reifydb_catalog/store/flow_node/
list.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{
5	interface::{EncodableKey, FlowId, FlowNodeDef, FlowNodeId, QueryTransaction},
6	key::FlowNodeKey,
7};
8
9use crate::{
10	CatalogStore,
11	store::flow_node::layout::{flow_node, flow_node_by_flow},
12};
13
14impl CatalogStore {
15	pub async fn list_flow_nodes_by_flow(
16		txn: &mut impl QueryTransaction,
17		flow_id: FlowId,
18	) -> crate::Result<Vec<FlowNodeDef>> {
19		// First collect all node IDs
20		let batch = txn.range(reifydb_core::key::FlowNodeByFlowKey::full_scan(flow_id)).await?;
21		let node_ids: Vec<FlowNodeId> = batch
22			.items
23			.iter()
24			.map(|multi| {
25				FlowNodeId(flow_node_by_flow::LAYOUT.get_u64(&multi.values, flow_node_by_flow::ID))
26			})
27			.collect();
28
29		// Then fetch each node
30		let mut nodes = Vec::new();
31		for node_id in node_ids {
32			if let Some(node) = Self::find_flow_node(txn, node_id).await? {
33				nodes.push(node);
34			}
35		}
36
37		Ok(nodes)
38	}
39
40	pub async fn list_flow_nodes_all(txn: &mut impl QueryTransaction) -> crate::Result<Vec<FlowNodeDef>> {
41		let mut result = Vec::new();
42
43		let batch = txn.range(FlowNodeKey::full_scan()).await?;
44		let entries: Vec<_> = batch.items.into_iter().collect();
45
46		for entry in entries {
47			if let Some(flow_node_key) = FlowNodeKey::decode(&entry.key) {
48				let node_id = flow_node_key.node;
49				let flow_id = FlowId(flow_node::LAYOUT.get_u64(&entry.values, flow_node::FLOW));
50				let node_type = flow_node::LAYOUT.get_u8(&entry.values, flow_node::TYPE);
51				let data = flow_node::LAYOUT.get_blob(&entry.values, flow_node::DATA).clone();
52
53				let node_def = FlowNodeDef {
54					id: node_id,
55					flow: flow_id,
56					node_type,
57					data,
58				};
59
60				result.push(node_def);
61			}
62		}
63
64		Ok(result)
65	}
66}
67
68#[cfg(test)]
69mod tests {
70	use reifydb_engine::test_utils::create_test_command_transaction;
71
72	use crate::{
73		CatalogStore,
74		test_utils::{create_flow, create_flow_node, create_namespace, ensure_test_flow},
75	};
76
77	#[tokio::test]
78	async fn test_list_flow_nodes_by_flow() {
79		let mut txn = create_test_command_transaction().await;
80		let _namespace = create_namespace(&mut txn, "test_namespace").await;
81		let flow = ensure_test_flow(&mut txn).await;
82
83		let node = create_flow_node(&mut txn, flow.id, 1, &[0x01]).await;
84
85		let nodes = CatalogStore::list_flow_nodes_by_flow(&mut txn, flow.id).await.unwrap();
86		assert_eq!(nodes.len(), 1);
87		assert_eq!(nodes[0].id, node.id);
88	}
89
90	#[tokio::test]
91	async fn test_list_flow_nodes_by_flow_empty() {
92		let mut txn = create_test_command_transaction().await;
93		let _namespace = create_namespace(&mut txn, "test_namespace").await;
94		let flow = ensure_test_flow(&mut txn).await;
95
96		let nodes = CatalogStore::list_flow_nodes_by_flow(&mut txn, flow.id).await.unwrap();
97		assert!(nodes.is_empty());
98	}
99
100	#[tokio::test]
101	async fn test_list_flow_nodes_by_flow_multiple() {
102		let mut txn = create_test_command_transaction().await;
103		let _namespace = create_namespace(&mut txn, "test_namespace").await;
104		let flow = ensure_test_flow(&mut txn).await;
105
106		let node1 = create_flow_node(&mut txn, flow.id, 1, &[0x01]).await;
107		let node2 = create_flow_node(&mut txn, flow.id, 4, &[0x02]).await;
108		let node3 = create_flow_node(&mut txn, flow.id, 5, &[0x03]).await;
109
110		let nodes = CatalogStore::list_flow_nodes_by_flow(&mut txn, flow.id).await.unwrap();
111		assert_eq!(nodes.len(), 3);
112
113		// Verify all nodes are present
114		let ids: Vec<_> = nodes.iter().map(|n| n.id).collect();
115		assert!(ids.contains(&node1.id));
116		assert!(ids.contains(&node2.id));
117		assert!(ids.contains(&node3.id));
118	}
119
120	#[tokio::test]
121	async fn test_list_flow_nodes_all() {
122		let mut txn = create_test_command_transaction().await;
123		let _namespace = create_namespace(&mut txn, "test_namespace").await;
124		let flow = ensure_test_flow(&mut txn).await;
125
126		create_flow_node(&mut txn, flow.id, 1, &[0x01]).await;
127		create_flow_node(&mut txn, flow.id, 4, &[0x02]).await;
128
129		let nodes = CatalogStore::list_flow_nodes_all(&mut txn).await.unwrap();
130		assert_eq!(nodes.len(), 2);
131	}
132
133	#[tokio::test]
134	async fn test_list_flow_nodes_all_empty() {
135		let mut txn = create_test_command_transaction().await;
136
137		let nodes = CatalogStore::list_flow_nodes_all(&mut txn).await.unwrap();
138		assert!(nodes.is_empty());
139	}
140
141	#[tokio::test]
142	async fn test_list_flow_nodes_all_multiple_flows() {
143		let mut txn = create_test_command_transaction().await;
144		let _namespace = create_namespace(&mut txn, "test_namespace").await;
145
146		let flow1 = create_flow(&mut txn, "test_namespace", "flow_one").await;
147		let flow2 = create_flow(&mut txn, "test_namespace", "flow_two").await;
148
149		create_flow_node(&mut txn, flow1.id, 1, &[0x01]).await;
150		create_flow_node(&mut txn, flow1.id, 4, &[0x02]).await;
151		create_flow_node(&mut txn, flow2.id, 1, &[0x03]).await;
152
153		let all_nodes = CatalogStore::list_flow_nodes_all(&mut txn).await.unwrap();
154		assert_eq!(all_nodes.len(), 3);
155
156		// Verify nodes are from correct flows
157		let flow1_nodes: Vec<_> = all_nodes.iter().filter(|n| n.flow == flow1.id).collect();
158		let flow2_nodes: Vec<_> = all_nodes.iter().filter(|n| n.flow == flow2.id).collect();
159
160		assert_eq!(flow1_nodes.len(), 2);
161		assert_eq!(flow2_nodes.len(), 1);
162	}
163}