reifydb_catalog/transaction/
ring_buffer.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{
5	interface::{
6		CommandTransaction, NamespaceId, QueryTransaction, RingBufferDef, RingBufferId, TransactionalChanges,
7		interceptor::WithInterceptors,
8	},
9	return_error,
10};
11use reifydb_type::{
12	IntoFragment,
13	diagnostic::catalog::{ring_buffer_already_exists, ring_buffer_not_found},
14};
15use tracing::instrument;
16
17use crate::{
18	CatalogStore, store::ring_buffer::create::RingBufferToCreate, transaction::MaterializedCatalogTransaction,
19};
20
21pub trait CatalogRingBufferQueryOperations {
22	fn find_ring_buffer(&mut self, id: RingBufferId) -> crate::Result<Option<RingBufferDef>>;
23
24	fn find_ring_buffer_by_name<'a>(
25		&mut self,
26		namespace: NamespaceId,
27		name: impl IntoFragment<'a>,
28	) -> crate::Result<Option<RingBufferDef>>;
29
30	fn get_ring_buffer(&mut self, id: RingBufferId) -> crate::Result<RingBufferDef>;
31
32	fn get_ring_buffer_by_name<'a>(
33		&mut self,
34		namespace: NamespaceId,
35		name: impl IntoFragment<'a>,
36	) -> crate::Result<RingBufferDef>;
37}
38
39impl<QT: QueryTransaction + MaterializedCatalogTransaction> CatalogRingBufferQueryOperations for QT {
40	#[instrument(level = "trace", skip(self))]
41	fn find_ring_buffer(&mut self, id: RingBufferId) -> crate::Result<Option<RingBufferDef>> {
42		CatalogStore::find_ring_buffer(self, id)
43	}
44
45	#[instrument(level = "trace", skip(self, name))]
46	fn find_ring_buffer_by_name<'a>(
47		&mut self,
48		namespace: NamespaceId,
49		name: impl IntoFragment<'a>,
50	) -> crate::Result<Option<RingBufferDef>> {
51		let name = name.into_fragment();
52		CatalogStore::find_ring_buffer_by_name(self, namespace, name.text())
53	}
54
55	#[instrument(level = "trace", skip(self))]
56	fn get_ring_buffer(&mut self, id: RingBufferId) -> crate::Result<RingBufferDef> {
57		CatalogStore::get_ring_buffer(self, id)
58	}
59
60	#[instrument(level = "trace", skip(self, name))]
61	fn get_ring_buffer_by_name<'a>(
62		&mut self,
63		namespace: NamespaceId,
64		name: impl IntoFragment<'a>,
65	) -> crate::Result<RingBufferDef> {
66		let name = name.into_fragment();
67		let name_text = name.text().to_string();
68		let ring_buffer = self.find_ring_buffer_by_name(namespace, name.clone())?;
69		match ring_buffer {
70			Some(rb) => Ok(rb),
71			None => {
72				let namespace = CatalogStore::get_namespace(self, namespace)?;
73				return_error!(ring_buffer_not_found(name, &namespace.name, &name_text))
74			}
75		}
76	}
77}
78
79pub trait CatalogTrackRingBufferChangeOperations {
80	// Ring buffer tracking methods
81	fn track_ring_buffer_def_created(&mut self, ring_buffer: RingBufferDef) -> crate::Result<()>;
82
83	fn track_ring_buffer_def_updated(&mut self, pre: RingBufferDef, post: RingBufferDef) -> crate::Result<()>;
84
85	fn track_ring_buffer_def_deleted(&mut self, ring_buffer: RingBufferDef) -> crate::Result<()>;
86}
87
88pub trait CatalogRingBufferCommandOperations: CatalogRingBufferQueryOperations {
89	fn create_ring_buffer(&mut self, to_create: RingBufferToCreate) -> crate::Result<RingBufferDef>;
90}
91
92impl<
93	CT: CommandTransaction
94		+ MaterializedCatalogTransaction
95		+ CatalogTrackRingBufferChangeOperations
96		+ WithInterceptors<CT>
97		+ TransactionalChanges,
98> CatalogRingBufferCommandOperations for CT
99{
100	#[instrument(level = "debug", skip(self, to_create))]
101	fn create_ring_buffer(&mut self, to_create: RingBufferToCreate) -> crate::Result<RingBufferDef> {
102		if let Some(_ring_buffer) =
103			self.find_ring_buffer_by_name(to_create.namespace, &to_create.ring_buffer)?
104		{
105			let namespace = CatalogStore::get_namespace(self, to_create.namespace)?;
106			return_error!(ring_buffer_already_exists(
107				to_create.fragment.unwrap_or_default(),
108				&namespace.name,
109				&to_create.ring_buffer
110			));
111		}
112
113		CatalogStore::create_ring_buffer(self, to_create)
114	}
115}