reifydb_catalog/store/flow_node/
list.rs1use reifydb_core::{
5 interface::{EncodableKey, FlowId, FlowNodeDef, FlowNodeId, QueryTransaction},
6 key::FlowNodeKey,
7};
8
9use crate::{
10 CatalogStore,
11 store::flow_node::layout::{flow_node, flow_node_by_flow},
12};
13
14impl CatalogStore {
15 pub async fn list_flow_nodes_by_flow(
16 txn: &mut impl QueryTransaction,
17 flow_id: FlowId,
18 ) -> crate::Result<Vec<FlowNodeDef>> {
19 let batch = txn.range(reifydb_core::key::FlowNodeByFlowKey::full_scan(flow_id)).await?;
21 let node_ids: Vec<FlowNodeId> = batch
22 .items
23 .iter()
24 .map(|multi| {
25 FlowNodeId(flow_node_by_flow::LAYOUT.get_u64(&multi.values, flow_node_by_flow::ID))
26 })
27 .collect();
28
29 let mut nodes = Vec::new();
31 for node_id in node_ids {
32 if let Some(node) = Self::find_flow_node(txn, node_id).await? {
33 nodes.push(node);
34 }
35 }
36
37 Ok(nodes)
38 }
39
40 pub async fn list_flow_nodes_all(txn: &mut impl QueryTransaction) -> crate::Result<Vec<FlowNodeDef>> {
41 let mut result = Vec::new();
42
43 let batch = txn.range(FlowNodeKey::full_scan()).await?;
44 let entries: Vec<_> = batch.items.into_iter().collect();
45
46 for entry in entries {
47 if let Some(flow_node_key) = FlowNodeKey::decode(&entry.key) {
48 let node_id = flow_node_key.node;
49 let flow_id = FlowId(flow_node::LAYOUT.get_u64(&entry.values, flow_node::FLOW));
50 let node_type = flow_node::LAYOUT.get_u8(&entry.values, flow_node::TYPE);
51 let data = flow_node::LAYOUT.get_blob(&entry.values, flow_node::DATA).clone();
52
53 let node_def = FlowNodeDef {
54 id: node_id,
55 flow: flow_id,
56 node_type,
57 data,
58 };
59
60 result.push(node_def);
61 }
62 }
63
64 Ok(result)
65 }
66}
67
68#[cfg(test)]
69mod tests {
70 use reifydb_engine::test_utils::create_test_command_transaction;
71
72 use crate::{
73 CatalogStore,
74 test_utils::{create_flow, create_flow_node, create_namespace, ensure_test_flow},
75 };
76
77 #[tokio::test]
78 async fn test_list_flow_nodes_by_flow() {
79 let mut txn = create_test_command_transaction().await;
80 let _namespace = create_namespace(&mut txn, "test_namespace").await;
81 let flow = ensure_test_flow(&mut txn).await;
82
83 let node = create_flow_node(&mut txn, flow.id, 1, &[0x01]).await;
84
85 let nodes = CatalogStore::list_flow_nodes_by_flow(&mut txn, flow.id).await.unwrap();
86 assert_eq!(nodes.len(), 1);
87 assert_eq!(nodes[0].id, node.id);
88 }
89
90 #[tokio::test]
91 async fn test_list_flow_nodes_by_flow_empty() {
92 let mut txn = create_test_command_transaction().await;
93 let _namespace = create_namespace(&mut txn, "test_namespace").await;
94 let flow = ensure_test_flow(&mut txn).await;
95
96 let nodes = CatalogStore::list_flow_nodes_by_flow(&mut txn, flow.id).await.unwrap();
97 assert!(nodes.is_empty());
98 }
99
100 #[tokio::test]
101 async fn test_list_flow_nodes_by_flow_multiple() {
102 let mut txn = create_test_command_transaction().await;
103 let _namespace = create_namespace(&mut txn, "test_namespace").await;
104 let flow = ensure_test_flow(&mut txn).await;
105
106 let node1 = create_flow_node(&mut txn, flow.id, 1, &[0x01]).await;
107 let node2 = create_flow_node(&mut txn, flow.id, 4, &[0x02]).await;
108 let node3 = create_flow_node(&mut txn, flow.id, 5, &[0x03]).await;
109
110 let nodes = CatalogStore::list_flow_nodes_by_flow(&mut txn, flow.id).await.unwrap();
111 assert_eq!(nodes.len(), 3);
112
113 let ids: Vec<_> = nodes.iter().map(|n| n.id).collect();
115 assert!(ids.contains(&node1.id));
116 assert!(ids.contains(&node2.id));
117 assert!(ids.contains(&node3.id));
118 }
119
120 #[tokio::test]
121 async fn test_list_flow_nodes_all() {
122 let mut txn = create_test_command_transaction().await;
123 let _namespace = create_namespace(&mut txn, "test_namespace").await;
124 let flow = ensure_test_flow(&mut txn).await;
125
126 create_flow_node(&mut txn, flow.id, 1, &[0x01]).await;
127 create_flow_node(&mut txn, flow.id, 4, &[0x02]).await;
128
129 let nodes = CatalogStore::list_flow_nodes_all(&mut txn).await.unwrap();
130 assert_eq!(nodes.len(), 2);
131 }
132
133 #[tokio::test]
134 async fn test_list_flow_nodes_all_empty() {
135 let mut txn = create_test_command_transaction().await;
136
137 let nodes = CatalogStore::list_flow_nodes_all(&mut txn).await.unwrap();
138 assert!(nodes.is_empty());
139 }
140
141 #[tokio::test]
142 async fn test_list_flow_nodes_all_multiple_flows() {
143 let mut txn = create_test_command_transaction().await;
144 let _namespace = create_namespace(&mut txn, "test_namespace").await;
145
146 let flow1 = create_flow(&mut txn, "test_namespace", "flow_one").await;
147 let flow2 = create_flow(&mut txn, "test_namespace", "flow_two").await;
148
149 create_flow_node(&mut txn, flow1.id, 1, &[0x01]).await;
150 create_flow_node(&mut txn, flow1.id, 4, &[0x02]).await;
151 create_flow_node(&mut txn, flow2.id, 1, &[0x03]).await;
152
153 let all_nodes = CatalogStore::list_flow_nodes_all(&mut txn).await.unwrap();
154 assert_eq!(all_nodes.len(), 3);
155
156 let flow1_nodes: Vec<_> = all_nodes.iter().filter(|n| n.flow == flow1.id).collect();
158 let flow2_nodes: Vec<_> = all_nodes.iter().filter(|n| n.flow == flow2.id).collect();
159
160 assert_eq!(flow1_nodes.len(), 2);
161 assert_eq!(flow2_nodes.len(), 1);
162 }
163}