1use reifydb_core::{
5 common::CommitVersion,
6 encoded::{row::EncodedRow, shape::RowShape},
7 interface::{
8 catalog::{ringbuffer::RingBuffer, shape::ShapeId},
9 change::{Change, ChangeOrigin, Diff},
10 },
11 key::row::RowKey,
12 value::column::columns::Columns,
13};
14use reifydb_transaction::{
15 interceptor::ringbuffer_row::RingBufferRowInterceptor,
16 transaction::{Transaction, admin::AdminTransaction, command::CommandTransaction},
17};
18use reifydb_type::{
19 util::cowvec::CowVec,
20 value::{datetime::DateTime, row_number::RowNumber},
21};
22use smallvec::smallvec;
23
24use crate::Result;
25
26fn build_ringbuffer_insert_change(
27 rb: &RingBuffer,
28 shape: &RowShape,
29 row_number: RowNumber,
30 encoded: &EncodedRow,
31) -> Change {
32 let ids = [row_number];
33 let rows = [encoded.clone()];
34 Change {
35 origin: ChangeOrigin::Shape(ShapeId::ringbuffer(rb.id)),
36 version: CommitVersion(0),
37 diffs: smallvec![Diff::insert(Columns::from_encoded_rows(shape, &ids, &rows))],
38 changed_at: DateTime::default(),
39 }
40}
41
42fn build_ringbuffer_update_change(
43 rb: &RingBuffer,
44 row_number: RowNumber,
45 pre: &EncodedRow,
46 post: &EncodedRow,
47) -> Change {
48 let shape: RowShape = (&rb.columns).into();
49 let ids = [row_number];
50 let pres = [pre.clone()];
51 let posts = [post.clone()];
52 Change {
53 origin: ChangeOrigin::Shape(ShapeId::ringbuffer(rb.id)),
54 version: CommitVersion(0),
55 diffs: smallvec![Diff::update(
56 Columns::from_encoded_rows(&shape, &ids, &pres),
57 Columns::from_encoded_rows(&shape, &ids, &posts),
58 )],
59 changed_at: DateTime::default(),
60 }
61}
62
63fn build_ringbuffer_remove_change(rb: &RingBuffer, row_number: RowNumber, encoded: &EncodedRow) -> Change {
64 let shape: RowShape = (&rb.columns).into();
65 let ids = [row_number];
66 let rows = [encoded.clone()];
67 Change {
68 origin: ChangeOrigin::Shape(ShapeId::ringbuffer(rb.id)),
69 version: CommitVersion(0),
70 diffs: smallvec![Diff::remove(Columns::from_encoded_rows(&shape, &ids, &rows))],
71 changed_at: DateTime::default(),
72 }
73}
74
75pub trait RingBufferOperations {
76 fn insert_ringbuffer(&mut self, ringbuffer: RingBuffer, row: EncodedRow) -> Result<RowNumber>;
77
78 fn insert_ringbuffer_at(
79 &mut self,
80 ringbuffer: &RingBuffer,
81 shape: &RowShape,
82 row_number: RowNumber,
83 row: EncodedRow,
84 ) -> Result<EncodedRow>;
85
86 fn update_ringbuffer(&mut self, ringbuffer: RingBuffer, id: RowNumber, row: EncodedRow) -> Result<EncodedRow>;
87
88 fn remove_from_ringbuffer(&mut self, ringbuffer: &RingBuffer, id: RowNumber) -> Result<EncodedRow>;
89}
90
91impl RingBufferOperations for CommandTransaction {
92 fn insert_ringbuffer(&mut self, _ringbuffer: RingBuffer, _row: EncodedRow) -> Result<RowNumber> {
93 unimplemented!(
94 "Ring buffer insert must be called with explicit row_number through insert_ringbuffer_at"
95 )
96 }
97
98 fn insert_ringbuffer_at(
99 &mut self,
100 ringbuffer: &RingBuffer,
101 shape: &RowShape,
102 row_number: RowNumber,
103 row: EncodedRow,
104 ) -> Result<EncodedRow> {
105 let key = RowKey::encoded(ringbuffer.id, row_number);
106
107 let pre = self.get(&key)?.map(|v| v.row);
108
109 if let Some(ref existing) = pre {
110 let ids = [row_number];
111 let existing_rows = [existing.clone()];
112 RingBufferRowInterceptor::pre_delete(self, ringbuffer, &ids)?;
113 RingBufferRowInterceptor::post_delete(self, ringbuffer, &ids, &existing_rows)?;
114 }
115
116 let mut rows_buf = [row];
117 RingBufferRowInterceptor::pre_insert(self, ringbuffer, &mut rows_buf)?;
118 let [row] = rows_buf;
119
120 self.set(&key, row.clone())?;
121
122 let ids = [row_number];
123 let rows = [row.clone()];
124 RingBufferRowInterceptor::post_insert(self, ringbuffer, &ids, &rows)?;
125
126 if let Some(pre_row) = pre.as_ref() {
127 self.track_flow_change(build_ringbuffer_update_change(ringbuffer, row_number, pre_row, &row));
128 } else {
129 self.track_flow_change(build_ringbuffer_insert_change(ringbuffer, shape, row_number, &row));
130 }
131
132 Ok(row)
133 }
134
135 fn update_ringbuffer(&mut self, ringbuffer: RingBuffer, id: RowNumber, row: EncodedRow) -> Result<EncodedRow> {
136 let key = RowKey::encoded(ringbuffer.id, id);
137
138 let pre = match self.get(&key)? {
139 Some(v) => v.row,
140 None => return Ok(row),
141 };
142
143 let mut rows_buf = [row];
144 let ids = [id];
145 RingBufferRowInterceptor::pre_update(self, &ringbuffer, &ids, &mut rows_buf)?;
146 let [row] = rows_buf;
147
148 if self.get_committed(&key)?.is_some() {
149 self.mark_preexisting(&key)?;
150 }
151 self.set(&key, row.clone())?;
152
153 let posts = [row.clone()];
154 let pres = [pre.clone()];
155 RingBufferRowInterceptor::post_update(self, &ringbuffer, &ids, &posts, &pres)?;
156
157 self.track_flow_change(build_ringbuffer_update_change(&ringbuffer, id, &pre, &row));
158
159 Ok(row)
160 }
161
162 fn remove_from_ringbuffer(&mut self, ringbuffer: &RingBuffer, id: RowNumber) -> Result<EncodedRow> {
163 let key = RowKey::encoded(ringbuffer.id, id);
164
165 let displayed = match self.get(&key)? {
166 Some(v) => v.row,
167 None => return Ok(EncodedRow(CowVec::new(vec![]))),
168 };
169 let committed = self.get_committed(&key)?.map(|v| v.row);
170
171 let ids = [id];
172 RingBufferRowInterceptor::pre_delete(self, ringbuffer, &ids)?;
173
174 let pre_for_cdc = committed.clone().unwrap_or_else(|| displayed.clone());
175
176 if committed.is_some() {
177 self.mark_preexisting(&key)?;
178 }
179 self.unset(&key, pre_for_cdc.clone())?;
180
181 let pre_rows = [pre_for_cdc.clone()];
182 RingBufferRowInterceptor::post_delete(self, ringbuffer, &ids, &pre_rows)?;
183
184 self.track_flow_change(build_ringbuffer_remove_change(ringbuffer, id, &pre_for_cdc));
185
186 Ok(displayed)
187 }
188}
189
190impl RingBufferOperations for AdminTransaction {
191 fn insert_ringbuffer(&mut self, _ringbuffer: RingBuffer, _row: EncodedRow) -> Result<RowNumber> {
192 unimplemented!(
193 "Ring buffer insert must be called with explicit row_number through insert_ringbuffer_at"
194 )
195 }
196
197 fn insert_ringbuffer_at(
198 &mut self,
199 ringbuffer: &RingBuffer,
200 shape: &RowShape,
201 row_number: RowNumber,
202 row: EncodedRow,
203 ) -> Result<EncodedRow> {
204 let key = RowKey::encoded(ringbuffer.id, row_number);
205
206 let pre = self.get(&key)?.map(|v| v.row);
207
208 if let Some(ref existing) = pre {
209 let ids = [row_number];
210 let existing_rows = [existing.clone()];
211 RingBufferRowInterceptor::pre_delete(self, ringbuffer, &ids)?;
212 RingBufferRowInterceptor::post_delete(self, ringbuffer, &ids, &existing_rows)?;
213 }
214
215 let mut rows_buf = [row];
216 RingBufferRowInterceptor::pre_insert(self, ringbuffer, &mut rows_buf)?;
217 let [row] = rows_buf;
218
219 self.set(&key, row.clone())?;
220
221 let ids = [row_number];
222 let rows = [row.clone()];
223 RingBufferRowInterceptor::post_insert(self, ringbuffer, &ids, &rows)?;
224
225 if let Some(pre_row) = pre.as_ref() {
226 self.track_flow_change(build_ringbuffer_update_change(ringbuffer, row_number, pre_row, &row));
227 } else {
228 self.track_flow_change(build_ringbuffer_insert_change(ringbuffer, shape, row_number, &row));
229 }
230
231 Ok(row)
232 }
233
234 fn update_ringbuffer(&mut self, ringbuffer: RingBuffer, id: RowNumber, row: EncodedRow) -> Result<EncodedRow> {
235 let key = RowKey::encoded(ringbuffer.id, id);
236
237 let pre = match self.get(&key)? {
238 Some(v) => v.row,
239 None => return Ok(row),
240 };
241
242 let mut rows_buf = [row];
243 let ids = [id];
244 RingBufferRowInterceptor::pre_update(self, &ringbuffer, &ids, &mut rows_buf)?;
245 let [row] = rows_buf;
246
247 if self.get_committed(&key)?.is_some() {
248 self.mark_preexisting(&key)?;
249 }
250 self.set(&key, row.clone())?;
251
252 let posts = [row.clone()];
253 let pres = [pre.clone()];
254 RingBufferRowInterceptor::post_update(self, &ringbuffer, &ids, &posts, &pres)?;
255
256 self.track_flow_change(build_ringbuffer_update_change(&ringbuffer, id, &pre, &row));
257
258 Ok(row)
259 }
260
261 fn remove_from_ringbuffer(&mut self, ringbuffer: &RingBuffer, id: RowNumber) -> Result<EncodedRow> {
262 let key = RowKey::encoded(ringbuffer.id, id);
263
264 let displayed = match self.get(&key)? {
265 Some(v) => v.row,
266 None => return Ok(EncodedRow(CowVec::new(vec![]))),
267 };
268 let committed = self.get_committed(&key)?.map(|v| v.row);
269
270 let ids = [id];
271 RingBufferRowInterceptor::pre_delete(self, ringbuffer, &ids)?;
272
273 let pre_for_cdc = committed.clone().unwrap_or_else(|| displayed.clone());
274
275 if committed.is_some() {
276 self.mark_preexisting(&key)?;
277 }
278 self.unset(&key, pre_for_cdc.clone())?;
279
280 let pre_rows = [pre_for_cdc.clone()];
281 RingBufferRowInterceptor::post_delete(self, ringbuffer, &ids, &pre_rows)?;
282
283 self.track_flow_change(build_ringbuffer_remove_change(ringbuffer, id, &pre_for_cdc));
284
285 Ok(displayed)
286 }
287}
288
289impl RingBufferOperations for Transaction<'_> {
290 fn insert_ringbuffer(&mut self, _ringbuffer: RingBuffer, _row: EncodedRow) -> Result<RowNumber> {
291 unimplemented!(
292 "Ring buffer insert must be called with explicit row_number through insert_ringbuffer_at"
293 )
294 }
295
296 fn insert_ringbuffer_at(
297 &mut self,
298 ringbuffer: &RingBuffer,
299 shape: &RowShape,
300 row_number: RowNumber,
301 row: EncodedRow,
302 ) -> Result<EncodedRow> {
303 match self {
304 Transaction::Command(txn) => txn.insert_ringbuffer_at(ringbuffer, shape, row_number, row),
305 Transaction::Admin(txn) => txn.insert_ringbuffer_at(ringbuffer, shape, row_number, row),
306 Transaction::Test(t) => t.inner.insert_ringbuffer_at(ringbuffer, shape, row_number, row),
307 Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
308 Transaction::Replica(_) => panic!("Write operations not supported on Replica transaction"),
309 }
310 }
311
312 fn update_ringbuffer(&mut self, ringbuffer: RingBuffer, id: RowNumber, row: EncodedRow) -> Result<EncodedRow> {
313 match self {
314 Transaction::Command(txn) => txn.update_ringbuffer(ringbuffer, id, row),
315 Transaction::Admin(txn) => txn.update_ringbuffer(ringbuffer, id, row),
316 Transaction::Test(t) => t.inner.update_ringbuffer(ringbuffer, id, row),
317 Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
318 Transaction::Replica(_) => panic!("Write operations not supported on Replica transaction"),
319 }
320 }
321
322 fn remove_from_ringbuffer(&mut self, ringbuffer: &RingBuffer, id: RowNumber) -> Result<EncodedRow> {
323 match self {
324 Transaction::Command(txn) => txn.remove_from_ringbuffer(ringbuffer, id),
325 Transaction::Admin(txn) => txn.remove_from_ringbuffer(ringbuffer, id),
326 Transaction::Test(t) => t.inner.remove_from_ringbuffer(ringbuffer, id),
327 Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
328 Transaction::Replica(_) => panic!("Write operations not supported on Replica transaction"),
329 }
330 }
331}