reifydb_catalog/store/flow_edge/
create.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{
5	interface::{CommandTransaction, FlowEdgeDef},
6	key::{FlowEdgeByFlowKey, FlowEdgeKey},
7};
8
9use crate::store::flow_edge::layout::{flow_edge, flow_edge_by_flow};
10
11impl crate::CatalogStore {
12	pub async fn create_flow_edge(txn: &mut impl CommandTransaction, edge_def: &FlowEdgeDef) -> crate::Result<()> {
13		// Write to main flow_edge table
14		let mut row = flow_edge::LAYOUT.allocate();
15		flow_edge::LAYOUT.set_u64(&mut row, flow_edge::ID, edge_def.id);
16		flow_edge::LAYOUT.set_u64(&mut row, flow_edge::FLOW, edge_def.flow);
17		flow_edge::LAYOUT.set_u64(&mut row, flow_edge::SOURCE, edge_def.source);
18		flow_edge::LAYOUT.set_u64(&mut row, flow_edge::TARGET, edge_def.target);
19
20		txn.set(&FlowEdgeKey::encoded(edge_def.id), row).await?;
21
22		// Write to flow_edge_by_flow index
23		let mut index_row = flow_edge_by_flow::LAYOUT.allocate();
24		flow_edge_by_flow::LAYOUT.set_u64(&mut index_row, flow_edge_by_flow::FLOW, edge_def.flow);
25		flow_edge_by_flow::LAYOUT.set_u64(&mut index_row, flow_edge_by_flow::ID, edge_def.id);
26
27		txn.set(&FlowEdgeByFlowKey::encoded(edge_def.flow, edge_def.id), index_row).await?;
28
29		Ok(())
30	}
31}
32
33#[cfg(test)]
34mod tests {
35	use reifydb_engine::test_utils::create_test_command_transaction;
36
37	use crate::{
38		CatalogStore,
39		test_utils::{create_flow, create_flow_edge, create_flow_node, create_namespace, ensure_test_flow},
40	};
41
42	#[tokio::test]
43	async fn test_create_flow_edge() {
44		let mut txn = create_test_command_transaction().await;
45		let _namespace = create_namespace(&mut txn, "test_namespace").await;
46		let flow = ensure_test_flow(&mut txn).await;
47
48		let node1 = create_flow_node(&mut txn, flow.id, 1, &[0x01]).await;
49		let node2 = create_flow_node(&mut txn, flow.id, 4, &[0x02]).await;
50
51		let edge = create_flow_edge(&mut txn, flow.id, node1.id, node2.id).await;
52
53		// Verify edge was created
54		let result = CatalogStore::get_flow_edge(&mut txn, edge.id).await.unwrap();
55		assert_eq!(result.id, edge.id);
56		assert_eq!(result.flow, flow.id);
57		assert_eq!(result.source, node1.id);
58		assert_eq!(result.target, node2.id);
59	}
60
61	#[tokio::test]
62	async fn test_create_multiple_edges_same_flow() {
63		let mut txn = create_test_command_transaction().await;
64		let _namespace = create_namespace(&mut txn, "test_namespace").await;
65		let flow = ensure_test_flow(&mut txn).await;
66
67		let node1 = create_flow_node(&mut txn, flow.id, 1, &[0x01]).await;
68		let node2 = create_flow_node(&mut txn, flow.id, 4, &[0x02]).await;
69		let node3 = create_flow_node(&mut txn, flow.id, 5, &[0x03]).await;
70
71		let edge1 = create_flow_edge(&mut txn, flow.id, node1.id, node2.id).await;
72		let edge2 = create_flow_edge(&mut txn, flow.id, node2.id, node3.id).await;
73
74		// Verify both edges exist
75		let result1 = CatalogStore::get_flow_edge(&mut txn, edge1.id).await.unwrap();
76		let result2 = CatalogStore::get_flow_edge(&mut txn, edge2.id).await.unwrap();
77
78		assert_eq!(result1.source, node1.id);
79		assert_eq!(result1.target, node2.id);
80		assert_eq!(result2.source, node2.id);
81		assert_eq!(result2.target, node3.id);
82	}
83
84	#[tokio::test]
85	async fn test_create_edges_different_flows() {
86		let mut txn = create_test_command_transaction().await;
87		let _namespace = create_namespace(&mut txn, "test_namespace").await;
88
89		let flow1 = create_flow(&mut txn, "test_namespace", "flow_one").await;
90		let flow2 = create_flow(&mut txn, "test_namespace", "flow_two").await;
91
92		let node1a = create_flow_node(&mut txn, flow1.id, 1, &[0x01]).await;
93		let node1b = create_flow_node(&mut txn, flow1.id, 4, &[0x02]).await;
94		let node2a = create_flow_node(&mut txn, flow2.id, 1, &[0x03]).await;
95		let node2b = create_flow_node(&mut txn, flow2.id, 4, &[0x04]).await;
96
97		let edge1 = create_flow_edge(&mut txn, flow1.id, node1a.id, node1b.id).await;
98		let edge2 = create_flow_edge(&mut txn, flow2.id, node2a.id, node2b.id).await;
99
100		// Verify edges are in correct flows
101		let result1 = CatalogStore::get_flow_edge(&mut txn, edge1.id).await.unwrap();
102		let result2 = CatalogStore::get_flow_edge(&mut txn, edge2.id).await.unwrap();
103
104		assert_eq!(result1.flow, flow1.id);
105		assert_eq!(result2.flow, flow2.id);
106	}
107
108	#[tokio::test]
109	async fn test_edge_appears_in_index() {
110		let mut txn = create_test_command_transaction().await;
111		let _namespace = create_namespace(&mut txn, "test_namespace").await;
112		let flow = ensure_test_flow(&mut txn).await;
113
114		let node1 = create_flow_node(&mut txn, flow.id, 1, &[0x01]).await;
115		let node2 = create_flow_node(&mut txn, flow.id, 4, &[0x02]).await;
116
117		let edge = create_flow_edge(&mut txn, flow.id, node1.id, node2.id).await;
118
119		// Verify edge appears in flow index by listing edges for flow
120		let edges = CatalogStore::list_flow_edges_by_flow(&mut txn, flow.id).await.unwrap();
121		assert_eq!(edges.len(), 1);
122		assert_eq!(edges[0].id, edge.id);
123	}
124}