1use std::{collections::BTreeMap, sync::Arc};
5
6use postcard::{from_bytes, to_stdvec};
7use reifydb_catalog::store::ringbuffer::update::{decode_ringbuffer_metadata, encode_ringbuffer_metadata};
8use reifydb_core::{
9 encoded::shape::{RowShape, RowShapeField},
10 interface::{
11 catalog::{
12 flow::FlowNodeId, id::RingBufferId, ringbuffer::RingBufferMetadata, shape::ShapeId, view::View,
13 },
14 change::{Change, ChangeOrigin, Diff},
15 resolved::ResolvedView,
16 },
17 internal,
18 key::{ringbuffer::RingBufferMetadataKey, row::RowKey},
19 value::column::columns::Columns,
20};
21use reifydb_transaction::interceptor::view_row::ViewRowInterceptor;
22use reifydb_type::{
23 Result,
24 error::Error,
25 value::{blob::Blob, datetime::DateTime, row_number::RowNumber, r#type::Type},
26};
27use serde::{Deserialize, Serialize};
28use smallvec::smallvec;
29
30use super::{coerce_columns, encode_row_at_index};
31use crate::{
32 Operator,
33 operator::{
34 Operators,
35 stateful::{raw::RawStatefulOperator, single::SingleStateful},
36 },
37 transaction::FlowTransaction,
38};
39
40#[derive(Debug, Clone, Serialize, Deserialize, Default)]
41struct RingBufferState {
42 forward: BTreeMap<RowNumber, RowNumber>, reverse: BTreeMap<RowNumber, RowNumber>, }
45
46pub struct SinkRingBufferViewOperator {
47 #[allow(dead_code)]
48 parent: Arc<Operators>,
49 node: FlowNodeId,
50 view: ResolvedView,
51 ringbuffer_id: RingBufferId,
52 capacity: u64,
53 propagate_evictions: bool,
54 state_shape: RowShape,
55}
56
57impl SinkRingBufferViewOperator {
58 pub fn new(
59 parent: Arc<Operators>,
60 node: FlowNodeId,
61 view: ResolvedView,
62 ringbuffer_id: RingBufferId,
63 capacity: u64,
64 propagate_evictions: bool,
65 ) -> Self {
66 Self {
67 parent,
68 node,
69 view,
70 ringbuffer_id,
71 capacity,
72 propagate_evictions,
73 state_shape: RowShape::new(vec![RowShapeField::unconstrained("state", Type::Blob)]),
74 }
75 }
76
77 fn read_metadata(&self, txn: &mut FlowTransaction) -> Result<RingBufferMetadata> {
78 let key = RingBufferMetadataKey::encoded(self.ringbuffer_id);
79 match txn.get(&key)? {
80 Some(row) => Ok(decode_ringbuffer_metadata(&row)),
81 None => Ok(RingBufferMetadata::new(self.ringbuffer_id, self.capacity)),
82 }
83 }
84
85 fn write_metadata(&self, txn: &mut FlowTransaction, metadata: &RingBufferMetadata) -> Result<()> {
86 let key = RingBufferMetadataKey::encoded(self.ringbuffer_id);
87 let row = encode_ringbuffer_metadata(metadata);
88 txn.set(&key, row)
89 }
90
91 fn load(&self, txn: &mut FlowTransaction) -> Result<RingBufferState> {
92 let state_row = self.load_state(txn)?;
93
94 if state_row.is_empty() || !state_row.is_defined(0) {
95 return Ok(RingBufferState::default());
96 }
97
98 let blob = self.state_shape.get_blob(&state_row, 0);
99 if blob.is_empty() {
100 return Ok(RingBufferState::default());
101 }
102
103 from_bytes(blob.as_ref())
104 .map_err(|e| Error(Box::new(internal!("Failed to deserialize RingBufferState: {}", e))))
105 }
106
107 fn save(&self, txn: &mut FlowTransaction, state: &RingBufferState) -> Result<()> {
108 let serialized = to_stdvec(state)
109 .map_err(|e| Error(Box::new(internal!("Failed to serialize RingBufferState: {}", e))))?;
110 let blob = Blob::from(serialized);
111
112 self.update_state(txn, |shape, row| {
113 shape.set_blob(row, 0, &blob);
114 Ok(())
115 })?;
116 Ok(())
117 }
118}
119
120impl RawStatefulOperator for SinkRingBufferViewOperator {}
121
122impl SingleStateful for SinkRingBufferViewOperator {
123 fn layout(&self) -> RowShape {
124 self.state_shape.clone()
125 }
126}
127
128impl Operator for SinkRingBufferViewOperator {
129 fn id(&self) -> FlowNodeId {
130 self.node
131 }
132
133 fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
134 let view = self.view.def().clone();
135 let shape: RowShape = view.columns().into();
136 let object_id = ShapeId::ringbuffer(self.ringbuffer_id);
137 let mut metadata = self.read_metadata(txn)?;
138 let mut state = self.load(txn)?;
139
140 for diff in change.diffs.iter() {
141 match diff {
142 Diff::Insert {
143 post,
144 } => self.apply_ringbuffer_insert(
145 txn,
146 &view,
147 &shape,
148 object_id,
149 &mut metadata,
150 &mut state,
151 post,
152 )?,
153 Diff::Update {
154 pre,
155 post,
156 } => self.apply_ringbuffer_update(txn, &view, &shape, object_id, &state, pre, post)?,
157 Diff::Remove {
158 pre,
159 } => self.apply_ringbuffer_remove(txn, &view, &shape, object_id, &mut state, pre)?,
160 }
161 }
162
163 self.write_metadata(txn, &metadata)?;
164 self.save(txn, &state)?;
165
166 Ok(Change::from_flow(self.node, change.version, Vec::new(), change.changed_at))
167 }
168
169 fn pull(&self, _txn: &mut FlowTransaction, _rows: &[RowNumber]) -> Result<Columns> {
170 unreachable!()
171 }
172}
173
174impl SinkRingBufferViewOperator {
175 #[inline]
176 #[allow(clippy::too_many_arguments)]
177 fn apply_ringbuffer_insert(
178 &self,
179 txn: &mut FlowTransaction,
180 view: &View,
181 shape: &RowShape,
182 object_id: ShapeId,
183 metadata: &mut RingBufferMetadata,
184 state: &mut RingBufferState,
185 post: &Arc<Columns>,
186 ) -> Result<()> {
187 let coerced = coerce_columns(post, view.columns())?;
188 let row_count = coerced.row_count();
189 for row_idx in 0..row_count {
190 if metadata.is_full() {
191 let oldest_rn = RowNumber(metadata.head);
192 let pre_key = RowKey::encoded(object_id, oldest_rn);
193 txn.remove(&pre_key)?;
194 metadata.head += 1;
195 metadata.count -= 1;
196
197 if let Some(source_rn) = state.reverse.remove(&oldest_rn) {
198 state.forward.remove(&source_rn);
199 }
200
201 if self.propagate_evictions {
202 }
206 }
207
208 let source_rn = coerced.row_numbers[row_idx];
209 let assigned_rn = RowNumber(metadata.tail);
210 let (_, encoded) = encode_row_at_index(&coerced, row_idx, shape, assigned_rn)?;
211
212 if source_rn != assigned_rn {
213 state.forward.insert(source_rn, assigned_rn);
214 state.reverse.insert(assigned_rn, source_rn);
215 }
216
217 let encoded = ViewRowInterceptor::pre_insert(txn, view, assigned_rn, encoded)?;
218 let key = RowKey::encoded(object_id, assigned_rn);
219 txn.set(&key, encoded.clone())?;
220 ViewRowInterceptor::post_insert(txn, view, assigned_rn, &encoded)?;
221
222 if metadata.is_empty() {
223 metadata.head = assigned_rn.0;
224 }
225 metadata.count += 1;
226 metadata.tail = assigned_rn.0 + 1;
227 }
228 emit_view_change(txn, view, Diff::insert(coerced));
229 Ok(())
230 }
231
232 #[inline]
233 #[allow(clippy::too_many_arguments)]
234 fn apply_ringbuffer_update(
235 &self,
236 txn: &mut FlowTransaction,
237 view: &View,
238 shape: &RowShape,
239 object_id: ShapeId,
240 state: &RingBufferState,
241 pre: &Arc<Columns>,
242 post: &Arc<Columns>,
243 ) -> Result<()> {
244 let coerced_pre = coerce_columns(pre, view.columns())?;
245 let coerced_post = coerce_columns(post, view.columns())?;
246 let row_count = coerced_post.row_count();
247 for row_idx in 0..row_count {
248 let pre_source_rn = coerced_pre.row_numbers[row_idx];
249 let post_source_rn = coerced_post.row_numbers[row_idx];
250 let pre_storage_rn = state.forward.get(&pre_source_rn).copied().unwrap_or(pre_source_rn);
251 let post_storage_rn = state.forward.get(&post_source_rn).copied().unwrap_or(post_source_rn);
252 let (_, pre_encoded) = encode_row_at_index(&coerced_pre, row_idx, shape, pre_storage_rn)?;
253 let (_, post_encoded) = encode_row_at_index(&coerced_post, row_idx, shape, post_storage_rn)?;
254
255 let post_encoded = ViewRowInterceptor::pre_update(txn, view, post_storage_rn, post_encoded)?;
256 let pre_key = RowKey::encoded(object_id, pre_storage_rn);
257 let post_key = RowKey::encoded(object_id, post_storage_rn);
258 txn.remove(&pre_key)?;
259 txn.set(&post_key, post_encoded.clone())?;
260 ViewRowInterceptor::post_update(txn, view, post_storage_rn, &post_encoded, &pre_encoded)?;
261 }
262 emit_view_change(txn, view, Diff::update(coerced_pre, coerced_post));
263 Ok(())
264 }
265
266 #[inline]
267 fn apply_ringbuffer_remove(
268 &self,
269 txn: &mut FlowTransaction,
270 view: &View,
271 shape: &RowShape,
272 object_id: ShapeId,
273 state: &mut RingBufferState,
274 pre: &Arc<Columns>,
275 ) -> Result<()> {
276 let coerced = coerce_columns(pre, view.columns())?;
277 let row_count = coerced.row_count();
278 for row_idx in 0..row_count {
279 let source_rn = coerced.row_numbers[row_idx];
280 let storage_rn = state.forward.remove(&source_rn).unwrap_or(source_rn);
281 state.reverse.remove(&storage_rn);
282 let (_, encoded) = encode_row_at_index(&coerced, row_idx, shape, storage_rn)?;
283 ViewRowInterceptor::pre_delete(txn, view, storage_rn)?;
284 let key = RowKey::encoded(object_id, storage_rn);
285 txn.remove(&key)?;
286 ViewRowInterceptor::post_delete(txn, view, storage_rn, &encoded)?;
287 }
288 emit_view_change(txn, view, Diff::remove(coerced));
289 Ok(())
290 }
291}
292
293#[inline]
294fn emit_view_change(txn: &mut FlowTransaction, view: &View, diff: Diff) {
295 let version = txn.version();
296 let changed_at = DateTime::from_nanos(txn.clock().now_nanos());
297 txn.track_flow_change(Change {
298 origin: ChangeOrigin::Shape(ShapeId::view(view.id())),
299 version,
300 diffs: smallvec![diff],
301 changed_at,
302 });
303}