Skip to main content

reifydb_engine/transaction/operation/
ringbuffer.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::{
5	common::CommitVersion,
6	encoded::{row::EncodedRow, shape::RowShape},
7	interface::{
8		catalog::{ringbuffer::RingBuffer, shape::ShapeId},
9		change::{Change, ChangeOrigin, Diff},
10	},
11	key::row::RowKey,
12	value::column::columns::Columns,
13};
14use reifydb_transaction::{
15	interceptor::ringbuffer_row::RingBufferRowInterceptor,
16	transaction::{Transaction, admin::AdminTransaction, command::CommandTransaction},
17};
18use reifydb_type::{
19	util::cowvec::CowVec,
20	value::{datetime::DateTime, row_number::RowNumber},
21};
22use smallvec::smallvec;
23
24use crate::Result;
25
26fn build_ringbuffer_insert_change(
27	rb: &RingBuffer,
28	shape: &RowShape,
29	row_number: RowNumber,
30	encoded: &EncodedRow,
31) -> Change {
32	let ids = [row_number];
33	let rows = [encoded.clone()];
34	Change {
35		origin: ChangeOrigin::Shape(ShapeId::ringbuffer(rb.id)),
36		version: CommitVersion(0),
37		diffs: smallvec![Diff::insert(Columns::from_encoded_rows(shape, &ids, &rows))],
38		changed_at: DateTime::default(),
39	}
40}
41
42fn build_ringbuffer_update_change(
43	rb: &RingBuffer,
44	row_number: RowNumber,
45	pre: &EncodedRow,
46	post: &EncodedRow,
47) -> Change {
48	let shape: RowShape = (&rb.columns).into();
49	let ids = [row_number];
50	let pres = [pre.clone()];
51	let posts = [post.clone()];
52	Change {
53		origin: ChangeOrigin::Shape(ShapeId::ringbuffer(rb.id)),
54		version: CommitVersion(0),
55		diffs: smallvec![Diff::update(
56			Columns::from_encoded_rows(&shape, &ids, &pres),
57			Columns::from_encoded_rows(&shape, &ids, &posts),
58		)],
59		changed_at: DateTime::default(),
60	}
61}
62
63fn build_ringbuffer_remove_change(rb: &RingBuffer, row_number: RowNumber, encoded: &EncodedRow) -> Change {
64	let shape: RowShape = (&rb.columns).into();
65	let ids = [row_number];
66	let rows = [encoded.clone()];
67	Change {
68		origin: ChangeOrigin::Shape(ShapeId::ringbuffer(rb.id)),
69		version: CommitVersion(0),
70		diffs: smallvec![Diff::remove(Columns::from_encoded_rows(&shape, &ids, &rows))],
71		changed_at: DateTime::default(),
72	}
73}
74
75pub trait RingBufferOperations {
76	fn insert_ringbuffer(&mut self, ringbuffer: RingBuffer, row: EncodedRow) -> Result<RowNumber>;
77
78	fn insert_ringbuffer_at(
79		&mut self,
80		ringbuffer: &RingBuffer,
81		shape: &RowShape,
82		row_number: RowNumber,
83		row: EncodedRow,
84	) -> Result<EncodedRow>;
85
86	fn update_ringbuffer(&mut self, ringbuffer: RingBuffer, id: RowNumber, row: EncodedRow) -> Result<EncodedRow>;
87
88	fn remove_from_ringbuffer(&mut self, ringbuffer: &RingBuffer, id: RowNumber) -> Result<EncodedRow>;
89}
90
91impl RingBufferOperations for CommandTransaction {
92	fn insert_ringbuffer(&mut self, _ringbuffer: RingBuffer, _row: EncodedRow) -> Result<RowNumber> {
93		unimplemented!(
94			"Ring buffer insert must be called with explicit row_number through insert_ringbuffer_at"
95		)
96	}
97
98	fn insert_ringbuffer_at(
99		&mut self,
100		ringbuffer: &RingBuffer,
101		shape: &RowShape,
102		row_number: RowNumber,
103		row: EncodedRow,
104	) -> Result<EncodedRow> {
105		let key = RowKey::encoded(ringbuffer.id, row_number);
106
107		let pre = self.get(&key)?.map(|v| v.row);
108
109		if let Some(ref existing) = pre {
110			let ids = [row_number];
111			let existing_rows = [existing.clone()];
112			RingBufferRowInterceptor::pre_delete(self, ringbuffer, &ids)?;
113			RingBufferRowInterceptor::post_delete(self, ringbuffer, &ids, &existing_rows)?;
114		}
115
116		let mut rows_buf = [row];
117		RingBufferRowInterceptor::pre_insert(self, ringbuffer, &mut rows_buf)?;
118		let [row] = rows_buf;
119
120		self.set(&key, row.clone())?;
121
122		let ids = [row_number];
123		let rows = [row.clone()];
124		RingBufferRowInterceptor::post_insert(self, ringbuffer, &ids, &rows)?;
125
126		if let Some(pre_row) = pre.as_ref() {
127			self.track_flow_change(build_ringbuffer_update_change(ringbuffer, row_number, pre_row, &row));
128		} else {
129			self.track_flow_change(build_ringbuffer_insert_change(ringbuffer, shape, row_number, &row));
130		}
131
132		Ok(row)
133	}
134
135	fn update_ringbuffer(&mut self, ringbuffer: RingBuffer, id: RowNumber, row: EncodedRow) -> Result<EncodedRow> {
136		let key = RowKey::encoded(ringbuffer.id, id);
137
138		let pre = match self.get(&key)? {
139			Some(v) => v.row,
140			None => return Ok(row),
141		};
142
143		let mut rows_buf = [row];
144		let ids = [id];
145		RingBufferRowInterceptor::pre_update(self, &ringbuffer, &ids, &mut rows_buf)?;
146		let [row] = rows_buf;
147
148		if self.get_committed(&key)?.is_some() {
149			self.mark_preexisting(&key)?;
150		}
151		self.set(&key, row.clone())?;
152
153		let posts = [row.clone()];
154		let pres = [pre.clone()];
155		RingBufferRowInterceptor::post_update(self, &ringbuffer, &ids, &posts, &pres)?;
156
157		self.track_flow_change(build_ringbuffer_update_change(&ringbuffer, id, &pre, &row));
158
159		Ok(row)
160	}
161
162	fn remove_from_ringbuffer(&mut self, ringbuffer: &RingBuffer, id: RowNumber) -> Result<EncodedRow> {
163		let key = RowKey::encoded(ringbuffer.id, id);
164
165		let displayed = match self.get(&key)? {
166			Some(v) => v.row,
167			None => return Ok(EncodedRow(CowVec::new(vec![]))),
168		};
169		let committed = self.get_committed(&key)?.map(|v| v.row);
170
171		let ids = [id];
172		RingBufferRowInterceptor::pre_delete(self, ringbuffer, &ids)?;
173
174		let pre_for_cdc = committed.clone().unwrap_or_else(|| displayed.clone());
175
176		if committed.is_some() {
177			self.mark_preexisting(&key)?;
178		}
179		self.unset(&key, pre_for_cdc.clone())?;
180
181		let pre_rows = [pre_for_cdc.clone()];
182		RingBufferRowInterceptor::post_delete(self, ringbuffer, &ids, &pre_rows)?;
183
184		self.track_flow_change(build_ringbuffer_remove_change(ringbuffer, id, &pre_for_cdc));
185
186		Ok(displayed)
187	}
188}
189
190impl RingBufferOperations for AdminTransaction {
191	fn insert_ringbuffer(&mut self, _ringbuffer: RingBuffer, _row: EncodedRow) -> Result<RowNumber> {
192		unimplemented!(
193			"Ring buffer insert must be called with explicit row_number through insert_ringbuffer_at"
194		)
195	}
196
197	fn insert_ringbuffer_at(
198		&mut self,
199		ringbuffer: &RingBuffer,
200		shape: &RowShape,
201		row_number: RowNumber,
202		row: EncodedRow,
203	) -> Result<EncodedRow> {
204		let key = RowKey::encoded(ringbuffer.id, row_number);
205
206		let pre = self.get(&key)?.map(|v| v.row);
207
208		if let Some(ref existing) = pre {
209			let ids = [row_number];
210			let existing_rows = [existing.clone()];
211			RingBufferRowInterceptor::pre_delete(self, ringbuffer, &ids)?;
212			RingBufferRowInterceptor::post_delete(self, ringbuffer, &ids, &existing_rows)?;
213		}
214
215		let mut rows_buf = [row];
216		RingBufferRowInterceptor::pre_insert(self, ringbuffer, &mut rows_buf)?;
217		let [row] = rows_buf;
218
219		self.set(&key, row.clone())?;
220
221		let ids = [row_number];
222		let rows = [row.clone()];
223		RingBufferRowInterceptor::post_insert(self, ringbuffer, &ids, &rows)?;
224
225		if let Some(pre_row) = pre.as_ref() {
226			self.track_flow_change(build_ringbuffer_update_change(ringbuffer, row_number, pre_row, &row));
227		} else {
228			self.track_flow_change(build_ringbuffer_insert_change(ringbuffer, shape, row_number, &row));
229		}
230
231		Ok(row)
232	}
233
234	fn update_ringbuffer(&mut self, ringbuffer: RingBuffer, id: RowNumber, row: EncodedRow) -> Result<EncodedRow> {
235		let key = RowKey::encoded(ringbuffer.id, id);
236
237		let pre = match self.get(&key)? {
238			Some(v) => v.row,
239			None => return Ok(row),
240		};
241
242		let mut rows_buf = [row];
243		let ids = [id];
244		RingBufferRowInterceptor::pre_update(self, &ringbuffer, &ids, &mut rows_buf)?;
245		let [row] = rows_buf;
246
247		if self.get_committed(&key)?.is_some() {
248			self.mark_preexisting(&key)?;
249		}
250		self.set(&key, row.clone())?;
251
252		let posts = [row.clone()];
253		let pres = [pre.clone()];
254		RingBufferRowInterceptor::post_update(self, &ringbuffer, &ids, &posts, &pres)?;
255
256		self.track_flow_change(build_ringbuffer_update_change(&ringbuffer, id, &pre, &row));
257
258		Ok(row)
259	}
260
261	fn remove_from_ringbuffer(&mut self, ringbuffer: &RingBuffer, id: RowNumber) -> Result<EncodedRow> {
262		let key = RowKey::encoded(ringbuffer.id, id);
263
264		let displayed = match self.get(&key)? {
265			Some(v) => v.row,
266			None => return Ok(EncodedRow(CowVec::new(vec![]))),
267		};
268		let committed = self.get_committed(&key)?.map(|v| v.row);
269
270		let ids = [id];
271		RingBufferRowInterceptor::pre_delete(self, ringbuffer, &ids)?;
272
273		let pre_for_cdc = committed.clone().unwrap_or_else(|| displayed.clone());
274
275		if committed.is_some() {
276			self.mark_preexisting(&key)?;
277		}
278		self.unset(&key, pre_for_cdc.clone())?;
279
280		let pre_rows = [pre_for_cdc.clone()];
281		RingBufferRowInterceptor::post_delete(self, ringbuffer, &ids, &pre_rows)?;
282
283		self.track_flow_change(build_ringbuffer_remove_change(ringbuffer, id, &pre_for_cdc));
284
285		Ok(displayed)
286	}
287}
288
289impl RingBufferOperations for Transaction<'_> {
290	fn insert_ringbuffer(&mut self, _ringbuffer: RingBuffer, _row: EncodedRow) -> Result<RowNumber> {
291		unimplemented!(
292			"Ring buffer insert must be called with explicit row_number through insert_ringbuffer_at"
293		)
294	}
295
296	fn insert_ringbuffer_at(
297		&mut self,
298		ringbuffer: &RingBuffer,
299		shape: &RowShape,
300		row_number: RowNumber,
301		row: EncodedRow,
302	) -> Result<EncodedRow> {
303		match self {
304			Transaction::Command(txn) => txn.insert_ringbuffer_at(ringbuffer, shape, row_number, row),
305			Transaction::Admin(txn) => txn.insert_ringbuffer_at(ringbuffer, shape, row_number, row),
306			Transaction::Test(t) => t.inner.insert_ringbuffer_at(ringbuffer, shape, row_number, row),
307			Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
308			Transaction::Replica(_) => panic!("Write operations not supported on Replica transaction"),
309		}
310	}
311
312	fn update_ringbuffer(&mut self, ringbuffer: RingBuffer, id: RowNumber, row: EncodedRow) -> Result<EncodedRow> {
313		match self {
314			Transaction::Command(txn) => txn.update_ringbuffer(ringbuffer, id, row),
315			Transaction::Admin(txn) => txn.update_ringbuffer(ringbuffer, id, row),
316			Transaction::Test(t) => t.inner.update_ringbuffer(ringbuffer, id, row),
317			Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
318			Transaction::Replica(_) => panic!("Write operations not supported on Replica transaction"),
319		}
320	}
321
322	fn remove_from_ringbuffer(&mut self, ringbuffer: &RingBuffer, id: RowNumber) -> Result<EncodedRow> {
323		match self {
324			Transaction::Command(txn) => txn.remove_from_ringbuffer(ringbuffer, id),
325			Transaction::Admin(txn) => txn.remove_from_ringbuffer(ringbuffer, id),
326			Transaction::Test(t) => t.inner.remove_from_ringbuffer(ringbuffer, id),
327			Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
328			Transaction::Replica(_) => panic!("Write operations not supported on Replica transaction"),
329		}
330	}
331}