Skip to main content

reifydb_transaction/transaction/catalog/
subscription.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::interface::catalog::{
5	change::CatalogTrackSubscriptionChangeOperations, id::SubscriptionId, subscription::SubscriptionDef,
6};
7use reifydb_type::Result;
8
9use crate::{
10	change::{
11		Change,
12		OperationType::{Create, Delete, Update},
13		TransactionalSubscriptionChanges,
14	},
15	transaction::{admin::AdminTransaction, subscription::SubscriptionTransaction},
16};
17
18impl CatalogTrackSubscriptionChangeOperations for AdminTransaction {
19	fn track_subscription_def_created(&mut self, subscription: SubscriptionDef) -> Result<()> {
20		let change = Change {
21			pre: None,
22			post: Some(subscription),
23			op: Create,
24		};
25		self.changes.add_subscription_def_change(change);
26		Ok(())
27	}
28
29	fn track_subscription_def_updated(&mut self, pre: SubscriptionDef, post: SubscriptionDef) -> Result<()> {
30		let change = Change {
31			pre: Some(pre),
32			post: Some(post),
33			op: Update,
34		};
35		self.changes.add_subscription_def_change(change);
36		Ok(())
37	}
38
39	fn track_subscription_def_deleted(&mut self, subscription: SubscriptionDef) -> Result<()> {
40		let change = Change {
41			pre: Some(subscription),
42			post: None,
43			op: Delete,
44		};
45		self.changes.add_subscription_def_change(change);
46		Ok(())
47	}
48}
49
50impl TransactionalSubscriptionChanges for AdminTransaction {
51	fn find_subscription(&self, id: SubscriptionId) -> Option<&SubscriptionDef> {
52		for change in self.changes.subscription_def.iter().rev() {
53			if let Some(subscription) = &change.post {
54				if subscription.id == id {
55					return Some(subscription);
56				}
57			} else if let Some(subscription) = &change.pre {
58				if subscription.id == id && change.op == Delete {
59					return None;
60				}
61			}
62		}
63		None
64	}
65
66	fn is_subscription_deleted(&self, id: SubscriptionId) -> bool {
67		self.changes
68			.subscription_def
69			.iter()
70			.rev()
71			.any(|change| change.op == Delete && change.pre.as_ref().map(|s| s.id) == Some(id))
72	}
73}
74
75impl CatalogTrackSubscriptionChangeOperations for SubscriptionTransaction {
76	fn track_subscription_def_created(&mut self, subscription: SubscriptionDef) -> Result<()> {
77		self.inner.track_subscription_def_created(subscription)
78	}
79
80	fn track_subscription_def_updated(&mut self, pre: SubscriptionDef, post: SubscriptionDef) -> Result<()> {
81		self.inner.track_subscription_def_updated(pre, post)
82	}
83
84	fn track_subscription_def_deleted(&mut self, subscription: SubscriptionDef) -> Result<()> {
85		self.inner.track_subscription_def_deleted(subscription)
86	}
87}
88
89impl TransactionalSubscriptionChanges for SubscriptionTransaction {
90	fn find_subscription(&self, id: SubscriptionId) -> Option<&SubscriptionDef> {
91		self.inner.find_subscription(id)
92	}
93
94	fn is_subscription_deleted(&self, id: SubscriptionId) -> bool {
95		self.inner.is_subscription_deleted(id)
96	}
97}