reifydb_catalog/store/flow/
get.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{
5	Error,
6	diagnostic::catalog::flow_not_found,
7	interface::{FlowDef, FlowId, NamespaceId, QueryTransaction},
8};
9use reifydb_type::{Fragment, internal};
10
11use crate::CatalogStore;
12
13impl CatalogStore {
14	pub async fn get_flow(rx: &mut impl QueryTransaction, flow: FlowId) -> crate::Result<FlowDef> {
15		CatalogStore::find_flow(rx, flow).await?.ok_or_else(|| {
16			Error(internal!(
17				"Flow with ID {:?} not found in catalog. This indicates a critical catalog inconsistency.",
18				flow
19			))
20		})
21	}
22
23	pub async fn get_flow_by_name(
24		rx: &mut impl QueryTransaction,
25		namespace: NamespaceId,
26		name: impl AsRef<str>,
27	) -> crate::Result<FlowDef> {
28		let name_ref = name.as_ref();
29
30		// Look up namespace name for error message
31		let namespace_name = Self::find_namespace(rx, namespace)
32			.await?
33			.map(|s| s.name)
34			.unwrap_or_else(|| format!("namespace_{}", namespace));
35
36		CatalogStore::find_flow_by_name(rx, namespace, name_ref)
37			.await?
38			.ok_or_else(|| Error(flow_not_found(Fragment::None, &namespace_name, name_ref)))
39	}
40}
41
42#[cfg(test)]
43mod tests {
44	use reifydb_core::interface::FlowId;
45	use reifydb_engine::test_utils::create_test_command_transaction;
46
47	use crate::{
48		CatalogStore,
49		test_utils::{create_flow, create_namespace},
50	};
51
52	#[tokio::test]
53	async fn test_get_flow_ok() {
54		let mut txn = create_test_command_transaction().await;
55		let namespace_one = create_namespace(&mut txn, "namespace_one").await;
56		let _namespace_two = create_namespace(&mut txn, "namespace_two").await;
57
58		create_flow(&mut txn, "namespace_one", "flow_one").await;
59		create_flow(&mut txn, "namespace_two", "flow_two").await;
60
61		let result = CatalogStore::get_flow(&mut txn, FlowId(1)).await.unwrap();
62		assert_eq!(result.id, FlowId(1));
63		assert_eq!(result.name, "flow_one");
64		assert_eq!(result.namespace, namespace_one.id);
65	}
66
67	#[tokio::test]
68	async fn test_get_flow_not_found() {
69		let mut txn = create_test_command_transaction().await;
70
71		let err = CatalogStore::get_flow(&mut txn, FlowId(42)).await.unwrap_err();
72		assert_eq!(err.code, "INTERNAL_ERROR");
73		assert!(err.message.contains("FlowId(42)"));
74		assert!(err.message.contains("not found in catalog"));
75	}
76
77	#[tokio::test]
78	async fn test_get_flow_by_name_ok() {
79		let mut txn = create_test_command_transaction().await;
80		let _namespace_one = create_namespace(&mut txn, "namespace_one").await;
81		let namespace_two = create_namespace(&mut txn, "namespace_two").await;
82
83		create_flow(&mut txn, "namespace_one", "flow_one").await;
84		create_flow(&mut txn, "namespace_two", "flow_two").await;
85
86		let result = CatalogStore::get_flow_by_name(&mut txn, namespace_two.id, "flow_two").await.unwrap();
87		assert_eq!(result.name, "flow_two");
88		assert_eq!(result.namespace, namespace_two.id);
89	}
90
91	#[tokio::test]
92	async fn test_get_flow_by_name_not_found() {
93		let mut txn = create_test_command_transaction().await;
94		let namespace = create_namespace(&mut txn, "test_namespace").await;
95
96		create_flow(&mut txn, "test_namespace", "flow_one").await;
97
98		let err = CatalogStore::get_flow_by_name(&mut txn, namespace.id, "flow_two").await.unwrap_err();
99		let diagnostic = err.diagnostic();
100		assert_eq!(diagnostic.code, "CA_031");
101		assert!(diagnostic.message.contains("flow_two"));
102		assert!(diagnostic.message.contains("not found"));
103	}
104
105	#[tokio::test]
106	async fn test_get_flow_by_name_different_namespace() {
107		let mut txn = create_test_command_transaction().await;
108		let _namespace_one = create_namespace(&mut txn, "namespace_one").await;
109		let namespace_two = create_namespace(&mut txn, "namespace_two").await;
110
111		create_flow(&mut txn, "namespace_one", "my_flow").await;
112
113		// Flow exists in namespace_one but we're looking in namespace_two
114		let err = CatalogStore::get_flow_by_name(&mut txn, namespace_two.id, "my_flow").await.unwrap_err();
115		assert_eq!(err.diagnostic().code, "CA_031");
116	}
117}