reifydb_engine/transaction/catalog/
flow.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use OperationType::{Create, Update};
5use reifydb_catalog::transaction::CatalogTrackFlowChangeOperations;
6use reifydb_core::interface::{
7	Change, FlowDef, FlowId, NamespaceId, OperationType, OperationType::Delete, TransactionalFlowChanges,
8};
9
10use crate::{StandardCommandTransaction, StandardQueryTransaction};
11
12impl CatalogTrackFlowChangeOperations for StandardCommandTransaction {
13	fn track_flow_def_created(&mut self, flow: FlowDef) -> reifydb_core::Result<()> {
14		let change = Change {
15			pre: None,
16			post: Some(flow),
17			op: Create,
18		};
19		self.changes.add_flow_def_change(change);
20		Ok(())
21	}
22
23	fn track_flow_def_updated(&mut self, pre: FlowDef, post: FlowDef) -> reifydb_core::Result<()> {
24		let change = Change {
25			pre: Some(pre),
26			post: Some(post),
27			op: Update,
28		};
29		self.changes.add_flow_def_change(change);
30		Ok(())
31	}
32
33	fn track_flow_def_deleted(&mut self, flow: FlowDef) -> reifydb_core::Result<()> {
34		let change = Change {
35			pre: Some(flow),
36			post: None,
37			op: Delete,
38		};
39		self.changes.add_flow_def_change(change);
40		Ok(())
41	}
42}
43
44impl TransactionalFlowChanges for StandardCommandTransaction {
45	fn find_flow(&self, id: FlowId) -> Option<&FlowDef> {
46		// Find the last change for this flow ID
47		for change in self.changes.flow_def.iter().rev() {
48			if let Some(flow) = &change.post {
49				if flow.id == id {
50					return Some(flow);
51				}
52			}
53			if let Some(flow) = &change.pre {
54				if flow.id == id && change.op == Delete {
55					// Flow was deleted
56					return None;
57				}
58			}
59		}
60		None
61	}
62
63	fn find_flow_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&FlowDef> {
64		// Find the last change for this flow name
65		for change in self.changes.flow_def.iter().rev() {
66			if let Some(flow) = &change.post {
67				if flow.namespace == namespace && flow.name == name {
68					return Some(flow);
69				}
70			}
71			if let Some(flow) = &change.pre {
72				if flow.namespace == namespace && flow.name == name && change.op == Delete {
73					// Flow was deleted
74					return None;
75				}
76			}
77		}
78		None
79	}
80
81	fn is_flow_deleted(&self, id: FlowId) -> bool {
82		// Check if this flow was deleted in this transaction
83		self.changes
84			.flow_def
85			.iter()
86			.any(|change| change.op == Delete && change.pre.as_ref().map(|f| f.id == id).unwrap_or(false))
87	}
88
89	fn is_flow_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool {
90		// Check if this flow was deleted in this transaction
91		self.changes.flow_def.iter().any(|change| {
92			change.op == Delete
93				&& change
94					.pre
95					.as_ref()
96					.map(|f| f.namespace == namespace && f.name == name)
97					.unwrap_or(false)
98		})
99	}
100}
101
102impl TransactionalFlowChanges for StandardQueryTransaction {
103	fn find_flow(&self, _id: FlowId) -> Option<&FlowDef> {
104		None
105	}
106
107	fn find_flow_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&FlowDef> {
108		None
109	}
110
111	fn is_flow_deleted(&self, _id: FlowId) -> bool {
112		false
113	}
114
115	fn is_flow_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
116		false
117	}
118}