reifydb-sub-flow 0.4.13

Flow subsystem for stream processing and data flows
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2025 ReifyDB

use std::{collections::BTreeMap, sync::Arc};

use postcard::{from_bytes, to_stdvec};
use reifydb_catalog::store::ringbuffer::update::{decode_ringbuffer_metadata, encode_ringbuffer_metadata};
use reifydb_core::{
	encoded::shape::{RowShape, RowShapeField},
	interface::{
		catalog::{
			flow::FlowNodeId, id::RingBufferId, ringbuffer::RingBufferMetadata, shape::ShapeId, view::View,
		},
		change::{Change, ChangeOrigin, Diff},
		resolved::ResolvedView,
	},
	internal,
	key::{ringbuffer::RingBufferMetadataKey, row::RowKey},
	value::column::columns::Columns,
};
use reifydb_transaction::interceptor::view_row::ViewRowInterceptor;
use reifydb_type::{
	Result,
	error::Error,
	value::{blob::Blob, datetime::DateTime, row_number::RowNumber, r#type::Type},
};
use serde::{Deserialize, Serialize};
use smallvec::smallvec;

use super::{coerce_columns, encode_row_at_index};
use crate::{
	Operator,
	operator::{
		Operators,
		stateful::{raw::RawStatefulOperator, single::SingleStateful},
	},
	transaction::FlowTransaction,
};

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
struct RingBufferState {
	forward: BTreeMap<RowNumber, RowNumber>, // source_rn → ringbuffer_key
	reverse: BTreeMap<RowNumber, RowNumber>, // ringbuffer_key → source_rn
}

pub struct SinkRingBufferViewOperator {
	#[allow(dead_code)]
	parent: Arc<Operators>,
	node: FlowNodeId,
	view: ResolvedView,
	ringbuffer_id: RingBufferId,
	capacity: u64,
	propagate_evictions: bool,
	state_shape: RowShape,
}

impl SinkRingBufferViewOperator {
	pub fn new(
		parent: Arc<Operators>,
		node: FlowNodeId,
		view: ResolvedView,
		ringbuffer_id: RingBufferId,
		capacity: u64,
		propagate_evictions: bool,
	) -> Self {
		Self {
			parent,
			node,
			view,
			ringbuffer_id,
			capacity,
			propagate_evictions,
			state_shape: RowShape::new(vec![RowShapeField::unconstrained("state", Type::Blob)]),
		}
	}

	fn read_metadata(&self, txn: &mut FlowTransaction) -> Result<RingBufferMetadata> {
		let key = RingBufferMetadataKey::encoded(self.ringbuffer_id);
		match txn.get(&key)? {
			Some(row) => Ok(decode_ringbuffer_metadata(&row)),
			None => Ok(RingBufferMetadata::new(self.ringbuffer_id, self.capacity)),
		}
	}

	fn write_metadata(&self, txn: &mut FlowTransaction, metadata: &RingBufferMetadata) -> Result<()> {
		let key = RingBufferMetadataKey::encoded(self.ringbuffer_id);
		let row = encode_ringbuffer_metadata(metadata);
		txn.set(&key, row)
	}

	fn load(&self, txn: &mut FlowTransaction) -> Result<RingBufferState> {
		let state_row = self.load_state(txn)?;

		if state_row.is_empty() || !state_row.is_defined(0) {
			return Ok(RingBufferState::default());
		}

		let blob = self.state_shape.get_blob(&state_row, 0);
		if blob.is_empty() {
			return Ok(RingBufferState::default());
		}

		from_bytes(blob.as_ref())
			.map_err(|e| Error(Box::new(internal!("Failed to deserialize RingBufferState: {}", e))))
	}

	fn save(&self, txn: &mut FlowTransaction, state: &RingBufferState) -> Result<()> {
		let serialized = to_stdvec(state)
			.map_err(|e| Error(Box::new(internal!("Failed to serialize RingBufferState: {}", e))))?;
		let blob = Blob::from(serialized);

		self.update_state(txn, |shape, row| {
			shape.set_blob(row, 0, &blob);
			Ok(())
		})?;
		Ok(())
	}
}

impl RawStatefulOperator for SinkRingBufferViewOperator {}

impl SingleStateful for SinkRingBufferViewOperator {
	fn layout(&self) -> RowShape {
		self.state_shape.clone()
	}
}

impl Operator for SinkRingBufferViewOperator {
	fn id(&self) -> FlowNodeId {
		self.node
	}

	fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
		let view = self.view.def().clone();
		let shape: RowShape = view.columns().into();
		let object_id = ShapeId::ringbuffer(self.ringbuffer_id);
		let mut metadata = self.read_metadata(txn)?;
		let mut state = self.load(txn)?;

		for diff in change.diffs.iter() {
			match diff {
				Diff::Insert {
					post,
				} => self.apply_ringbuffer_insert(
					txn,
					&view,
					&shape,
					object_id,
					&mut metadata,
					&mut state,
					post,
				)?,
				Diff::Update {
					pre,
					post,
				} => self.apply_ringbuffer_update(txn, &view, &shape, object_id, &state, pre, post)?,
				Diff::Remove {
					pre,
				} => self.apply_ringbuffer_remove(txn, &view, &shape, object_id, &mut state, pre)?,
			}
		}

		self.write_metadata(txn, &metadata)?;
		self.save(txn, &state)?;

		Ok(Change::from_flow(self.node, change.version, Vec::new(), change.changed_at))
	}

	fn pull(&self, _txn: &mut FlowTransaction, _rows: &[RowNumber]) -> Result<Columns> {
		unreachable!()
	}
}

