Skip to main content

reifydb_transaction/transaction/catalog/
ringbuffer.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::interface::catalog::{
5	change::CatalogTrackRingBufferChangeOperations,
6	id::{NamespaceId, RingBufferId},
7	ringbuffer::RingBufferDef,
8};
9use reifydb_type::Result;
10
11use crate::{
12	change::{
13		Change,
14		OperationType::{Create, Delete, Update},
15		TransactionalRingBufferChanges,
16	},
17	transaction::{admin::AdminTransaction, subscription::SubscriptionTransaction},
18};
19
20impl CatalogTrackRingBufferChangeOperations for AdminTransaction {
21	fn track_ringbuffer_def_created(&mut self, ringbuffer: RingBufferDef) -> Result<()> {
22		let change = Change {
23			pre: None,
24			post: Some(ringbuffer),
25			op: Create,
26		};
27		self.changes.add_ringbuffer_def_change(change);
28		Ok(())
29	}
30
31	fn track_ringbuffer_def_updated(&mut self, pre: RingBufferDef, post: RingBufferDef) -> Result<()> {
32		let change = Change {
33			pre: Some(pre),
34			post: Some(post),
35			op: Update,
36		};
37		self.changes.add_ringbuffer_def_change(change);
38		Ok(())
39	}
40
41	fn track_ringbuffer_def_deleted(&mut self, ringbuffer: RingBufferDef) -> Result<()> {
42		let change = Change {
43			pre: Some(ringbuffer),
44			post: None,
45			op: Delete,
46		};
47		self.changes.add_ringbuffer_def_change(change);
48		Ok(())
49	}
50}
51
52impl TransactionalRingBufferChanges for AdminTransaction {
53	fn find_ringbuffer(&self, id: RingBufferId) -> Option<&RingBufferDef> {
54		for change in self.changes.ringbuffer_def.iter().rev() {
55			if let Some(ringbuffer) = &change.post {
56				if ringbuffer.id == id {
57					return Some(ringbuffer);
58				}
59			}
60			if let Some(ringbuffer) = &change.pre {
61				if ringbuffer.id == id && change.op == Delete {
62					return None;
63				}
64			}
65		}
66		None
67	}
68
69	fn find_ringbuffer_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&RingBufferDef> {
70		for change in self.changes.ringbuffer_def.iter().rev() {
71			if let Some(ringbuffer) = &change.post {
72				if ringbuffer.namespace == namespace && ringbuffer.name == name {
73					return Some(ringbuffer);
74				}
75			}
76			if let Some(ringbuffer) = &change.pre {
77				if ringbuffer.namespace == namespace && ringbuffer.name == name && change.op == Delete {
78					return None;
79				}
80			}
81		}
82		None
83	}
84
85	fn is_ringbuffer_deleted(&self, id: RingBufferId) -> bool {
86		self.changes
87			.ringbuffer_def
88			.iter()
89			.any(|change| change.op == Delete && change.pre.as_ref().map(|rb| rb.id == id).unwrap_or(false))
90	}
91
92	fn is_ringbuffer_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool {
93		self.changes.ringbuffer_def.iter().any(|change| {
94			change.op == Delete
95				&& change
96					.pre
97					.as_ref()
98					.map(|rb| rb.namespace == namespace && rb.name == name)
99					.unwrap_or(false)
100		})
101	}
102}
103
104impl CatalogTrackRingBufferChangeOperations for SubscriptionTransaction {
105	fn track_ringbuffer_def_created(&mut self, ringbuffer: RingBufferDef) -> Result<()> {
106		self.inner.track_ringbuffer_def_created(ringbuffer)
107	}
108
109	fn track_ringbuffer_def_updated(&mut self, pre: RingBufferDef, post: RingBufferDef) -> Result<()> {
110		self.inner.track_ringbuffer_def_updated(pre, post)
111	}
112
113	fn track_ringbuffer_def_deleted(&mut self, ringbuffer: RingBufferDef) -> Result<()> {
114		self.inner.track_ringbuffer_def_deleted(ringbuffer)
115	}
116}
117
118impl TransactionalRingBufferChanges for SubscriptionTransaction {
119	fn find_ringbuffer(&self, id: RingBufferId) -> Option<&RingBufferDef> {
120		self.inner.find_ringbuffer(id)
121	}
122
123	fn find_ringbuffer_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&RingBufferDef> {
124		self.inner.find_ringbuffer_by_name(namespace, name)
125	}
126
127	fn is_ringbuffer_deleted(&self, id: RingBufferId) -> bool {
128		self.inner.is_ringbuffer_deleted(id)
129	}
130
131	fn is_ringbuffer_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool {
132		self.inner.is_ringbuffer_deleted_by_name(namespace, name)
133	}
134}