reifydb_catalog/store/flow_node/
get.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{
5	Error,
6	interface::{FlowNodeDef, FlowNodeId, QueryTransaction},
7};
8use reifydb_type::internal;
9
10use crate::CatalogStore;
11
12impl CatalogStore {
13	pub async fn get_flow_node(txn: &mut impl QueryTransaction, node_id: FlowNodeId) -> crate::Result<FlowNodeDef> {
14		CatalogStore::find_flow_node(txn, node_id).await?.ok_or_else(|| {
15			Error(internal!(
16				"Flow node with ID {:?} not found in catalog. This indicates a critical catalog inconsistency.",
17				node_id
18			))
19		})
20	}
21}
22
23#[cfg(test)]
24mod tests {
25	use reifydb_core::interface::FlowNodeId;
26	use reifydb_engine::test_utils::create_test_command_transaction;
27
28	use crate::{
29		CatalogStore,
30		test_utils::{create_flow_node, create_namespace, ensure_test_flow},
31	};
32
33	#[tokio::test]
34	async fn test_get_flow_node_ok() {
35		let mut txn = create_test_command_transaction().await;
36		let _namespace = create_namespace(&mut txn, "test_namespace").await;
37		let flow = ensure_test_flow(&mut txn).await;
38
39		let node = create_flow_node(&mut txn, flow.id, 1, &[0x01, 0x02, 0x03]).await;
40
41		let result = CatalogStore::get_flow_node(&mut txn, node.id).await.unwrap();
42		assert_eq!(result.id, node.id);
43		assert_eq!(result.flow, flow.id);
44		assert_eq!(result.node_type, 1);
45		assert_eq!(result.data.as_ref(), &[0x01, 0x02, 0x03]);
46	}
47
48	#[tokio::test]
49	async fn test_get_flow_node_not_found() {
50		let mut txn = create_test_command_transaction().await;
51
52		let err = CatalogStore::get_flow_node(&mut txn, FlowNodeId(999)).await.unwrap_err();
53		assert_eq!(err.code, "INTERNAL_ERROR");
54		assert!(err.message.contains("FlowNodeId(999)"));
55		assert!(err.message.contains("not found in catalog"));
56	}
57}