reifydb_catalog/store/flow/
find.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::interface::{
5	FlowDef, FlowId, FlowKey, FlowStatus, MultiVersionValues, NamespaceFlowKey, NamespaceId, QueryTransaction,
6};
7
8use crate::{
9	CatalogStore,
10	store::flow::layout::{flow, flow_namespace},
11};
12
13impl CatalogStore {
14	pub async fn find_flow(rx: &mut impl QueryTransaction, id: FlowId) -> crate::Result<Option<FlowDef>> {
15		let Some(multi) = rx.get(&FlowKey::encoded(id)).await? else {
16			return Ok(None);
17		};
18
19		let row = multi.values;
20		let id = FlowId(flow::LAYOUT.get_u64(&row, flow::ID));
21		let namespace = NamespaceId(flow::LAYOUT.get_u64(&row, flow::NAMESPACE));
22		let name = flow::LAYOUT.get_utf8(&row, flow::NAME).to_string();
23		let status_u8 = flow::LAYOUT.get_u8(&row, flow::STATUS);
24		let status = FlowStatus::from_u8(status_u8);
25
26		Ok(Some(FlowDef {
27			id,
28			name,
29			namespace,
30			status,
31		}))
32	}
33
34	pub async fn find_flow_by_name(
35		rx: &mut impl QueryTransaction,
36		namespace: NamespaceId,
37		name: impl AsRef<str>,
38	) -> crate::Result<Option<FlowDef>> {
39		let name = name.as_ref();
40		let batch = rx.range(NamespaceFlowKey::full_scan(namespace)).await?;
41		let Some(flow) = batch.items.iter().find_map(|multi: &MultiVersionValues| {
42			let row = &multi.values;
43			let flow_name = flow_namespace::LAYOUT.get_utf8(row, flow_namespace::NAME);
44			if name == flow_name {
45				Some(FlowId(flow_namespace::LAYOUT.get_u64(row, flow_namespace::ID)))
46			} else {
47				None
48			}
49		}) else {
50			return Ok(None);
51		};
52
53		Ok(Some(Self::get_flow(rx, flow).await?))
54	}
55}
56
57#[cfg(test)]
58mod tests {
59	use reifydb_engine::test_utils::create_test_command_transaction;
60
61	use crate::{
62		CatalogStore,
63		test_utils::{create_flow, create_namespace, ensure_test_namespace},
64	};
65
66	#[tokio::test]
67	async fn test_find_flow_by_name_ok() {
68		let mut txn = create_test_command_transaction().await;
69		let _namespace_one = create_namespace(&mut txn, "namespace_one").await;
70		let namespace_two = create_namespace(&mut txn, "namespace_two").await;
71
72		create_flow(&mut txn, "namespace_one", "flow_one").await;
73		create_flow(&mut txn, "namespace_two", "flow_two").await;
74
75		let result =
76			CatalogStore::find_flow_by_name(&mut txn, namespace_two.id, "flow_two").await.unwrap().unwrap();
77		assert_eq!(result.name, "flow_two");
78		assert_eq!(result.namespace, namespace_two.id);
79	}
80
81	#[tokio::test]
82	async fn test_find_flow_by_name_empty() {
83		let mut txn = create_test_command_transaction().await;
84		let test_namespace = ensure_test_namespace(&mut txn).await;
85
86		let result = CatalogStore::find_flow_by_name(&mut txn, test_namespace.id, "some_flow").await.unwrap();
87		assert!(result.is_none());
88	}
89
90	#[tokio::test]
91	async fn test_find_flow_by_name_not_found() {
92		let mut txn = create_test_command_transaction().await;
93		let test_namespace = ensure_test_namespace(&mut txn).await;
94
95		create_flow(&mut txn, "test_namespace", "flow_one").await;
96		create_flow(&mut txn, "test_namespace", "flow_two").await;
97
98		let result = CatalogStore::find_flow_by_name(&mut txn, test_namespace.id, "flow_three").await.unwrap();
99		assert!(result.is_none());
100	}
101
102	#[tokio::test]
103	async fn test_find_flow_by_name_different_namespace() {
104		let mut txn = create_test_command_transaction().await;
105		let _namespace_one = create_namespace(&mut txn, "namespace_one").await;
106		let namespace_two = create_namespace(&mut txn, "namespace_two").await;
107
108		create_flow(&mut txn, "namespace_one", "my_flow").await;
109
110		// Flow exists in namespace_one but not in namespace_two
111		let result = CatalogStore::find_flow_by_name(&mut txn, namespace_two.id, "my_flow").await.unwrap();
112		assert!(result.is_none());
113	}
114
115	#[tokio::test]
116	async fn test_find_flow_by_name_case_sensitive() {
117		let mut txn = create_test_command_transaction().await;
118		let test_namespace = ensure_test_namespace(&mut txn).await;
119
120		create_flow(&mut txn, "test_namespace", "MyFlow").await;
121
122		// Flow names are case-sensitive
123		let result = CatalogStore::find_flow_by_name(&mut txn, test_namespace.id, "myflow").await.unwrap();
124		assert!(result.is_none());
125
126		let result = CatalogStore::find_flow_by_name(&mut txn, test_namespace.id, "MyFlow").await.unwrap();
127		assert!(result.is_some());
128	}
129
130	#[tokio::test]
131	async fn test_find_flow_by_id() {
132		let mut txn = create_test_command_transaction().await;
133		ensure_test_namespace(&mut txn).await;
134
135		let flow = create_flow(&mut txn, "test_namespace", "test_flow").await;
136
137		let result = CatalogStore::find_flow(&mut txn, flow.id).await.unwrap().unwrap();
138		assert_eq!(result.id, flow.id);
139		assert_eq!(result.name, "test_flow");
140	}
141
142	#[tokio::test]
143	async fn test_find_flow_by_id_not_found() {
144		let mut txn = create_test_command_transaction().await;
145
146		let result = CatalogStore::find_flow(&mut txn, 999.into()).await.unwrap();
147		assert!(result.is_none());
148	}
149}