reifydb_engine/transaction/catalog/
flow.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use OperationType::{Create, Update};
5use reifydb_catalog::transaction::CatalogTrackFlowChangeOperations;
6use reifydb_core::interface::{
7	Change, FlowDef, FlowId, NamespaceId, OperationType, OperationType::Delete, TransactionalFlowChanges,
8};
9use reifydb_type::IntoFragment;
10
11use crate::{StandardCommandTransaction, StandardQueryTransaction};
12
13impl CatalogTrackFlowChangeOperations for StandardCommandTransaction {
14	fn track_flow_def_created(&mut self, flow: FlowDef) -> reifydb_core::Result<()> {
15		let change = Change {
16			pre: None,
17			post: Some(flow),
18			op: Create,
19		};
20		self.changes.add_flow_def_change(change);
21		Ok(())
22	}
23
24	fn track_flow_def_updated(&mut self, pre: FlowDef, post: FlowDef) -> reifydb_core::Result<()> {
25		let change = Change {
26			pre: Some(pre),
27			post: Some(post),
28			op: Update,
29		};
30		self.changes.add_flow_def_change(change);
31		Ok(())
32	}
33
34	fn track_flow_def_deleted(&mut self, flow: FlowDef) -> reifydb_core::Result<()> {
35		let change = Change {
36			pre: Some(flow),
37			post: None,
38			op: Delete,
39		};
40		self.changes.add_flow_def_change(change);
41		Ok(())
42	}
43}
44
45impl TransactionalFlowChanges for StandardCommandTransaction {
46	fn find_flow(&self, id: FlowId) -> Option<&FlowDef> {
47		// Find the last change for this flow ID
48		for change in self.changes.flow_def.iter().rev() {
49			if let Some(flow) = &change.post {
50				if flow.id == id {
51					return Some(flow);
52				}
53			}
54			if let Some(flow) = &change.pre {
55				if flow.id == id && change.op == Delete {
56					// Flow was deleted
57					return None;
58				}
59			}
60		}
61		None
62	}
63
64	fn find_flow_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> Option<&FlowDef> {
65		let name = name.into_fragment();
66		// Find the last change for this flow name
67		for change in self.changes.flow_def.iter().rev() {
68			if let Some(flow) = &change.post {
69				if flow.namespace == namespace && flow.name == name.text() {
70					return Some(flow);
71				}
72			}
73			if let Some(flow) = &change.pre {
74				if flow.namespace == namespace && flow.name == name.text() && change.op == Delete {
75					// Flow was deleted
76					return None;
77				}
78			}
79		}
80		None
81	}
82
83	fn is_flow_deleted(&self, id: FlowId) -> bool {
84		// Check if this flow was deleted in this transaction
85		self.changes
86			.flow_def
87			.iter()
88			.any(|change| change.op == Delete && change.pre.as_ref().map(|f| f.id == id).unwrap_or(false))
89	}
90
91	fn is_flow_deleted_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> bool {
92		let name = name.into_fragment();
93		// Check if this flow was deleted in this transaction
94		self.changes.flow_def.iter().any(|change| {
95			change.op == Delete
96				&& change
97					.pre
98					.as_ref()
99					.map(|f| f.namespace == namespace && f.name == name.text())
100					.unwrap_or(false)
101		})
102	}
103}
104
105impl TransactionalFlowChanges for StandardQueryTransaction {
106	fn find_flow(&self, _id: FlowId) -> Option<&FlowDef> {
107		None
108	}
109
110	fn find_flow_by_name<'a>(&self, _namespace: NamespaceId, _name: impl IntoFragment<'a>) -> Option<&FlowDef> {
111		None
112	}
113
114	fn is_flow_deleted(&self, _id: FlowId) -> bool {
115		false
116	}
117
118	fn is_flow_deleted_by_name<'a>(&self, _namespace: NamespaceId, _name: impl IntoFragment<'a>) -> bool {
119		false
120	}
121}