1use reifydb_core::{
5 common::CommitVersion,
6 encoded::{row::EncodedRow, shape::RowShape},
7 interface::{
8 catalog::{ringbuffer::RingBuffer, shape::ShapeId},
9 change::{Change, ChangeOrigin, Diff},
10 },
11 key::row::RowKey,
12 value::column::{Column, columns::Columns, data::ColumnData},
13};
14use reifydb_transaction::{
15 interceptor::ringbuffer_row::RingBufferRowInterceptor,
16 transaction::{Transaction, admin::AdminTransaction, command::CommandTransaction},
17};
18use reifydb_type::{
19 fragment::Fragment,
20 util::cowvec::CowVec,
21 value::{datetime::DateTime, row_number::RowNumber},
22};
23
24use crate::Result;
25
26fn build_encoded_columns(shape: &RowShape, row_number: RowNumber, encoded: &EncodedRow) -> Columns {
27 let fields = shape.fields();
28
29 let mut columns_vec: Vec<Column> = Vec::with_capacity(fields.len());
30 for field in fields.iter() {
31 columns_vec.push(Column {
32 name: Fragment::internal(&field.name),
33 data: ColumnData::with_capacity(field.constraint.get_type(), 1),
34 });
35 }
36
37 for (i, _) in fields.iter().enumerate() {
38 columns_vec[i].data.push_value(shape.get_value(encoded, i));
39 }
40
41 Columns {
42 row_numbers: CowVec::new(vec![row_number]),
43 created_at: CowVec::new(vec![DateTime::from_nanos(encoded.created_at_nanos())]),
44 updated_at: CowVec::new(vec![DateTime::from_nanos(encoded.updated_at_nanos())]),
45 columns: CowVec::new(columns_vec),
46 }
47}
48
49fn build_ringbuffer_insert_change(
50 rb: &RingBuffer,
51 shape: &RowShape,
52 row_number: RowNumber,
53 encoded: &EncodedRow,
54) -> Change {
55 Change {
56 origin: ChangeOrigin::Shape(ShapeId::ringbuffer(rb.id)),
57 version: CommitVersion(0),
58 diffs: vec![Diff::Insert {
59 post: build_encoded_columns(shape, row_number, encoded),
60 }],
61 changed_at: DateTime::default(),
62 }
63}
64
65fn build_ringbuffer_update_change(
66 rb: &RingBuffer,
67 row_number: RowNumber,
68 pre: &EncodedRow,
69 post: &EncodedRow,
70) -> Change {
71 let shape: RowShape = (&rb.columns).into();
72 Change {
73 origin: ChangeOrigin::Shape(ShapeId::ringbuffer(rb.id)),
74 version: CommitVersion(0),
75 diffs: vec![Diff::Update {
76 pre: build_encoded_columns(&shape, row_number, pre),
77 post: build_encoded_columns(&shape, row_number, post),
78 }],
79 changed_at: DateTime::default(),
80 }
81}
82
83fn build_ringbuffer_remove_change(rb: &RingBuffer, row_number: RowNumber, encoded: &EncodedRow) -> Change {
84 let shape: RowShape = (&rb.columns).into();
85 Change {
86 origin: ChangeOrigin::Shape(ShapeId::ringbuffer(rb.id)),
87 version: CommitVersion(0),
88 diffs: vec![Diff::Remove {
89 pre: build_encoded_columns(&shape, row_number, encoded),
90 }],
91 changed_at: DateTime::default(),
92 }
93}
94
95pub trait RingBufferOperations {
96 fn insert_ringbuffer(&mut self, ringbuffer: RingBuffer, row: EncodedRow) -> Result<RowNumber>;
97
98 fn insert_ringbuffer_at(
99 &mut self,
100 ringbuffer: &RingBuffer,
101 shape: &RowShape,
102 row_number: RowNumber,
103 row: EncodedRow,
104 ) -> Result<EncodedRow>;
105
106 fn update_ringbuffer(&mut self, ringbuffer: RingBuffer, id: RowNumber, row: EncodedRow) -> Result<EncodedRow>;
107
108 fn remove_from_ringbuffer(&mut self, ringbuffer: &RingBuffer, id: RowNumber) -> Result<EncodedRow>;
109}
110
111impl RingBufferOperations for CommandTransaction {
112 fn insert_ringbuffer(&mut self, _ringbuffer: RingBuffer, _row: EncodedRow) -> Result<RowNumber> {
113 unimplemented!(
117 "Ring buffer insert must be called with explicit row_number through insert_ringbuffer_at"
118 )
119 }
120
121 fn insert_ringbuffer_at(
122 &mut self,
123 ringbuffer: &RingBuffer,
124 shape: &RowShape,
125 row_number: RowNumber,
126 row: EncodedRow,
127 ) -> Result<EncodedRow> {
128 let key = RowKey::encoded(ringbuffer.id, row_number);
129
130 let pre = self.get(&key)?.map(|v| v.row);
132
133 if let Some(ref existing) = pre {
135 RingBufferRowInterceptor::pre_delete(self, ringbuffer, row_number)?;
136 RingBufferRowInterceptor::post_delete(self, ringbuffer, row_number, existing)?;
138 }
139
140 let row = RingBufferRowInterceptor::pre_insert(self, ringbuffer, row)?;
141
142 self.set(&key, row.clone())?;
143
144 RingBufferRowInterceptor::post_insert(self, ringbuffer, row_number, &row)?;
145
146 if let Some(pre_row) = pre.as_ref() {
147 self.track_flow_change(build_ringbuffer_update_change(ringbuffer, row_number, pre_row, &row));
148 } else {
149 self.track_flow_change(build_ringbuffer_insert_change(ringbuffer, shape, row_number, &row));
150 }
151
152 Ok(row)
153 }
154
155 fn update_ringbuffer(&mut self, ringbuffer: RingBuffer, id: RowNumber, row: EncodedRow) -> Result<EncodedRow> {
156 let key = RowKey::encoded(ringbuffer.id, id);
157
158 let pre = self.get(&key)?.map(|v| v.row);
160
161 let row = RingBufferRowInterceptor::pre_update(self, &ringbuffer, id, row)?;
162
163 self.set(&key, row.clone())?;
164
165 if let Some(ref pre) = pre {
166 RingBufferRowInterceptor::post_update(self, &ringbuffer, id, &row, pre)?;
167 self.track_flow_change(build_ringbuffer_update_change(&ringbuffer, id, pre, &row));
168 }
169
170 Ok(row)
171 }
172
173 fn remove_from_ringbuffer(&mut self, ringbuffer: &RingBuffer, id: RowNumber) -> Result<EncodedRow> {
174 let key = RowKey::encoded(ringbuffer.id, id);
175
176 let deleted_row = match self.get(&key)? {
178 Some(v) => v.row,
179 None => return Ok(EncodedRow(CowVec::new(vec![]))),
180 };
181
182 RingBufferRowInterceptor::pre_delete(self, ringbuffer, id)?;
184
185 self.unset(&key, deleted_row.clone())?;
187
188 RingBufferRowInterceptor::post_delete(self, ringbuffer, id, &deleted_row)?;
189
190 self.track_flow_change(build_ringbuffer_remove_change(ringbuffer, id, &deleted_row));
191
192 Ok(deleted_row)
193 }
194}
195
196impl RingBufferOperations for AdminTransaction {
197 fn insert_ringbuffer(&mut self, _ringbuffer: RingBuffer, _row: EncodedRow) -> Result<RowNumber> {
198 unimplemented!(
199 "Ring buffer insert must be called with explicit row_number through insert_ringbuffer_at"
200 )
201 }
202
203 fn insert_ringbuffer_at(
204 &mut self,
205 ringbuffer: &RingBuffer,
206 shape: &RowShape,
207 row_number: RowNumber,
208 row: EncodedRow,
209 ) -> Result<EncodedRow> {
210 let key = RowKey::encoded(ringbuffer.id, row_number);
211
212 let pre = self.get(&key)?.map(|v| v.row);
213
214 if let Some(ref existing) = pre {
215 RingBufferRowInterceptor::pre_delete(self, ringbuffer, row_number)?;
216 RingBufferRowInterceptor::post_delete(self, ringbuffer, row_number, existing)?;
217 }
218
219 let row = RingBufferRowInterceptor::pre_insert(self, ringbuffer, row)?;
220
221 self.set(&key, row.clone())?;
222
223 RingBufferRowInterceptor::post_insert(self, ringbuffer, row_number, &row)?;
224
225 if let Some(pre_row) = pre.as_ref() {
226 self.track_flow_change(build_ringbuffer_update_change(ringbuffer, row_number, pre_row, &row));
227 } else {
228 self.track_flow_change(build_ringbuffer_insert_change(ringbuffer, shape, row_number, &row));
229 }
230
231 Ok(row)
232 }
233
234 fn update_ringbuffer(&mut self, ringbuffer: RingBuffer, id: RowNumber, row: EncodedRow) -> Result<EncodedRow> {
235 let key = RowKey::encoded(ringbuffer.id, id);
236
237 let pre = self.get(&key)?.map(|v| v.row);
238
239 let row = RingBufferRowInterceptor::pre_update(self, &ringbuffer, id, row)?;
240
241 self.set(&key, row.clone())?;
242
243 if let Some(ref pre) = pre {
244 RingBufferRowInterceptor::post_update(self, &ringbuffer, id, &row, pre)?;
245 self.track_flow_change(build_ringbuffer_update_change(&ringbuffer, id, pre, &row));
246 }
247
248 Ok(row)
249 }
250
251 fn remove_from_ringbuffer(&mut self, ringbuffer: &RingBuffer, id: RowNumber) -> Result<EncodedRow> {
252 let key = RowKey::encoded(ringbuffer.id, id);
253
254 let deleted_row = match self.get(&key)? {
255 Some(v) => v.row,
256 None => return Ok(EncodedRow(CowVec::new(vec![]))),
257 };
258
259 RingBufferRowInterceptor::pre_delete(self, ringbuffer, id)?;
260
261 self.unset(&key, deleted_row.clone())?;
262
263 RingBufferRowInterceptor::post_delete(self, ringbuffer, id, &deleted_row)?;
264
265 self.track_flow_change(build_ringbuffer_remove_change(ringbuffer, id, &deleted_row));
266
267 Ok(deleted_row)
268 }
269}
270
271impl RingBufferOperations for Transaction<'_> {
272 fn insert_ringbuffer(&mut self, _ringbuffer: RingBuffer, _row: EncodedRow) -> Result<RowNumber> {
273 unimplemented!(
274 "Ring buffer insert must be called with explicit row_number through insert_ringbuffer_at"
275 )
276 }
277
278 fn insert_ringbuffer_at(
279 &mut self,
280 ringbuffer: &RingBuffer,
281 shape: &RowShape,
282 row_number: RowNumber,
283 row: EncodedRow,
284 ) -> Result<EncodedRow> {
285 match self {
286 Transaction::Command(txn) => txn.insert_ringbuffer_at(ringbuffer, shape, row_number, row),
287 Transaction::Admin(txn) => txn.insert_ringbuffer_at(ringbuffer, shape, row_number, row),
288 Transaction::Test(t) => t.inner.insert_ringbuffer_at(ringbuffer, shape, row_number, row),
289 Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
290 Transaction::Replica(_) => panic!("Write operations not supported on Replica transaction"),
291 }
292 }
293
294 fn update_ringbuffer(&mut self, ringbuffer: RingBuffer, id: RowNumber, row: EncodedRow) -> Result<EncodedRow> {
295 match self {
296 Transaction::Command(txn) => txn.update_ringbuffer(ringbuffer, id, row),
297 Transaction::Admin(txn) => txn.update_ringbuffer(ringbuffer, id, row),
298 Transaction::Test(t) => t.inner.update_ringbuffer(ringbuffer, id, row),
299 Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
300 Transaction::Replica(_) => panic!("Write operations not supported on Replica transaction"),
301 }
302 }
303
304 fn remove_from_ringbuffer(&mut self, ringbuffer: &RingBuffer, id: RowNumber) -> Result<EncodedRow> {
305 match self {
306 Transaction::Command(txn) => txn.remove_from_ringbuffer(ringbuffer, id),
307 Transaction::Admin(txn) => txn.remove_from_ringbuffer(ringbuffer, id),
308 Transaction::Test(t) => t.inner.remove_from_ringbuffer(ringbuffer, id),
309 Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
310 Transaction::Replica(_) => panic!("Write operations not supported on Replica transaction"),
311 }
312 }
313}