reifydb_sub_flow/operator/
mod.rs

1use reifydb_core::{Row, interface::FlowNodeId};
2use reifydb_engine::StandardRowEvaluator;
3use reifydb_type::RowNumber;
4
5use crate::transaction::FlowTransaction;
6
7mod apply;
8mod distinct;
9mod extend;
10mod ffi;
11mod filter;
12pub mod join;
13mod map;
14mod merge;
15mod sink;
16mod sort;
17mod source;
18pub mod stateful;
19mod take;
20pub mod transform;
21mod window;
22
23pub use apply::ApplyOperator;
24pub use distinct::DistinctOperator;
25pub use extend::ExtendOperator;
26pub use ffi::FFIOperator;
27pub use filter::FilterOperator;
28pub use join::JoinOperator;
29pub use map::MapOperator;
30pub use merge::MergeOperator;
31use reifydb_flow_operator_sdk::FlowChange;
32pub use sink::SinkViewOperator;
33pub use sort::SortOperator;
34pub use source::{SourceFlowOperator, SourceTableOperator, SourceViewOperator};
35pub use take::TakeOperator;
36pub use transform::registry::TransformOperatorRegistry;
37pub use window::WindowOperator;
38
39pub trait Operator: Send + Sync {
40	fn id(&self) -> FlowNodeId; // FIXME replace by operator id
41
42	fn apply(
43		&self,
44		txn: &mut FlowTransaction,
45		change: FlowChange,
46		evaluator: &StandardRowEvaluator,
47	) -> crate::Result<FlowChange>;
48
49	fn get_rows(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> crate::Result<Vec<Option<Row>>>;
50}
51
52pub type BoxedOperator = Box<dyn Operator>;
53
54pub enum Operators {
55	SourceTable(SourceTableOperator),
56	SourceView(SourceViewOperator),
57	SourceFlow(SourceFlowOperator),
58	Filter(FilterOperator),
59	Map(MapOperator),
60	Extend(ExtendOperator),
61	Join(JoinOperator),
62	Sort(SortOperator),
63	Take(TakeOperator),
64	Distinct(DistinctOperator),
65	Merge(MergeOperator),
66	Apply(ApplyOperator),
67	SinkView(SinkViewOperator),
68	Window(WindowOperator),
69}
70
71impl Operators {
72	pub fn apply(
73		&self,
74		txn: &mut FlowTransaction,
75		change: FlowChange,
76		evaluator: &StandardRowEvaluator,
77	) -> crate::Result<FlowChange> {
78		match self {
79			Operators::Filter(op) => op.apply(txn, change, evaluator),
80			Operators::Map(op) => op.apply(txn, change, evaluator),
81			Operators::Extend(op) => op.apply(txn, change, evaluator),
82			Operators::Join(op) => op.apply(txn, change, evaluator),
83			Operators::Sort(op) => op.apply(txn, change, evaluator),
84			Operators::Take(op) => op.apply(txn, change, evaluator),
85			Operators::Distinct(op) => op.apply(txn, change, evaluator),
86			Operators::Merge(op) => op.apply(txn, change, evaluator),
87			Operators::Apply(op) => op.apply(txn, change, evaluator),
88			Operators::SinkView(op) => op.apply(txn, change, evaluator),
89			Operators::Window(op) => op.apply(txn, change, evaluator),
90			Operators::SourceTable(op) => op.apply(txn, change, evaluator),
91			Operators::SourceView(op) => op.apply(txn, change, evaluator),
92			Operators::SourceFlow(op) => op.apply(txn, change, evaluator),
93		}
94	}
95
96	fn get_rows(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> crate::Result<Vec<Option<Row>>> {
97		match self {
98			Operators::Filter(op) => op.get_rows(txn, rows),
99			Operators::Map(op) => op.get_rows(txn, rows),
100			Operators::Extend(op) => op.get_rows(txn, rows),
101			Operators::Join(op) => op.get_rows(txn, rows),
102			Operators::Sort(op) => op.get_rows(txn, rows),
103			Operators::Take(op) => op.get_rows(txn, rows),
104			Operators::Distinct(op) => op.get_rows(txn, rows),
105			Operators::Merge(op) => op.get_rows(txn, rows),
106			Operators::Apply(op) => op.get_rows(txn, rows),
107			Operators::SinkView(op) => op.get_rows(txn, rows),
108			Operators::Window(op) => op.get_rows(txn, rows),
109			Operators::SourceTable(op) => op.get_rows(txn, rows),
110			Operators::SourceView(op) => op.get_rows(txn, rows),
111			Operators::SourceFlow(op) => op.get_rows(txn, rows),
112		}
113	}
114}