reifydb_sub_flow/operator/
mod.rs1use reifydb_core::{Row, interface::FlowNodeId};
2use reifydb_engine::StandardRowEvaluator;
3use reifydb_type::RowNumber;
4
5use crate::transaction::FlowTransaction;
6
7mod apply;
8mod distinct;
9mod extend;
10mod ffi;
11mod filter;
12pub mod join;
13mod map;
14mod merge;
15mod sink;
16mod sort;
17mod source;
18pub mod stateful;
19mod take;
20pub mod transform;
21mod window;
22
23pub use apply::ApplyOperator;
24pub use distinct::DistinctOperator;
25pub use extend::ExtendOperator;
26pub use ffi::FFIOperator;
27pub use filter::FilterOperator;
28pub use join::JoinOperator;
29pub use map::MapOperator;
30pub use merge::MergeOperator;
31use reifydb_flow_operator_sdk::FlowChange;
32pub use sink::SinkViewOperator;
33pub use sort::SortOperator;
34pub use source::{SourceFlowOperator, SourceTableOperator, SourceViewOperator};
35pub use take::TakeOperator;
36pub use transform::registry::TransformOperatorRegistry;
37pub use window::WindowOperator;
38
39pub trait Operator: Send + Sync {
40 fn id(&self) -> FlowNodeId; fn apply(
43 &self,
44 txn: &mut FlowTransaction,
45 change: FlowChange,
46 evaluator: &StandardRowEvaluator,
47 ) -> crate::Result<FlowChange>;
48
49 fn get_rows(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> crate::Result<Vec<Option<Row>>>;
50}
51
52pub type BoxedOperator = Box<dyn Operator>;
53
54pub enum Operators {
55 SourceTable(SourceTableOperator),
56 SourceView(SourceViewOperator),
57 SourceFlow(SourceFlowOperator),
58 Filter(FilterOperator),
59 Map(MapOperator),
60 Extend(ExtendOperator),
61 Join(JoinOperator),
62 Sort(SortOperator),
63 Take(TakeOperator),
64 Distinct(DistinctOperator),
65 Merge(MergeOperator),
66 Apply(ApplyOperator),
67 SinkView(SinkViewOperator),
68 Window(WindowOperator),
69}
70
71impl Operators {
72 pub fn apply(
73 &self,
74 txn: &mut FlowTransaction,
75 change: FlowChange,
76 evaluator: &StandardRowEvaluator,
77 ) -> crate::Result<FlowChange> {
78 match self {
79 Operators::Filter(op) => op.apply(txn, change, evaluator),
80 Operators::Map(op) => op.apply(txn, change, evaluator),
81 Operators::Extend(op) => op.apply(txn, change, evaluator),
82 Operators::Join(op) => op.apply(txn, change, evaluator),
83 Operators::Sort(op) => op.apply(txn, change, evaluator),
84 Operators::Take(op) => op.apply(txn, change, evaluator),
85 Operators::Distinct(op) => op.apply(txn, change, evaluator),
86 Operators::Merge(op) => op.apply(txn, change, evaluator),
87 Operators::Apply(op) => op.apply(txn, change, evaluator),
88 Operators::SinkView(op) => op.apply(txn, change, evaluator),
89 Operators::Window(op) => op.apply(txn, change, evaluator),
90 Operators::SourceTable(op) => op.apply(txn, change, evaluator),
91 Operators::SourceView(op) => op.apply(txn, change, evaluator),
92 Operators::SourceFlow(op) => op.apply(txn, change, evaluator),
93 }
94 }
95
96 fn get_rows(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> crate::Result<Vec<Option<Row>>> {
97 match self {
98 Operators::Filter(op) => op.get_rows(txn, rows),
99 Operators::Map(op) => op.get_rows(txn, rows),
100 Operators::Extend(op) => op.get_rows(txn, rows),
101 Operators::Join(op) => op.get_rows(txn, rows),
102 Operators::Sort(op) => op.get_rows(txn, rows),
103 Operators::Take(op) => op.get_rows(txn, rows),
104 Operators::Distinct(op) => op.get_rows(txn, rows),
105 Operators::Merge(op) => op.get_rows(txn, rows),
106 Operators::Apply(op) => op.get_rows(txn, rows),
107 Operators::SinkView(op) => op.get_rows(txn, rows),
108 Operators::Window(op) => op.get_rows(txn, rows),
109 Operators::SourceTable(op) => op.get_rows(txn, rows),
110 Operators::SourceView(op) => op.get_rows(txn, rows),
111 Operators::SourceFlow(op) => op.get_rows(txn, rows),
112 }
113 }
114}