reifydb_engine/transaction/catalog/
flow.rs1use OperationType::{Create, Update};
5use reifydb_catalog::transaction::CatalogTrackFlowChangeOperations;
6use reifydb_core::interface::{
7 Change, FlowDef, FlowId, NamespaceId, OperationType, OperationType::Delete, TransactionalFlowChanges,
8};
9
10use crate::{StandardCommandTransaction, StandardQueryTransaction};
11
12impl CatalogTrackFlowChangeOperations for StandardCommandTransaction {
13 fn track_flow_def_created(&mut self, flow: FlowDef) -> reifydb_core::Result<()> {
14 let change = Change {
15 pre: None,
16 post: Some(flow),
17 op: Create,
18 };
19 self.changes.add_flow_def_change(change);
20 Ok(())
21 }
22
23 fn track_flow_def_updated(&mut self, pre: FlowDef, post: FlowDef) -> reifydb_core::Result<()> {
24 let change = Change {
25 pre: Some(pre),
26 post: Some(post),
27 op: Update,
28 };
29 self.changes.add_flow_def_change(change);
30 Ok(())
31 }
32
33 fn track_flow_def_deleted(&mut self, flow: FlowDef) -> reifydb_core::Result<()> {
34 let change = Change {
35 pre: Some(flow),
36 post: None,
37 op: Delete,
38 };
39 self.changes.add_flow_def_change(change);
40 Ok(())
41 }
42}
43
44impl TransactionalFlowChanges for StandardCommandTransaction {
45 fn find_flow(&self, id: FlowId) -> Option<&FlowDef> {
46 for change in self.changes.flow_def.iter().rev() {
48 if let Some(flow) = &change.post {
49 if flow.id == id {
50 return Some(flow);
51 }
52 }
53 if let Some(flow) = &change.pre {
54 if flow.id == id && change.op == Delete {
55 return None;
57 }
58 }
59 }
60 None
61 }
62
63 fn find_flow_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&FlowDef> {
64 for change in self.changes.flow_def.iter().rev() {
66 if let Some(flow) = &change.post {
67 if flow.namespace == namespace && flow.name == name {
68 return Some(flow);
69 }
70 }
71 if let Some(flow) = &change.pre {
72 if flow.namespace == namespace && flow.name == name && change.op == Delete {
73 return None;
75 }
76 }
77 }
78 None
79 }
80
81 fn is_flow_deleted(&self, id: FlowId) -> bool {
82 self.changes
84 .flow_def
85 .iter()
86 .any(|change| change.op == Delete && change.pre.as_ref().map(|f| f.id == id).unwrap_or(false))
87 }
88
89 fn is_flow_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool {
90 self.changes.flow_def.iter().any(|change| {
92 change.op == Delete
93 && change
94 .pre
95 .as_ref()
96 .map(|f| f.namespace == namespace && f.name == name)
97 .unwrap_or(false)
98 })
99 }
100}
101
102impl TransactionalFlowChanges for StandardQueryTransaction {
103 fn find_flow(&self, _id: FlowId) -> Option<&FlowDef> {
104 None
105 }
106
107 fn find_flow_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&FlowDef> {
108 None
109 }
110
111 fn is_flow_deleted(&self, _id: FlowId) -> bool {
112 false
113 }
114
115 fn is_flow_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
116 false
117 }
118}