Skip to main content

reifydb_sub_flow/operator/sink/
ringbuffer_view.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{collections::BTreeMap, sync::Arc};
5
6use postcard::{from_bytes, to_stdvec};
7use reifydb_catalog::store::ringbuffer::update::{decode_ringbuffer_metadata, encode_ringbuffer_metadata};
8use reifydb_core::{
9	encoded::schema::{RowSchema, RowSchemaField},
10	interface::{
11		catalog::{flow::FlowNodeId, id::RingBufferId, ringbuffer::RingBufferMetadata, schema::SchemaId},
12		change::{Change, ChangeOrigin, Diff},
13		resolved::ResolvedView,
14	},
15	internal,
16	key::{ringbuffer::RingBufferMetadataKey, row::RowKey},
17	value::column::columns::Columns,
18};
19use reifydb_transaction::interceptor::view_row::ViewRowInterceptor;
20use reifydb_type::{
21	Result,
22	error::Error,
23	value::{blob::Blob, row_number::RowNumber, r#type::Type},
24};
25use serde::{Deserialize, Serialize};
26
27use super::{coerce_columns, encode_row_at_index};
28use crate::{
29	Operator,
30	operator::{
31		Operators,
32		stateful::{raw::RawStatefulOperator, single::SingleStateful},
33	},
34	transaction::FlowTransaction,
35};
36
37#[derive(Debug, Clone, Serialize, Deserialize, Default)]
38struct RingBufferState {
39	forward: BTreeMap<RowNumber, RowNumber>, // source_rn → ringbuffer_key
40	reverse: BTreeMap<RowNumber, RowNumber>, // ringbuffer_key → source_rn
41}
42
43pub struct SinkRingBufferViewOperator {
44	#[allow(dead_code)]
45	parent: Arc<Operators>,
46	node: FlowNodeId,
47	view: ResolvedView,
48	ringbuffer_id: RingBufferId,
49	capacity: u64,
50	propagate_evictions: bool,
51	state_schema: RowSchema,
52}
53
54impl SinkRingBufferViewOperator {
55	pub fn new(
56		parent: Arc<Operators>,
57		node: FlowNodeId,
58		view: ResolvedView,
59		ringbuffer_id: RingBufferId,
60		capacity: u64,
61		propagate_evictions: bool,
62	) -> Self {
63		Self {
64			parent,
65			node,
66			view,
67			ringbuffer_id,
68			capacity,
69			propagate_evictions,
70			state_schema: RowSchema::new(vec![RowSchemaField::unconstrained("state", Type::Blob)]),
71		}
72	}
73
74	fn read_metadata(&self, txn: &mut FlowTransaction) -> Result<RingBufferMetadata> {
75		let key = RingBufferMetadataKey::encoded(self.ringbuffer_id);
76		match txn.get(&key)? {
77			Some(row) => Ok(decode_ringbuffer_metadata(&row)),
78			None => Ok(RingBufferMetadata::new(self.ringbuffer_id, self.capacity)),
79		}
80	}
81
82	fn write_metadata(&self, txn: &mut FlowTransaction, metadata: &RingBufferMetadata) -> Result<()> {
83		let key = RingBufferMetadataKey::encoded(self.ringbuffer_id);
84		let row = encode_ringbuffer_metadata(metadata);
85		txn.set(&key, row)
86	}
87
88	fn load(&self, txn: &mut FlowTransaction) -> Result<RingBufferState> {
89		let state_row = self.load_state(txn)?;
90
91		if state_row.is_empty() || !state_row.is_defined(0) {
92			return Ok(RingBufferState::default());
93		}
94
95		let blob = self.state_schema.get_blob(&state_row, 0);
96		if blob.is_empty() {
97			return Ok(RingBufferState::default());
98		}
99
100		from_bytes(blob.as_ref()).map_err(|e| Error(internal!("Failed to deserialize RingBufferState: {}", e)))
101	}
102
103	fn save(&self, txn: &mut FlowTransaction, state: &RingBufferState) -> Result<()> {
104		let serialized =
105			to_stdvec(state).map_err(|e| Error(internal!("Failed to serialize RingBufferState: {}", e)))?;
106		let blob = Blob::from(serialized);
107
108		self.update_state(txn, |schema, row| {
109			schema.set_blob(row, 0, &blob);
110			Ok(())
111		})?;
112		Ok(())
113	}
114}
115
116impl RawStatefulOperator for SinkRingBufferViewOperator {}
117
118impl SingleStateful for SinkRingBufferViewOperator {
119	fn layout(&self) -> RowSchema {
120		self.state_schema.clone()
121	}
122}
123
124impl Operator for SinkRingBufferViewOperator {
125	fn id(&self) -> FlowNodeId {
126		self.node
127	}
128
129	fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
130		let view = self.view.def().clone();
131		let schema: RowSchema = view.columns().into();
132		let object_id = SchemaId::ringbuffer(self.ringbuffer_id);
133		let mut metadata = self.read_metadata(txn)?;
134		let mut state = self.load(txn)?;
135
136		for diff in change.diffs.iter() {
137			match diff {
138				Diff::Insert {
139					post,
140				} => {
141					let coerced = coerce_columns(post, view.columns())?;
142					let row_count = coerced.row_count();
143					for row_idx in 0..row_count {
144						// Evict oldest if full
145						if metadata.is_full() {
146							let oldest_rn = RowNumber(metadata.head);
147							let pre_key = RowKey::encoded(object_id, oldest_rn);
148							txn.remove(&pre_key)?;
149							metadata.head += 1;
150							metadata.count -= 1;
151
152							// Clean up alias for evicted row
153							if let Some(source_rn) = state.reverse.remove(&oldest_rn) {
154								state.forward.remove(&source_rn);
155							}
156
157							if self.propagate_evictions {
158								// We could read the old row and emit a Remove diff,
159								// but for now we skip (requires reading the old value
160								// from storage)
161							}
162						}
163
164						let source_rn = coerced.row_numbers[row_idx];
165						let assigned_rn = RowNumber(metadata.tail);
166						let (_, encoded) =
167							encode_row_at_index(&coerced, row_idx, &schema, assigned_rn);
168
169						// Track alias when source row number differs from assigned key
170						if source_rn != assigned_rn {
171							state.forward.insert(source_rn, assigned_rn);
172							state.reverse.insert(assigned_rn, source_rn);
173						}
174
175						let encoded = ViewRowInterceptor::pre_insert(
176							txn,
177							&view,
178							assigned_rn,
179							encoded,
180						)?;
181						let key = RowKey::encoded(object_id, assigned_rn);
182						txn.set(&key, encoded.clone())?;
183						ViewRowInterceptor::post_insert(txn, &view, assigned_rn, &encoded)?;
184
185						if metadata.is_empty() {
186							metadata.head = assigned_rn.0;
187						}
188						metadata.count += 1;
189						metadata.tail = assigned_rn.0 + 1;
190					}
191					let version = txn.version();
192					txn.track_flow_change(Change {
193						origin: ChangeOrigin::Schema(SchemaId::view(view.id())),
194						version,
195						diffs: vec![Diff::Insert {
196							post: coerced,
197						}],
198					});
199				}
200				Diff::Update {
201					pre,
202					post,
203				} => {
204					// Ringbuffer views support update (same as table view)
205					let coerced_pre = coerce_columns(pre, view.columns())?;
206					let coerced_post = coerce_columns(post, view.columns())?;
207					let row_count = coerced_post.row_count();
208					for row_idx in 0..row_count {
209						let pre_source_rn = coerced_pre.row_numbers[row_idx];
210						let post_source_rn = coerced_post.row_numbers[row_idx];
211						// Resolve state to storage keys
212						let pre_storage_rn = state
213							.forward
214							.get(&pre_source_rn)
215							.copied()
216							.unwrap_or(pre_source_rn);
217						let post_storage_rn = state
218							.forward
219							.get(&post_source_rn)
220							.copied()
221							.unwrap_or(post_source_rn);
222						let (_, pre_encoded) = encode_row_at_index(
223							&coerced_pre,
224							row_idx,
225							&schema,
226							pre_storage_rn,
227						);
228						let (_, post_encoded) = encode_row_at_index(
229							&coerced_post,
230							row_idx,
231							&schema,
232							post_storage_rn,
233						);
234
235						let post_encoded = ViewRowInterceptor::pre_update(
236							txn,
237							&view,
238							post_storage_rn,
239							post_encoded,
240						)?;
241						let pre_key = RowKey::encoded(object_id, pre_storage_rn);
242						let post_key = RowKey::encoded(object_id, post_storage_rn);
243						txn.remove(&pre_key)?;
244						txn.set(&post_key, post_encoded.clone())?;
245						ViewRowInterceptor::post_update(
246							txn,
247							&view,
248							post_storage_rn,
249							&post_encoded,
250							&pre_encoded,
251						)?;
252					}
253					let version = txn.version();
254					txn.track_flow_change(Change {
255						origin: ChangeOrigin::Schema(SchemaId::view(view.id())),
256						version,
257						diffs: vec![Diff::Update {
258							pre: coerced_pre,
259							post: coerced_post,
260						}],
261					});
262				}
263				Diff::Remove {
264					pre,
265				} => {
266					let coerced = coerce_columns(pre, view.columns())?;
267					let row_count = coerced.row_count();
268					for row_idx in 0..row_count {
269						let source_rn = coerced.row_numbers[row_idx];
270						// Resolve alias to storage key
271						let storage_rn = state.forward.remove(&source_rn).unwrap_or(source_rn);
272						state.reverse.remove(&storage_rn);
273						let (_, encoded) =
274							encode_row_at_index(&coerced, row_idx, &schema, storage_rn);
275						ViewRowInterceptor::pre_delete(txn, &view, storage_rn)?;
276						let key = RowKey::encoded(object_id, storage_rn);
277						txn.remove(&key)?;
278						ViewRowInterceptor::post_delete(txn, &view, storage_rn, &encoded)?;
279					}
280					let version = txn.version();
281					txn.track_flow_change(Change {
282						origin: ChangeOrigin::Schema(SchemaId::view(view.id())),
283						version,
284						diffs: vec![Diff::Remove {
285							pre: coerced,
286						}],
287					});
288				}
289			}
290		}
291
292		self.write_metadata(txn, &metadata)?;
293		self.save(txn, &state)?;
294
295		Ok(Change::from_flow(self.node, change.version, Vec::new()))
296	}
297
298	fn pull(&self, _txn: &mut FlowTransaction, _rows: &[RowNumber]) -> Result<Columns> {
299		unreachable!()
300	}
301}