Skip to main content

reifydb_engine/transaction/operation/
ringbuffer.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::{
5	common::CommitVersion,
6	encoded::{row::EncodedRow, shape::RowShape},
7	interface::{
8		catalog::{ringbuffer::RingBuffer, shape::ShapeId},
9		change::{Change, ChangeOrigin, Diff},
10	},
11	key::row::RowKey,
12	value::column::{Column, columns::Columns, data::ColumnData},
13};
14use reifydb_transaction::{
15	interceptor::ringbuffer_row::RingBufferRowInterceptor,
16	transaction::{Transaction, admin::AdminTransaction, command::CommandTransaction},
17};
18use reifydb_type::{
19	fragment::Fragment,
20	util::cowvec::CowVec,
21	value::{datetime::DateTime, row_number::RowNumber},
22};
23
24use crate::Result;
25
26fn build_encoded_columns(shape: &RowShape, row_number: RowNumber, encoded: &EncodedRow) -> Columns {
27	let fields = shape.fields();
28
29	let mut columns_vec: Vec<Column> = Vec::with_capacity(fields.len());
30	for field in fields.iter() {
31		columns_vec.push(Column {
32			name: Fragment::internal(&field.name),
33			data: ColumnData::with_capacity(field.constraint.get_type(), 1),
34		});
35	}
36
37	for (i, _) in fields.iter().enumerate() {
38		columns_vec[i].data.push_value(shape.get_value(encoded, i));
39	}
40
41	Columns {
42		row_numbers: CowVec::new(vec![row_number]),
43		created_at: CowVec::new(vec![DateTime::from_nanos(encoded.created_at_nanos())]),
44		updated_at: CowVec::new(vec![DateTime::from_nanos(encoded.updated_at_nanos())]),
45		columns: CowVec::new(columns_vec),
46	}
47}
48
49fn build_ringbuffer_insert_change(
50	rb: &RingBuffer,
51	shape: &RowShape,
52	row_number: RowNumber,
53	encoded: &EncodedRow,
54) -> Change {
55	Change {
56		origin: ChangeOrigin::Shape(ShapeId::ringbuffer(rb.id)),
57		version: CommitVersion(0),
58		diffs: vec![Diff::Insert {
59			post: build_encoded_columns(shape, row_number, encoded),
60		}],
61		changed_at: DateTime::default(),
62	}
63}
64
65fn build_ringbuffer_update_change(
66	rb: &RingBuffer,
67	row_number: RowNumber,
68	pre: &EncodedRow,
69	post: &EncodedRow,
70) -> Change {
71	let shape: RowShape = (&rb.columns).into();
72	Change {
73		origin: ChangeOrigin::Shape(ShapeId::ringbuffer(rb.id)),
74		version: CommitVersion(0),
75		diffs: vec![Diff::Update {
76			pre: build_encoded_columns(&shape, row_number, pre),
77			post: build_encoded_columns(&shape, row_number, post),
78		}],
79		changed_at: DateTime::default(),
80	}
81}
82
83fn build_ringbuffer_remove_change(rb: &RingBuffer, row_number: RowNumber, encoded: &EncodedRow) -> Change {
84	let shape: RowShape = (&rb.columns).into();
85	Change {
86		origin: ChangeOrigin::Shape(ShapeId::ringbuffer(rb.id)),
87		version: CommitVersion(0),
88		diffs: vec![Diff::Remove {
89			pre: build_encoded_columns(&shape, row_number, encoded),
90		}],
91		changed_at: DateTime::default(),
92	}
93}
94
95pub trait RingBufferOperations {
96	fn insert_ringbuffer(&mut self, ringbuffer: RingBuffer, row: EncodedRow) -> Result<RowNumber>;
97
98	fn insert_ringbuffer_at(
99		&mut self,
100		ringbuffer: &RingBuffer,
101		shape: &RowShape,
102		row_number: RowNumber,
103		row: EncodedRow,
104	) -> Result<EncodedRow>;
105
106	fn update_ringbuffer(&mut self, ringbuffer: RingBuffer, id: RowNumber, row: EncodedRow) -> Result<EncodedRow>;
107
108	fn remove_from_ringbuffer(&mut self, ringbuffer: &RingBuffer, id: RowNumber) -> Result<EncodedRow>;
109}
110
111impl RingBufferOperations for CommandTransaction {
112	fn insert_ringbuffer(&mut self, _ringbuffer: RingBuffer, _row: EncodedRow) -> Result<RowNumber> {
113		// For ring buffers, the row_number is determined by the caller based on ring buffer metadata
114		// This is different from tables which use RowSequence::next_row_number
115		// The caller must provide the correct row_number based on head/tail position
116		unimplemented!(
117			"Ring buffer insert must be called with explicit row_number through insert_ringbuffer_at"
118		)
119	}
120
121	fn insert_ringbuffer_at(
122		&mut self,
123		ringbuffer: &RingBuffer,
124		shape: &RowShape,
125		row_number: RowNumber,
126		row: EncodedRow,
127	) -> Result<EncodedRow> {
128		let key = RowKey::encoded(ringbuffer.id, row_number);
129
130		// Check if we're overwriting existing data (for ring buffer circular behavior)
131		let pre = self.get(&key)?.map(|v| v.row);
132
133		// If there's an existing encoded, we need to delete it first with interceptors
134		if let Some(ref existing) = pre {
135			RingBufferRowInterceptor::pre_delete(self, ringbuffer, row_number)?;
136			// Don't actually remove, we'll overwrite
137			RingBufferRowInterceptor::post_delete(self, ringbuffer, row_number, existing)?;
138		}
139
140		let row = RingBufferRowInterceptor::pre_insert(self, ringbuffer, row)?;
141
142		self.set(&key, row.clone())?;
143
144		RingBufferRowInterceptor::post_insert(self, ringbuffer, row_number, &row)?;
145
146		if let Some(pre_row) = pre.as_ref() {
147			self.track_flow_change(build_ringbuffer_update_change(ringbuffer, row_number, pre_row, &row));
148		} else {
149			self.track_flow_change(build_ringbuffer_insert_change(ringbuffer, shape, row_number, &row));
150		}
151
152		Ok(row)
153	}
154
155	fn update_ringbuffer(&mut self, ringbuffer: RingBuffer, id: RowNumber, row: EncodedRow) -> Result<EncodedRow> {
156		let key = RowKey::encoded(ringbuffer.id, id);
157
158		// Get the current encoded before updating (for post-update interceptor)
159		let pre = self.get(&key)?.map(|v| v.row);
160
161		let row = RingBufferRowInterceptor::pre_update(self, &ringbuffer, id, row)?;
162
163		self.set(&key, row.clone())?;
164
165		if let Some(ref pre) = pre {
166			RingBufferRowInterceptor::post_update(self, &ringbuffer, id, &row, pre)?;
167			self.track_flow_change(build_ringbuffer_update_change(&ringbuffer, id, pre, &row));
168		}
169
170		Ok(row)
171	}
172
173	fn remove_from_ringbuffer(&mut self, ringbuffer: &RingBuffer, id: RowNumber) -> Result<EncodedRow> {
174		let key = RowKey::encoded(ringbuffer.id, id);
175
176		// Get the encoded before removing (for post-delete interceptor)
177		let deleted_row = match self.get(&key)? {
178			Some(v) => v.row,
179			None => return Ok(EncodedRow(CowVec::new(vec![]))),
180		};
181
182		// Execute pre-delete interceptors
183		RingBufferRowInterceptor::pre_delete(self, ringbuffer, id)?;
184
185		// Remove the encoded from the database
186		self.unset(&key, deleted_row.clone())?;
187
188		RingBufferRowInterceptor::post_delete(self, ringbuffer, id, &deleted_row)?;
189
190		self.track_flow_change(build_ringbuffer_remove_change(ringbuffer, id, &deleted_row));
191
192		Ok(deleted_row)
193	}
194}
195
196impl RingBufferOperations for AdminTransaction {
197	fn insert_ringbuffer(&mut self, _ringbuffer: RingBuffer, _row: EncodedRow) -> Result<RowNumber> {
198		unimplemented!(
199			"Ring buffer insert must be called with explicit row_number through insert_ringbuffer_at"
200		)
201	}
202
203	fn insert_ringbuffer_at(
204		&mut self,
205		ringbuffer: &RingBuffer,
206		shape: &RowShape,
207		row_number: RowNumber,
208		row: EncodedRow,
209	) -> Result<EncodedRow> {
210		let key = RowKey::encoded(ringbuffer.id, row_number);
211
212		let pre = self.get(&key)?.map(|v| v.row);
213
214		if let Some(ref existing) = pre {
215			RingBufferRowInterceptor::pre_delete(self, ringbuffer, row_number)?;
216			RingBufferRowInterceptor::post_delete(self, ringbuffer, row_number, existing)?;
217		}
218
219		let row = RingBufferRowInterceptor::pre_insert(self, ringbuffer, row)?;
220
221		self.set(&key, row.clone())?;
222
223		RingBufferRowInterceptor::post_insert(self, ringbuffer, row_number, &row)?;
224
225		if let Some(pre_row) = pre.as_ref() {
226			self.track_flow_change(build_ringbuffer_update_change(ringbuffer, row_number, pre_row, &row));
227		} else {
228			self.track_flow_change(build_ringbuffer_insert_change(ringbuffer, shape, row_number, &row));
229		}
230
231		Ok(row)
232	}
233
234	fn update_ringbuffer(&mut self, ringbuffer: RingBuffer, id: RowNumber, row: EncodedRow) -> Result<EncodedRow> {
235		let key = RowKey::encoded(ringbuffer.id, id);
236
237		let pre = self.get(&key)?.map(|v| v.row);
238
239		let row = RingBufferRowInterceptor::pre_update(self, &ringbuffer, id, row)?;
240
241		self.set(&key, row.clone())?;
242
243		if let Some(ref pre) = pre {
244			RingBufferRowInterceptor::post_update(self, &ringbuffer, id, &row, pre)?;
245			self.track_flow_change(build_ringbuffer_update_change(&ringbuffer, id, pre, &row));
246		}
247
248		Ok(row)
249	}
250
251	fn remove_from_ringbuffer(&mut self, ringbuffer: &RingBuffer, id: RowNumber) -> Result<EncodedRow> {
252		let key = RowKey::encoded(ringbuffer.id, id);
253
254		let deleted_row = match self.get(&key)? {
255			Some(v) => v.row,
256			None => return Ok(EncodedRow(CowVec::new(vec![]))),
257		};
258
259		RingBufferRowInterceptor::pre_delete(self, ringbuffer, id)?;
260
261		self.unset(&key, deleted_row.clone())?;
262
263		RingBufferRowInterceptor::post_delete(self, ringbuffer, id, &deleted_row)?;
264
265		self.track_flow_change(build_ringbuffer_remove_change(ringbuffer, id, &deleted_row));
266
267		Ok(deleted_row)
268	}
269}
270
271impl RingBufferOperations for Transaction<'_> {
272	fn insert_ringbuffer(&mut self, _ringbuffer: RingBuffer, _row: EncodedRow) -> Result<RowNumber> {
273		unimplemented!(
274			"Ring buffer insert must be called with explicit row_number through insert_ringbuffer_at"
275		)
276	}
277
278	fn insert_ringbuffer_at(
279		&mut self,
280		ringbuffer: &RingBuffer,
281		shape: &RowShape,
282		row_number: RowNumber,
283		row: EncodedRow,
284	) -> Result<EncodedRow> {
285		match self {
286			Transaction::Command(txn) => txn.insert_ringbuffer_at(ringbuffer, shape, row_number, row),
287			Transaction::Admin(txn) => txn.insert_ringbuffer_at(ringbuffer, shape, row_number, row),
288			Transaction::Test(t) => t.inner.insert_ringbuffer_at(ringbuffer, shape, row_number, row),
289			Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
290			Transaction::Replica(_) => panic!("Write operations not supported on Replica transaction"),
291		}
292	}
293
294	fn update_ringbuffer(&mut self, ringbuffer: RingBuffer, id: RowNumber, row: EncodedRow) -> Result<EncodedRow> {
295		match self {
296			Transaction::Command(txn) => txn.update_ringbuffer(ringbuffer, id, row),
297			Transaction::Admin(txn) => txn.update_ringbuffer(ringbuffer, id, row),
298			Transaction::Test(t) => t.inner.update_ringbuffer(ringbuffer, id, row),
299			Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
300			Transaction::Replica(_) => panic!("Write operations not supported on Replica transaction"),
301		}
302	}
303
304	fn remove_from_ringbuffer(&mut self, ringbuffer: &RingBuffer, id: RowNumber) -> Result<EncodedRow> {
305		match self {
306			Transaction::Command(txn) => txn.remove_from_ringbuffer(ringbuffer, id),
307			Transaction::Admin(txn) => txn.remove_from_ringbuffer(ringbuffer, id),
308			Transaction::Test(t) => t.inner.remove_from_ringbuffer(ringbuffer, id),
309			Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
310			Transaction::Replica(_) => panic!("Write operations not supported on Replica transaction"),
311		}
312	}
313}