reifydb_transaction/transaction/catalog/
flow.rs1use reifydb_core::interface::catalog::{
5 change::CatalogTrackFlowChangeOperations,
6 flow::{Flow, FlowId},
7 id::NamespaceId,
8};
9use reifydb_type::Result;
10
11use crate::{
12 change::{
13 Change,
14 OperationType::{Create, Delete, Update},
15 TransactionalFlowChanges,
16 },
17 transaction::admin::AdminTransaction,
18};
19
20impl CatalogTrackFlowChangeOperations for AdminTransaction {
21 fn track_flow_created(&mut self, flow: Flow) -> Result<()> {
22 let change = Change {
23 pre: None,
24 post: Some(flow),
25 op: Create,
26 };
27 self.changes.add_flow_change(change);
28 Ok(())
29 }
30
31 fn track_flow_updated(&mut self, pre: Flow, post: Flow) -> Result<()> {
32 let change = Change {
33 pre: Some(pre),
34 post: Some(post),
35 op: Update,
36 };
37 self.changes.add_flow_change(change);
38 Ok(())
39 }
40
41 fn track_flow_deleted(&mut self, flow: Flow) -> Result<()> {
42 let change = Change {
43 pre: Some(flow),
44 post: None,
45 op: Delete,
46 };
47 self.changes.add_flow_change(change);
48 Ok(())
49 }
50}
51
52impl TransactionalFlowChanges for AdminTransaction {
53 fn find_flow(&self, id: FlowId) -> Option<&Flow> {
54 for change in self.changes.flow.iter().rev() {
55 if let Some(flow) = &change.post
56 && flow.id == id
57 {
58 return Some(flow);
59 }
60 if let Some(flow) = &change.pre
61 && flow.id == id && change.op == Delete
62 {
63 return None;
64 }
65 }
66 None
67 }
68
69 fn find_flow_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Flow> {
70 for change in self.changes.flow.iter().rev() {
71 if let Some(flow) = &change.post
72 && flow.namespace == namespace
73 && flow.name == name
74 {
75 return Some(flow);
76 }
77 if let Some(flow) = &change.pre
78 && flow.namespace == namespace
79 && flow.name == name && change.op == Delete
80 {
81 return None;
82 }
83 }
84 None
85 }
86
87 fn is_flow_deleted(&self, id: FlowId) -> bool {
88 self.changes
89 .flow
90 .iter()
91 .any(|change| change.op == Delete && change.pre.as_ref().map(|f| f.id == id).unwrap_or(false))
92 }
93
94 fn is_flow_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool {
95 self.changes.flow.iter().any(|change| {
96 change.op == Delete
97 && change
98 .pre
99 .as_ref()
100 .map(|f| f.namespace == namespace && f.name == name)
101 .unwrap_or(false)
102 })
103 }
104}