reifydb_transaction/transaction/catalog/
subscription.rs1use reifydb_core::interface::catalog::{
5 change::CatalogTrackSubscriptionChangeOperations, id::SubscriptionId, subscription::SubscriptionDef,
6};
7use reifydb_type::Result;
8
9use crate::{
10 change::{
11 Change,
12 OperationType::{Create, Delete, Update},
13 TransactionalSubscriptionChanges,
14 },
15 transaction::{admin::AdminTransaction, subscription::SubscriptionTransaction},
16};
17
18impl CatalogTrackSubscriptionChangeOperations for AdminTransaction {
19 fn track_subscription_def_created(&mut self, subscription: SubscriptionDef) -> Result<()> {
20 let change = Change {
21 pre: None,
22 post: Some(subscription),
23 op: Create,
24 };
25 self.changes.add_subscription_def_change(change);
26 Ok(())
27 }
28
29 fn track_subscription_def_updated(&mut self, pre: SubscriptionDef, post: SubscriptionDef) -> Result<()> {
30 let change = Change {
31 pre: Some(pre),
32 post: Some(post),
33 op: Update,
34 };
35 self.changes.add_subscription_def_change(change);
36 Ok(())
37 }
38
39 fn track_subscription_def_deleted(&mut self, subscription: SubscriptionDef) -> Result<()> {
40 let change = Change {
41 pre: Some(subscription),
42 post: None,
43 op: Delete,
44 };
45 self.changes.add_subscription_def_change(change);
46 Ok(())
47 }
48}
49
50impl TransactionalSubscriptionChanges for AdminTransaction {
51 fn find_subscription(&self, id: SubscriptionId) -> Option<&SubscriptionDef> {
52 for change in self.changes.subscription_def.iter().rev() {
53 if let Some(subscription) = &change.post {
54 if subscription.id == id {
55 return Some(subscription);
56 }
57 } else if let Some(subscription) = &change.pre {
58 if subscription.id == id && change.op == Delete {
59 return None;
60 }
61 }
62 }
63 None
64 }
65
66 fn is_subscription_deleted(&self, id: SubscriptionId) -> bool {
67 self.changes
68 .subscription_def
69 .iter()
70 .rev()
71 .any(|change| change.op == Delete && change.pre.as_ref().map(|s| s.id) == Some(id))
72 }
73}
74
75impl CatalogTrackSubscriptionChangeOperations for SubscriptionTransaction {
76 fn track_subscription_def_created(&mut self, subscription: SubscriptionDef) -> Result<()> {
77 self.inner.track_subscription_def_created(subscription)
78 }
79
80 fn track_subscription_def_updated(&mut self, pre: SubscriptionDef, post: SubscriptionDef) -> Result<()> {
81 self.inner.track_subscription_def_updated(pre, post)
82 }
83
84 fn track_subscription_def_deleted(&mut self, subscription: SubscriptionDef) -> Result<()> {
85 self.inner.track_subscription_def_deleted(subscription)
86 }
87}
88
89impl TransactionalSubscriptionChanges for SubscriptionTransaction {
90 fn find_subscription(&self, id: SubscriptionId) -> Option<&SubscriptionDef> {
91 self.inner.find_subscription(id)
92 }
93
94 fn is_subscription_deleted(&self, id: SubscriptionId) -> bool {
95 self.inner.is_subscription_deleted(id)
96 }
97}