Skip to main content

reifydb_sub_flow/operator/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::{interface::catalog::flow::FlowNodeId, value::column::columns::Columns};
5use reifydb_type::{
6	Result,
7	value::{datetime::DateTime, row_number::RowNumber},
8};
9
10use crate::transaction::FlowTransaction;
11
12pub mod append;
13pub mod apply;
14pub mod distinct;
15pub mod extend;
16#[cfg(reifydb_target = "native")]
17pub mod ffi;
18pub mod filter;
19pub mod gate;
20pub mod join;
21pub mod map;
22pub mod scan;
23pub mod sink;
24pub mod sort;
25pub mod stateful;
26pub mod take;
27pub mod window;
28
29use append::AppendOperator;
30use apply::ApplyOperator;
31use distinct::DistinctOperator;
32use extend::ExtendOperator;
33use filter::FilterOperator;
34use gate::GateOperator;
35use join::operator::JoinOperator;
36use map::MapOperator;
37use reifydb_core::interface::change::Change;
38use scan::{
39	flow::PrimitiveFlowOperator, ringbuffer::PrimitiveRingBufferOperator, series::PrimitiveSeriesOperator,
40	table::PrimitiveTableOperator, view::PrimitiveViewOperator,
41};
42use sink::{
43	ringbuffer_view::SinkRingBufferViewOperator, series_view::SinkSeriesViewOperator, view::SinkTableViewOperator,
44};
45use sort::SortOperator;
46use take::TakeOperator;
47use window::WindowOperator;
48
49pub trait Operator: Send + Sync {
50	fn id(&self) -> FlowNodeId;
51
52	fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change>;
53
54	/// Periodic tick for time-based maintenance (e.g., window eviction).
55	/// Returns Some(Change) with diffs if maintenance produced changes.
56	fn tick(&self, _txn: &mut FlowTransaction, _timestamp: DateTime) -> Result<Option<Change>> {
57		Ok(None)
58	}
59
60	fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns>;
61}
62
63pub type BoxedOperator = Box<dyn Operator + Send + Sync>;
64
65pub enum Operators {
66	SourceTable(PrimitiveTableOperator),
67	SourceView(PrimitiveViewOperator),
68	SourceFlow(PrimitiveFlowOperator),
69	SourceRingBuffer(PrimitiveRingBufferOperator),
70	SourceSeries(PrimitiveSeriesOperator),
71	Filter(FilterOperator),
72	Gate(GateOperator),
73	Map(MapOperator),
74	Extend(ExtendOperator),
75	Join(JoinOperator),
76	Sort(SortOperator),
77	Take(TakeOperator),
78	Distinct(DistinctOperator),
79	Append(AppendOperator),
80	Apply(ApplyOperator),
81	SinkTableView(SinkTableViewOperator),
82	SinkRingBufferView(SinkRingBufferViewOperator),
83	SinkSeriesView(SinkSeriesViewOperator),
84	Window(WindowOperator),
85	Custom(BoxedOperator),
86}
87
88impl Operators {
89	pub fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
90		match self {
91			Operators::Filter(op) => op.apply(txn, change),
92			Operators::Gate(op) => op.apply(txn, change),
93			Operators::Map(op) => op.apply(txn, change),
94			Operators::Extend(op) => op.apply(txn, change),
95			Operators::Join(op) => op.apply(txn, change),
96			Operators::Sort(op) => op.apply(txn, change),
97			Operators::Take(op) => op.apply(txn, change),
98			Operators::Distinct(op) => op.apply(txn, change),
99			Operators::Append(op) => op.apply(txn, change),
100			Operators::Apply(op) => op.apply(txn, change),
101			Operators::SinkTableView(op) => op.apply(txn, change),
102			Operators::SinkRingBufferView(op) => op.apply(txn, change),
103			Operators::SinkSeriesView(op) => op.apply(txn, change),
104			Operators::Window(op) => op.apply(txn, change),
105			Operators::SourceTable(op) => op.apply(txn, change),
106			Operators::SourceView(op) => op.apply(txn, change),
107			Operators::SourceFlow(op) => op.apply(txn, change),
108			Operators::SourceRingBuffer(op) => op.apply(txn, change),
109			Operators::SourceSeries(op) => op.apply(txn, change),
110			Operators::Custom(op) => op.apply(txn, change),
111		}
112	}
113
114	pub fn tick(&self, txn: &mut FlowTransaction, timestamp: DateTime) -> Result<Option<Change>> {
115		match self {
116			Operators::Window(op) => op.tick(txn, timestamp),
117			Operators::Custom(op) => op.tick(txn, timestamp),
118			Operators::Apply(op) => op.tick(txn, timestamp),
119			_ => Ok(None),
120		}
121	}
122
123	fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
124		match self {
125			Operators::Filter(op) => op.pull(txn, rows),
126			Operators::Gate(op) => op.pull(txn, rows),
127			Operators::Map(op) => op.pull(txn, rows),
128			Operators::Extend(op) => op.pull(txn, rows),
129			Operators::Join(op) => op.pull(txn, rows),
130			Operators::Sort(op) => op.pull(txn, rows),
131			Operators::Take(op) => op.pull(txn, rows),
132			Operators::Distinct(op) => op.pull(txn, rows),
133			Operators::Append(op) => op.pull(txn, rows),
134			Operators::Apply(op) => op.pull(txn, rows),
135			Operators::SinkTableView(op) => op.pull(txn, rows),
136			Operators::SinkRingBufferView(op) => op.pull(txn, rows),
137			Operators::SinkSeriesView(op) => op.pull(txn, rows),
138			Operators::Window(op) => op.pull(txn, rows),
139			Operators::SourceTable(op) => op.pull(txn, rows),
140			Operators::SourceView(op) => op.pull(txn, rows),
141			Operators::SourceFlow(op) => op.pull(txn, rows),
142			Operators::SourceRingBuffer(op) => op.pull(txn, rows),
143			Operators::SourceSeries(op) => op.pull(txn, rows),
144			Operators::Custom(op) => op.pull(txn, rows),
145		}
146	}
147}