reifydb_sub_flow/operator/
mod.rs

1use reifydb_core::{Row, interface::FlowNodeId};
2use reifydb_engine::StandardRowEvaluator;
3use reifydb_type::RowNumber;
4
5use crate::{flow::FlowChange, transaction::FlowTransaction};
6
7mod apply;
8mod distinct;
9mod extend;
10mod ffi;
11mod filter;
12pub mod join;
13mod map;
14mod sink;
15mod sort;
16mod source;
17pub mod stateful;
18mod take;
19pub mod transform;
20mod union;
21mod window;
22
23pub use apply::ApplyOperator;
24pub use distinct::DistinctOperator;
25pub use extend::ExtendOperator;
26pub use ffi::FFIOperator;
27pub use filter::FilterOperator;
28pub use join::JoinOperator;
29pub use map::MapOperator;
30pub use sink::SinkViewOperator;
31pub use sort::SortOperator;
32pub use source::{SourceTableOperator, SourceViewOperator};
33pub use take::TakeOperator;
34pub use transform::registry::TransformOperatorRegistry;
35pub use union::UnionOperator;
36pub use window::WindowOperator;
37
38pub trait Operator: Send + Sync {
39	fn id(&self) -> FlowNodeId; // FIXME replace by operator id
40
41	fn apply(
42		&self,
43		txn: &mut FlowTransaction,
44		change: FlowChange,
45		evaluator: &StandardRowEvaluator,
46	) -> crate::Result<FlowChange>;
47
48	fn get_rows(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> crate::Result<Vec<Option<Row>>>;
49}
50
51pub type BoxedOperator = Box<dyn Operator>;
52
53pub enum Operators {
54	SourceTable(SourceTableOperator),
55	SourceView(SourceViewOperator),
56	Filter(FilterOperator),
57	Map(MapOperator),
58	Extend(ExtendOperator),
59	Join(JoinOperator),
60	Sort(SortOperator),
61	Take(TakeOperator),
62	Distinct(DistinctOperator),
63	Union(UnionOperator),
64	Apply(ApplyOperator),
65	SinkView(SinkViewOperator),
66	Window(WindowOperator),
67}
68
69impl Operators {
70	pub fn apply(
71		&self,
72		txn: &mut FlowTransaction,
73		change: FlowChange,
74		evaluator: &StandardRowEvaluator,
75	) -> crate::Result<FlowChange> {
76		match self {
77			Operators::Filter(op) => op.apply(txn, change, evaluator),
78			Operators::Map(op) => op.apply(txn, change, evaluator),
79			Operators::Extend(op) => op.apply(txn, change, evaluator),
80			Operators::Join(op) => op.apply(txn, change, evaluator),
81			Operators::Sort(op) => op.apply(txn, change, evaluator),
82			Operators::Take(op) => op.apply(txn, change, evaluator),
83			Operators::Distinct(op) => op.apply(txn, change, evaluator),
84			Operators::Union(op) => op.apply(txn, change, evaluator),
85			Operators::Apply(op) => op.apply(txn, change, evaluator),
86			Operators::SinkView(op) => op.apply(txn, change, evaluator),
87			Operators::Window(op) => op.apply(txn, change, evaluator),
88			Operators::SourceTable(op) => op.apply(txn, change, evaluator),
89			Operators::SourceView(op) => op.apply(txn, change, evaluator),
90		}
91	}
92
93	fn get_rows(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> crate::Result<Vec<Option<Row>>> {
94		match self {
95			Operators::Filter(op) => op.get_rows(txn, rows),
96			Operators::Map(op) => op.get_rows(txn, rows),
97			Operators::Extend(op) => op.get_rows(txn, rows),
98			Operators::Join(op) => op.get_rows(txn, rows),
99			Operators::Sort(op) => op.get_rows(txn, rows),
100			Operators::Take(op) => op.get_rows(txn, rows),
101			Operators::Distinct(op) => op.get_rows(txn, rows),
102			Operators::Union(op) => op.get_rows(txn, rows),
103			Operators::Apply(op) => op.get_rows(txn, rows),
104			Operators::SinkView(op) => op.get_rows(txn, rows),
105			Operators::Window(op) => op.get_rows(txn, rows),
106			Operators::SourceTable(op) => op.get_rows(txn, rows),
107			Operators::SourceView(op) => op.get_rows(txn, rows),
108		}
109	}
110}