reifydb-sub-flow 0.5.0

Flow subsystem for stream processing and data flows
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2025 ReifyDB

use reifydb_core::{interface::catalog::flow::FlowNodeId, value::column::columns::Columns};
use reifydb_sdk::operator::Tick;
use reifydb_type::{Result, value::row_number::RowNumber};

use crate::transaction::FlowTransaction;

pub mod append;
pub mod apply;
pub mod capability_guard;
pub mod distinct;
pub mod extend;
#[cfg(reifydb_target = "native")]
pub mod ffi;
pub mod filter;
pub mod gate;
pub mod join;
pub mod map;
pub mod scan;
pub mod sink;
pub mod sort;
pub mod stateful;
pub mod take;
pub mod window;

use append::AppendOperator;
use apply::ApplyOperator;
use capability_guard::{enforce_apply_capabilities, enforce_pull_capability, enforce_tick_capability};
use distinct::DistinctOperator;
use extend::ExtendOperator;
use filter::FilterOperator;
use gate::GateOperator;
use join::operator::JoinOperator;
use map::MapOperator;
use reifydb_core::interface::change::Change;
use scan::{
	flow::PrimitiveFlowOperator, ringbuffer::PrimitiveRingBufferOperator, series::PrimitiveSeriesOperator,
	table::PrimitiveTableOperator, view::PrimitiveViewOperator,
};
use sink::{
	ringbuffer_view::SinkRingBufferViewOperator, series_view::SinkSeriesViewOperator, view::SinkTableViewOperator,
};
use sort::SortOperator;
use take::TakeOperator;
use window::WindowOperator;

pub trait Operator: Send + Sync {
	fn id(&self) -> FlowNodeId;

	fn capabilities(&self) -> u32;

	fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change>;

	fn tick(&self, _txn: &mut FlowTransaction, _tick: Tick) -> Result<Option<Change>> {
		Ok(None)
	}

	fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns>;
}

pub type BoxedOperator = Box<dyn Operator + Send + Sync>;

pub enum Operators {
	SourceTable(PrimitiveTableOperator),
	SourceView(PrimitiveViewOperator),
	SourceFlow(PrimitiveFlowOperator),
	SourceRingBuffer(PrimitiveRingBufferOperator),
	SourceSeries(PrimitiveSeriesOperator),
	Filter(FilterOperator),
	Gate(GateOperator),
	Map(MapOperator),
	Extend(ExtendOperator),
	Join(JoinOperator),
	Sort(SortOperator),
	Take(TakeOperator),
	Distinct(DistinctOperator),
	Append(AppendOperator),
	Apply(ApplyOperator),
	SinkTableView(SinkTableViewOperator),
	SinkRingBufferView(SinkRingBufferViewOperator),
	SinkSeriesView(SinkSeriesViewOperator),
	Window(WindowOperator),
	Custom(BoxedOperator),
}

impl Operators {
	pub fn id(&self) -> FlowNodeId {
		match self {
			Operators::Filter(op) => op.id(),
			Operators::Gate(op) => op.id(),
			Operators::Map(op) => op.id(),
			Operators::Extend(op) => op.id(),
			Operators::Join(op) => op.id(),
			Operators::Sort(op) => op.id(),
			Operators::Take(op) => op.id(),
			Operators::Distinct(op) => op.id(),
			Operators::Append(op) => op.id(),
			Operators::Apply(op) => op.id(),
			Operators::SinkTableView(op) => op.id(),
			Operators::SinkRingBufferView(op) => op.id(),
			Operators::SinkSeriesView(op) => op.id(),
			Operators::Window(op) => op.id(),
			Operators::SourceTable(op) => op.id(),
			Operators::SourceView(op) => op.id(),
			Operators::SourceFlow(op) => op.id(),
			Operators::SourceRingBuffer(op) => op.id(),
			Operators::SourceSeries(op) => op.id(),
			Operators::Custom(op) => op.id(),
		}
	}

