reifydb_catalog/store/flow/
create.rs1use reifydb_core::{
5 diagnostic::catalog::flow_already_exists,
6 interface::{CommandTransaction, FlowDef, FlowId, FlowKey, FlowStatus, NamespaceFlowKey, NamespaceId},
7 return_error,
8};
9use reifydb_type::Fragment;
10
11use crate::{
12 CatalogStore,
13 store::{
14 flow::layout::{flow, flow_namespace},
15 sequence::flow::next_flow_id,
16 },
17};
18
19#[derive(Debug, Clone)]
20pub struct FlowToCreate {
21 pub fragment: Option<Fragment>,
22 pub name: String,
23 pub namespace: NamespaceId,
24 pub status: FlowStatus,
25}
26
27impl CatalogStore {
28 pub async fn create_flow(txn: &mut impl CommandTransaction, to_create: FlowToCreate) -> crate::Result<FlowDef> {
29 let namespace_id = to_create.namespace;
30
31 if let Some(_flow) = CatalogStore::find_flow_by_name(txn, namespace_id, &to_create.name).await? {
33 let namespace = CatalogStore::get_namespace(txn, namespace_id).await?;
34 return_error!(flow_already_exists(
35 to_create.fragment.unwrap_or_else(|| Fragment::None),
36 &namespace.name,
37 &to_create.name
38 ));
39 }
40
41 let flow_id = next_flow_id(txn).await?;
42 Self::store_flow(txn, flow_id, namespace_id, &to_create).await?;
43 Self::link_flow_to_namespace(txn, namespace_id, flow_id, &to_create.name).await?;
44
45 Ok(Self::get_flow(txn, flow_id).await?)
46 }
47
48 async fn store_flow(
49 txn: &mut impl CommandTransaction,
50 flow: FlowId,
51 namespace: NamespaceId,
52 to_create: &FlowToCreate,
53 ) -> crate::Result<()> {
54 let mut row = flow::LAYOUT.allocate();
55 flow::LAYOUT.set_u64(&mut row, flow::ID, flow);
56 flow::LAYOUT.set_u64(&mut row, flow::NAMESPACE, namespace);
57 flow::LAYOUT.set_utf8(&mut row, flow::NAME, &to_create.name);
58 flow::LAYOUT.set_u8(&mut row, flow::STATUS, to_create.status.to_u8());
59
60 txn.set(&FlowKey::encoded(flow), row).await?;
61
62 Ok(())
63 }
64
65 async fn link_flow_to_namespace(
66 txn: &mut impl CommandTransaction,
67 namespace: NamespaceId,
68 flow: FlowId,
69 name: &str,
70 ) -> crate::Result<()> {
71 let mut row = flow_namespace::LAYOUT.allocate();
72 flow_namespace::LAYOUT.set_u64(&mut row, flow_namespace::ID, flow);
73 flow_namespace::LAYOUT.set_utf8(&mut row, flow_namespace::NAME, name);
74 txn.set(&NamespaceFlowKey::encoded(namespace, flow), row).await?;
75 Ok(())
76 }
77}
78
79#[cfg(test)]
80mod tests {
81 use reifydb_core::interface::{
82 FlowId, FlowStatus, MultiVersionQueryTransaction, NamespaceFlowKey, NamespaceId,
83 };
84 use reifydb_engine::test_utils::create_test_command_transaction;
85
86 use crate::{
87 CatalogStore,
88 store::flow::{create::FlowToCreate, layout::flow_namespace},
89 test_utils::{create_namespace, ensure_test_namespace},
90 };
91
92 #[tokio::test]
93 async fn test_create_flow() {
94 let mut txn = create_test_command_transaction().await;
95 let test_namespace = ensure_test_namespace(&mut txn).await;
96
97 let to_create = FlowToCreate {
98 fragment: None,
99 name: "test_flow".to_string(),
100 namespace: test_namespace.id,
101 status: FlowStatus::Active,
102 };
103
104 let result = CatalogStore::create_flow(&mut txn, to_create.clone()).await.unwrap();
106 assert_eq!(result.id, FlowId(1));
107 assert_eq!(result.namespace, NamespaceId(1025));
108 assert_eq!(result.name, "test_flow");
109 assert_eq!(result.status, FlowStatus::Active);
110
111 let err = CatalogStore::create_flow(&mut txn, to_create).await.unwrap_err();
113 assert_eq!(err.diagnostic().code, "CA_030");
114 }
115
116 #[tokio::test]
117 async fn test_flow_linked_to_namespace() {
118 let mut txn = create_test_command_transaction().await;
119 let test_namespace = ensure_test_namespace(&mut txn).await;
120
121 let to_create = FlowToCreate {
123 fragment: None,
124 name: "flow_one".to_string(),
125 namespace: test_namespace.id,
126 status: FlowStatus::Active,
127 };
128 CatalogStore::create_flow(&mut txn, to_create).await.unwrap();
129
130 let to_create = FlowToCreate {
131 fragment: None,
132 name: "flow_two".to_string(),
133 namespace: test_namespace.id,
134 status: FlowStatus::Paused,
135 };
136 CatalogStore::create_flow(&mut txn, to_create).await.unwrap();
137
138 let links = txn
140 .range(NamespaceFlowKey::full_scan(test_namespace.id))
141 .await
142 .unwrap()
143 .items
144 .into_iter()
145 .collect::<Vec<_>>();
146 assert_eq!(links.len(), 2);
147
148 let mut found_flow_one = false;
150 let mut found_flow_two = false;
151
152 for link in &links {
153 let row = &link.values;
154 let id = flow_namespace::LAYOUT.get_u64(row, flow_namespace::ID);
155 let name = flow_namespace::LAYOUT.get_utf8(row, flow_namespace::NAME);
156
157 match name {
158 "flow_one" => {
159 assert_eq!(id, 1);
160 found_flow_one = true;
161 }
162 "flow_two" => {
163 assert_eq!(id, 2);
164 found_flow_two = true;
165 }
166 _ => panic!("Unexpected flow name: {}", name),
167 }
168 }
169
170 assert!(found_flow_one, "flow_one not found in namespace links");
171 assert!(found_flow_two, "flow_two not found in namespace links");
172 }
173
174 #[tokio::test]
175 async fn test_create_flow_multiple_namespaces() {
176 let mut txn = create_test_command_transaction().await;
177 let namespace_one = create_namespace(&mut txn, "namespace_one").await;
178 let namespace_two = create_namespace(&mut txn, "namespace_two").await;
179
180 let to_create = FlowToCreate {
182 fragment: None,
183 name: "shared_name".to_string(),
184 namespace: namespace_one.id,
185 status: FlowStatus::Active,
186 };
187 CatalogStore::create_flow(&mut txn, to_create).await.unwrap();
188
189 let to_create = FlowToCreate {
191 fragment: None,
192 name: "shared_name".to_string(),
193 namespace: namespace_two.id,
194 status: FlowStatus::Active,
195 };
196 let result = CatalogStore::create_flow(&mut txn, to_create).await.unwrap();
197 assert_eq!(result.name, "shared_name");
198 assert_eq!(result.namespace, namespace_two.id);
199 }
200}