reifydb_catalog/transaction/
ringbuffer.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use async_trait::async_trait;
5use reifydb_core::{
6	interface::{
7		CommandTransaction, NamespaceId, QueryTransaction, RingBufferDef, RingBufferId, TransactionalChanges,
8		interceptor::WithInterceptors,
9	},
10	return_error,
11};
12use reifydb_type::{
13	Fragment,
14	diagnostic::catalog::{ringbuffer_already_exists, ringbuffer_not_found},
15};
16use tracing::instrument;
17
18use crate::{CatalogStore, store::ringbuffer::create::RingBufferToCreate, transaction::MaterializedCatalogTransaction};
19
20#[async_trait]
21pub trait CatalogRingBufferQueryOperations: Send {
22	async fn find_ringbuffer(&mut self, id: RingBufferId) -> crate::Result<Option<RingBufferDef>>;
23
24	async fn find_ringbuffer_by_name(
25		&mut self,
26		namespace: NamespaceId,
27		name: &str,
28	) -> crate::Result<Option<RingBufferDef>>;
29
30	async fn get_ringbuffer(&mut self, id: RingBufferId) -> crate::Result<RingBufferDef>;
31
32	async fn get_ringbuffer_by_name(
33		&mut self,
34		namespace: NamespaceId,
35		name: impl Into<Fragment> + Send,
36	) -> crate::Result<RingBufferDef>;
37}
38
39#[async_trait]
40impl<QT: QueryTransaction + MaterializedCatalogTransaction + Send + 'static> CatalogRingBufferQueryOperations for QT {
41	#[instrument(name = "catalog::ringbuffer::find", level = "trace", skip(self))]
42	async fn find_ringbuffer(&mut self, id: RingBufferId) -> crate::Result<Option<RingBufferDef>> {
43		CatalogStore::find_ringbuffer(self, id).await
44	}
45
46	#[instrument(name = "catalog::ringbuffer::find_by_name", level = "trace", skip(self, name))]
47	async fn find_ringbuffer_by_name(
48		&mut self,
49		namespace: NamespaceId,
50		name: &str,
51	) -> crate::Result<Option<RingBufferDef>> {
52		CatalogStore::find_ringbuffer_by_name(self, namespace, name).await
53	}
54
55	#[instrument(name = "catalog::ringbuffer::get", level = "trace", skip(self))]
56	async fn get_ringbuffer(&mut self, id: RingBufferId) -> crate::Result<RingBufferDef> {
57		CatalogStore::get_ringbuffer(self, id).await
58	}
59
60	#[instrument(name = "catalog::ringbuffer::get_by_name", level = "trace", skip(self, name))]
61	async fn get_ringbuffer_by_name(
62		&mut self,
63		namespace: NamespaceId,
64		name: impl Into<Fragment> + Send,
65	) -> crate::Result<RingBufferDef> {
66		let name = name.into();
67		let name_text = name.text().to_string();
68		let ringbuffer = self.find_ringbuffer_by_name(namespace, name.text()).await?;
69		match ringbuffer {
70			Some(rb) => Ok(rb),
71			None => {
72				let namespace = CatalogStore::get_namespace(self, namespace).await?;
73				return_error!(ringbuffer_not_found(name, &namespace.name, &name_text))
74			}
75		}
76	}
77}
78
79pub trait CatalogTrackRingBufferChangeOperations {
80	// Ring buffer tracking methods
81	fn track_ringbuffer_def_created(&mut self, ringbuffer: RingBufferDef) -> crate::Result<()>;
82
83	fn track_ringbuffer_def_updated(&mut self, pre: RingBufferDef, post: RingBufferDef) -> crate::Result<()>;
84
85	fn track_ringbuffer_def_deleted(&mut self, ringbuffer: RingBufferDef) -> crate::Result<()>;
86}
87
88#[async_trait]
89pub trait CatalogRingBufferCommandOperations: CatalogRingBufferQueryOperations {
90	async fn create_ringbuffer(&mut self, to_create: RingBufferToCreate) -> crate::Result<RingBufferDef>;
91}
92
93#[async_trait]
94impl<
95	CT: CommandTransaction
96		+ MaterializedCatalogTransaction
97		+ CatalogTrackRingBufferChangeOperations
98		+ WithInterceptors<CT>
99		+ TransactionalChanges
100		+ Send
101		+ 'static,
102> CatalogRingBufferCommandOperations for CT
103{
104	#[instrument(name = "catalog::ringbuffer::create", level = "debug", skip(self, to_create))]
105	async fn create_ringbuffer(&mut self, to_create: RingBufferToCreate) -> crate::Result<RingBufferDef> {
106		if let Some(_ringbuffer) =
107			self.find_ringbuffer_by_name(to_create.namespace, to_create.ringbuffer.as_str()).await?
108		{
109			let namespace = CatalogStore::get_namespace(self, to_create.namespace).await?;
110			return_error!(ringbuffer_already_exists(
111				to_create.fragment.unwrap_or_default(),
112				&namespace.name,
113				&to_create.ringbuffer
114			));
115		}
116
117		CatalogStore::create_ringbuffer(self, to_create).await
118	}
119}