reifydb_transaction/transaction/catalog/
sink.rs1use reifydb_core::interface::catalog::{
5 change::CatalogTrackSinkChangeOperations,
6 id::{NamespaceId, SinkId},
7 sink::Sink,
8};
9use reifydb_type::Result;
10
11use crate::{
12 change::{
13 Change,
14 OperationType::{Create, Delete},
15 TransactionalSinkChanges,
16 },
17 transaction::admin::AdminTransaction,
18};
19
20impl CatalogTrackSinkChangeOperations for AdminTransaction {
21 fn track_sink_created(&mut self, sink: Sink) -> Result<()> {
22 let change = Change {
23 pre: None,
24 post: Some(sink),
25 op: Create,
26 };
27 self.changes.add_sink_change(change);
28 Ok(())
29 }
30
31 fn track_sink_deleted(&mut self, sink: Sink) -> Result<()> {
32 let change = Change {
33 pre: Some(sink),
34 post: None,
35 op: Delete,
36 };
37 self.changes.add_sink_change(change);
38 Ok(())
39 }
40}
41
42impl TransactionalSinkChanges for AdminTransaction {
43 fn find_sink(&self, id: SinkId) -> Option<&Sink> {
44 for change in self.changes.sink.iter().rev() {
45 if let Some(sink) = &change.post
46 && sink.id == id
47 {
48 return Some(sink);
49 }
50 if let Some(sink) = &change.pre
51 && sink.id == id && change.op == Delete
52 {
53 return None;
54 }
55 }
56 None
57 }
58
59 fn find_sink_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Sink> {
60 for change in self.changes.sink.iter().rev() {
61 if let Some(sink) = &change.post
62 && sink.namespace == namespace
63 && sink.name == name
64 {
65 return Some(sink);
66 }
67 if let Some(sink) = &change.pre
68 && sink.namespace == namespace
69 && sink.name == name && change.op == Delete
70 {
71 return None;
72 }
73 }
74 None
75 }
76
77 fn is_sink_deleted(&self, id: SinkId) -> bool {
78 self.changes
79 .sink
80 .iter()
81 .any(|change| change.op == Delete && change.pre.as_ref().map(|s| s.id == id).unwrap_or(false))
82 }
83
84 fn is_sink_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool {
85 self.changes.sink.iter().any(|change| {
86 change.op == Delete
87 && change
88 .pre
89 .as_ref()
90 .map(|s| s.namespace == namespace && s.name == name)
91 .unwrap_or(false)
92 })
93 }
94}