Skip to main content

reifydb_transaction/transaction/catalog/
flow.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::interface::catalog::{
5	change::CatalogTrackFlowChangeOperations,
6	flow::{Flow, FlowId},
7	id::NamespaceId,
8};
9use reifydb_type::Result;
10
11use crate::{
12	change::{
13		Change,
14		OperationType::{Create, Delete, Update},
15		TransactionalFlowChanges,
16	},
17	transaction::admin::AdminTransaction,
18};
19
20impl CatalogTrackFlowChangeOperations for AdminTransaction {
21	fn track_flow_created(&mut self, flow: Flow) -> Result<()> {
22		let change = Change {
23			pre: None,
24			post: Some(flow),
25			op: Create,
26		};
27		self.changes.add_flow_change(change);
28		Ok(())
29	}
30
31	fn track_flow_updated(&mut self, pre: Flow, post: Flow) -> Result<()> {
32		let change = Change {
33			pre: Some(pre),
34			post: Some(post),
35			op: Update,
36		};
37		self.changes.add_flow_change(change);
38		Ok(())
39	}
40
41	fn track_flow_deleted(&mut self, flow: Flow) -> Result<()> {
42		let change = Change {
43			pre: Some(flow),
44			post: None,
45			op: Delete,
46		};
47		self.changes.add_flow_change(change);
48		Ok(())
49	}
50}
51
52impl TransactionalFlowChanges for AdminTransaction {
53	fn find_flow(&self, id: FlowId) -> Option<&Flow> {
54		for change in self.changes.flow.iter().rev() {
55			if let Some(flow) = &change.post
56				&& flow.id == id
57			{
58				return Some(flow);
59			}
60			if let Some(flow) = &change.pre
61				&& flow.id == id && change.op == Delete
62			{
63				return None;
64			}
65		}
66		None
67	}
68
69	fn find_flow_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Flow> {
70		for change in self.changes.flow.iter().rev() {
71			if let Some(flow) = &change.post
72				&& flow.namespace == namespace
73				&& flow.name == name
74			{
75				return Some(flow);
76			}
77			if let Some(flow) = &change.pre
78				&& flow.namespace == namespace
79				&& flow.name == name && change.op == Delete
80			{
81				return None;
82			}
83		}
84		None
85	}
86
87	fn is_flow_deleted(&self, id: FlowId) -> bool {
88		self.changes
89			.flow
90			.iter()
91			.any(|change| change.op == Delete && change.pre.as_ref().map(|f| f.id == id).unwrap_or(false))
92	}
93
94	fn is_flow_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool {
95		self.changes.flow.iter().any(|change| {
96			change.op == Delete
97				&& change
98					.pre
99					.as_ref()
100					.map(|f| f.namespace == namespace && f.name == name)
101					.unwrap_or(false)
102		})
103	}
104}