reifydb_catalog/store/flow/
update.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::interface::{CommandTransaction, FlowId, FlowKey, FlowStatus};
5
6use crate::{CatalogStore, store::flow::layout::flow};
7
8impl CatalogStore {
9	/// Update the name of a flow
10	pub async fn update_flow_name(
11		txn: &mut impl CommandTransaction,
12		flow_id: FlowId,
13		new_name: String,
14	) -> crate::Result<()> {
15		// Get the existing flow
16		let flow = Self::get_flow(txn, flow_id).await?;
17
18		// Update the name field
19		let mut row = flow::LAYOUT.allocate();
20		flow::LAYOUT.set_u64(&mut row, flow::ID, flow_id.0);
21		flow::LAYOUT.set_u64(&mut row, flow::NAMESPACE, flow.namespace.0);
22		flow::LAYOUT.set_utf8(&mut row, flow::NAME, &new_name);
23		flow::LAYOUT.set_u8(&mut row, flow::STATUS, flow.status as u8);
24
25		txn.set(&FlowKey::encoded(flow_id), row).await?;
26
27		Ok(())
28	}
29
30	/// Update the status of a flow
31	pub async fn update_flow_status(
32		txn: &mut impl CommandTransaction,
33		flow_id: FlowId,
34		status: FlowStatus,
35	) -> crate::Result<()> {
36		// Get the existing flow
37		let flow = Self::get_flow(txn, flow_id).await?;
38
39		// Update the status field
40		let mut row = flow::LAYOUT.allocate();
41		flow::LAYOUT.set_u64(&mut row, flow::ID, flow_id.0);
42		flow::LAYOUT.set_u64(&mut row, flow::NAMESPACE, flow.namespace.0);
43		flow::LAYOUT.set_utf8(&mut row, flow::NAME, &flow.name);
44		flow::LAYOUT.set_u8(&mut row, flow::STATUS, status as u8);
45
46		txn.set(&FlowKey::encoded(flow_id), row).await?;
47
48		Ok(())
49	}
50}
51
52#[cfg(test)]
53mod tests {
54	use reifydb_core::interface::FlowStatus;
55	use reifydb_engine::test_utils::create_test_command_transaction;
56
57	use super::*;
58	use crate::test_utils::ensure_test_flow;
59
60	#[tokio::test]
61	async fn test_update_flow_name() {
62		let mut txn = create_test_command_transaction().await;
63		let flow = ensure_test_flow(&mut txn).await;
64
65		// Update the name
66		CatalogStore::update_flow_name(&mut txn, flow.id, "new_flow_name".to_string()).await.unwrap();
67
68		// Verify update
69		let updated = CatalogStore::get_flow(&mut txn, flow.id).await.unwrap();
70		assert_eq!(updated.name, "new_flow_name");
71		assert_eq!(updated.namespace, flow.namespace);
72		assert_eq!(updated.status, flow.status);
73	}
74
75	#[tokio::test]
76	async fn test_update_flow_status() {
77		let mut txn = create_test_command_transaction().await;
78		let flow = ensure_test_flow(&mut txn).await;
79
80		// Initial status should be Active
81		assert_eq!(flow.status, FlowStatus::Active);
82
83		// Update to Paused
84		CatalogStore::update_flow_status(&mut txn, flow.id, FlowStatus::Paused).await.unwrap();
85		let updated = CatalogStore::get_flow(&mut txn, flow.id).await.unwrap();
86		assert_eq!(updated.status, FlowStatus::Paused);
87
88		// Update to Failed
89		CatalogStore::update_flow_status(&mut txn, flow.id, FlowStatus::Failed).await.unwrap();
90		let updated = CatalogStore::get_flow(&mut txn, flow.id).await.unwrap();
91		assert_eq!(updated.status, FlowStatus::Failed);
92
93		// Update back to Active
94		CatalogStore::update_flow_status(&mut txn, flow.id, FlowStatus::Active).await.unwrap();
95		let updated = CatalogStore::get_flow(&mut txn, flow.id).await.unwrap();
96		assert_eq!(updated.status, FlowStatus::Active);
97	}
98}