reifydb-catalog 0.4.13

Database catalog and metadata management for ReifyDB
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2025 ReifyDB

use reifydb_core::{
	interface::catalog::{
		flow::{Flow, FlowId, FlowStatus},
		id::NamespaceId,
	},
	key::{flow::FlowKey, namespace_flow::NamespaceFlowKey},
};
use reifydb_transaction::transaction::Transaction;
use reifydb_type::value::duration::Duration;

use crate::{
	CatalogStore, Result,
	store::flow::shape::{flow, flow_namespace},
};

impl CatalogStore {
	pub(crate) fn find_flow(rx: &mut Transaction<'_>, id: FlowId) -> Result<Option<Flow>> {
		let Some(multi) = rx.get(&FlowKey::encoded(id))? else {
			return Ok(None);
		};

		let row = multi.row;
		let id = FlowId(flow::SHAPE.get_u64(&row, flow::ID));
		let namespace = NamespaceId(flow::SHAPE.get_u64(&row, flow::NAMESPACE));
		let name = flow::SHAPE.get_utf8(&row, flow::NAME).to_string();
		let status_u8 = flow::SHAPE.get_u8(&row, flow::STATUS);
		let status = FlowStatus::from_u8(status_u8);

		let tick_nanos = flow::SHAPE.get_u64(&row, flow::TICK_NANOS);
		let tick = if tick_nanos > 0 {
			Some(Duration::from_nanoseconds(tick_nanos as i64)?)
		} else {
			None
		};

		Ok(Some(Flow {
			id,
			name,
			namespace,
			status,
			tick,
		}))
	}

	pub(crate) fn find_flow_by_name(
		rx: &mut Transaction<'_>,
		namespace: NamespaceId,
		name: impl AsRef<str>,
	) -> Result<Option<Flow>> {
		let name = name.as_ref();
		let mut stream = rx.range(NamespaceFlowKey::full_scan(namespace), 1024)?;

		let mut found_flow = None;
		for entry in stream.by_ref() {
			let multi = entry?;
			let row = &multi.row;
			let flow_name = flow_namespace::SHAPE.get_utf8(row, flow_namespace::NAME);
			if name == flow_name {
				found_flow = Some(FlowId(flow_namespace::SHAPE.get_u64(row, flow_namespace::ID)));
				break;
			}
		}

		drop(stream);

		let Some(flow) = found_flow else {
			return Ok(None);
		};

		Ok(Some(Self::get_flow(rx, flow)?))
	}
}

#[cfg(test)]
pub mod tests {
	use reifydb_engine::test_harness::create_test_admin_transaction;
	use reifydb_transaction::transaction::Transaction;

	use crate::{
		CatalogStore,
		test_utils::{create_flow, create_namespace, ensure_test_namespace},
	};

	#[test]
	fn test_find_flow_by_name_ok() {
		let mut txn = create_test_admin_transaction();
		let _namespace_one = create_namespace(&mut txn, "namespace_one");
		let namespace_two = create_namespace(&mut txn, "namespace_two");

		create_flow(&mut txn, "namespace_one", "flow_one");
		create_flow(&mut txn, "namespace_two", "flow_two");

		let result = CatalogStore::find_flow_by_name(
			&mut Transaction::Admin(&mut txn),
			namespace_two.id(),
			"flow_two",
		)
		.unwrap()
		.unwrap();
		assert_eq!(result.name, "flow_two");
		assert_eq!(result.namespace, namespace_two.id());
	}

	#[test]
	fn test_find_flow_by_name_empty() {
		let mut txn = create_test_admin_transaction();
		let test_namespace = ensure_test_namespace(&mut txn);

		let result = CatalogStore::find_flow_by_name(
			&mut Transaction::Admin(&mut txn),
			test_namespace.id(),
			"some_flow",
		)
		.unwrap();
		assert!(result.is_none());
	}

	#[test]
	fn test_find_flow_by_name_not_found() {
		let mut txn = create_test_admin_transaction();
		let test_namespace = ensure_test_namespace(&mut txn);

		create_flow(&mut txn, "test_namespace", "flow_one");
		create_flow(&mut txn, "test_namespace", "flow_two");

		let result = CatalogStore::find_flow_by_name(
			&mut Transaction::Admin(&mut txn),
			test_namespace.id(),
			"flow_three",
		)
		.unwrap();
		assert!(result.is_none());
	}

	#[test]
	fn test_find_flow_by_name_different_namespace() {
		let mut txn = create_test_admin_transaction();
		let _namespace_one = create_namespace(&mut txn, "namespace_one");
		let namespace_two = create_namespace(&mut txn, "namespace_two");

		create_flow(&mut txn, "namespace_one", "my_flow");

		// Flow exists in namespace_one but not in namespace_two
		let result = CatalogStore::find_flow_by_name(
			&mut Transaction::Admin(&mut txn),
			namespace_two.id(),
			"my_flow",
		)
		.unwrap();
		assert!(result.is_none());
	}

	#[test]
	fn test_find_flow_by_name_case_sensitive() {
		let mut txn = create_test_admin_transaction();
		let test_namespace = ensure_test_namespace(&mut txn);

		create_flow(&mut txn, "test_namespace", "MyFlow");

		// Flow names are case-sensitive
		let result = CatalogStore::find_flow_by_name(
			&mut Transaction::Admin(&mut txn),
			test_namespace.id(),
			"myflow",
		)
		.unwrap();
		assert!(result.is_none());

		let result = CatalogStore::find_flow_by_name(
			&mut Transaction::Admin(&mut txn),
			test_namespace.id(),
			"MyFlow",
		)
		.unwrap();
		assert!(result.is_some());
	}

	#[test]
	fn test_find_flow_by_id() {
		let mut txn = create_test_admin_transaction();
		ensure_test_namespace(&mut txn);

		let flow = create_flow(&mut txn, "test_namespace", "test_flow");

		let result = CatalogStore::find_flow(&mut Transaction::Admin(&mut txn), flow.id).unwrap().unwrap();
		assert_eq!(result.id, flow.id);
		assert_eq!(result.name, "test_flow");
	}

	#[test]
	fn test_find_flow_by_id_not_found() {
		let mut txn = create_test_admin_transaction();

		let result = CatalogStore::find_flow(&mut Transaction::Admin(&mut txn), 999.into()).unwrap();
		assert!(result.is_none());
	}
}