reifydb_catalog/store/flow_edge/
get.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{
5	Error,
6	interface::{FlowEdgeDef, FlowEdgeId, QueryTransaction},
7};
8use reifydb_type::internal;
9
10use crate::CatalogStore;
11
12impl CatalogStore {
13	pub async fn get_flow_edge(txn: &mut impl QueryTransaction, edge_id: FlowEdgeId) -> crate::Result<FlowEdgeDef> {
14		CatalogStore::find_flow_edge(txn, edge_id).await?.ok_or_else(|| {
15			Error(internal!(
16				"Flow edge with ID {:?} not found in catalog. This indicates a critical catalog inconsistency.",
17				edge_id
18			))
19		})
20	}
21}
22
23#[cfg(test)]
24mod tests {
25	use reifydb_core::interface::FlowEdgeId;
26	use reifydb_engine::test_utils::create_test_command_transaction;
27
28	use crate::{
29		CatalogStore,
30		test_utils::{create_flow_edge, create_flow_node, create_namespace, ensure_test_flow},
31	};
32
33	#[tokio::test]
34	async fn test_get_flow_edge_ok() {
35		let mut txn = create_test_command_transaction().await;
36		let _namespace = create_namespace(&mut txn, "test_namespace").await;
37		let flow = ensure_test_flow(&mut txn).await;
38
39		let node1 = create_flow_node(&mut txn, flow.id, 1, &[0x01]).await;
40		let node2 = create_flow_node(&mut txn, flow.id, 4, &[0x02]).await;
41		let edge = create_flow_edge(&mut txn, flow.id, node1.id, node2.id).await;
42
43		let result = CatalogStore::get_flow_edge(&mut txn, edge.id).await.unwrap();
44		assert_eq!(result.id, edge.id);
45		assert_eq!(result.flow, flow.id);
46		assert_eq!(result.source, node1.id);
47		assert_eq!(result.target, node2.id);
48	}
49
50	#[tokio::test]
51	async fn test_get_flow_edge_not_found() {
52		let mut txn = create_test_command_transaction().await;
53
54		let err = CatalogStore::get_flow_edge(&mut txn, FlowEdgeId(999)).await.unwrap_err();
55		assert_eq!(err.code, "INTERNAL_ERROR");
56		assert!(err.message.contains("FlowEdgeId(999)"));
57		assert!(err.message.contains("not found in catalog"));
58	}
59}