reifydb_engine/transaction/catalog/
ring_buffer.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use OperationType::{Create, Update};
5use reifydb_catalog::transaction::CatalogTrackRingBufferChangeOperations;
6use reifydb_core::interface::{
7	Change, NamespaceId, OperationType, OperationType::Delete, RingBufferDef, RingBufferId,
8	TransactionalRingBufferChanges,
9};
10use reifydb_type::IntoFragment;
11
12use crate::{StandardCommandTransaction, StandardQueryTransaction};
13
14impl CatalogTrackRingBufferChangeOperations for StandardCommandTransaction {
15	fn track_ring_buffer_def_created(&mut self, ring_buffer: RingBufferDef) -> reifydb_core::Result<()> {
16		let change = Change {
17			pre: None,
18			post: Some(ring_buffer),
19			op: Create,
20		};
21		self.changes.add_ring_buffer_def_change(change);
22		Ok(())
23	}
24
25	fn track_ring_buffer_def_updated(
26		&mut self,
27		pre: RingBufferDef,
28		post: RingBufferDef,
29	) -> reifydb_core::Result<()> {
30		let change = Change {
31			pre: Some(pre),
32			post: Some(post),
33			op: Update,
34		};
35		self.changes.add_ring_buffer_def_change(change);
36		Ok(())
37	}
38
39	fn track_ring_buffer_def_deleted(&mut self, ring_buffer: RingBufferDef) -> reifydb_core::Result<()> {
40		let change = Change {
41			pre: Some(ring_buffer),
42			post: None,
43			op: Delete,
44		};
45		self.changes.add_ring_buffer_def_change(change);
46		Ok(())
47	}
48}
49
50impl TransactionalRingBufferChanges for StandardCommandTransaction {
51	fn find_ring_buffer(&self, id: RingBufferId) -> Option<&RingBufferDef> {
52		// Find the last change for this ring buffer ID
53		for change in self.changes.ring_buffer_def.iter().rev() {
54			if let Some(ring_buffer) = &change.post {
55				if ring_buffer.id == id {
56					return Some(ring_buffer);
57				}
58			}
59			if let Some(ring_buffer) = &change.pre {
60				if ring_buffer.id == id && change.op == Delete {
61					// Ring buffer was deleted
62					return None;
63				}
64			}
65		}
66		None
67	}
68
69	fn find_ring_buffer_by_name<'a>(
70		&self,
71		namespace: NamespaceId,
72		name: impl IntoFragment<'a>,
73	) -> Option<&RingBufferDef> {
74		let name = name.into_fragment();
75		// Find the last change for this ring buffer name
76		for change in self.changes.ring_buffer_def.iter().rev() {
77			if let Some(ring_buffer) = &change.post {
78				if ring_buffer.namespace == namespace && ring_buffer.name == name.text() {
79					return Some(ring_buffer);
80				}
81			}
82			if let Some(ring_buffer) = &change.pre {
83				if ring_buffer.namespace == namespace
84					&& ring_buffer.name == name.text() && change.op == Delete
85				{
86					// Ring buffer was deleted
87					return None;
88				}
89			}
90		}
91		None
92	}
93
94	fn is_ring_buffer_deleted(&self, id: RingBufferId) -> bool {
95		// Check if this ring buffer was deleted in this transaction
96		self.changes
97			.ring_buffer_def
98			.iter()
99			.any(|change| change.op == Delete && change.pre.as_ref().map(|rb| rb.id == id).unwrap_or(false))
100	}
101
102	fn is_ring_buffer_deleted_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> bool {
103		let name = name.into_fragment();
104		// Check if this ring buffer was deleted in this transaction
105		self.changes.ring_buffer_def.iter().any(|change| {
106			change.op == Delete
107				&& change
108					.pre
109					.as_ref()
110					.map(|rb| rb.namespace == namespace && rb.name == name.text())
111					.unwrap_or(false)
112		})
113	}
114}
115
116impl TransactionalRingBufferChanges for StandardQueryTransaction {
117	fn find_ring_buffer(&self, _id: RingBufferId) -> Option<&RingBufferDef> {
118		None
119	}
120
121	fn find_ring_buffer_by_name<'a>(
122		&self,
123		_namespace: NamespaceId,
124		_name: impl IntoFragment<'a>,
125	) -> Option<&RingBufferDef> {
126		None
127	}
128
129	fn is_ring_buffer_deleted(&self, _id: RingBufferId) -> bool {
130		false
131	}
132
133	fn is_ring_buffer_deleted_by_name<'a>(&self, _namespace: NamespaceId, _name: impl IntoFragment<'a>) -> bool {
134		false
135	}
136}