reifydb_catalog/store/flow_edge/
find.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use flow_edge::LAYOUT;
5use reifydb_core::{
6	interface::{FlowEdgeDef, FlowEdgeId, FlowId, FlowNodeId, QueryTransaction},
7	key::FlowEdgeKey,
8};
9
10use crate::{CatalogStore, store::flow_edge::layout::flow_edge};
11
12impl CatalogStore {
13	pub async fn find_flow_edge(
14		txn: &mut impl QueryTransaction,
15		edge: FlowEdgeId,
16	) -> crate::Result<Option<FlowEdgeDef>> {
17		let Some(multi) = txn.get(&FlowEdgeKey::encoded(edge)).await? else {
18			return Ok(None);
19		};
20
21		let row = multi.values;
22		let id = FlowEdgeId(LAYOUT.get_u64(&row, flow_edge::ID));
23		let flow = FlowId(LAYOUT.get_u64(&row, flow_edge::FLOW));
24		let source = FlowNodeId(LAYOUT.get_u64(&row, flow_edge::SOURCE));
25		let target = FlowNodeId(LAYOUT.get_u64(&row, flow_edge::TARGET));
26
27		Ok(Some(FlowEdgeDef {
28			id,
29			flow,
30			source,
31			target,
32		}))
33	}
34}
35
36#[cfg(test)]
37mod tests {
38	use reifydb_core::interface::FlowEdgeId;
39	use reifydb_engine::test_utils::create_test_command_transaction;
40
41	use crate::{
42		CatalogStore,
43		test_utils::{create_flow_edge, create_flow_node, create_namespace, ensure_test_flow},
44	};
45
46	#[tokio::test]
47	async fn test_find_flow_edge_ok() {
48		let mut txn = create_test_command_transaction().await;
49		let _namespace = create_namespace(&mut txn, "test_namespace").await;
50		let flow = ensure_test_flow(&mut txn).await;
51
52		let node1 = create_flow_node(&mut txn, flow.id, 1, &[0x01]).await;
53		let node2 = create_flow_node(&mut txn, flow.id, 4, &[0x02]).await;
54		let edge = create_flow_edge(&mut txn, flow.id, node1.id, node2.id).await;
55
56		let result = CatalogStore::find_flow_edge(&mut txn, edge.id).await.unwrap();
57		assert!(result.is_some());
58		let found = result.unwrap();
59		assert_eq!(found.id, edge.id);
60		assert_eq!(found.flow, flow.id);
61		assert_eq!(found.source, node1.id);
62		assert_eq!(found.target, node2.id);
63	}
64
65	#[tokio::test]
66	async fn test_find_flow_edge_not_found() {
67		let mut txn = create_test_command_transaction().await;
68
69		let result = CatalogStore::find_flow_edge(&mut txn, FlowEdgeId(999)).await.unwrap();
70		assert!(result.is_none());
71	}
72}