1use reifydb_core::{
5 common::CommitVersion,
6 encoded::{row::EncodedRow, shape::RowShape},
7 interface::{
8 catalog::{ringbuffer::RingBuffer, shape::ShapeId},
9 change::{Change, ChangeOrigin, Diff},
10 },
11 key::row::RowKey,
12 value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns},
13};
14use reifydb_transaction::{
15 interceptor::ringbuffer_row::RingBufferRowInterceptor,
16 transaction::{Transaction, admin::AdminTransaction, command::CommandTransaction},
17};
18use reifydb_type::{
19 fragment::Fragment,
20 util::cowvec::CowVec,
21 value::{datetime::DateTime, row_number::RowNumber},
22};
23use smallvec::smallvec;
24
25use crate::Result;
26
27fn build_encoded_columns(shape: &RowShape, row_number: RowNumber, encoded: &EncodedRow) -> Columns {
28 let fields = shape.fields();
29
30 let mut columns_vec: Vec<ColumnWithName> = Vec::with_capacity(fields.len());
31 for field in fields.iter() {
32 columns_vec.push(ColumnWithName {
33 name: Fragment::internal(&field.name),
34 data: ColumnBuffer::with_capacity(field.constraint.get_type(), 1),
35 });
36 }
37
38 for (i, _) in fields.iter().enumerate() {
39 columns_vec[i].data.push_value(shape.get_value(encoded, i));
40 }
41
42 Columns::with_system_columns(
43 columns_vec,
44 vec![row_number],
45 vec![DateTime::from_nanos(encoded.created_at_nanos())],
46 vec![DateTime::from_nanos(encoded.updated_at_nanos())],
47 )
48}
49
50fn build_ringbuffer_insert_change(
51 rb: &RingBuffer,
52 shape: &RowShape,
53 row_number: RowNumber,
54 encoded: &EncodedRow,
55) -> Change {
56 Change {
57 origin: ChangeOrigin::Shape(ShapeId::ringbuffer(rb.id)),
58 version: CommitVersion(0),
59 diffs: smallvec![Diff::insert(build_encoded_columns(shape, row_number, encoded))],
60 changed_at: DateTime::default(),
61 }
62}
63
64fn build_ringbuffer_update_change(
65 rb: &RingBuffer,
66 row_number: RowNumber,
67 pre: &EncodedRow,
68 post: &EncodedRow,
69) -> Change {
70 let shape: RowShape = (&rb.columns).into();
71 Change {
72 origin: ChangeOrigin::Shape(ShapeId::ringbuffer(rb.id)),
73 version: CommitVersion(0),
74 diffs: smallvec![Diff::update(
75 build_encoded_columns(&shape, row_number, pre),
76 build_encoded_columns(&shape, row_number, post),
77 )],
78 changed_at: DateTime::default(),
79 }
80}
81
82fn build_ringbuffer_remove_change(rb: &RingBuffer, row_number: RowNumber, encoded: &EncodedRow) -> Change {
83 let shape: RowShape = (&rb.columns).into();
84 Change {
85 origin: ChangeOrigin::Shape(ShapeId::ringbuffer(rb.id)),
86 version: CommitVersion(0),
87 diffs: smallvec![Diff::remove(build_encoded_columns(&shape, row_number, encoded))],
88 changed_at: DateTime::default(),
89 }
90}
91
92pub trait RingBufferOperations {
93 fn insert_ringbuffer(&mut self, ringbuffer: RingBuffer, row: EncodedRow) -> Result<RowNumber>;
94
95 fn insert_ringbuffer_at(
96 &mut self,
97 ringbuffer: &RingBuffer,
98 shape: &RowShape,
99 row_number: RowNumber,
100 row: EncodedRow,
101 ) -> Result<EncodedRow>;
102
103 fn update_ringbuffer(&mut self, ringbuffer: RingBuffer, id: RowNumber, row: EncodedRow) -> Result<EncodedRow>;
104
105 fn remove_from_ringbuffer(&mut self, ringbuffer: &RingBuffer, id: RowNumber) -> Result<EncodedRow>;
106}
107
108impl RingBufferOperations for CommandTransaction {
109 fn insert_ringbuffer(&mut self, _ringbuffer: RingBuffer, _row: EncodedRow) -> Result<RowNumber> {
110 unimplemented!(
114 "Ring buffer insert must be called with explicit row_number through insert_ringbuffer_at"
115 )
116 }
117
118 fn insert_ringbuffer_at(
119 &mut self,
120 ringbuffer: &RingBuffer,
121 shape: &RowShape,
122 row_number: RowNumber,
123 row: EncodedRow,
124 ) -> Result<EncodedRow> {
125 let key = RowKey::encoded(ringbuffer.id, row_number);
126
127 let pre = self.get(&key)?.map(|v| v.row);
129
130 if let Some(ref existing) = pre {
132 RingBufferRowInterceptor::pre_delete(self, ringbuffer, row_number)?;
133 RingBufferRowInterceptor::post_delete(self, ringbuffer, row_number, existing)?;
135 }
136
137 let row = RingBufferRowInterceptor::pre_insert(self, ringbuffer, row)?;
138
139 self.set(&key, row.clone())?;
140
141 RingBufferRowInterceptor::post_insert(self, ringbuffer, row_number, &row)?;
142
143 if let Some(pre_row) = pre.as_ref() {
144 self.track_flow_change(build_ringbuffer_update_change(ringbuffer, row_number, pre_row, &row));
145 } else {
146 self.track_flow_change(build_ringbuffer_insert_change(ringbuffer, shape, row_number, &row));
147 }
148
149 Ok(row)
150 }
151
152 fn update_ringbuffer(&mut self, ringbuffer: RingBuffer, id: RowNumber, row: EncodedRow) -> Result<EncodedRow> {
153 let key = RowKey::encoded(ringbuffer.id, id);
154
155 let pre = match self.get(&key)? {
156 Some(v) => v.row,
157 None => return Ok(row),
158 };
159
160 let row = RingBufferRowInterceptor::pre_update(self, &ringbuffer, id, row)?;
161
162 if self.get_committed(&key)?.is_some() {
163 self.mark_preexisting(&key)?;
164 }
165 self.set(&key, row.clone())?;
166
167 RingBufferRowInterceptor::post_update(self, &ringbuffer, id, &row, &pre)?;
168
169 self.track_flow_change(build_ringbuffer_update_change(&ringbuffer, id, &pre, &row));
170
171 Ok(row)
172 }
173
174 fn remove_from_ringbuffer(&mut self, ringbuffer: &RingBuffer, id: RowNumber) -> Result<EncodedRow> {
175 let key = RowKey::encoded(ringbuffer.id, id);
176
177 let displayed = match self.get(&key)? {
178 Some(v) => v.row,
179 None => return Ok(EncodedRow(CowVec::new(vec![]))),
180 };
181 let committed = self.get_committed(&key)?.map(|v| v.row);
182
183 RingBufferRowInterceptor::pre_delete(self, ringbuffer, id)?;
184
185 let pre_for_cdc = committed.clone().unwrap_or_else(|| displayed.clone());
186
187 if committed.is_some() {
188 self.mark_preexisting(&key)?;
189 }
190 self.unset(&key, pre_for_cdc.clone())?;
191
192 RingBufferRowInterceptor::post_delete(self, ringbuffer, id, &pre_for_cdc)?;
193
194 self.track_flow_change(build_ringbuffer_remove_change(ringbuffer, id, &pre_for_cdc));
195
196 Ok(displayed)
197 }
198}
199
200impl RingBufferOperations for AdminTransaction {
201 fn insert_ringbuffer(&mut self, _ringbuffer: RingBuffer, _row: EncodedRow) -> Result<RowNumber> {
202 unimplemented!(
203 "Ring buffer insert must be called with explicit row_number through insert_ringbuffer_at"
204 )
205 }
206
207 fn insert_ringbuffer_at(
208 &mut self,
209 ringbuffer: &RingBuffer,
210 shape: &RowShape,
211 row_number: RowNumber,
212 row: EncodedRow,
213 ) -> Result<EncodedRow> {
214 let key = RowKey::encoded(ringbuffer.id, row_number);
215
216 let pre = self.get(&key)?.map(|v| v.row);
217
218 if let Some(ref existing) = pre {
219 RingBufferRowInterceptor::pre_delete(self, ringbuffer, row_number)?;
220 RingBufferRowInterceptor::post_delete(self, ringbuffer, row_number, existing)?;
221 }
222
223 let row = RingBufferRowInterceptor::pre_insert(self, ringbuffer, row)?;
224
225 self.set(&key, row.clone())?;
226
227 RingBufferRowInterceptor::post_insert(self, ringbuffer, row_number, &row)?;
228
229 if let Some(pre_row) = pre.as_ref() {
230 self.track_flow_change(build_ringbuffer_update_change(ringbuffer, row_number, pre_row, &row));
231 } else {
232 self.track_flow_change(build_ringbuffer_insert_change(ringbuffer, shape, row_number, &row));
233 }
234
235 Ok(row)
236 }
237
238 fn update_ringbuffer(&mut self, ringbuffer: RingBuffer, id: RowNumber, row: EncodedRow) -> Result<EncodedRow> {
239 let key = RowKey::encoded(ringbuffer.id, id);
240
241 let pre = match self.get(&key)? {
242 Some(v) => v.row,
243 None => return Ok(row),
244 };
245
246 let row = RingBufferRowInterceptor::pre_update(self, &ringbuffer, id, row)?;
247
248 if self.get_committed(&key)?.is_some() {
249 self.mark_preexisting(&key)?;
250 }
251 self.set(&key, row.clone())?;
252
253 RingBufferRowInterceptor::post_update(self, &ringbuffer, id, &row, &pre)?;
254
255 self.track_flow_change(build_ringbuffer_update_change(&ringbuffer, id, &pre, &row));
256
257 Ok(row)
258 }
259
260 fn remove_from_ringbuffer(&mut self, ringbuffer: &RingBuffer, id: RowNumber) -> Result<EncodedRow> {
261 let key = RowKey::encoded(ringbuffer.id, id);
262
263 let displayed = match self.get(&key)? {
264 Some(v) => v.row,
265 None => return Ok(EncodedRow(CowVec::new(vec![]))),
266 };
267 let committed = self.get_committed(&key)?.map(|v| v.row);
268
269 RingBufferRowInterceptor::pre_delete(self, ringbuffer, id)?;
270
271 let pre_for_cdc = committed.clone().unwrap_or_else(|| displayed.clone());
272
273 if committed.is_some() {
274 self.mark_preexisting(&key)?;
275 }
276 self.unset(&key, pre_for_cdc.clone())?;
277
278 RingBufferRowInterceptor::post_delete(self, ringbuffer, id, &pre_for_cdc)?;
279
280 self.track_flow_change(build_ringbuffer_remove_change(ringbuffer, id, &pre_for_cdc));
281
282 Ok(displayed)
283 }
284}
285
286impl RingBufferOperations for Transaction<'_> {
287 fn insert_ringbuffer(&mut self, _ringbuffer: RingBuffer, _row: EncodedRow) -> Result<RowNumber> {
288 unimplemented!(
289 "Ring buffer insert must be called with explicit row_number through insert_ringbuffer_at"
290 )
291 }
292
293 fn insert_ringbuffer_at(
294 &mut self,
295 ringbuffer: &RingBuffer,
296 shape: &RowShape,
297 row_number: RowNumber,
298 row: EncodedRow,
299 ) -> Result<EncodedRow> {
300 match self {
301 Transaction::Command(txn) => txn.insert_ringbuffer_at(ringbuffer, shape, row_number, row),
302 Transaction::Admin(txn) => txn.insert_ringbuffer_at(ringbuffer, shape, row_number, row),
303 Transaction::Test(t) => t.inner.insert_ringbuffer_at(ringbuffer, shape, row_number, row),
304 Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
305 Transaction::Replica(_) => panic!("Write operations not supported on Replica transaction"),
306 }
307 }
308
309 fn update_ringbuffer(&mut self, ringbuffer: RingBuffer, id: RowNumber, row: EncodedRow) -> Result<EncodedRow> {
310 match self {
311 Transaction::Command(txn) => txn.update_ringbuffer(ringbuffer, id, row),
312 Transaction::Admin(txn) => txn.update_ringbuffer(ringbuffer, id, row),
313 Transaction::Test(t) => t.inner.update_ringbuffer(ringbuffer, id, row),
314 Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
315 Transaction::Replica(_) => panic!("Write operations not supported on Replica transaction"),
316 }
317 }
318
319 fn remove_from_ringbuffer(&mut self, ringbuffer: &RingBuffer, id: RowNumber) -> Result<EncodedRow> {
320 match self {
321 Transaction::Command(txn) => txn.remove_from_ringbuffer(ringbuffer, id),
322 Transaction::Admin(txn) => txn.remove_from_ringbuffer(ringbuffer, id),
323 Transaction::Test(t) => t.inner.remove_from_ringbuffer(ringbuffer, id),
324 Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
325 Transaction::Replica(_) => panic!("Write operations not supported on Replica transaction"),
326 }
327 }
328}