reifydb_engine/transaction/catalog/
ringbuffer.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use OperationType::{Create, Update};
5use reifydb_catalog::transaction::CatalogTrackRingBufferChangeOperations;
6use reifydb_core::interface::{
7	Change, NamespaceId, OperationType, OperationType::Delete, RingBufferDef, RingBufferId,
8	TransactionalRingBufferChanges,
9};
10
11use crate::{StandardCommandTransaction, StandardQueryTransaction};
12
13impl CatalogTrackRingBufferChangeOperations for StandardCommandTransaction {
14	fn track_ringbuffer_def_created(&mut self, ringbuffer: RingBufferDef) -> reifydb_core::Result<()> {
15		let change = Change {
16			pre: None,
17			post: Some(ringbuffer),
18			op: Create,
19		};
20		self.changes.add_ringbuffer_def_change(change);
21		Ok(())
22	}
23
24	fn track_ringbuffer_def_updated(
25		&mut self,
26		pre: RingBufferDef,
27		post: RingBufferDef,
28	) -> reifydb_core::Result<()> {
29		let change = Change {
30			pre: Some(pre),
31			post: Some(post),
32			op: Update,
33		};
34		self.changes.add_ringbuffer_def_change(change);
35		Ok(())
36	}
37
38	fn track_ringbuffer_def_deleted(&mut self, ringbuffer: RingBufferDef) -> reifydb_core::Result<()> {
39		let change = Change {
40			pre: Some(ringbuffer),
41			post: None,
42			op: Delete,
43		};
44		self.changes.add_ringbuffer_def_change(change);
45		Ok(())
46	}
47}
48
49impl TransactionalRingBufferChanges for StandardCommandTransaction {
50	fn find_ringbuffer(&self, id: RingBufferId) -> Option<&RingBufferDef> {
51		// Find the last change for this ring buffer ID
52		for change in self.changes.ringbuffer_def.iter().rev() {
53			if let Some(ringbuffer) = &change.post {
54				if ringbuffer.id == id {
55					return Some(ringbuffer);
56				}
57			}
58			if let Some(ringbuffer) = &change.pre {
59				if ringbuffer.id == id && change.op == Delete {
60					// Ring buffer was deleted
61					return None;
62				}
63			}
64		}
65		None
66	}
67
68	fn find_ringbuffer_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&RingBufferDef> {
69		// Find the last change for this ring buffer name
70		for change in self.changes.ringbuffer_def.iter().rev() {
71			if let Some(ringbuffer) = &change.post {
72				if ringbuffer.namespace == namespace && ringbuffer.name == name {
73					return Some(ringbuffer);
74				}
75			}
76			if let Some(ringbuffer) = &change.pre {
77				if ringbuffer.namespace == namespace && ringbuffer.name == name && change.op == Delete {
78					// Ring buffer was deleted
79					return None;
80				}
81			}
82		}
83		None
84	}
85
86	fn is_ringbuffer_deleted(&self, id: RingBufferId) -> bool {
87		// Check if this ring buffer was deleted in this transaction
88		self.changes
89			.ringbuffer_def
90			.iter()
91			.any(|change| change.op == Delete && change.pre.as_ref().map(|rb| rb.id == id).unwrap_or(false))
92	}
93
94	fn is_ringbuffer_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool {
95		// Check if this ring buffer was deleted in this transaction
96		self.changes.ringbuffer_def.iter().any(|change| {
97			change.op == Delete
98				&& change
99					.pre
100					.as_ref()
101					.map(|rb| rb.namespace == namespace && rb.name == name)
102					.unwrap_or(false)
103		})
104	}
105}
106
107impl TransactionalRingBufferChanges for StandardQueryTransaction {
108	fn find_ringbuffer(&self, _id: RingBufferId) -> Option<&RingBufferDef> {
109		None
110	}
111
112	fn find_ringbuffer_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&RingBufferDef> {
113		None
114	}
115
116	fn is_ringbuffer_deleted(&self, _id: RingBufferId) -> bool {
117		false
118	}
119
120	fn is_ringbuffer_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
121		false
122	}
123}