Skip to main content

reifydb_sub_flow/operator/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::{interface::catalog::flow::FlowNodeId, value::column::columns::Columns};
5use reifydb_type::{
6	Result,
7	value::{datetime::DateTime, row_number::RowNumber},
8};
9
10use crate::transaction::FlowTransaction;
11
12pub mod append;
13pub mod apply;
14pub mod distinct;
15pub mod extend;
16#[cfg(reifydb_target = "native")]
17pub mod ffi;
18pub mod filter;
19pub mod gate;
20pub mod join;
21pub mod map;
22pub mod scan;
23pub mod sink;
24pub mod sort;
25pub mod stateful;
26pub mod take;
27pub mod window;
28
29use append::AppendOperator;
30use apply::ApplyOperator;
31use distinct::DistinctOperator;
32use extend::ExtendOperator;
33use filter::FilterOperator;
34use gate::GateOperator;
35use join::operator::JoinOperator;
36use map::MapOperator;
37use reifydb_core::interface::change::Change;
38use scan::{
39	flow::PrimitiveFlowOperator, ringbuffer::PrimitiveRingBufferOperator, series::PrimitiveSeriesOperator,
40	table::PrimitiveTableOperator, view::PrimitiveViewOperator,
41};
42use sink::{
43	ringbuffer_view::SinkRingBufferViewOperator, series_view::SinkSeriesViewOperator,
44	subscription::SinkSubscriptionOperator, view::SinkTableViewOperator,
45};
46use sort::SortOperator;
47use take::TakeOperator;
48use window::WindowOperator;
49
50pub trait Operator: Send + Sync {
51	fn id(&self) -> FlowNodeId;
52
53	fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change>;
54
55	/// Periodic tick for time-based maintenance (e.g., window eviction).
56	/// Returns Some(Change) with diffs if maintenance produced changes.
57	fn tick(&self, _txn: &mut FlowTransaction, _timestamp: DateTime) -> Result<Option<Change>> {
58		Ok(None)
59	}
60
61	fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns>;
62}
63
64pub type BoxedOperator = Box<dyn Operator + Send + Sync>;
65
66pub enum Operators {
67	SourceTable(PrimitiveTableOperator),
68	SourceView(PrimitiveViewOperator),
69	SourceFlow(PrimitiveFlowOperator),
70	SourceRingBuffer(PrimitiveRingBufferOperator),
71	SourceSeries(PrimitiveSeriesOperator),
72	Filter(FilterOperator),
73	Gate(GateOperator),
74	Map(MapOperator),
75	Extend(ExtendOperator),
76	Join(JoinOperator),
77	Sort(SortOperator),
78	Take(TakeOperator),
79	Distinct(DistinctOperator),
80	Append(AppendOperator),
81	Apply(ApplyOperator),
82	SinkTableView(SinkTableViewOperator),
83	SinkRingBufferView(SinkRingBufferViewOperator),
84	SinkSeriesView(SinkSeriesViewOperator),
85	SinkSubscription(SinkSubscriptionOperator),
86	Window(WindowOperator),
87	Custom(BoxedOperator),
88}
89
90impl Operators {
91	pub fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
92		match self {
93			Operators::Filter(op) => op.apply(txn, change),
94			Operators::Gate(op) => op.apply(txn, change),
95			Operators::Map(op) => op.apply(txn, change),
96			Operators::Extend(op) => op.apply(txn, change),
97			Operators::Join(op) => op.apply(txn, change),
98			Operators::Sort(op) => op.apply(txn, change),
99			Operators::Take(op) => op.apply(txn, change),
100			Operators::Distinct(op) => op.apply(txn, change),
101			Operators::Append(op) => op.apply(txn, change),
102			Operators::Apply(op) => op.apply(txn, change),
103			Operators::SinkTableView(op) => op.apply(txn, change),
104			Operators::SinkRingBufferView(op) => op.apply(txn, change),
105			Operators::SinkSeriesView(op) => op.apply(txn, change),
106			Operators::SinkSubscription(op) => op.apply(txn, change),
107			Operators::Window(op) => op.apply(txn, change),
108			Operators::SourceTable(op) => op.apply(txn, change),
109			Operators::SourceView(op) => op.apply(txn, change),
110			Operators::SourceFlow(op) => op.apply(txn, change),
111			Operators::SourceRingBuffer(op) => op.apply(txn, change),
112			Operators::SourceSeries(op) => op.apply(txn, change),
113			Operators::Custom(op) => op.apply(txn, change),
114		}
115	}
116
117	pub fn tick(&self, txn: &mut FlowTransaction, timestamp: DateTime) -> Result<Option<Change>> {
118		match self {
119			Operators::Window(op) => op.tick(txn, timestamp),
120			Operators::Custom(op) => op.tick(txn, timestamp),
121			Operators::Apply(op) => op.tick(txn, timestamp),
122			_ => Ok(None),
123		}
124	}
125
126	fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
127		match self {
128			Operators::Filter(op) => op.pull(txn, rows),
129			Operators::Gate(op) => op.pull(txn, rows),
130			Operators::Map(op) => op.pull(txn, rows),
131			Operators::Extend(op) => op.pull(txn, rows),
132			Operators::Join(op) => op.pull(txn, rows),
133			Operators::Sort(op) => op.pull(txn, rows),
134			Operators::Take(op) => op.pull(txn, rows),
135			Operators::Distinct(op) => op.pull(txn, rows),
136			Operators::Append(op) => op.pull(txn, rows),
137			Operators::Apply(op) => op.pull(txn, rows),
138			Operators::SinkTableView(op) => op.pull(txn, rows),
139			Operators::SinkRingBufferView(op) => op.pull(txn, rows),
140			Operators::SinkSeriesView(op) => op.pull(txn, rows),
141			Operators::SinkSubscription(op) => op.pull(txn, rows),
142			Operators::Window(op) => op.pull(txn, rows),
143			Operators::SourceTable(op) => op.pull(txn, rows),
144			Operators::SourceView(op) => op.pull(txn, rows),
145			Operators::SourceFlow(op) => op.pull(txn, rows),
146			Operators::SourceRingBuffer(op) => op.pull(txn, rows),
147			Operators::SourceSeries(op) => op.pull(txn, rows),
148			Operators::Custom(op) => op.pull(txn, rows),
149		}
150	}
151}