reifydb_catalog/store/flow/
list.rs1use reifydb_core::interface::{FlowDef, FlowKey, FlowStatus, Key, NamespaceId, QueryTransaction};
5
6use crate::{CatalogStore, store::flow::layout::flow};
7
8impl CatalogStore {
9 pub async fn list_flows_all(rx: &mut impl QueryTransaction) -> crate::Result<Vec<FlowDef>> {
10 let mut result = Vec::new();
11
12 let batch = rx.range(FlowKey::full_scan()).await?;
13
14 for entry in batch.items {
15 if let Some(key) = Key::decode(&entry.key) {
16 if let Key::Flow(flow_key) = key {
17 let flow_id = flow_key.flow;
18
19 let namespace_id =
20 NamespaceId(flow::LAYOUT.get_u64(&entry.values, flow::NAMESPACE));
21
22 let name = flow::LAYOUT.get_utf8(&entry.values, flow::NAME).to_string();
23
24 let status_u8 = flow::LAYOUT.get_u8(&entry.values, flow::STATUS);
25 let status = FlowStatus::from_u8(status_u8);
26
27 let flow_def = FlowDef {
28 id: flow_id,
29 namespace: namespace_id,
30 name,
31 status,
32 };
33
34 result.push(flow_def);
35 }
36 }
37 }
38
39 Ok(result)
40 }
41}
42
43#[cfg(test)]
44mod tests {
45 use reifydb_core::interface::FlowStatus;
46 use reifydb_engine::test_utils::create_test_command_transaction;
47
48 use crate::{
49 CatalogStore,
50 test_utils::{create_flow, create_namespace},
51 };
52
53 #[tokio::test]
54 async fn test_list_flows_all() {
55 let mut txn = create_test_command_transaction().await;
56 let namespace_one = create_namespace(&mut txn, "namespace_one").await;
57 let namespace_two = create_namespace(&mut txn, "namespace_two").await;
58
59 create_flow(&mut txn, "namespace_one", "flow_one").await;
60 create_flow(&mut txn, "namespace_one", "flow_two").await;
61 create_flow(&mut txn, "namespace_two", "flow_three").await;
62
63 let result = CatalogStore::list_flows_all(&mut txn).await.unwrap();
64 assert_eq!(result.len(), 3);
65
66 let flow_names: Vec<_> = result.iter().map(|f| f.name.as_str()).collect();
68 assert!(flow_names.contains(&"flow_one"));
69 assert!(flow_names.contains(&"flow_two"));
70 assert!(flow_names.contains(&"flow_three"));
71
72 for flow in &result {
74 match flow.name.as_str() {
75 "flow_one" => {
76 assert_eq!(flow.namespace, namespace_one.id);
77 assert_eq!(flow.status, FlowStatus::Active);
78 }
79 "flow_two" => {
80 assert_eq!(flow.namespace, namespace_one.id);
81 }
82 "flow_three" => {
83 assert_eq!(flow.namespace, namespace_two.id);
84 }
85 _ => panic!("Unexpected flow name: {}", flow.name),
86 }
87 }
88 }
89
90 #[tokio::test]
91 async fn test_list_flows_empty() {
92 let mut txn = create_test_command_transaction().await;
93
94 let result = CatalogStore::list_flows_all(&mut txn).await.unwrap();
95 assert_eq!(result.len(), 0);
96 }
97
98 #[tokio::test]
99 async fn test_list_flows_all_with_different_statuses() {
100 let mut txn = create_test_command_transaction().await;
101 create_namespace(&mut txn, "test_namespace").await;
102
103 create_flow(&mut txn, "test_namespace", "active_flow").await;
105
106 use crate::store::flow::create::FlowToCreate;
108 let namespace =
109 CatalogStore::find_namespace_by_name(&mut txn, "test_namespace").await.unwrap().unwrap();
110 CatalogStore::create_flow(
111 &mut txn,
112 FlowToCreate {
113 fragment: None,
114 name: "paused_flow".to_string(),
115 namespace: namespace.id,
116 status: FlowStatus::Paused,
117 },
118 )
119 .await
120 .unwrap();
121
122 let result = CatalogStore::list_flows_all(&mut txn).await.unwrap();
123 assert_eq!(result.len(), 2);
124
125 for flow in &result {
127 match flow.name.as_str() {
128 "active_flow" => assert_eq!(flow.status, FlowStatus::Active),
129 "paused_flow" => assert_eq!(flow.status, FlowStatus::Paused),
130 _ => panic!("Unexpected flow name: {}", flow.name),
131 }
132 }
133 }
134}