reifydb_catalog/store/flow_edge/
list.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use flow_edge_by_flow::LAYOUT;
5use reifydb_core::{
6	interface::{EncodableKey, FlowEdgeDef, FlowEdgeId, FlowId, FlowNodeId, QueryTransaction},
7	key::{FlowEdgeByFlowKey, FlowEdgeKey},
8};
9
10use crate::{
11	CatalogStore,
12	store::flow_edge::layout::{flow_edge, flow_edge_by_flow},
13};
14
15impl CatalogStore {
16	pub async fn list_flow_edges_by_flow(
17		txn: &mut impl QueryTransaction,
18		flow_id: FlowId,
19	) -> crate::Result<Vec<FlowEdgeDef>> {
20		let batch = txn.range(FlowEdgeByFlowKey::full_scan(flow_id)).await?;
21		let edge_ids: Vec<FlowEdgeId> = batch
22			.items
23			.iter()
24			.map(|multi| FlowEdgeId(LAYOUT.get_u64(&multi.values, flow_edge_by_flow::ID)))
25			.collect();
26
27		// Then fetch each edge
28		let mut edges = Vec::new();
29		for edge_id in edge_ids {
30			if let Some(edge) = Self::find_flow_edge(txn, edge_id).await? {
31				edges.push(edge);
32			}
33		}
34
35		// Sort by edge_id to ensure consistent ordering (edges are stored in descending order)
36		edges.sort_by_key(|e| e.id);
37
38		Ok(edges)
39	}
40
41	pub async fn list_flow_edges_all(txn: &mut impl QueryTransaction) -> crate::Result<Vec<FlowEdgeDef>> {
42		let mut result = Vec::new();
43
44		let batch = txn.range(FlowEdgeKey::full_scan()).await?;
45		let entries: Vec<_> = batch.items.into_iter().collect();
46
47		for entry in entries {
48			if let Some(flow_edge_key) = FlowEdgeKey::decode(&entry.key) {
49				let edge_id = flow_edge_key.edge;
50				let flow_id = FlowId(flow_edge::LAYOUT.get_u64(&entry.values, flow_edge::FLOW));
51				let source = FlowNodeId(flow_edge::LAYOUT.get_u64(&entry.values, flow_edge::SOURCE));
52				let target = FlowNodeId(flow_edge::LAYOUT.get_u64(&entry.values, flow_edge::TARGET));
53
54				let edge_def = FlowEdgeDef {
55					id: edge_id,
56					flow: flow_id,
57					source,
58					target,
59				};
60
61				result.push(edge_def);
62			}
63		}
64
65		Ok(result)
66	}
67}
68
69#[cfg(test)]
70mod tests {
71	use reifydb_engine::test_utils::create_test_command_transaction;
72
73	use crate::{
74		CatalogStore,
75		test_utils::{create_flow, create_flow_edge, create_flow_node, create_namespace, ensure_test_flow},
76	};
77
78	#[tokio::test]
79	async fn test_list_flow_edges_by_flow() {
80		let mut txn = create_test_command_transaction().await;
81		let _namespace = create_namespace(&mut txn, "test_namespace").await;
82		let flow = ensure_test_flow(&mut txn).await;
83
84		let node1 = create_flow_node(&mut txn, flow.id, 1, &[0x01]).await;
85		let node2 = create_flow_node(&mut txn, flow.id, 4, &[0x02]).await;
86		let edge = create_flow_edge(&mut txn, flow.id, node1.id, node2.id).await;
87
88		let edges = CatalogStore::list_flow_edges_by_flow(&mut txn, flow.id).await.unwrap();
89		assert_eq!(edges.len(), 1);
90		assert_eq!(edges[0].id, edge.id);
91	}
92
93	#[tokio::test]
94	async fn test_list_flow_edges_by_flow_empty() {
95		let mut txn = create_test_command_transaction().await;
96		let _namespace = create_namespace(&mut txn, "test_namespace").await;
97		let flow = ensure_test_flow(&mut txn).await;
98
99		let edges = CatalogStore::list_flow_edges_by_flow(&mut txn, flow.id).await.unwrap();
100		assert!(edges.is_empty());
101	}
102
103	#[tokio::test]
104	async fn test_list_flow_edges_by_flow_multiple() {
105		let mut txn = create_test_command_transaction().await;
106		let _namespace = create_namespace(&mut txn, "test_namespace").await;
107		let flow = ensure_test_flow(&mut txn).await;
108
109		let node1 = create_flow_node(&mut txn, flow.id, 1, &[0x01]).await;
110		let node2 = create_flow_node(&mut txn, flow.id, 4, &[0x02]).await;
111		let node3 = create_flow_node(&mut txn, flow.id, 5, &[0x03]).await;
112
113		let edge1 = create_flow_edge(&mut txn, flow.id, node1.id, node2.id).await;
114		let edge2 = create_flow_edge(&mut txn, flow.id, node2.id, node3.id).await;
115
116		let edges = CatalogStore::list_flow_edges_by_flow(&mut txn, flow.id).await.unwrap();
117		assert_eq!(edges.len(), 2);
118
119		// Verify all edges are present
120		let ids: Vec<_> = edges.iter().map(|e| e.id).collect();
121		assert!(ids.contains(&edge1.id));
122		assert!(ids.contains(&edge2.id));
123	}
124
125	#[tokio::test]
126	async fn test_list_flow_edges_all() {
127		let mut txn = create_test_command_transaction().await;
128		let _namespace = create_namespace(&mut txn, "test_namespace").await;
129		let flow = ensure_test_flow(&mut txn).await;
130
131		let node1 = create_flow_node(&mut txn, flow.id, 1, &[0x01]).await;
132		let node2 = create_flow_node(&mut txn, flow.id, 4, &[0x02]).await;
133
134		create_flow_edge(&mut txn, flow.id, node1.id, node2.id).await;
135
136		let edges = CatalogStore::list_flow_edges_all(&mut txn).await.unwrap();
137		assert_eq!(edges.len(), 1);
138	}
139
140	#[tokio::test]
141	async fn test_list_flow_edges_all_empty() {
142		let mut txn = create_test_command_transaction().await;
143
144		let edges = CatalogStore::list_flow_edges_all(&mut txn).await.unwrap();
145		assert!(edges.is_empty());
146	}
147
148	#[tokio::test]
149	async fn test_list_flow_edges_all_multiple_flows() {
150		let mut txn = create_test_command_transaction().await;
151		let _namespace = create_namespace(&mut txn, "test_namespace").await;
152
153		let flow1 = create_flow(&mut txn, "test_namespace", "flow_one").await;
154		let flow2 = create_flow(&mut txn, "test_namespace", "flow_two").await;
155
156		let node1a = create_flow_node(&mut txn, flow1.id, 1, &[0x01]).await;
157		let node1b = create_flow_node(&mut txn, flow1.id, 4, &[0x02]).await;
158		let node2a = create_flow_node(&mut txn, flow2.id, 1, &[0x03]).await;
159		let node2b = create_flow_node(&mut txn, flow2.id, 4, &[0x04]).await;
160
161		create_flow_edge(&mut txn, flow1.id, node1a.id, node1b.id).await;
162		create_flow_edge(&mut txn, flow2.id, node2a.id, node2b.id).await;
163
164		let all_edges = CatalogStore::list_flow_edges_all(&mut txn).await.unwrap();
165		assert_eq!(all_edges.len(), 2);
166
167		// Verify edges are from correct flows
168		let flow1_edges: Vec<_> = all_edges.iter().filter(|e| e.flow == flow1.id).collect();
169		let flow2_edges: Vec<_> = all_edges.iter().filter(|e| e.flow == flow2.id).collect();
170
171		assert_eq!(flow1_edges.len(), 1);
172		assert_eq!(flow2_edges.len(), 1);
173	}
174}