impl SinkRingBufferViewOperator {
	#[inline]
	#[allow(clippy::too_many_arguments)]
	fn apply_ringbuffer_insert(
		&self,
		txn: &mut FlowTransaction,
		view: &View,
		shape: &RowShape,
		object_id: ShapeId,
		metadata: &mut RingBufferMetadata,
		state: &mut RingBufferState,
		post: &Arc<Columns>,
	) -> Result<()> {
		let coerced = coerce_columns(post, view.columns())?;
		let row_count = coerced.row_count();
		for row_idx in 0..row_count {
			if metadata.is_full() {
				let oldest_rn = RowNumber(metadata.head);
				let pre_key = RowKey::encoded(object_id, oldest_rn);
				txn.remove(&pre_key)?;
				metadata.head += 1;
				metadata.count -= 1;

				if let Some(source_rn) = state.reverse.remove(&oldest_rn) {
					state.forward.remove(&source_rn);
				}

				if self.propagate_evictions {
					// We could read the old row and emit a Remove diff,
					// but for now we skip (requires reading the old value
					// from storage).
				}
			}

			let source_rn = coerced.row_numbers[row_idx];
			let assigned_rn = RowNumber(metadata.tail);
			let (_, encoded) = encode_row_at_index(&coerced, row_idx, shape, assigned_rn)?;

			if source_rn != assigned_rn {
				state.forward.insert(source_rn, assigned_rn);
				state.reverse.insert(assigned_rn, source_rn);
			}

			let encoded = ViewRowInterceptor::pre_insert(txn, view, assigned_rn, encoded)?;
			let key = RowKey::encoded(object_id, assigned_rn);
			txn.set(&key, encoded.clone())?;
			ViewRowInterceptor::post_insert(txn, view, assigned_rn, &encoded)?;

			if metadata.is_empty() {
				metadata.head = assigned_rn.0;
			}
			metadata.count += 1;
			metadata.tail = assigned_rn.0 + 1;
		}
		emit_view_change(txn, view, Diff::insert(coerced));
		Ok(())
	}

	#[inline]
	#[allow(clippy::too_many_arguments)]
	fn apply_ringbuffer_update(
		&self,
		txn: &mut FlowTransaction,
		view: &View,
		shape: &RowShape,
		object_id: ShapeId,
		state: &RingBufferState,
		pre: &Arc<Columns>,
		post: &Arc<Columns>,
	) -> Result<()> {
		let coerced_pre = coerce_columns(pre, view.columns())?;
		let coerced_post = coerce_columns(post, view.columns())?;
		let row_count = coerced_post.row_count();
		for row_idx in 0..row_count {
			let pre_source_rn = coerced_pre.row_numbers[row_idx];
			let post_source_rn = coerced_post.row_numbers[row_idx];
			let pre_storage_rn = state.forward.get(&pre_source_rn).copied().unwrap_or(pre_source_rn);
			let post_storage_rn = state.forward.get(&post_source_rn).copied().unwrap_or(post_source_rn);
			let (_, pre_encoded) = encode_row_at_index(&coerced_pre, row_idx, shape, pre_storage_rn)?;
			let (_, post_encoded) = encode_row_at_index(&coerced_post, row_idx, shape, post_storage_rn)?;

			let post_encoded = ViewRowInterceptor::pre_update(txn, view, post_storage_rn, post_encoded)?;
			let pre_key = RowKey::encoded(object_id, pre_storage_rn);
			let post_key = RowKey::encoded(object_id, post_storage_rn);
			txn.remove(&pre_key)?;
			txn.set(&post_key, post_encoded.clone())?;
			ViewRowInterceptor::post_update(txn, view, post_storage_rn, &post_encoded, &pre_encoded)?;
		}
		emit_view_change(txn, view, Diff::update(coerced_pre, coerced_post));
		Ok(())
	}

	#[inline]
	fn apply_ringbuffer_remove(
		&self,
		txn: &mut FlowTransaction,
		view: &View,
		shape: &RowShape,
		object_id: ShapeId,
		state: &mut RingBufferState,
		pre: &Arc<Columns>,
	) -> Result<()> {
		let coerced = coerce_columns(pre, view.columns())?;
		let row_count = coerced.row_count();
		for row_idx in 0..row_count {
			let source_rn = coerced.row_numbers[row_idx];
			let storage_rn = state.forward.remove(&source_rn).unwrap_or(source_rn);
			state.reverse.remove(&storage_rn);
			let (_, encoded) = encode_row_at_index(&coerced, row_idx, shape, storage_rn)?;
			ViewRowInterceptor::pre_delete(txn, view, storage_rn)?;
			let key = RowKey::encoded(object_id, storage_rn);
			txn.remove(&key)?;
			ViewRowInterceptor::post_delete(txn, view, storage_rn, &encoded)?;
		}
		emit_view_change(txn, view, Diff::remove(coerced));
		Ok(())
	}
}

#[inline]
fn emit_view_change(txn: &mut FlowTransaction, view: &View, diff: Diff) {
	let version = txn.version();
	let changed_at = DateTime::from_nanos(txn.clock().now_nanos());
	txn.track_flow_change(Change {
		origin: ChangeOrigin::Shape(ShapeId::view(view.id())),
		version,
		diffs: smallvec![diff],
		changed_at,
	});
}