Skip to main content

reifydb_sub_flow/operator/sink/
ringbuffer_view.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{collections::BTreeMap, sync::Arc};
5
6use postcard::{from_bytes, to_stdvec};
7use reifydb_catalog::store::ringbuffer::update::{decode_ringbuffer_metadata, encode_ringbuffer_metadata};
8use reifydb_core::{
9	encoded::shape::{RowShape, RowShapeField},
10	interface::{
11		catalog::{
12			flow::FlowNodeId, id::RingBufferId, ringbuffer::RingBufferMetadata, shape::ShapeId, view::View,
13		},
14		change::{Change, ChangeOrigin, Diff},
15		resolved::ResolvedView,
16	},
17	internal,
18	key::{ringbuffer::RingBufferMetadataKey, row::RowKey},
19	value::column::columns::Columns,
20};
21use reifydb_transaction::interceptor::view_row::ViewRowInterceptor;
22use reifydb_type::{
23	Result,
24	error::Error,
25	value::{blob::Blob, datetime::DateTime, row_number::RowNumber, r#type::Type},
26};
27use serde::{Deserialize, Serialize};
28use smallvec::smallvec;
29
30use super::{coerce_columns, encode_row_at_index};
31use crate::{
32	Operator,
33	operator::{
34		Operators,
35		stateful::{raw::RawStatefulOperator, single::SingleStateful},
36	},
37	transaction::FlowTransaction,
38};
39
40#[derive(Debug, Clone, Serialize, Deserialize, Default)]
41struct RingBufferState {
42	forward: BTreeMap<RowNumber, RowNumber>, // source_rn → ringbuffer_key
43	reverse: BTreeMap<RowNumber, RowNumber>, // ringbuffer_key → source_rn
44}
45
46pub struct SinkRingBufferViewOperator {
47	#[allow(dead_code)]
48	parent: Arc<Operators>,
49	node: FlowNodeId,
50	view: ResolvedView,
51	ringbuffer_id: RingBufferId,
52	capacity: u64,
53	propagate_evictions: bool,
54	state_shape: RowShape,
55}
56
57impl SinkRingBufferViewOperator {
58	pub fn new(
59		parent: Arc<Operators>,
60		node: FlowNodeId,
61		view: ResolvedView,
62		ringbuffer_id: RingBufferId,
63		capacity: u64,
64		propagate_evictions: bool,
65	) -> Self {
66		Self {
67			parent,
68			node,
69			view,
70			ringbuffer_id,
71			capacity,
72			propagate_evictions,
73			state_shape: RowShape::new(vec![RowShapeField::unconstrained("state", Type::Blob)]),
74		}
75	}
76
77	fn read_metadata(&self, txn: &mut FlowTransaction) -> Result<RingBufferMetadata> {
78		let key = RingBufferMetadataKey::encoded(self.ringbuffer_id);
79		match txn.get(&key)? {
80			Some(row) => Ok(decode_ringbuffer_metadata(&row)),
81			None => Ok(RingBufferMetadata::new(self.ringbuffer_id, self.capacity)),
82		}
83	}
84
85	fn write_metadata(&self, txn: &mut FlowTransaction, metadata: &RingBufferMetadata) -> Result<()> {
86		let key = RingBufferMetadataKey::encoded(self.ringbuffer_id);
87		let row = encode_ringbuffer_metadata(metadata);
88		txn.set(&key, row)
89	}
90
91	fn load(&self, txn: &mut FlowTransaction) -> Result<RingBufferState> {
92		let state_row = self.load_state(txn)?;
93
94		if state_row.is_empty() || !state_row.is_defined(0) {
95			return Ok(RingBufferState::default());
96		}
97
98		let blob = self.state_shape.get_blob(&state_row, 0);
99		if blob.is_empty() {
100			return Ok(RingBufferState::default());
101		}
102
103		from_bytes(blob.as_ref())
104			.map_err(|e| Error(Box::new(internal!("Failed to deserialize RingBufferState: {}", e))))
105	}
106
107	fn save(&self, txn: &mut FlowTransaction, state: &RingBufferState) -> Result<()> {
108		let serialized = to_stdvec(state)
109			.map_err(|e| Error(Box::new(internal!("Failed to serialize RingBufferState: {}", e))))?;
110		let blob = Blob::from(serialized);
111
112		self.update_state(txn, |shape, row| {
113			shape.set_blob(row, 0, &blob);
114			Ok(())
115		})?;
116		Ok(())
117	}
118}
119
120impl RawStatefulOperator for SinkRingBufferViewOperator {}
121
122impl SingleStateful for SinkRingBufferViewOperator {
123	fn layout(&self) -> RowShape {
124		self.state_shape.clone()
125	}
126}
127
128impl Operator for SinkRingBufferViewOperator {
129	fn id(&self) -> FlowNodeId {
130		self.node
131	}
132
133	fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
134		let view = self.view.def().clone();
135		let shape: RowShape = view.columns().into();
136		let object_id = ShapeId::ringbuffer(self.ringbuffer_id);
137		let mut metadata = self.read_metadata(txn)?;
138		let mut state = self.load(txn)?;
139
140		for diff in change.diffs.iter() {
141			match diff {
142				Diff::Insert {
143					post,
144				} => self.apply_ringbuffer_insert(
145					txn,
146					&view,
147					&shape,
148					object_id,
149					&mut metadata,
150					&mut state,
151					post,
152				)?,
153				Diff::Update {
154					pre,
155					post,
156				} => self.apply_ringbuffer_update(txn, &view, &shape, object_id, &state, pre, post)?,
157				Diff::Remove {
158					pre,
159				} => self.apply_ringbuffer_remove(txn, &view, &shape, object_id, &mut state, pre)?,
160			}
161		}
162
163		self.write_metadata(txn, &metadata)?;
164		self.save(txn, &state)?;
165
166		Ok(Change::from_flow(self.node, change.version, Vec::new(), change.changed_at))
167	}
168
169	fn pull(&self, _txn: &mut FlowTransaction, _rows: &[RowNumber]) -> Result<Columns> {
170		unreachable!()
171	}
172}
173
174impl SinkRingBufferViewOperator {
175	#[inline]
176	#[allow(clippy::too_many_arguments)]
177	fn apply_ringbuffer_insert(
178		&self,
179		txn: &mut FlowTransaction,
180		view: &View,
181		shape: &RowShape,
182		object_id: ShapeId,
183		metadata: &mut RingBufferMetadata,
184		state: &mut RingBufferState,
185		post: &Arc<Columns>,
186	) -> Result<()> {
187		let coerced = coerce_columns(post, view.columns())?;
188		let row_count = coerced.row_count();
189		for row_idx in 0..row_count {
190			if metadata.is_full() {
191				let oldest_rn = RowNumber(metadata.head);
192				let pre_key = RowKey::encoded(object_id, oldest_rn);
193				txn.remove(&pre_key)?;
194				metadata.head += 1;
195				metadata.count -= 1;
196
197				if let Some(source_rn) = state.reverse.remove(&oldest_rn) {
198					state.forward.remove(&source_rn);
199				}
200
201				if self.propagate_evictions {
202					// We could read the old row and emit a Remove diff,
203					// but for now we skip (requires reading the old value
204					// from storage).
205				}
206			}
207
208			let source_rn = coerced.row_numbers[row_idx];
209			let assigned_rn = RowNumber(metadata.tail);
210			let (_, encoded) = encode_row_at_index(&coerced, row_idx, shape, assigned_rn)?;
211
212			if source_rn != assigned_rn {
213				state.forward.insert(source_rn, assigned_rn);
214				state.reverse.insert(assigned_rn, source_rn);
215			}
216
217			let encoded = ViewRowInterceptor::pre_insert(txn, view, assigned_rn, encoded)?;
218			let key = RowKey::encoded(object_id, assigned_rn);
219			txn.set(&key, encoded.clone())?;
220			ViewRowInterceptor::post_insert(txn, view, assigned_rn, &encoded)?;
221
222			if metadata.is_empty() {
223				metadata.head = assigned_rn.0;
224			}
225			metadata.count += 1;
226			metadata.tail = assigned_rn.0 + 1;
227		}
228		emit_view_change(txn, view, Diff::insert(coerced));
229		Ok(())
230	}
231
232	#[inline]
233	#[allow(clippy::too_many_arguments)]
234	fn apply_ringbuffer_update(
235		&self,
236		txn: &mut FlowTransaction,
237		view: &View,
238		shape: &RowShape,
239		object_id: ShapeId,
240		state: &RingBufferState,
241		pre: &Arc<Columns>,
242		post: &Arc<Columns>,
243	) -> Result<()> {
244		let coerced_pre = coerce_columns(pre, view.columns())?;
245		let coerced_post = coerce_columns(post, view.columns())?;
246		let row_count = coerced_post.row_count();
247		for row_idx in 0..row_count {
248			let pre_source_rn = coerced_pre.row_numbers[row_idx];
249			let post_source_rn = coerced_post.row_numbers[row_idx];
250			let pre_storage_rn = state.forward.get(&pre_source_rn).copied().unwrap_or(pre_source_rn);
251			let post_storage_rn = state.forward.get(&post_source_rn).copied().unwrap_or(post_source_rn);
252			let (_, pre_encoded) = encode_row_at_index(&coerced_pre, row_idx, shape, pre_storage_rn)?;
253			let (_, post_encoded) = encode_row_at_index(&coerced_post, row_idx, shape, post_storage_rn)?;
254
255			let post_encoded = ViewRowInterceptor::pre_update(txn, view, post_storage_rn, post_encoded)?;
256			let pre_key = RowKey::encoded(object_id, pre_storage_rn);
257			let post_key = RowKey::encoded(object_id, post_storage_rn);
258			txn.remove(&pre_key)?;
259			txn.set(&post_key, post_encoded.clone())?;
260			ViewRowInterceptor::post_update(txn, view, post_storage_rn, &post_encoded, &pre_encoded)?;
261		}
262		emit_view_change(txn, view, Diff::update(coerced_pre, coerced_post));
263		Ok(())
264	}
265
266	#[inline]
267	fn apply_ringbuffer_remove(
268		&self,
269		txn: &mut FlowTransaction,
270		view: &View,
271		shape: &RowShape,
272		object_id: ShapeId,
273		state: &mut RingBufferState,
274		pre: &Arc<Columns>,
275	) -> Result<()> {
276		let coerced = coerce_columns(pre, view.columns())?;
277		let row_count = coerced.row_count();
278		for row_idx in 0..row_count {
279			let source_rn = coerced.row_numbers[row_idx];
280			let storage_rn = state.forward.remove(&source_rn).unwrap_or(source_rn);
281			state.reverse.remove(&storage_rn);
282			let (_, encoded) = encode_row_at_index(&coerced, row_idx, shape, storage_rn)?;
283			ViewRowInterceptor::pre_delete(txn, view, storage_rn)?;
284			let key = RowKey::encoded(object_id, storage_rn);
285			txn.remove(&key)?;
286			ViewRowInterceptor::post_delete(txn, view, storage_rn, &encoded)?;
287		}
288		emit_view_change(txn, view, Diff::remove(coerced));
289		Ok(())
290	}
291}
292
293#[inline]
294fn emit_view_change(txn: &mut FlowTransaction, view: &View, diff: Diff) {
295	let version = txn.version();
296	let changed_at = DateTime::from_nanos(txn.clock().now_nanos());
297	txn.track_flow_change(Change {
298		origin: ChangeOrigin::Shape(ShapeId::view(view.id())),
299		version,
300		diffs: smallvec![diff],
301		changed_at,
302	});
303}