reifydb-sub-flow 0.4.5

Flow subsystem for stream processing and data flows
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2025 ReifyDB

use reifydb_core::{
	encoded::schema::RowSchema,
	interface::{
		catalog::{flow::FlowNodeId, ringbuffer::RingBuffer, schema::SchemaId},
		change::{Change, Diff},
	},
	key::row::RowKey,
	value::column::{Column, columns::Columns, data::ColumnData},
};
use reifydb_type::{Result, fragment::Fragment, util::cowvec::CowVec, value::row_number::RowNumber};

use crate::{Operator, operator::sink::decode_dictionary_columns, transaction::FlowTransaction};

pub struct PrimitiveRingBufferOperator {
	node: FlowNodeId,
	ringbuffer: RingBuffer,
}

impl PrimitiveRingBufferOperator {
	pub fn new(node: FlowNodeId, ringbuffer: RingBuffer) -> Self {
		Self {
			node,
			ringbuffer,
		}
	}
}

impl Operator for PrimitiveRingBufferOperator {
	fn id(&self) -> FlowNodeId {
		self.node
	}

	fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
		let mut decoded_diffs = Vec::with_capacity(change.diffs.len());
		for diff in change.diffs {
			decoded_diffs.push(match diff {
				Diff::Insert {
					post,
				} => {
					let mut decoded = post;
					decode_dictionary_columns(&mut decoded, txn)?;
					Diff::Insert {
						post: decoded,
					}
				}
				Diff::Update {
					pre,
					post,
				} => {
					let mut decoded_pre = pre;
					let mut decoded_post = post;
					decode_dictionary_columns(&mut decoded_pre, txn)?;
					decode_dictionary_columns(&mut decoded_post, txn)?;
					Diff::Update {
						pre: decoded_pre,
						post: decoded_post,
					}
				}
				Diff::Remove {
					pre,
				} => {
					let mut decoded = pre;
					decode_dictionary_columns(&mut decoded, txn)?;
					Diff::Remove {
						pre: decoded,
					}
				}
			});
		}
		Ok(Change::from_flow(self.node, change.version, decoded_diffs))
	}

	fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
		if rows.is_empty() {
			return Ok(self.empty_columns());
		}

		let schema: RowSchema = (&self.ringbuffer.columns).into();
		let fields = schema.fields();

		let mut columns_vec: Vec<Column> = Vec::with_capacity(fields.len());
		for field in fields.iter() {
			columns_vec.push(Column {
				name: Fragment::internal(&field.name),
				data: ColumnData::with_capacity(field.constraint.get_type(), rows.len()),
			});
		}
		let mut row_numbers = Vec::with_capacity(rows.len());

		for row_num in rows {
			let key = RowKey::encoded(SchemaId::ringbuffer(self.ringbuffer.id), *row_num);
			if let Some(encoded) = txn.get(&key)? {
				row_numbers.push(*row_num);
				for (i, _field) in fields.iter().enumerate() {
					let value = schema.get_value(&encoded, i);
					columns_vec[i].data.push_value(value);
				}
			}
		}

		if row_numbers.is_empty() {
			Ok(self.empty_columns())
		} else {
			Ok(Columns {
				row_numbers: CowVec::new(row_numbers),
				columns: CowVec::new(columns_vec),
			})
		}
	}
}

impl PrimitiveRingBufferOperator {
	fn empty_columns(&self) -> Columns {
		let columns: Vec<Column> = self
			.ringbuffer
			.columns
			.iter()
			.map(|col| Column {
				name: Fragment::internal(&col.name),
				data: ColumnData::with_capacity(col.constraint.get_type(), 0),
			})
			.collect();
		Columns {
			row_numbers: CowVec::new(Vec::new()),
			columns: CowVec::new(columns),
		}
	}
}