reifydb_catalog/store/flow_node/
create.rs1use reifydb_core::{
5 interface::{CommandTransaction, FlowNodeDef},
6 key::{FlowNodeByFlowKey, FlowNodeKey},
7};
8
9use crate::store::flow_node::layout::{flow_node, flow_node_by_flow};
10
11impl crate::CatalogStore {
12 pub async fn create_flow_node(txn: &mut impl CommandTransaction, node_def: &FlowNodeDef) -> crate::Result<()> {
13 let mut row = flow_node::LAYOUT.allocate();
15 flow_node::LAYOUT.set_u64(&mut row, flow_node::ID, node_def.id);
16 flow_node::LAYOUT.set_u64(&mut row, flow_node::FLOW, node_def.flow);
17 flow_node::LAYOUT.set_u8(&mut row, flow_node::TYPE, node_def.node_type);
18 flow_node::LAYOUT.set_blob(&mut row, flow_node::DATA, &node_def.data);
19
20 txn.set(&FlowNodeKey::encoded(node_def.id), row).await?;
21
22 let mut index_row = flow_node_by_flow::LAYOUT.allocate();
24 flow_node_by_flow::LAYOUT.set_u64(&mut index_row, flow_node_by_flow::FLOW, node_def.flow);
25 flow_node_by_flow::LAYOUT.set_u64(&mut index_row, flow_node_by_flow::ID, node_def.id);
26
27 txn.set(&FlowNodeByFlowKey::encoded(node_def.flow, node_def.id), index_row).await?;
28
29 Ok(())
30 }
31}
32
33#[cfg(test)]
34mod tests {
35 use reifydb_core::interface::FlowNodeDef;
36 use reifydb_engine::test_utils::create_test_command_transaction;
37 use reifydb_type::Blob;
38
39 use crate::{
40 CatalogStore,
41 store::sequence::flow::next_flow_node_id,
42 test_utils::{create_namespace, ensure_test_flow},
43 };
44
45 #[tokio::test]
46 async fn test_create_flow_node() {
47 let mut txn = create_test_command_transaction().await;
48 let _namespace = create_namespace(&mut txn, "test_namespace").await;
49 let flow = ensure_test_flow(&mut txn).await;
50
51 let node_id = next_flow_node_id(&mut txn).await.unwrap();
52 let node_def = FlowNodeDef {
53 id: node_id,
54 flow: flow.id,
55 node_type: 1, data: Blob::from([0x01u8, 0x02, 0x03].as_slice()),
57 };
58
59 CatalogStore::create_flow_node(&mut txn, &node_def).await.unwrap();
60
61 let result = CatalogStore::get_flow_node(&mut txn, node_id).await.unwrap();
63 assert_eq!(result.id, node_id);
64 assert_eq!(result.flow, flow.id);
65 assert_eq!(result.node_type, 1);
66 assert_eq!(result.data.as_ref(), &[0x01, 0x02, 0x03]);
67 }
68
69 #[tokio::test]
70 async fn test_create_multiple_nodes_same_flow() {
71 let mut txn = create_test_command_transaction().await;
72 let _namespace = create_namespace(&mut txn, "test_namespace").await;
73 let flow = ensure_test_flow(&mut txn).await;
74
75 let node1_id = next_flow_node_id(&mut txn).await.unwrap();
77 let node1 = FlowNodeDef {
78 id: node1_id,
79 flow: flow.id,
80 node_type: 1, data: Blob::from([0x01u8].as_slice()),
82 };
83 CatalogStore::create_flow_node(&mut txn, &node1).await.unwrap();
84
85 let node2_id = next_flow_node_id(&mut txn).await.unwrap();
87 let node2 = FlowNodeDef {
88 id: node2_id,
89 flow: flow.id,
90 node_type: 4, data: Blob::from([0x02u8].as_slice()),
92 };
93 CatalogStore::create_flow_node(&mut txn, &node2).await.unwrap();
94
95 let result1 = CatalogStore::get_flow_node(&mut txn, node1_id).await.unwrap();
97 let result2 = CatalogStore::get_flow_node(&mut txn, node2_id).await.unwrap();
98
99 assert_eq!(result1.node_type, 1);
100 assert_eq!(result2.node_type, 4);
101 }
102
103 #[tokio::test]
104 async fn test_create_nodes_different_flows() {
105 let mut txn = create_test_command_transaction().await;
106 let _namespace = create_namespace(&mut txn, "test_namespace").await;
107
108 let flow1 = crate::test_utils::create_flow(&mut txn, "test_namespace", "flow_one").await;
110 let flow2 = crate::test_utils::create_flow(&mut txn, "test_namespace", "flow_two").await;
111
112 let node1_id = next_flow_node_id(&mut txn).await.unwrap();
114 let node1 = FlowNodeDef {
115 id: node1_id,
116 flow: flow1.id,
117 node_type: 1,
118 data: Blob::from([0x01u8].as_slice()),
119 };
120 CatalogStore::create_flow_node(&mut txn, &node1).await.unwrap();
121
122 let node2_id = next_flow_node_id(&mut txn).await.unwrap();
124 let node2 = FlowNodeDef {
125 id: node2_id,
126 flow: flow2.id,
127 node_type: 1,
128 data: Blob::from([0x02u8].as_slice()),
129 };
130 CatalogStore::create_flow_node(&mut txn, &node2).await.unwrap();
131
132 let result1 = CatalogStore::get_flow_node(&mut txn, node1_id).await.unwrap();
134 let result2 = CatalogStore::get_flow_node(&mut txn, node2_id).await.unwrap();
135
136 assert_eq!(result1.flow, flow1.id);
137 assert_eq!(result2.flow, flow2.id);
138 }
139
140 #[tokio::test]
141 async fn test_node_appears_in_index() {
142 let mut txn = create_test_command_transaction().await;
143 let _namespace = create_namespace(&mut txn, "test_namespace").await;
144 let flow = ensure_test_flow(&mut txn).await;
145
146 let node_id = next_flow_node_id(&mut txn).await.unwrap();
147 let node_def = FlowNodeDef {
148 id: node_id,
149 flow: flow.id,
150 node_type: 1,
151 data: Blob::from([0x01u8].as_slice()),
152 };
153
154 CatalogStore::create_flow_node(&mut txn, &node_def).await.unwrap();
155
156 let nodes = CatalogStore::list_flow_nodes_by_flow(&mut txn, flow.id).await.unwrap();
158 assert_eq!(nodes.len(), 1);
159 assert_eq!(nodes[0].id, node_id);
160 }
161}