reifydb_catalog/store/flow/
list.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::interface::{FlowDef, FlowKey, FlowStatus, Key, NamespaceId, QueryTransaction};
5
6use crate::{CatalogStore, store::flow::layout::flow};
7
8impl CatalogStore {
9	pub async fn list_flows_all(rx: &mut impl QueryTransaction) -> crate::Result<Vec<FlowDef>> {
10		let mut result = Vec::new();
11
12		let batch = rx.range(FlowKey::full_scan()).await?;
13
14		for entry in batch.items {
15			if let Some(key) = Key::decode(&entry.key) {
16				if let Key::Flow(flow_key) = key {
17					let flow_id = flow_key.flow;
18
19					let namespace_id =
20						NamespaceId(flow::LAYOUT.get_u64(&entry.values, flow::NAMESPACE));
21
22					let name = flow::LAYOUT.get_utf8(&entry.values, flow::NAME).to_string();
23
24					let status_u8 = flow::LAYOUT.get_u8(&entry.values, flow::STATUS);
25					let status = FlowStatus::from_u8(status_u8);
26
27					let flow_def = FlowDef {
28						id: flow_id,
29						namespace: namespace_id,
30						name,
31						status,
32					};
33
34					result.push(flow_def);
35				}
36			}
37		}
38
39		Ok(result)
40	}
41}
42
43#[cfg(test)]
44mod tests {
45	use reifydb_core::interface::FlowStatus;
46	use reifydb_engine::test_utils::create_test_command_transaction;
47
48	use crate::{
49		CatalogStore,
50		test_utils::{create_flow, create_namespace},
51	};
52
53	#[tokio::test]
54	async fn test_list_flows_all() {
55		let mut txn = create_test_command_transaction().await;
56		let namespace_one = create_namespace(&mut txn, "namespace_one").await;
57		let namespace_two = create_namespace(&mut txn, "namespace_two").await;
58
59		create_flow(&mut txn, "namespace_one", "flow_one").await;
60		create_flow(&mut txn, "namespace_one", "flow_two").await;
61		create_flow(&mut txn, "namespace_two", "flow_three").await;
62
63		let result = CatalogStore::list_flows_all(&mut txn).await.unwrap();
64		assert_eq!(result.len(), 3);
65
66		// Verify all flows are present (order may vary)
67		let flow_names: Vec<_> = result.iter().map(|f| f.name.as_str()).collect();
68		assert!(flow_names.contains(&"flow_one"));
69		assert!(flow_names.contains(&"flow_two"));
70		assert!(flow_names.contains(&"flow_three"));
71
72		// Verify namespaces and status for each flow
73		for flow in &result {
74			match flow.name.as_str() {
75				"flow_one" => {
76					assert_eq!(flow.namespace, namespace_one.id);
77					assert_eq!(flow.status, FlowStatus::Active);
78				}
79				"flow_two" => {
80					assert_eq!(flow.namespace, namespace_one.id);
81				}
82				"flow_three" => {
83					assert_eq!(flow.namespace, namespace_two.id);
84				}
85				_ => panic!("Unexpected flow name: {}", flow.name),
86			}
87		}
88	}
89
90	#[tokio::test]
91	async fn test_list_flows_empty() {
92		let mut txn = create_test_command_transaction().await;
93
94		let result = CatalogStore::list_flows_all(&mut txn).await.unwrap();
95		assert_eq!(result.len(), 0);
96	}
97
98	#[tokio::test]
99	async fn test_list_flows_all_with_different_statuses() {
100		let mut txn = create_test_command_transaction().await;
101		create_namespace(&mut txn, "test_namespace").await;
102
103		// Create flows with different statuses
104		create_flow(&mut txn, "test_namespace", "active_flow").await;
105
106		// Create a paused flow by directly using CatalogStore
107		use crate::store::flow::create::FlowToCreate;
108		let namespace =
109			CatalogStore::find_namespace_by_name(&mut txn, "test_namespace").await.unwrap().unwrap();
110		CatalogStore::create_flow(
111			&mut txn,
112			FlowToCreate {
113				fragment: None,
114				name: "paused_flow".to_string(),
115				namespace: namespace.id,
116				status: FlowStatus::Paused,
117			},
118		)
119		.await
120		.unwrap();
121
122		let result = CatalogStore::list_flows_all(&mut txn).await.unwrap();
123		assert_eq!(result.len(), 2);
124
125		// Verify both flows are present with correct statuses (order may vary)
126		for flow in &result {
127			match flow.name.as_str() {
128				"active_flow" => assert_eq!(flow.status, FlowStatus::Active),
129				"paused_flow" => assert_eq!(flow.status, FlowStatus::Paused),
130				_ => panic!("Unexpected flow name: {}", flow.name),
131			}
132		}
133	}
134}