reifydb_catalog/store/flow/
update.rs1use reifydb_core::interface::{CommandTransaction, FlowId, FlowKey, FlowStatus};
5
6use crate::{CatalogStore, store::flow::layout::flow};
7
8impl CatalogStore {
9 pub async fn update_flow_name(
11 txn: &mut impl CommandTransaction,
12 flow_id: FlowId,
13 new_name: String,
14 ) -> crate::Result<()> {
15 let flow = Self::get_flow(txn, flow_id).await?;
17
18 let mut row = flow::LAYOUT.allocate();
20 flow::LAYOUT.set_u64(&mut row, flow::ID, flow_id.0);
21 flow::LAYOUT.set_u64(&mut row, flow::NAMESPACE, flow.namespace.0);
22 flow::LAYOUT.set_utf8(&mut row, flow::NAME, &new_name);
23 flow::LAYOUT.set_u8(&mut row, flow::STATUS, flow.status as u8);
24
25 txn.set(&FlowKey::encoded(flow_id), row).await?;
26
27 Ok(())
28 }
29
30 pub async fn update_flow_status(
32 txn: &mut impl CommandTransaction,
33 flow_id: FlowId,
34 status: FlowStatus,
35 ) -> crate::Result<()> {
36 let flow = Self::get_flow(txn, flow_id).await?;
38
39 let mut row = flow::LAYOUT.allocate();
41 flow::LAYOUT.set_u64(&mut row, flow::ID, flow_id.0);
42 flow::LAYOUT.set_u64(&mut row, flow::NAMESPACE, flow.namespace.0);
43 flow::LAYOUT.set_utf8(&mut row, flow::NAME, &flow.name);
44 flow::LAYOUT.set_u8(&mut row, flow::STATUS, status as u8);
45
46 txn.set(&FlowKey::encoded(flow_id), row).await?;
47
48 Ok(())
49 }
50}
51
52#[cfg(test)]
53mod tests {
54 use reifydb_core::interface::FlowStatus;
55 use reifydb_engine::test_utils::create_test_command_transaction;
56
57 use super::*;
58 use crate::test_utils::ensure_test_flow;
59
60 #[tokio::test]
61 async fn test_update_flow_name() {
62 let mut txn = create_test_command_transaction().await;
63 let flow = ensure_test_flow(&mut txn).await;
64
65 CatalogStore::update_flow_name(&mut txn, flow.id, "new_flow_name".to_string()).await.unwrap();
67
68 let updated = CatalogStore::get_flow(&mut txn, flow.id).await.unwrap();
70 assert_eq!(updated.name, "new_flow_name");
71 assert_eq!(updated.namespace, flow.namespace);
72 assert_eq!(updated.status, flow.status);
73 }
74
75 #[tokio::test]
76 async fn test_update_flow_status() {
77 let mut txn = create_test_command_transaction().await;
78 let flow = ensure_test_flow(&mut txn).await;
79
80 assert_eq!(flow.status, FlowStatus::Active);
82
83 CatalogStore::update_flow_status(&mut txn, flow.id, FlowStatus::Paused).await.unwrap();
85 let updated = CatalogStore::get_flow(&mut txn, flow.id).await.unwrap();
86 assert_eq!(updated.status, FlowStatus::Paused);
87
88 CatalogStore::update_flow_status(&mut txn, flow.id, FlowStatus::Failed).await.unwrap();
90 let updated = CatalogStore::get_flow(&mut txn, flow.id).await.unwrap();
91 assert_eq!(updated.status, FlowStatus::Failed);
92
93 CatalogStore::update_flow_status(&mut txn, flow.id, FlowStatus::Active).await.unwrap();
95 let updated = CatalogStore::get_flow(&mut txn, flow.id).await.unwrap();
96 assert_eq!(updated.status, FlowStatus::Active);
97 }
98}