reifydb_engine/transaction/catalog/
flow.rs1use OperationType::{Create, Update};
5use reifydb_catalog::transaction::CatalogTrackFlowChangeOperations;
6use reifydb_core::interface::{
7 Change, FlowDef, FlowId, NamespaceId, OperationType, OperationType::Delete, TransactionalFlowChanges,
8};
9use reifydb_type::IntoFragment;
10
11use crate::{StandardCommandTransaction, StandardQueryTransaction};
12
13impl CatalogTrackFlowChangeOperations for StandardCommandTransaction {
14 fn track_flow_def_created(&mut self, flow: FlowDef) -> reifydb_core::Result<()> {
15 let change = Change {
16 pre: None,
17 post: Some(flow),
18 op: Create,
19 };
20 self.changes.add_flow_def_change(change);
21 Ok(())
22 }
23
24 fn track_flow_def_updated(&mut self, pre: FlowDef, post: FlowDef) -> reifydb_core::Result<()> {
25 let change = Change {
26 pre: Some(pre),
27 post: Some(post),
28 op: Update,
29 };
30 self.changes.add_flow_def_change(change);
31 Ok(())
32 }
33
34 fn track_flow_def_deleted(&mut self, flow: FlowDef) -> reifydb_core::Result<()> {
35 let change = Change {
36 pre: Some(flow),
37 post: None,
38 op: Delete,
39 };
40 self.changes.add_flow_def_change(change);
41 Ok(())
42 }
43}
44
45impl TransactionalFlowChanges for StandardCommandTransaction {
46 fn find_flow(&self, id: FlowId) -> Option<&FlowDef> {
47 for change in self.changes.flow_def.iter().rev() {
49 if let Some(flow) = &change.post {
50 if flow.id == id {
51 return Some(flow);
52 }
53 }
54 if let Some(flow) = &change.pre {
55 if flow.id == id && change.op == Delete {
56 return None;
58 }
59 }
60 }
61 None
62 }
63
64 fn find_flow_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> Option<&FlowDef> {
65 let name = name.into_fragment();
66 for change in self.changes.flow_def.iter().rev() {
68 if let Some(flow) = &change.post {
69 if flow.namespace == namespace && flow.name == name.text() {
70 return Some(flow);
71 }
72 }
73 if let Some(flow) = &change.pre {
74 if flow.namespace == namespace && flow.name == name.text() && change.op == Delete {
75 return None;
77 }
78 }
79 }
80 None
81 }
82
83 fn is_flow_deleted(&self, id: FlowId) -> bool {
84 self.changes
86 .flow_def
87 .iter()
88 .any(|change| change.op == Delete && change.pre.as_ref().map(|f| f.id == id).unwrap_or(false))
89 }
90
91 fn is_flow_deleted_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> bool {
92 let name = name.into_fragment();
93 self.changes.flow_def.iter().any(|change| {
95 change.op == Delete
96 && change
97 .pre
98 .as_ref()
99 .map(|f| f.namespace == namespace && f.name == name.text())
100 .unwrap_or(false)
101 })
102 }
103}
104
105impl TransactionalFlowChanges for StandardQueryTransaction {
106 fn find_flow(&self, _id: FlowId) -> Option<&FlowDef> {
107 None
108 }
109
110 fn find_flow_by_name<'a>(&self, _namespace: NamespaceId, _name: impl IntoFragment<'a>) -> Option<&FlowDef> {
111 None
112 }
113
114 fn is_flow_deleted(&self, _id: FlowId) -> bool {
115 false
116 }
117
118 fn is_flow_deleted_by_name<'a>(&self, _namespace: NamespaceId, _name: impl IntoFragment<'a>) -> bool {
119 false
120 }
121}