reifydb-catalog 0.4.13

Database catalog and metadata management for ReifyDB
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2025 ReifyDB

use reifydb_core::{
	interface::catalog::{
		flow::{Flow, FlowId, FlowStatus},
		id::NamespaceId,
	},
	key::{flow::FlowKey, namespace_flow::NamespaceFlowKey},
};
use reifydb_transaction::transaction::{Transaction, admin::AdminTransaction};
use reifydb_type::{fragment::Fragment, value::duration::Duration};

use crate::{
	CatalogStore, Result,
	error::{CatalogError, CatalogObjectKind},
	store::{
		flow::shape::{flow, flow_namespace},
		sequence::flow::next_flow_id,
	},
};

#[derive(Debug, Clone)]
pub struct FlowToCreate {
	pub name: Fragment,
	pub namespace: NamespaceId,
	pub status: FlowStatus,
	pub tick: Option<Duration>,
}

impl CatalogStore {
	pub(crate) fn create_flow(txn: &mut AdminTransaction, to_create: FlowToCreate) -> Result<Flow> {
		let namespace_id = to_create.namespace;
		Self::reject_existing_flow(txn, namespace_id, &to_create.name)?;

		let flow_id = next_flow_id(txn)?;
		Self::install_flow(txn, flow_id, namespace_id, &to_create)?;
		Self::get_flow(&mut Transaction::Admin(&mut *txn), flow_id)
	}

	/// Create a flow with a specific ID (for subscription flows where FlowId == SubscriptionId).
	/// This skips the name uniqueness check since the ID is guaranteed unique by the sequence.
	pub(crate) fn create_flow_with_id(
		txn: &mut AdminTransaction,
		flow_id: FlowId,
		to_create: FlowToCreate,
	) -> Result<Flow> {
		let namespace_id = to_create.namespace;
		Self::install_flow(txn, flow_id, namespace_id, &to_create)?;
		Self::get_flow(&mut Transaction::Admin(&mut *txn), flow_id)
	}

	#[inline]
	fn reject_existing_flow(txn: &mut AdminTransaction, namespace_id: NamespaceId, name: &Fragment) -> Result<()> {
		if CatalogStore::find_flow_by_name(&mut Transaction::Admin(&mut *txn), namespace_id, name.text())?
			.is_none()
		{
			return Ok(());
		}
		let namespace = CatalogStore::get_namespace(&mut Transaction::Admin(&mut *txn), namespace_id)?;
		Err(CatalogError::AlreadyExists {
			kind: CatalogObjectKind::Flow,
			namespace: namespace.name().to_string(),
			name: name.text().to_string(),
			fragment: name.clone(),
		}
		.into())
	}

	#[inline]
	fn install_flow(
		txn: &mut AdminTransaction,
		flow_id: FlowId,
		namespace_id: NamespaceId,
		to_create: &FlowToCreate,
	) -> Result<()> {
		Self::store_flow(txn, flow_id, namespace_id, to_create)?;
		Self::link_flow_to_namespace(txn, namespace_id, flow_id, to_create.name.text())
	}

	fn store_flow(
		txn: &mut AdminTransaction,
		flow: FlowId,
		namespace: NamespaceId,
		to_create: &FlowToCreate,
	) -> Result<()> {
		let mut row = flow::SHAPE.allocate();
		flow::SHAPE.set_u64(&mut row, flow::ID, flow);
		flow::SHAPE.set_u64(&mut row, flow::NAMESPACE, namespace);
		flow::SHAPE.set_utf8(&mut row, flow::NAME, to_create.name.text());
		flow::SHAPE.set_u8(&mut row, flow::STATUS, to_create.status.to_u8());
		let tick_nanos = to_create.tick.map(|d| d.get_nanos() as u64).unwrap_or(0);
		flow::SHAPE.set_u64(&mut row, flow::TICK_NANOS, tick_nanos);

		let key = FlowKey::encoded(flow);
		txn.set(&key, row)?;

		Ok(())
	}

	fn link_flow_to_namespace(
		txn: &mut AdminTransaction,
		namespace: NamespaceId,
		flow: FlowId,
		name: &str,
	) -> Result<()> {
		let mut row = flow_namespace::SHAPE.allocate();
		flow_namespace::SHAPE.set_u64(&mut row, flow_namespace::ID, flow);
		flow_namespace::SHAPE.set_utf8(&mut row, flow_namespace::NAME, name);
		let key = NamespaceFlowKey::encoded(namespace, flow);
		txn.set(&key, row)?;
		Ok(())
	}
}

