reifydb-sub-flow 0.6.0

Flow subsystem for stream processing and data flows
Documentation
// SPDX-License-Identifier: AGPL-3.0-or-later
// Copyright (c) 2026 ReifyDB

use std::collections::{BTreeMap, HashSet};

use postcard::{from_bytes, to_stdvec};
use reifydb_abi::operator::capabilities::OperatorCapability;
use reifydb_catalog::store::ringbuffer::update::{decode_ringbuffer_metadata, encode_ringbuffer_metadata};
use reifydb_core::{
	encoded::{key::EncodedKey, row::EncodedRow, shape::RowShape},
	interface::{
		catalog::{
			flow::FlowNodeId, id::RingBufferId, ringbuffer::RingBufferMetadata, shape::ShapeId, view::View,
		},
		change::{Change, ChangeOrigin, Diff},
		resolved::ResolvedView,
	},
	internal,
	key::{ringbuffer::RingBufferMetadataKey, row::RowKey},
	value::column::columns::Columns,
};
use reifydb_value::{
	Result,
	error::Error,
	value::{blob::Blob, datetime::DateTime, row_number::RowNumber},
};
use serde::{Deserialize, Serialize};
use smallvec::smallvec;

use super::{coerce_columns, encode_row_at_index};
use crate::{
	Operator,
	operator::{
		OperatorCell,
		stateful::{raw::RawStatefulOperator, single::SingleStateful},
	},
	transaction::FlowTransaction,
};

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
struct RingBufferState {
	forward: BTreeMap<RowNumber, RowNumber>,
	reverse: BTreeMap<RowNumber, RowNumber>,
}

pub struct SinkRingBufferViewOperator {
	#[allow(dead_code)]
	parent: OperatorCell,
	node: FlowNodeId,
	view: ResolvedView,
	ringbuffer_id: RingBufferId,
	capacity: u64,
	propagate_evictions: bool,
	state_shape: RowShape,
}

impl SinkRingBufferViewOperator {
	pub fn new(
		parent: OperatorCell,
		node: FlowNodeId,
		view: ResolvedView,
		ringbuffer_id: RingBufferId,
		capacity: u64,
		propagate_evictions: bool,
	) -> Self {
		Self {
			parent,
			node,
			view,
			ringbuffer_id,
			capacity,
			propagate_evictions,
			state_shape: RowShape::operator_state(),
		}
	}

	fn read_metadata(&self, txn: &mut FlowTransaction) -> Result<RingBufferMetadata> {
		let key = RingBufferMetadataKey::encoded(self.ringbuffer_id);
		match txn.get(&key)? {
			Some(row) => Ok(decode_ringbuffer_metadata(&row)),
			None => Ok(RingBufferMetadata::new(self.ringbuffer_id, self.capacity)),
		}
	}

	fn write_metadata(&self, txn: &mut FlowTransaction, metadata: &RingBufferMetadata) -> Result<()> {
		let key = RingBufferMetadataKey::encoded(self.ringbuffer_id);
		let row = encode_ringbuffer_metadata(metadata);
		txn.set(&key, row)
	}

	fn load(&self, txn: &mut FlowTransaction) -> Result<RingBufferState> {
		let state_row = self.load_state(txn)?;

		if state_row.is_empty() || !state_row.is_defined(0) {
			return Ok(RingBufferState::default());
		}

		let blob = self.state_shape.get_blob(&state_row, 0);
		if blob.is_empty() {
			return Ok(RingBufferState::default());
		}

		from_bytes(blob.as_ref())
			.map_err(|e| Error(Box::new(internal!("Failed to deserialize RingBufferState: {}", e))))
	}

	fn save(&self, txn: &mut FlowTransaction, state: &RingBufferState) -> Result<()> {
		let serialized = to_stdvec(state)
			.map_err(|e| Error(Box::new(internal!("Failed to serialize RingBufferState: {}", e))))?;
		let blob = Blob::from(serialized);

		self.update_state(txn, |shape, row| {
			shape.set_blob(row, 0, &blob);
			Ok(())
		})?;
		Ok(())
	}
}

impl RawStatefulOperator for SinkRingBufferViewOperator {}

impl SingleStateful for SinkRingBufferViewOperator {
	fn layout(&self) -> RowShape {
		self.state_shape.clone()
	}
}

impl Operator for SinkRingBufferViewOperator {
	fn id(&self) -> FlowNodeId {
		self.node
	}

	fn capabilities(&self) -> &[OperatorCapability] {
		OperatorCapability::STANDARD
	}

	fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
		let view = self.view.def().clone();
		let shape: RowShape = view.columns().into();
		let object_id = ShapeId::ringbuffer(self.ringbuffer_id);
		let mut metadata = self.read_metadata(txn)?;
		let mut state = self.load(txn)?;

		for diff in change.diffs.iter() {
			match diff {
				Diff::Insert {
					post,
					..
				} => self.apply_ringbuffer_insert(
					txn,
					&view,
					&shape,
					object_id,
					&mut metadata,
					&mut state,
					post,
				)?,
				Diff::Update {
					pre,
					post,
					..
				} => self.apply_ringbuffer_update(txn, &view, &shape, object_id, &state, pre, post)?,
				Diff::Remove {
					pre,
					..
				} => self.apply_ringbuffer_remove(txn, &view, object_id, &mut state, pre)?,
			}
		}

		self.write_metadata(txn, &metadata)?;
		self.save(txn, &state)?;

		Ok(Change::from_flow(self.node, change.version, Vec::new(), change.changed_at))
	}
}

