Skip to main content

reifydb_engine/transaction/operation/
ringbuffer.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::{
5	common::CommitVersion,
6	encoded::{row::EncodedRow, shape::RowShape},
7	interface::{
8		catalog::{ringbuffer::RingBuffer, shape::ShapeId},
9		change::{Change, ChangeOrigin, Diff},
10	},
11	key::row::RowKey,
12	value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns},
13};
14use reifydb_transaction::{
15	interceptor::ringbuffer_row::RingBufferRowInterceptor,
16	transaction::{Transaction, admin::AdminTransaction, command::CommandTransaction},
17};
18use reifydb_type::{
19	fragment::Fragment,
20	util::cowvec::CowVec,
21	value::{datetime::DateTime, row_number::RowNumber},
22};
23use smallvec::smallvec;
24
25use crate::Result;
26
27fn build_encoded_columns(shape: &RowShape, row_number: RowNumber, encoded: &EncodedRow) -> Columns {
28	let fields = shape.fields();
29
30	let mut columns_vec: Vec<ColumnWithName> = Vec::with_capacity(fields.len());
31	for field in fields.iter() {
32		columns_vec.push(ColumnWithName {
33			name: Fragment::internal(&field.name),
34			data: ColumnBuffer::with_capacity(field.constraint.get_type(), 1),
35		});
36	}
37
38	for (i, _) in fields.iter().enumerate() {
39		columns_vec[i].data.push_value(shape.get_value(encoded, i));
40	}
41
42	Columns::with_system_columns(
43		columns_vec,
44		vec![row_number],
45		vec![DateTime::from_nanos(encoded.created_at_nanos())],
46		vec![DateTime::from_nanos(encoded.updated_at_nanos())],
47	)
48}
49
50fn build_ringbuffer_insert_change(
51	rb: &RingBuffer,
52	shape: &RowShape,
53	row_number: RowNumber,
54	encoded: &EncodedRow,
55) -> Change {
56	Change {
57		origin: ChangeOrigin::Shape(ShapeId::ringbuffer(rb.id)),
58		version: CommitVersion(0),
59		diffs: smallvec![Diff::insert(build_encoded_columns(shape, row_number, encoded))],
60		changed_at: DateTime::default(),
61	}
62}
63
64fn build_ringbuffer_update_change(
65	rb: &RingBuffer,
66	row_number: RowNumber,
67	pre: &EncodedRow,
68	post: &EncodedRow,
69) -> Change {
70	let shape: RowShape = (&rb.columns).into();
71	Change {
72		origin: ChangeOrigin::Shape(ShapeId::ringbuffer(rb.id)),
73		version: CommitVersion(0),
74		diffs: smallvec![Diff::update(
75			build_encoded_columns(&shape, row_number, pre),
76			build_encoded_columns(&shape, row_number, post),
77		)],
78		changed_at: DateTime::default(),
79	}
80}
81
82fn build_ringbuffer_remove_change(rb: &RingBuffer, row_number: RowNumber, encoded: &EncodedRow) -> Change {
83	let shape: RowShape = (&rb.columns).into();
84	Change {
85		origin: ChangeOrigin::Shape(ShapeId::ringbuffer(rb.id)),
86		version: CommitVersion(0),
87		diffs: smallvec![Diff::remove(build_encoded_columns(&shape, row_number, encoded))],
88		changed_at: DateTime::default(),
89	}
90}
91
92pub trait RingBufferOperations {
93	fn insert_ringbuffer(&mut self, ringbuffer: RingBuffer, row: EncodedRow) -> Result<RowNumber>;
94
95	fn insert_ringbuffer_at(
96		&mut self,
97		ringbuffer: &RingBuffer,
98		shape: &RowShape,
99		row_number: RowNumber,
100		row: EncodedRow,
101	) -> Result<EncodedRow>;
102
103	fn update_ringbuffer(&mut self, ringbuffer: RingBuffer, id: RowNumber, row: EncodedRow) -> Result<EncodedRow>;
104
105	fn remove_from_ringbuffer(&mut self, ringbuffer: &RingBuffer, id: RowNumber) -> Result<EncodedRow>;
106}
107
108impl RingBufferOperations for CommandTransaction {
109	fn insert_ringbuffer(&mut self, _ringbuffer: RingBuffer, _row: EncodedRow) -> Result<RowNumber> {
110		// For ring buffers, the row_number is determined by the caller based on ring buffer metadata
111		// This is different from tables which use RowSequence::next_row_number
112		// The caller must provide the correct row_number based on head/tail position
113		unimplemented!(
114			"Ring buffer insert must be called with explicit row_number through insert_ringbuffer_at"
115		)
116	}
117
118	fn insert_ringbuffer_at(
119		&mut self,
120		ringbuffer: &RingBuffer,
121		shape: &RowShape,
122		row_number: RowNumber,
123		row: EncodedRow,
124	) -> Result<EncodedRow> {
125		let key = RowKey::encoded(ringbuffer.id, row_number);
126
127		// Check if we're overwriting existing data (for ring buffer circular behavior)
128		let pre = self.get(&key)?.map(|v| v.row);
129
130		// If there's an existing encoded, we need to delete it first with interceptors
131		if let Some(ref existing) = pre {
132			RingBufferRowInterceptor::pre_delete(self, ringbuffer, row_number)?;
133			// Don't actually remove, we'll overwrite
134			RingBufferRowInterceptor::post_delete(self, ringbuffer, row_number, existing)?;
135		}
136
137		let row = RingBufferRowInterceptor::pre_insert(self, ringbuffer, row)?;
138
139		self.set(&key, row.clone())?;
140
141		RingBufferRowInterceptor::post_insert(self, ringbuffer, row_number, &row)?;
142
143		if let Some(pre_row) = pre.as_ref() {
144			self.track_flow_change(build_ringbuffer_update_change(ringbuffer, row_number, pre_row, &row));
145		} else {
146			self.track_flow_change(build_ringbuffer_insert_change(ringbuffer, shape, row_number, &row));
147		}
148
149		Ok(row)
150	}
151
152	fn update_ringbuffer(&mut self, ringbuffer: RingBuffer, id: RowNumber, row: EncodedRow) -> Result<EncodedRow> {
153		let key = RowKey::encoded(ringbuffer.id, id);
154
155		let pre = match self.get(&key)? {
156			Some(v) => v.row,
157			None => return Ok(row),
158		};
159
160		let row = RingBufferRowInterceptor::pre_update(self, &ringbuffer, id, row)?;
161
162		if self.get_committed(&key)?.is_some() {
163			self.mark_preexisting(&key)?;
164		}
165		self.set(&key, row.clone())?;
166
167		RingBufferRowInterceptor::post_update(self, &ringbuffer, id, &row, &pre)?;
168
169		self.track_flow_change(build_ringbuffer_update_change(&ringbuffer, id, &pre, &row));
170
171		Ok(row)
172	}
173
174	fn remove_from_ringbuffer(&mut self, ringbuffer: &RingBuffer, id: RowNumber) -> Result<EncodedRow> {
175		let key = RowKey::encoded(ringbuffer.id, id);
176
177		let displayed = match self.get(&key)? {
178			Some(v) => v.row,
179			None => return Ok(EncodedRow(CowVec::new(vec![]))),
180		};
181		let committed = self.get_committed(&key)?.map(|v| v.row);
182
183		RingBufferRowInterceptor::pre_delete(self, ringbuffer, id)?;
184
185		let pre_for_cdc = committed.clone().unwrap_or_else(|| displayed.clone());
186
187		if committed.is_some() {
188			self.mark_preexisting(&key)?;
189		}
190		self.unset(&key, pre_for_cdc.clone())?;
191
192		RingBufferRowInterceptor::post_delete(self, ringbuffer, id, &pre_for_cdc)?;
193
194		self.track_flow_change(build_ringbuffer_remove_change(ringbuffer, id, &pre_for_cdc));
195
196		Ok(displayed)
197	}
198}
199
200impl RingBufferOperations for AdminTransaction {
201	fn insert_ringbuffer(&mut self, _ringbuffer: RingBuffer, _row: EncodedRow) -> Result<RowNumber> {
202		unimplemented!(
203			"Ring buffer insert must be called with explicit row_number through insert_ringbuffer_at"
204		)
205	}
206
207	fn insert_ringbuffer_at(
208		&mut self,
209		ringbuffer: &RingBuffer,
210		shape: &RowShape,
211		row_number: RowNumber,
212		row: EncodedRow,
213	) -> Result<EncodedRow> {
214		let key = RowKey::encoded(ringbuffer.id, row_number);
215
216		let pre = self.get(&key)?.map(|v| v.row);
217
218		if let Some(ref existing) = pre {
219			RingBufferRowInterceptor::pre_delete(self, ringbuffer, row_number)?;
220			RingBufferRowInterceptor::post_delete(self, ringbuffer, row_number, existing)?;
221		}
222
223		let row = RingBufferRowInterceptor::pre_insert(self, ringbuffer, row)?;
224
225		self.set(&key, row.clone())?;
226
227		RingBufferRowInterceptor::post_insert(self, ringbuffer, row_number, &row)?;
228
229		if let Some(pre_row) = pre.as_ref() {
230			self.track_flow_change(build_ringbuffer_update_change(ringbuffer, row_number, pre_row, &row));
231		} else {
232			self.track_flow_change(build_ringbuffer_insert_change(ringbuffer, shape, row_number, &row));
233		}
234
235		Ok(row)
236	}
237
238	fn update_ringbuffer(&mut self, ringbuffer: RingBuffer, id: RowNumber, row: EncodedRow) -> Result<EncodedRow> {
239		let key = RowKey::encoded(ringbuffer.id, id);
240
241		let pre = match self.get(&key)? {
242			Some(v) => v.row,
243			None => return Ok(row),
244		};
245
246		let row = RingBufferRowInterceptor::pre_update(self, &ringbuffer, id, row)?;
247
248		if self.get_committed(&key)?.is_some() {
249			self.mark_preexisting(&key)?;
250		}
251		self.set(&key, row.clone())?;
252
253		RingBufferRowInterceptor::post_update(self, &ringbuffer, id, &row, &pre)?;
254
255		self.track_flow_change(build_ringbuffer_update_change(&ringbuffer, id, &pre, &row));
256
257		Ok(row)
258	}
259
260	fn remove_from_ringbuffer(&mut self, ringbuffer: &RingBuffer, id: RowNumber) -> Result<EncodedRow> {
261		let key = RowKey::encoded(ringbuffer.id, id);
262
263		let displayed = match self.get(&key)? {
264			Some(v) => v.row,
265			None => return Ok(EncodedRow(CowVec::new(vec![]))),
266		};
267		let committed = self.get_committed(&key)?.map(|v| v.row);
268
269		RingBufferRowInterceptor::pre_delete(self, ringbuffer, id)?;
270
271		let pre_for_cdc = committed.clone().unwrap_or_else(|| displayed.clone());
272
273		if committed.is_some() {
274			self.mark_preexisting(&key)?;
275		}
276		self.unset(&key, pre_for_cdc.clone())?;
277
278		RingBufferRowInterceptor::post_delete(self, ringbuffer, id, &pre_for_cdc)?;
279
280		self.track_flow_change(build_ringbuffer_remove_change(ringbuffer, id, &pre_for_cdc));
281
282		Ok(displayed)
283	}
284}
285
286impl RingBufferOperations for Transaction<'_> {
287	fn insert_ringbuffer(&mut self, _ringbuffer: RingBuffer, _row: EncodedRow) -> Result<RowNumber> {
288		unimplemented!(
289			"Ring buffer insert must be called with explicit row_number through insert_ringbuffer_at"
290		)
291	}
292
293	fn insert_ringbuffer_at(
294		&mut self,
295		ringbuffer: &RingBuffer,
296		shape: &RowShape,
297		row_number: RowNumber,
298		row: EncodedRow,
299	) -> Result<EncodedRow> {
300		match self {
301			Transaction::Command(txn) => txn.insert_ringbuffer_at(ringbuffer, shape, row_number, row),
302			Transaction::Admin(txn) => txn.insert_ringbuffer_at(ringbuffer, shape, row_number, row),
303			Transaction::Test(t) => t.inner.insert_ringbuffer_at(ringbuffer, shape, row_number, row),
304			Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
305			Transaction::Replica(_) => panic!("Write operations not supported on Replica transaction"),
306		}
307	}
308
309	fn update_ringbuffer(&mut self, ringbuffer: RingBuffer, id: RowNumber, row: EncodedRow) -> Result<EncodedRow> {
310		match self {
311			Transaction::Command(txn) => txn.update_ringbuffer(ringbuffer, id, row),
312			Transaction::Admin(txn) => txn.update_ringbuffer(ringbuffer, id, row),
313			Transaction::Test(t) => t.inner.update_ringbuffer(ringbuffer, id, row),
314			Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
315			Transaction::Replica(_) => panic!("Write operations not supported on Replica transaction"),
316		}
317	}
318
319	fn remove_from_ringbuffer(&mut self, ringbuffer: &RingBuffer, id: RowNumber) -> Result<EncodedRow> {
320		match self {
321			Transaction::Command(txn) => txn.remove_from_ringbuffer(ringbuffer, id),
322			Transaction::Admin(txn) => txn.remove_from_ringbuffer(ringbuffer, id),
323			Transaction::Test(t) => t.inner.remove_from_ringbuffer(ringbuffer, id),
324			Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
325			Transaction::Replica(_) => panic!("Write operations not supported on Replica transaction"),
326		}
327	}
328}