reifydb_catalog/store/flow_edge/
create.rs1use reifydb_core::{
5 interface::{CommandTransaction, FlowEdgeDef},
6 key::{FlowEdgeByFlowKey, FlowEdgeKey},
7};
8
9use crate::store::flow_edge::layout::{flow_edge, flow_edge_by_flow};
10
11impl crate::CatalogStore {
12 pub async fn create_flow_edge(txn: &mut impl CommandTransaction, edge_def: &FlowEdgeDef) -> crate::Result<()> {
13 let mut row = flow_edge::LAYOUT.allocate();
15 flow_edge::LAYOUT.set_u64(&mut row, flow_edge::ID, edge_def.id);
16 flow_edge::LAYOUT.set_u64(&mut row, flow_edge::FLOW, edge_def.flow);
17 flow_edge::LAYOUT.set_u64(&mut row, flow_edge::SOURCE, edge_def.source);
18 flow_edge::LAYOUT.set_u64(&mut row, flow_edge::TARGET, edge_def.target);
19
20 txn.set(&FlowEdgeKey::encoded(edge_def.id), row).await?;
21
22 let mut index_row = flow_edge_by_flow::LAYOUT.allocate();
24 flow_edge_by_flow::LAYOUT.set_u64(&mut index_row, flow_edge_by_flow::FLOW, edge_def.flow);
25 flow_edge_by_flow::LAYOUT.set_u64(&mut index_row, flow_edge_by_flow::ID, edge_def.id);
26
27 txn.set(&FlowEdgeByFlowKey::encoded(edge_def.flow, edge_def.id), index_row).await?;
28
29 Ok(())
30 }
31}
32
33#[cfg(test)]
34mod tests {
35 use reifydb_engine::test_utils::create_test_command_transaction;
36
37 use crate::{
38 CatalogStore,
39 test_utils::{create_flow, create_flow_edge, create_flow_node, create_namespace, ensure_test_flow},
40 };
41
42 #[tokio::test]
43 async fn test_create_flow_edge() {
44 let mut txn = create_test_command_transaction().await;
45 let _namespace = create_namespace(&mut txn, "test_namespace").await;
46 let flow = ensure_test_flow(&mut txn).await;
47
48 let node1 = create_flow_node(&mut txn, flow.id, 1, &[0x01]).await;
49 let node2 = create_flow_node(&mut txn, flow.id, 4, &[0x02]).await;
50
51 let edge = create_flow_edge(&mut txn, flow.id, node1.id, node2.id).await;
52
53 let result = CatalogStore::get_flow_edge(&mut txn, edge.id).await.unwrap();
55 assert_eq!(result.id, edge.id);
56 assert_eq!(result.flow, flow.id);
57 assert_eq!(result.source, node1.id);
58 assert_eq!(result.target, node2.id);
59 }
60
61 #[tokio::test]
62 async fn test_create_multiple_edges_same_flow() {
63 let mut txn = create_test_command_transaction().await;
64 let _namespace = create_namespace(&mut txn, "test_namespace").await;
65 let flow = ensure_test_flow(&mut txn).await;
66
67 let node1 = create_flow_node(&mut txn, flow.id, 1, &[0x01]).await;
68 let node2 = create_flow_node(&mut txn, flow.id, 4, &[0x02]).await;
69 let node3 = create_flow_node(&mut txn, flow.id, 5, &[0x03]).await;
70
71 let edge1 = create_flow_edge(&mut txn, flow.id, node1.id, node2.id).await;
72 let edge2 = create_flow_edge(&mut txn, flow.id, node2.id, node3.id).await;
73
74 let result1 = CatalogStore::get_flow_edge(&mut txn, edge1.id).await.unwrap();
76 let result2 = CatalogStore::get_flow_edge(&mut txn, edge2.id).await.unwrap();
77
78 assert_eq!(result1.source, node1.id);
79 assert_eq!(result1.target, node2.id);
80 assert_eq!(result2.source, node2.id);
81 assert_eq!(result2.target, node3.id);
82 }
83
84 #[tokio::test]
85 async fn test_create_edges_different_flows() {
86 let mut txn = create_test_command_transaction().await;
87 let _namespace = create_namespace(&mut txn, "test_namespace").await;
88
89 let flow1 = create_flow(&mut txn, "test_namespace", "flow_one").await;
90 let flow2 = create_flow(&mut txn, "test_namespace", "flow_two").await;
91
92 let node1a = create_flow_node(&mut txn, flow1.id, 1, &[0x01]).await;
93 let node1b = create_flow_node(&mut txn, flow1.id, 4, &[0x02]).await;
94 let node2a = create_flow_node(&mut txn, flow2.id, 1, &[0x03]).await;
95 let node2b = create_flow_node(&mut txn, flow2.id, 4, &[0x04]).await;
96
97 let edge1 = create_flow_edge(&mut txn, flow1.id, node1a.id, node1b.id).await;
98 let edge2 = create_flow_edge(&mut txn, flow2.id, node2a.id, node2b.id).await;
99
100 let result1 = CatalogStore::get_flow_edge(&mut txn, edge1.id).await.unwrap();
102 let result2 = CatalogStore::get_flow_edge(&mut txn, edge2.id).await.unwrap();
103
104 assert_eq!(result1.flow, flow1.id);
105 assert_eq!(result2.flow, flow2.id);
106 }
107
108 #[tokio::test]
109 async fn test_edge_appears_in_index() {
110 let mut txn = create_test_command_transaction().await;
111 let _namespace = create_namespace(&mut txn, "test_namespace").await;
112 let flow = ensure_test_flow(&mut txn).await;
113
114 let node1 = create_flow_node(&mut txn, flow.id, 1, &[0x01]).await;
115 let node2 = create_flow_node(&mut txn, flow.id, 4, &[0x02]).await;
116
117 let edge = create_flow_edge(&mut txn, flow.id, node1.id, node2.id).await;
118
119 let edges = CatalogStore::list_flow_edges_by_flow(&mut txn, flow.id).await.unwrap();
121 assert_eq!(edges.len(), 1);
122 assert_eq!(edges[0].id, edge.id);
123 }
124}