reifydb_catalog/store/flow_node/
create.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{
5	interface::{CommandTransaction, FlowNodeDef},
6	key::{FlowNodeByFlowKey, FlowNodeKey},
7};
8
9use crate::store::flow_node::layout::{flow_node, flow_node_by_flow};
10
11impl crate::CatalogStore {
12	pub async fn create_flow_node(txn: &mut impl CommandTransaction, node_def: &FlowNodeDef) -> crate::Result<()> {
13		// Write to main flow_node table
14		let mut row = flow_node::LAYOUT.allocate();
15		flow_node::LAYOUT.set_u64(&mut row, flow_node::ID, node_def.id);
16		flow_node::LAYOUT.set_u64(&mut row, flow_node::FLOW, node_def.flow);
17		flow_node::LAYOUT.set_u8(&mut row, flow_node::TYPE, node_def.node_type);
18		flow_node::LAYOUT.set_blob(&mut row, flow_node::DATA, &node_def.data);
19
20		txn.set(&FlowNodeKey::encoded(node_def.id), row).await?;
21
22		// Write to flow_node_by_flow index
23		let mut index_row = flow_node_by_flow::LAYOUT.allocate();
24		flow_node_by_flow::LAYOUT.set_u64(&mut index_row, flow_node_by_flow::FLOW, node_def.flow);
25		flow_node_by_flow::LAYOUT.set_u64(&mut index_row, flow_node_by_flow::ID, node_def.id);
26
27		txn.set(&FlowNodeByFlowKey::encoded(node_def.flow, node_def.id), index_row).await?;
28
29		Ok(())
30	}
31}
32
33#[cfg(test)]
34mod tests {
35	use reifydb_core::interface::FlowNodeDef;
36	use reifydb_engine::test_utils::create_test_command_transaction;
37	use reifydb_type::Blob;
38
39	use crate::{
40		CatalogStore,
41		store::sequence::flow::next_flow_node_id,
42		test_utils::{create_namespace, ensure_test_flow},
43	};
44
45	#[tokio::test]
46	async fn test_create_flow_node() {
47		let mut txn = create_test_command_transaction().await;
48		let _namespace = create_namespace(&mut txn, "test_namespace").await;
49		let flow = ensure_test_flow(&mut txn).await;
50
51		let node_id = next_flow_node_id(&mut txn).await.unwrap();
52		let node_def = FlowNodeDef {
53			id: node_id,
54			flow: flow.id,
55			node_type: 1, // SourceTable
56			data: Blob::from([0x01u8, 0x02, 0x03].as_slice()),
57		};
58
59		CatalogStore::create_flow_node(&mut txn, &node_def).await.unwrap();
60
61		// Verify node was created
62		let result = CatalogStore::get_flow_node(&mut txn, node_id).await.unwrap();
63		assert_eq!(result.id, node_id);
64		assert_eq!(result.flow, flow.id);
65		assert_eq!(result.node_type, 1);
66		assert_eq!(result.data.as_ref(), &[0x01, 0x02, 0x03]);
67	}
68
69	#[tokio::test]
70	async fn test_create_multiple_nodes_same_flow() {
71		let mut txn = create_test_command_transaction().await;
72		let _namespace = create_namespace(&mut txn, "test_namespace").await;
73		let flow = ensure_test_flow(&mut txn).await;
74
75		// Create first node
76		let node1_id = next_flow_node_id(&mut txn).await.unwrap();
77		let node1 = FlowNodeDef {
78			id: node1_id,
79			flow: flow.id,
80			node_type: 1, // SourceTable
81			data: Blob::from([0x01u8].as_slice()),
82		};
83		CatalogStore::create_flow_node(&mut txn, &node1).await.unwrap();
84
85		// Create second node
86		let node2_id = next_flow_node_id(&mut txn).await.unwrap();
87		let node2 = FlowNodeDef {
88			id: node2_id,
89			flow: flow.id,
90			node_type: 4, // Filter
91			data: Blob::from([0x02u8].as_slice()),
92		};
93		CatalogStore::create_flow_node(&mut txn, &node2).await.unwrap();
94
95		// Verify both nodes exist
96		let result1 = CatalogStore::get_flow_node(&mut txn, node1_id).await.unwrap();
97		let result2 = CatalogStore::get_flow_node(&mut txn, node2_id).await.unwrap();
98
99		assert_eq!(result1.node_type, 1);
100		assert_eq!(result2.node_type, 4);
101	}
102
103	#[tokio::test]
104	async fn test_create_nodes_different_flows() {
105		let mut txn = create_test_command_transaction().await;
106		let _namespace = create_namespace(&mut txn, "test_namespace").await;
107
108		// Create two flows
109		let flow1 = crate::test_utils::create_flow(&mut txn, "test_namespace", "flow_one").await;
110		let flow2 = crate::test_utils::create_flow(&mut txn, "test_namespace", "flow_two").await;
111
112		// Create node in first flow
113		let node1_id = next_flow_node_id(&mut txn).await.unwrap();
114		let node1 = FlowNodeDef {
115			id: node1_id,
116			flow: flow1.id,
117			node_type: 1,
118			data: Blob::from([0x01u8].as_slice()),
119		};
120		CatalogStore::create_flow_node(&mut txn, &node1).await.unwrap();
121
122		// Create node in second flow
123		let node2_id = next_flow_node_id(&mut txn).await.unwrap();
124		let node2 = FlowNodeDef {
125			id: node2_id,
126			flow: flow2.id,
127			node_type: 1,
128			data: Blob::from([0x02u8].as_slice()),
129		};
130		CatalogStore::create_flow_node(&mut txn, &node2).await.unwrap();
131
132		// Verify nodes are in correct flows
133		let result1 = CatalogStore::get_flow_node(&mut txn, node1_id).await.unwrap();
134		let result2 = CatalogStore::get_flow_node(&mut txn, node2_id).await.unwrap();
135
136		assert_eq!(result1.flow, flow1.id);
137		assert_eq!(result2.flow, flow2.id);
138	}
139
140	#[tokio::test]
141	async fn test_node_appears_in_index() {
142		let mut txn = create_test_command_transaction().await;
143		let _namespace = create_namespace(&mut txn, "test_namespace").await;
144		let flow = ensure_test_flow(&mut txn).await;
145
146		let node_id = next_flow_node_id(&mut txn).await.unwrap();
147		let node_def = FlowNodeDef {
148			id: node_id,
149			flow: flow.id,
150			node_type: 1,
151			data: Blob::from([0x01u8].as_slice()),
152		};
153
154		CatalogStore::create_flow_node(&mut txn, &node_def).await.unwrap();
155
156		// Verify node appears in flow index by listing nodes for flow
157		let nodes = CatalogStore::list_flow_nodes_by_flow(&mut txn, flow.id).await.unwrap();
158		assert_eq!(nodes.len(), 1);
159		assert_eq!(nodes[0].id, node_id);
160	}
161}