	pub fn capabilities(&self) -> u32 {
		match self {
			Operators::Filter(op) => op.capabilities(),
			Operators::Gate(op) => op.capabilities(),
			Operators::Map(op) => op.capabilities(),
			Operators::Extend(op) => op.capabilities(),
			Operators::Join(op) => op.capabilities(),
			Operators::Sort(op) => op.capabilities(),
			Operators::Take(op) => op.capabilities(),
			Operators::Distinct(op) => op.capabilities(),
			Operators::Append(op) => op.capabilities(),
			Operators::Apply(op) => op.capabilities(),
			Operators::SinkTableView(op) => op.capabilities(),
			Operators::SinkRingBufferView(op) => op.capabilities(),
			Operators::SinkSeriesView(op) => op.capabilities(),
			Operators::Window(op) => op.capabilities(),
			Operators::SourceTable(op) => op.capabilities(),
			Operators::SourceView(op) => op.capabilities(),
			Operators::SourceFlow(op) => op.capabilities(),
			Operators::SourceRingBuffer(op) => op.capabilities(),
			Operators::SourceSeries(op) => op.capabilities(),
			Operators::Custom(op) => op.capabilities(),
		}
	}

	pub fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
		enforce_apply_capabilities(self.id(), self.capabilities(), &change);
		match self {
			Operators::Filter(op) => op.apply(txn, change),
			Operators::Gate(op) => op.apply(txn, change),
			Operators::Map(op) => op.apply(txn, change),
			Operators::Extend(op) => op.apply(txn, change),
			Operators::Join(op) => op.apply(txn, change),
			Operators::Sort(op) => op.apply(txn, change),
			Operators::Take(op) => op.apply(txn, change),
			Operators::Distinct(op) => op.apply(txn, change),
			Operators::Append(op) => op.apply(txn, change),
			Operators::Apply(op) => op.apply(txn, change),
			Operators::SinkTableView(op) => op.apply(txn, change),
			Operators::SinkRingBufferView(op) => op.apply(txn, change),
			Operators::SinkSeriesView(op) => op.apply(txn, change),
			Operators::Window(op) => op.apply(txn, change),
			Operators::SourceTable(op) => op.apply(txn, change),
			Operators::SourceView(op) => op.apply(txn, change),
			Operators::SourceFlow(op) => op.apply(txn, change),
			Operators::SourceRingBuffer(op) => op.apply(txn, change),
			Operators::SourceSeries(op) => op.apply(txn, change),
			Operators::Custom(op) => op.apply(txn, change),
		}
	}

	pub fn tick(&self, txn: &mut FlowTransaction, tick: Tick) -> Result<Option<Change>> {
		match self {
			Operators::Window(op) => {
				enforce_tick_capability(op.id(), op.capabilities());
				op.tick(txn, tick)
			}
			Operators::Custom(op) => {
				enforce_tick_capability(op.id(), op.capabilities());
				op.tick(txn, tick)
			}
			Operators::Apply(op) => {
				enforce_tick_capability(op.id(), op.capabilities());
				op.tick(txn, tick)
			}
			Operators::Distinct(op) => {
				enforce_tick_capability(op.id(), op.capabilities());
				op.tick(txn, tick)
			}
			Operators::Join(op) => {
				enforce_tick_capability(op.id(), op.capabilities());
				op.tick(txn, tick)
			}
			_ => Ok(None),
		}
	}

	fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
		enforce_pull_capability(self.id(), self.capabilities());
		match self {
			Operators::Filter(op) => op.pull(txn, rows),
			Operators::Gate(op) => op.pull(txn, rows),
			Operators::Map(op) => op.pull(txn, rows),
			Operators::Extend(op) => op.pull(txn, rows),
			Operators::Join(op) => op.pull(txn, rows),
			Operators::Sort(op) => op.pull(txn, rows),
			Operators::Take(op) => op.pull(txn, rows),
			Operators::Distinct(op) => op.pull(txn, rows),
			Operators::Append(op) => op.pull(txn, rows),
			Operators::Apply(op) => op.pull(txn, rows),
			Operators::SinkTableView(op) => op.pull(txn, rows),
			Operators::SinkRingBufferView(op) => op.pull(txn, rows),
			Operators::SinkSeriesView(op) => op.pull(txn, rows),
			Operators::Window(op) => op.pull(txn, rows),
			Operators::SourceTable(op) => op.pull(txn, rows),
			Operators::SourceView(op) => op.pull(txn, rows),
			Operators::SourceFlow(op) => op.pull(txn, rows),
			Operators::SourceRingBuffer(op) => op.pull(txn, rows),
			Operators::SourceSeries(op) => op.pull(txn, rows),
			Operators::Custom(op) => op.pull(txn, rows),
		}
	}
}