impl SinkRingBufferViewOperator {
	#[inline]
	#[allow(clippy::too_many_arguments)]
	fn apply_ringbuffer_insert(
		&self,
		txn: &mut FlowTransaction,
		view: &View,
		shape: &RowShape,
		object_id: ShapeId,
		metadata: &mut RingBufferMetadata,
		state: &mut RingBufferState,
		post: &Columns,
	) -> Result<()> {
		let coerced = coerce_columns(post, view.columns())?;
		let row_count = coerced.row_count();
		let mut assigned_ids: Vec<RowNumber> = Vec::with_capacity(row_count);
		let mut encoded_rows: Vec<EncodedRow> = Vec::with_capacity(row_count);
		let mut evicted_in_batch: HashSet<RowNumber> = HashSet::new();
		for row_idx in 0..row_count {
			if metadata.is_full() {
				let oldest_rn = RowNumber(metadata.head);
				let pre_key = RowKey::encoded(object_id, oldest_rn);
				txn.remove(&pre_key)?;
				metadata.head += 1;
				metadata.count -= 1;
				evicted_in_batch.insert(oldest_rn);

				if let Some(source_rn) = state.reverse.remove(&oldest_rn) {
					state.forward.remove(&source_rn);
				}

				if self.propagate_evictions {}
			}

			let source_rn = coerced.row_numbers[row_idx];
			let assigned_rn = RowNumber(metadata.tail);
			let (_, encoded) = encode_row_at_index(&coerced, row_idx, shape, assigned_rn)?;

			if source_rn != assigned_rn {
				state.forward.insert(source_rn, assigned_rn);
				state.reverse.insert(assigned_rn, source_rn);
			}

			assigned_ids.push(assigned_rn);
			encoded_rows.push(encoded);

			if metadata.is_empty() {
				metadata.head = assigned_rn.0;
			}
			metadata.count += 1;
			metadata.tail = assigned_rn.0 + 1;
		}

		let surviving: Vec<usize> =
			(0..assigned_ids.len()).filter(|&i| !evicted_in_batch.contains(&assigned_ids[i])).collect();
		let final_ids: Vec<RowNumber> = surviving.iter().map(|&i| assigned_ids[i]).collect();
		let final_rows: Vec<EncodedRow> = surviving.iter().map(|&i| encoded_rows[i].clone()).collect();

		for (assigned_rn, encoded) in final_ids.iter().zip(final_rows.iter()) {
			let key = RowKey::encoded(object_id, *assigned_rn);
			txn.set(&key, encoded.clone())?;
		}
		emit_view_change(txn, view, Diff::insert(coerced));
		Ok(())
	}

	#[inline]
	#[allow(clippy::too_many_arguments)]
	fn apply_ringbuffer_update(
		&self,
		txn: &mut FlowTransaction,
		view: &View,
		shape: &RowShape,
		object_id: ShapeId,
		state: &RingBufferState,
		pre: &Columns,
		post: &Columns,
	) -> Result<()> {
		let coerced_pre = coerce_columns(pre, view.columns())?;
		let coerced_post = coerce_columns(post, view.columns())?;
		let row_count = coerced_post.row_count();
		let mut pre_keys: Vec<EncodedKey> = Vec::with_capacity(row_count);
		let mut post_keys: Vec<EncodedKey> = Vec::with_capacity(row_count);
		let mut post_encoded_rows: Vec<EncodedRow> = Vec::with_capacity(row_count);
		for row_idx in 0..row_count {
			let pre_source_rn = coerced_pre.row_numbers[row_idx];
			let post_source_rn = coerced_post.row_numbers[row_idx];
			let pre_storage_rn = state.forward.get(&pre_source_rn).copied().unwrap_or(pre_source_rn);
			let post_storage_rn = state.forward.get(&post_source_rn).copied().unwrap_or(post_source_rn);
			let (_, post_encoded) = encode_row_at_index(&coerced_post, row_idx, shape, post_storage_rn)?;

			pre_keys.push(RowKey::encoded(object_id, pre_storage_rn));
			post_keys.push(RowKey::encoded(object_id, post_storage_rn));
			post_encoded_rows.push(post_encoded);
		}

		for ((pre_key, post_key), post_encoded) in
			pre_keys.iter().zip(post_keys.iter()).zip(post_encoded_rows.iter())
		{
			txn.remove(pre_key)?;
			txn.set(post_key, post_encoded.clone())?;
		}
		emit_view_change(txn, view, Diff::update(coerced_pre, coerced_post));
		Ok(())
	}

	#[inline]
	fn apply_ringbuffer_remove(
		&self,
		txn: &mut FlowTransaction,
		view: &View,
		object_id: ShapeId,
		state: &mut RingBufferState,
		pre: &Columns,
	) -> Result<()> {
		let coerced = coerce_columns(pre, view.columns())?;
		let row_count = coerced.row_count();
		let mut storage_ids: Vec<RowNumber> = Vec::with_capacity(row_count);
		for row_idx in 0..row_count {
			let source_rn = coerced.row_numbers[row_idx];
			let storage_rn = state.forward.remove(&source_rn).unwrap_or(source_rn);
			state.reverse.remove(&storage_rn);
			storage_ids.push(storage_rn);
		}
		for storage_rn in storage_ids.iter() {
			let key = RowKey::encoded(object_id, *storage_rn);
			txn.remove(&key)?;
		}
		emit_view_change(txn, view, Diff::remove(coerced));
		Ok(())
	}
}

#[inline]
fn emit_view_change(txn: &mut FlowTransaction, view: &View, diff: Diff) {
	let version = txn.version();
	let changed_at = DateTime::from_nanos(txn.clock().now_nanos());
	txn.track_flow_change(Change {
		origin: ChangeOrigin::Shape(ShapeId::view(view.id())),
		version,
		diffs: smallvec![diff],
		changed_at,
	});
}