reifydb_catalog/store/flow/
get.rs1use reifydb_core::{
5 Error,
6 diagnostic::catalog::flow_not_found,
7 interface::{FlowDef, FlowId, NamespaceId, QueryTransaction},
8};
9use reifydb_type::{Fragment, internal};
10
11use crate::CatalogStore;
12
13impl CatalogStore {
14 pub async fn get_flow(rx: &mut impl QueryTransaction, flow: FlowId) -> crate::Result<FlowDef> {
15 CatalogStore::find_flow(rx, flow).await?.ok_or_else(|| {
16 Error(internal!(
17 "Flow with ID {:?} not found in catalog. This indicates a critical catalog inconsistency.",
18 flow
19 ))
20 })
21 }
22
23 pub async fn get_flow_by_name(
24 rx: &mut impl QueryTransaction,
25 namespace: NamespaceId,
26 name: impl AsRef<str>,
27 ) -> crate::Result<FlowDef> {
28 let name_ref = name.as_ref();
29
30 let namespace_name = Self::find_namespace(rx, namespace)
32 .await?
33 .map(|s| s.name)
34 .unwrap_or_else(|| format!("namespace_{}", namespace));
35
36 CatalogStore::find_flow_by_name(rx, namespace, name_ref)
37 .await?
38 .ok_or_else(|| Error(flow_not_found(Fragment::None, &namespace_name, name_ref)))
39 }
40}
41
42#[cfg(test)]
43mod tests {
44 use reifydb_core::interface::FlowId;
45 use reifydb_engine::test_utils::create_test_command_transaction;
46
47 use crate::{
48 CatalogStore,
49 test_utils::{create_flow, create_namespace},
50 };
51
52 #[tokio::test]
53 async fn test_get_flow_ok() {
54 let mut txn = create_test_command_transaction().await;
55 let namespace_one = create_namespace(&mut txn, "namespace_one").await;
56 let _namespace_two = create_namespace(&mut txn, "namespace_two").await;
57
58 create_flow(&mut txn, "namespace_one", "flow_one").await;
59 create_flow(&mut txn, "namespace_two", "flow_two").await;
60
61 let result = CatalogStore::get_flow(&mut txn, FlowId(1)).await.unwrap();
62 assert_eq!(result.id, FlowId(1));
63 assert_eq!(result.name, "flow_one");
64 assert_eq!(result.namespace, namespace_one.id);
65 }
66
67 #[tokio::test]
68 async fn test_get_flow_not_found() {
69 let mut txn = create_test_command_transaction().await;
70
71 let err = CatalogStore::get_flow(&mut txn, FlowId(42)).await.unwrap_err();
72 assert_eq!(err.code, "INTERNAL_ERROR");
73 assert!(err.message.contains("FlowId(42)"));
74 assert!(err.message.contains("not found in catalog"));
75 }
76
77 #[tokio::test]
78 async fn test_get_flow_by_name_ok() {
79 let mut txn = create_test_command_transaction().await;
80 let _namespace_one = create_namespace(&mut txn, "namespace_one").await;
81 let namespace_two = create_namespace(&mut txn, "namespace_two").await;
82
83 create_flow(&mut txn, "namespace_one", "flow_one").await;
84 create_flow(&mut txn, "namespace_two", "flow_two").await;
85
86 let result = CatalogStore::get_flow_by_name(&mut txn, namespace_two.id, "flow_two").await.unwrap();
87 assert_eq!(result.name, "flow_two");
88 assert_eq!(result.namespace, namespace_two.id);
89 }
90
91 #[tokio::test]
92 async fn test_get_flow_by_name_not_found() {
93 let mut txn = create_test_command_transaction().await;
94 let namespace = create_namespace(&mut txn, "test_namespace").await;
95
96 create_flow(&mut txn, "test_namespace", "flow_one").await;
97
98 let err = CatalogStore::get_flow_by_name(&mut txn, namespace.id, "flow_two").await.unwrap_err();
99 let diagnostic = err.diagnostic();
100 assert_eq!(diagnostic.code, "CA_031");
101 assert!(diagnostic.message.contains("flow_two"));
102 assert!(diagnostic.message.contains("not found"));
103 }
104
105 #[tokio::test]
106 async fn test_get_flow_by_name_different_namespace() {
107 let mut txn = create_test_command_transaction().await;
108 let _namespace_one = create_namespace(&mut txn, "namespace_one").await;
109 let namespace_two = create_namespace(&mut txn, "namespace_two").await;
110
111 create_flow(&mut txn, "namespace_one", "my_flow").await;
112
113 let err = CatalogStore::get_flow_by_name(&mut txn, namespace_two.id, "my_flow").await.unwrap_err();
115 assert_eq!(err.diagnostic().code, "CA_031");
116 }
117}