reifydb_catalog/store/flow/
find.rs1use reifydb_core::interface::{
5 FlowDef, FlowId, FlowKey, FlowStatus, MultiVersionValues, NamespaceFlowKey, NamespaceId, QueryTransaction,
6};
7
8use crate::{
9 CatalogStore,
10 store::flow::layout::{flow, flow_namespace},
11};
12
13impl CatalogStore {
14 pub async fn find_flow(rx: &mut impl QueryTransaction, id: FlowId) -> crate::Result<Option<FlowDef>> {
15 let Some(multi) = rx.get(&FlowKey::encoded(id)).await? else {
16 return Ok(None);
17 };
18
19 let row = multi.values;
20 let id = FlowId(flow::LAYOUT.get_u64(&row, flow::ID));
21 let namespace = NamespaceId(flow::LAYOUT.get_u64(&row, flow::NAMESPACE));
22 let name = flow::LAYOUT.get_utf8(&row, flow::NAME).to_string();
23 let status_u8 = flow::LAYOUT.get_u8(&row, flow::STATUS);
24 let status = FlowStatus::from_u8(status_u8);
25
26 Ok(Some(FlowDef {
27 id,
28 name,
29 namespace,
30 status,
31 }))
32 }
33
34 pub async fn find_flow_by_name(
35 rx: &mut impl QueryTransaction,
36 namespace: NamespaceId,
37 name: impl AsRef<str>,
38 ) -> crate::Result<Option<FlowDef>> {
39 let name = name.as_ref();
40 let batch = rx.range(NamespaceFlowKey::full_scan(namespace)).await?;
41 let Some(flow) = batch.items.iter().find_map(|multi: &MultiVersionValues| {
42 let row = &multi.values;
43 let flow_name = flow_namespace::LAYOUT.get_utf8(row, flow_namespace::NAME);
44 if name == flow_name {
45 Some(FlowId(flow_namespace::LAYOUT.get_u64(row, flow_namespace::ID)))
46 } else {
47 None
48 }
49 }) else {
50 return Ok(None);
51 };
52
53 Ok(Some(Self::get_flow(rx, flow).await?))
54 }
55}
56
57#[cfg(test)]
58mod tests {
59 use reifydb_engine::test_utils::create_test_command_transaction;
60
61 use crate::{
62 CatalogStore,
63 test_utils::{create_flow, create_namespace, ensure_test_namespace},
64 };
65
66 #[tokio::test]
67 async fn test_find_flow_by_name_ok() {
68 let mut txn = create_test_command_transaction().await;
69 let _namespace_one = create_namespace(&mut txn, "namespace_one").await;
70 let namespace_two = create_namespace(&mut txn, "namespace_two").await;
71
72 create_flow(&mut txn, "namespace_one", "flow_one").await;
73 create_flow(&mut txn, "namespace_two", "flow_two").await;
74
75 let result =
76 CatalogStore::find_flow_by_name(&mut txn, namespace_two.id, "flow_two").await.unwrap().unwrap();
77 assert_eq!(result.name, "flow_two");
78 assert_eq!(result.namespace, namespace_two.id);
79 }
80
81 #[tokio::test]
82 async fn test_find_flow_by_name_empty() {
83 let mut txn = create_test_command_transaction().await;
84 let test_namespace = ensure_test_namespace(&mut txn).await;
85
86 let result = CatalogStore::find_flow_by_name(&mut txn, test_namespace.id, "some_flow").await.unwrap();
87 assert!(result.is_none());
88 }
89
90 #[tokio::test]
91 async fn test_find_flow_by_name_not_found() {
92 let mut txn = create_test_command_transaction().await;
93 let test_namespace = ensure_test_namespace(&mut txn).await;
94
95 create_flow(&mut txn, "test_namespace", "flow_one").await;
96 create_flow(&mut txn, "test_namespace", "flow_two").await;
97
98 let result = CatalogStore::find_flow_by_name(&mut txn, test_namespace.id, "flow_three").await.unwrap();
99 assert!(result.is_none());
100 }
101
102 #[tokio::test]
103 async fn test_find_flow_by_name_different_namespace() {
104 let mut txn = create_test_command_transaction().await;
105 let _namespace_one = create_namespace(&mut txn, "namespace_one").await;
106 let namespace_two = create_namespace(&mut txn, "namespace_two").await;
107
108 create_flow(&mut txn, "namespace_one", "my_flow").await;
109
110 let result = CatalogStore::find_flow_by_name(&mut txn, namespace_two.id, "my_flow").await.unwrap();
112 assert!(result.is_none());
113 }
114
115 #[tokio::test]
116 async fn test_find_flow_by_name_case_sensitive() {
117 let mut txn = create_test_command_transaction().await;
118 let test_namespace = ensure_test_namespace(&mut txn).await;
119
120 create_flow(&mut txn, "test_namespace", "MyFlow").await;
121
122 let result = CatalogStore::find_flow_by_name(&mut txn, test_namespace.id, "myflow").await.unwrap();
124 assert!(result.is_none());
125
126 let result = CatalogStore::find_flow_by_name(&mut txn, test_namespace.id, "MyFlow").await.unwrap();
127 assert!(result.is_some());
128 }
129
130 #[tokio::test]
131 async fn test_find_flow_by_id() {
132 let mut txn = create_test_command_transaction().await;
133 ensure_test_namespace(&mut txn).await;
134
135 let flow = create_flow(&mut txn, "test_namespace", "test_flow").await;
136
137 let result = CatalogStore::find_flow(&mut txn, flow.id).await.unwrap().unwrap();
138 assert_eq!(result.id, flow.id);
139 assert_eq!(result.name, "test_flow");
140 }
141
142 #[tokio::test]
143 async fn test_find_flow_by_id_not_found() {
144 let mut txn = create_test_command_transaction().await;
145
146 let result = CatalogStore::find_flow(&mut txn, 999.into()).await.unwrap();
147 assert!(result.is_none());
148 }
149}