reifydb_sub_flow/operator/sink/
ringbuffer_view.rs1use std::{collections::BTreeMap, sync::Arc};
5
6use postcard::{from_bytes, to_stdvec};
7use reifydb_catalog::store::ringbuffer::update::{decode_ringbuffer_metadata, encode_ringbuffer_metadata};
8use reifydb_core::{
9 encoded::schema::{RowSchema, RowSchemaField},
10 interface::{
11 catalog::{flow::FlowNodeId, id::RingBufferId, ringbuffer::RingBufferMetadata, schema::SchemaId},
12 change::{Change, ChangeOrigin, Diff},
13 resolved::ResolvedView,
14 },
15 internal,
16 key::{ringbuffer::RingBufferMetadataKey, row::RowKey},
17 value::column::columns::Columns,
18};
19use reifydb_transaction::interceptor::view_row::ViewRowInterceptor;
20use reifydb_type::{
21 Result,
22 error::Error,
23 value::{blob::Blob, row_number::RowNumber, r#type::Type},
24};
25use serde::{Deserialize, Serialize};
26
27use super::{coerce_columns, encode_row_at_index};
28use crate::{
29 Operator,
30 operator::{
31 Operators,
32 stateful::{raw::RawStatefulOperator, single::SingleStateful},
33 },
34 transaction::FlowTransaction,
35};
36
37#[derive(Debug, Clone, Serialize, Deserialize, Default)]
38struct RingBufferState {
39 forward: BTreeMap<RowNumber, RowNumber>, reverse: BTreeMap<RowNumber, RowNumber>, }
42
43pub struct SinkRingBufferViewOperator {
44 #[allow(dead_code)]
45 parent: Arc<Operators>,
46 node: FlowNodeId,
47 view: ResolvedView,
48 ringbuffer_id: RingBufferId,
49 capacity: u64,
50 propagate_evictions: bool,
51 state_schema: RowSchema,
52}
53
54impl SinkRingBufferViewOperator {
55 pub fn new(
56 parent: Arc<Operators>,
57 node: FlowNodeId,
58 view: ResolvedView,
59 ringbuffer_id: RingBufferId,
60 capacity: u64,
61 propagate_evictions: bool,
62 ) -> Self {
63 Self {
64 parent,
65 node,
66 view,
67 ringbuffer_id,
68 capacity,
69 propagate_evictions,
70 state_schema: RowSchema::new(vec![RowSchemaField::unconstrained("state", Type::Blob)]),
71 }
72 }
73
74 fn read_metadata(&self, txn: &mut FlowTransaction) -> Result<RingBufferMetadata> {
75 let key = RingBufferMetadataKey::encoded(self.ringbuffer_id);
76 match txn.get(&key)? {
77 Some(row) => Ok(decode_ringbuffer_metadata(&row)),
78 None => Ok(RingBufferMetadata::new(self.ringbuffer_id, self.capacity)),
79 }
80 }
81
82 fn write_metadata(&self, txn: &mut FlowTransaction, metadata: &RingBufferMetadata) -> Result<()> {
83 let key = RingBufferMetadataKey::encoded(self.ringbuffer_id);
84 let row = encode_ringbuffer_metadata(metadata);
85 txn.set(&key, row)
86 }
87
88 fn load(&self, txn: &mut FlowTransaction) -> Result<RingBufferState> {
89 let state_row = self.load_state(txn)?;
90
91 if state_row.is_empty() || !state_row.is_defined(0) {
92 return Ok(RingBufferState::default());
93 }
94
95 let blob = self.state_schema.get_blob(&state_row, 0);
96 if blob.is_empty() {
97 return Ok(RingBufferState::default());
98 }
99
100 from_bytes(blob.as_ref()).map_err(|e| Error(internal!("Failed to deserialize RingBufferState: {}", e)))
101 }
102
103 fn save(&self, txn: &mut FlowTransaction, state: &RingBufferState) -> Result<()> {
104 let serialized =
105 to_stdvec(state).map_err(|e| Error(internal!("Failed to serialize RingBufferState: {}", e)))?;
106 let blob = Blob::from(serialized);
107
108 self.update_state(txn, |schema, row| {
109 schema.set_blob(row, 0, &blob);
110 Ok(())
111 })?;
112 Ok(())
113 }
114}
115
116impl RawStatefulOperator for SinkRingBufferViewOperator {}
117
118impl SingleStateful for SinkRingBufferViewOperator {
119 fn layout(&self) -> RowSchema {
120 self.state_schema.clone()
121 }
122}
123
124impl Operator for SinkRingBufferViewOperator {
125 fn id(&self) -> FlowNodeId {
126 self.node
127 }
128
129 fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
130 let view = self.view.def().clone();
131 let schema: RowSchema = view.columns().into();
132 let object_id = SchemaId::ringbuffer(self.ringbuffer_id);
133 let mut metadata = self.read_metadata(txn)?;
134 let mut state = self.load(txn)?;
135
136 for diff in change.diffs.iter() {
137 match diff {
138 Diff::Insert {
139 post,
140 } => {
141 let coerced = coerce_columns(post, view.columns())?;
142 let row_count = coerced.row_count();
143 for row_idx in 0..row_count {
144 if metadata.is_full() {
146 let oldest_rn = RowNumber(metadata.head);
147 let pre_key = RowKey::encoded(object_id, oldest_rn);
148 txn.remove(&pre_key)?;
149 metadata.head += 1;
150 metadata.count -= 1;
151
152 if let Some(source_rn) = state.reverse.remove(&oldest_rn) {
154 state.forward.remove(&source_rn);
155 }
156
157 if self.propagate_evictions {
158 }
162 }
163
164 let source_rn = coerced.row_numbers[row_idx];
165 let assigned_rn = RowNumber(metadata.tail);
166 let (_, encoded) =
167 encode_row_at_index(&coerced, row_idx, &schema, assigned_rn);
168
169 if source_rn != assigned_rn {
171 state.forward.insert(source_rn, assigned_rn);
172 state.reverse.insert(assigned_rn, source_rn);
173 }
174
175 let encoded = ViewRowInterceptor::pre_insert(
176 txn,
177 &view,
178 assigned_rn,
179 encoded,
180 )?;
181 let key = RowKey::encoded(object_id, assigned_rn);
182 txn.set(&key, encoded.clone())?;
183 ViewRowInterceptor::post_insert(txn, &view, assigned_rn, &encoded)?;
184
185 if metadata.is_empty() {
186 metadata.head = assigned_rn.0;
187 }
188 metadata.count += 1;
189 metadata.tail = assigned_rn.0 + 1;
190 }
191 let version = txn.version();
192 txn.track_flow_change(Change {
193 origin: ChangeOrigin::Schema(SchemaId::view(view.id())),
194 version,
195 diffs: vec![Diff::Insert {
196 post: coerced,
197 }],
198 });
199 }
200 Diff::Update {
201 pre,
202 post,
203 } => {
204 let coerced_pre = coerce_columns(pre, view.columns())?;
206 let coerced_post = coerce_columns(post, view.columns())?;
207 let row_count = coerced_post.row_count();
208 for row_idx in 0..row_count {
209 let pre_source_rn = coerced_pre.row_numbers[row_idx];
210 let post_source_rn = coerced_post.row_numbers[row_idx];
211 let pre_storage_rn = state
213 .forward
214 .get(&pre_source_rn)
215 .copied()
216 .unwrap_or(pre_source_rn);
217 let post_storage_rn = state
218 .forward
219 .get(&post_source_rn)
220 .copied()
221 .unwrap_or(post_source_rn);
222 let (_, pre_encoded) = encode_row_at_index(
223 &coerced_pre,
224 row_idx,
225 &schema,
226 pre_storage_rn,
227 );
228 let (_, post_encoded) = encode_row_at_index(
229 &coerced_post,
230 row_idx,
231 &schema,
232 post_storage_rn,
233 );
234
235 let post_encoded = ViewRowInterceptor::pre_update(
236 txn,
237 &view,
238 post_storage_rn,
239 post_encoded,
240 )?;
241 let pre_key = RowKey::encoded(object_id, pre_storage_rn);
242 let post_key = RowKey::encoded(object_id, post_storage_rn);
243 txn.remove(&pre_key)?;
244 txn.set(&post_key, post_encoded.clone())?;
245 ViewRowInterceptor::post_update(
246 txn,
247 &view,
248 post_storage_rn,
249 &post_encoded,
250 &pre_encoded,
251 )?;
252 }
253 let version = txn.version();
254 txn.track_flow_change(Change {
255 origin: ChangeOrigin::Schema(SchemaId::view(view.id())),
256 version,
257 diffs: vec![Diff::Update {
258 pre: coerced_pre,
259 post: coerced_post,
260 }],
261 });
262 }
263 Diff::Remove {
264 pre,
265 } => {
266 let coerced = coerce_columns(pre, view.columns())?;
267 let row_count = coerced.row_count();
268 for row_idx in 0..row_count {
269 let source_rn = coerced.row_numbers[row_idx];
270 let storage_rn = state.forward.remove(&source_rn).unwrap_or(source_rn);
272 state.reverse.remove(&storage_rn);
273 let (_, encoded) =
274 encode_row_at_index(&coerced, row_idx, &schema, storage_rn);
275 ViewRowInterceptor::pre_delete(txn, &view, storage_rn)?;
276 let key = RowKey::encoded(object_id, storage_rn);
277 txn.remove(&key)?;
278 ViewRowInterceptor::post_delete(txn, &view, storage_rn, &encoded)?;
279 }
280 let version = txn.version();
281 txn.track_flow_change(Change {
282 origin: ChangeOrigin::Schema(SchemaId::view(view.id())),
283 version,
284 diffs: vec![Diff::Remove {
285 pre: coerced,
286 }],
287 });
288 }
289 }
290 }
291
292 self.write_metadata(txn, &metadata)?;
293 self.save(txn, &state)?;
294
295 Ok(Change::from_flow(self.node, change.version, Vec::new()))
296 }
297
298 fn pull(&self, _txn: &mut FlowTransaction, _rows: &[RowNumber]) -> Result<Columns> {
299 unreachable!()
300 }
301}