reifydb_catalog/store/flow/
create.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{
5	diagnostic::catalog::flow_already_exists,
6	interface::{CommandTransaction, FlowDef, FlowId, FlowKey, FlowStatus, NamespaceFlowKey, NamespaceId},
7	return_error,
8};
9use reifydb_type::Fragment;
10
11use crate::{
12	CatalogStore,
13	store::{
14		flow::layout::{flow, flow_namespace},
15		sequence::flow::next_flow_id,
16	},
17};
18
19#[derive(Debug, Clone)]
20pub struct FlowToCreate {
21	pub fragment: Option<Fragment>,
22	pub name: String,
23	pub namespace: NamespaceId,
24	pub status: FlowStatus,
25}
26
27impl CatalogStore {
28	pub async fn create_flow(txn: &mut impl CommandTransaction, to_create: FlowToCreate) -> crate::Result<FlowDef> {
29		let namespace_id = to_create.namespace;
30
31		// Check if flow already exists
32		if let Some(_flow) = CatalogStore::find_flow_by_name(txn, namespace_id, &to_create.name).await? {
33			let namespace = CatalogStore::get_namespace(txn, namespace_id).await?;
34			return_error!(flow_already_exists(
35				to_create.fragment.unwrap_or_else(|| Fragment::None),
36				&namespace.name,
37				&to_create.name
38			));
39		}
40
41		let flow_id = next_flow_id(txn).await?;
42		Self::store_flow(txn, flow_id, namespace_id, &to_create).await?;
43		Self::link_flow_to_namespace(txn, namespace_id, flow_id, &to_create.name).await?;
44
45		Ok(Self::get_flow(txn, flow_id).await?)
46	}
47
48	async fn store_flow(
49		txn: &mut impl CommandTransaction,
50		flow: FlowId,
51		namespace: NamespaceId,
52		to_create: &FlowToCreate,
53	) -> crate::Result<()> {
54		let mut row = flow::LAYOUT.allocate();
55		flow::LAYOUT.set_u64(&mut row, flow::ID, flow);
56		flow::LAYOUT.set_u64(&mut row, flow::NAMESPACE, namespace);
57		flow::LAYOUT.set_utf8(&mut row, flow::NAME, &to_create.name);
58		flow::LAYOUT.set_u8(&mut row, flow::STATUS, to_create.status.to_u8());
59
60		txn.set(&FlowKey::encoded(flow), row).await?;
61
62		Ok(())
63	}
64
65	async fn link_flow_to_namespace(
66		txn: &mut impl CommandTransaction,
67		namespace: NamespaceId,
68		flow: FlowId,
69		name: &str,
70	) -> crate::Result<()> {
71		let mut row = flow_namespace::LAYOUT.allocate();
72		flow_namespace::LAYOUT.set_u64(&mut row, flow_namespace::ID, flow);
73		flow_namespace::LAYOUT.set_utf8(&mut row, flow_namespace::NAME, name);
74		txn.set(&NamespaceFlowKey::encoded(namespace, flow), row).await?;
75		Ok(())
76	}
77}
78
79#[cfg(test)]
80mod tests {
81	use reifydb_core::interface::{
82		FlowId, FlowStatus, MultiVersionQueryTransaction, NamespaceFlowKey, NamespaceId,
83	};
84	use reifydb_engine::test_utils::create_test_command_transaction;
85
86	use crate::{
87		CatalogStore,
88		store::flow::{create::FlowToCreate, layout::flow_namespace},
89		test_utils::{create_namespace, ensure_test_namespace},
90	};
91
92	#[tokio::test]
93	async fn test_create_flow() {
94		let mut txn = create_test_command_transaction().await;
95		let test_namespace = ensure_test_namespace(&mut txn).await;
96
97		let to_create = FlowToCreate {
98			fragment: None,
99			name: "test_flow".to_string(),
100			namespace: test_namespace.id,
101			status: FlowStatus::Active,
102		};
103
104		// First creation should succeed
105		let result = CatalogStore::create_flow(&mut txn, to_create.clone()).await.unwrap();
106		assert_eq!(result.id, FlowId(1));
107		assert_eq!(result.namespace, NamespaceId(1025));
108		assert_eq!(result.name, "test_flow");
109		assert_eq!(result.status, FlowStatus::Active);
110
111		// Second creation should fail with duplicate error
112		let err = CatalogStore::create_flow(&mut txn, to_create).await.unwrap_err();
113		assert_eq!(err.diagnostic().code, "CA_030");
114	}
115
116	#[tokio::test]
117	async fn test_flow_linked_to_namespace() {
118		let mut txn = create_test_command_transaction().await;
119		let test_namespace = ensure_test_namespace(&mut txn).await;
120
121		// Create two flows
122		let to_create = FlowToCreate {
123			fragment: None,
124			name: "flow_one".to_string(),
125			namespace: test_namespace.id,
126			status: FlowStatus::Active,
127		};
128		CatalogStore::create_flow(&mut txn, to_create).await.unwrap();
129
130		let to_create = FlowToCreate {
131			fragment: None,
132			name: "flow_two".to_string(),
133			namespace: test_namespace.id,
134			status: FlowStatus::Paused,
135		};
136		CatalogStore::create_flow(&mut txn, to_create).await.unwrap();
137
138		// Verify both are linked to namespace
139		let links = txn
140			.range(NamespaceFlowKey::full_scan(test_namespace.id))
141			.await
142			.unwrap()
143			.items
144			.into_iter()
145			.collect::<Vec<_>>();
146		assert_eq!(links.len(), 2);
147
148		// Verify link metadata (order may vary)
149		let mut found_flow_one = false;
150		let mut found_flow_two = false;
151
152		for link in &links {
153			let row = &link.values;
154			let id = flow_namespace::LAYOUT.get_u64(row, flow_namespace::ID);
155			let name = flow_namespace::LAYOUT.get_utf8(row, flow_namespace::NAME);
156
157			match name {
158				"flow_one" => {
159					assert_eq!(id, 1);
160					found_flow_one = true;
161				}
162				"flow_two" => {
163					assert_eq!(id, 2);
164					found_flow_two = true;
165				}
166				_ => panic!("Unexpected flow name: {}", name),
167			}
168		}
169
170		assert!(found_flow_one, "flow_one not found in namespace links");
171		assert!(found_flow_two, "flow_two not found in namespace links");
172	}
173
174	#[tokio::test]
175	async fn test_create_flow_multiple_namespaces() {
176		let mut txn = create_test_command_transaction().await;
177		let namespace_one = create_namespace(&mut txn, "namespace_one").await;
178		let namespace_two = create_namespace(&mut txn, "namespace_two").await;
179
180		// Create flow in first namespace
181		let to_create = FlowToCreate {
182			fragment: None,
183			name: "shared_name".to_string(),
184			namespace: namespace_one.id,
185			status: FlowStatus::Active,
186		};
187		CatalogStore::create_flow(&mut txn, to_create).await.unwrap();
188
189		// Should be able to create flow with same name in different namespace
190		let to_create = FlowToCreate {
191			fragment: None,
192			name: "shared_name".to_string(),
193			namespace: namespace_two.id,
194			status: FlowStatus::Active,
195		};
196		let result = CatalogStore::create_flow(&mut txn, to_create).await.unwrap();
197		assert_eq!(result.name, "shared_name");
198		assert_eq!(result.namespace, namespace_two.id);
199	}
200}