#[cfg(test)]
pub mod tests {
	use reifydb_core::{
		interface::catalog::{
			flow::{FlowId, FlowStatus},
			id::NamespaceId,
		},
		key::namespace_flow::NamespaceFlowKey,
	};
	use reifydb_engine::test_harness::create_test_admin_transaction;
	use reifydb_type::fragment::Fragment;

	use crate::{
		CatalogStore,
		store::flow::{create::FlowToCreate, shape::flow_namespace},
		test_utils::{create_namespace, ensure_test_namespace},
	};

	#[test]
	fn test_create_flow() {
		let mut txn = create_test_admin_transaction();
		let test_namespace = ensure_test_namespace(&mut txn);

		let to_create = FlowToCreate {
			name: Fragment::internal("test_flow"),
			namespace: test_namespace.id(),
			status: FlowStatus::Active,
			tick: None,
		};

		// First creation should succeed
		let result = CatalogStore::create_flow(&mut txn, to_create.clone()).unwrap();
		assert_eq!(result.id, FlowId(1));
		assert_eq!(result.namespace, NamespaceId(16385));
		assert_eq!(result.name, "test_flow");
		assert_eq!(result.status, FlowStatus::Active);

		// Second creation should fail with duplicate error
		let err = CatalogStore::create_flow(&mut txn, to_create).unwrap_err();
		assert_eq!(err.diagnostic().code, "CA_030");
	}

	#[test]
	fn test_flow_linked_to_namespace() {
		let mut txn = create_test_admin_transaction();
		let test_namespace = ensure_test_namespace(&mut txn);

		// Create two flows
		let to_create = FlowToCreate {
			name: Fragment::internal("flow_one"),
			namespace: test_namespace.id(),
			status: FlowStatus::Active,
			tick: None,
		};
		CatalogStore::create_flow(&mut txn, to_create).unwrap();

		let to_create = FlowToCreate {
			name: Fragment::internal("flow_two"),
			namespace: test_namespace.id(),
			status: FlowStatus::Paused,
			tick: None,
		};
		CatalogStore::create_flow(&mut txn, to_create).unwrap();

		// Verify both are linked to namespace
		let links: Vec<_> = txn
			.range(NamespaceFlowKey::full_scan(test_namespace.id()), 1024)
			.unwrap()
			.collect::<Result<Vec<_>, _>>()
			.unwrap();
		assert_eq!(links.len(), 2);

		// Verify link metadata (order may vary)
		let mut found_flow_one = false;
		let mut found_flow_two = false;

		for link in &links {
			let row = &link.row;
			let id = flow_namespace::SHAPE.get_u64(row, flow_namespace::ID);
			let name = flow_namespace::SHAPE.get_utf8(row, flow_namespace::NAME);

			match name {
				"flow_one" => {
					assert_eq!(id, 1);
					found_flow_one = true;
				}
				"flow_two" => {
					assert_eq!(id, 2);
					found_flow_two = true;
				}
				_ => panic!("Unexpected flow name: {}", name),
			}
		}

		assert!(found_flow_one, "flow_one not found in namespace links");
		assert!(found_flow_two, "flow_two not found in namespace links");
	}

	#[test]
	fn test_create_flow_multiple_namespaces() {
		let mut txn = create_test_admin_transaction();
		let namespace_one = create_namespace(&mut txn, "namespace_one");
		let namespace_two = create_namespace(&mut txn, "namespace_two");

		// Create flow in first namespace
		let to_create = FlowToCreate {
			name: Fragment::internal("shared_name"),
			namespace: namespace_one.id(),
			status: FlowStatus::Active,
			tick: None,
		};
		CatalogStore::create_flow(&mut txn, to_create).unwrap();

		// Should be able to create flow with same name in different namespace
		let to_create = FlowToCreate {
			name: Fragment::internal("shared_name"),
			namespace: namespace_two.id(),
			status: FlowStatus::Active,
			tick: None,
		};
		let result = CatalogStore::create_flow(&mut txn, to_create).unwrap();
		assert_eq!(result.name, "shared_name");
		assert_eq!(result.namespace, namespace_two.id());
	}
}