reifydb_sub_flow/operator/
mod.rs1use reifydb_core::{Row, interface::FlowNodeId};
2use reifydb_engine::StandardRowEvaluator;
3use reifydb_type::RowNumber;
4
5use crate::{flow::FlowChange, transaction::FlowTransaction};
6
7mod apply;
8mod distinct;
9mod extend;
10mod ffi;
11mod filter;
12pub mod join;
13mod map;
14mod sink;
15mod sort;
16mod source;
17pub mod stateful;
18mod take;
19pub mod transform;
20mod union;
21mod window;
22
23pub use apply::ApplyOperator;
24pub use distinct::DistinctOperator;
25pub use extend::ExtendOperator;
26pub use ffi::FFIOperator;
27pub use filter::FilterOperator;
28pub use join::JoinOperator;
29pub use map::MapOperator;
30pub use sink::SinkViewOperator;
31pub use sort::SortOperator;
32pub use source::{SourceTableOperator, SourceViewOperator};
33pub use take::TakeOperator;
34pub use transform::registry::TransformOperatorRegistry;
35pub use union::UnionOperator;
36pub use window::WindowOperator;
37
38pub trait Operator: Send + Sync {
39 fn id(&self) -> FlowNodeId; fn apply(
42 &self,
43 txn: &mut FlowTransaction,
44 change: FlowChange,
45 evaluator: &StandardRowEvaluator,
46 ) -> crate::Result<FlowChange>;
47
48 fn get_rows(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> crate::Result<Vec<Option<Row>>>;
49}
50
51pub type BoxedOperator = Box<dyn Operator>;
52
53pub enum Operators {
54 SourceTable(SourceTableOperator),
55 SourceView(SourceViewOperator),
56 Filter(FilterOperator),
57 Map(MapOperator),
58 Extend(ExtendOperator),
59 Join(JoinOperator),
60 Sort(SortOperator),
61 Take(TakeOperator),
62 Distinct(DistinctOperator),
63 Union(UnionOperator),
64 Apply(ApplyOperator),
65 SinkView(SinkViewOperator),
66 Window(WindowOperator),
67}
68
69impl Operators {
70 pub fn apply(
71 &self,
72 txn: &mut FlowTransaction,
73 change: FlowChange,
74 evaluator: &StandardRowEvaluator,
75 ) -> crate::Result<FlowChange> {
76 match self {
77 Operators::Filter(op) => op.apply(txn, change, evaluator),
78 Operators::Map(op) => op.apply(txn, change, evaluator),
79 Operators::Extend(op) => op.apply(txn, change, evaluator),
80 Operators::Join(op) => op.apply(txn, change, evaluator),
81 Operators::Sort(op) => op.apply(txn, change, evaluator),
82 Operators::Take(op) => op.apply(txn, change, evaluator),
83 Operators::Distinct(op) => op.apply(txn, change, evaluator),
84 Operators::Union(op) => op.apply(txn, change, evaluator),
85 Operators::Apply(op) => op.apply(txn, change, evaluator),
86 Operators::SinkView(op) => op.apply(txn, change, evaluator),
87 Operators::Window(op) => op.apply(txn, change, evaluator),
88 Operators::SourceTable(op) => op.apply(txn, change, evaluator),
89 Operators::SourceView(op) => op.apply(txn, change, evaluator),
90 }
91 }
92
93 fn get_rows(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> crate::Result<Vec<Option<Row>>> {
94 match self {
95 Operators::Filter(op) => op.get_rows(txn, rows),
96 Operators::Map(op) => op.get_rows(txn, rows),
97 Operators::Extend(op) => op.get_rows(txn, rows),
98 Operators::Join(op) => op.get_rows(txn, rows),
99 Operators::Sort(op) => op.get_rows(txn, rows),
100 Operators::Take(op) => op.get_rows(txn, rows),
101 Operators::Distinct(op) => op.get_rows(txn, rows),
102 Operators::Union(op) => op.get_rows(txn, rows),
103 Operators::Apply(op) => op.get_rows(txn, rows),
104 Operators::SinkView(op) => op.get_rows(txn, rows),
105 Operators::Window(op) => op.get_rows(txn, rows),
106 Operators::SourceTable(op) => op.get_rows(txn, rows),
107 Operators::SourceView(op) => op.get_rows(txn, rows),
108 }
109 }
110}