reifydb_catalog/store/flow_edge/
get.rs1use reifydb_core::{
5 Error,
6 interface::{FlowEdgeDef, FlowEdgeId, QueryTransaction},
7};
8use reifydb_type::internal;
9
10use crate::CatalogStore;
11
12impl CatalogStore {
13 pub async fn get_flow_edge(txn: &mut impl QueryTransaction, edge_id: FlowEdgeId) -> crate::Result<FlowEdgeDef> {
14 CatalogStore::find_flow_edge(txn, edge_id).await?.ok_or_else(|| {
15 Error(internal!(
16 "Flow edge with ID {:?} not found in catalog. This indicates a critical catalog inconsistency.",
17 edge_id
18 ))
19 })
20 }
21}
22
23#[cfg(test)]
24mod tests {
25 use reifydb_core::interface::FlowEdgeId;
26 use reifydb_engine::test_utils::create_test_command_transaction;
27
28 use crate::{
29 CatalogStore,
30 test_utils::{create_flow_edge, create_flow_node, create_namespace, ensure_test_flow},
31 };
32
33 #[tokio::test]
34 async fn test_get_flow_edge_ok() {
35 let mut txn = create_test_command_transaction().await;
36 let _namespace = create_namespace(&mut txn, "test_namespace").await;
37 let flow = ensure_test_flow(&mut txn).await;
38
39 let node1 = create_flow_node(&mut txn, flow.id, 1, &[0x01]).await;
40 let node2 = create_flow_node(&mut txn, flow.id, 4, &[0x02]).await;
41 let edge = create_flow_edge(&mut txn, flow.id, node1.id, node2.id).await;
42
43 let result = CatalogStore::get_flow_edge(&mut txn, edge.id).await.unwrap();
44 assert_eq!(result.id, edge.id);
45 assert_eq!(result.flow, flow.id);
46 assert_eq!(result.source, node1.id);
47 assert_eq!(result.target, node2.id);
48 }
49
50 #[tokio::test]
51 async fn test_get_flow_edge_not_found() {
52 let mut txn = create_test_command_transaction().await;
53
54 let err = CatalogStore::get_flow_edge(&mut txn, FlowEdgeId(999)).await.unwrap_err();
55 assert_eq!(err.code, "INTERNAL_ERROR");
56 assert!(err.message.contains("FlowEdgeId(999)"));
57 assert!(err.message.contains("not found in catalog"));
58 }
59}