reifydb_sub_flow/operator/
mod.rs1use async_trait::async_trait;
2use reifydb_core::{Row, interface::FlowNodeId};
3use reifydb_engine::StandardRowEvaluator;
4use reifydb_type::RowNumber;
5
6use crate::transaction::FlowTransaction;
7
8mod apply;
9mod distinct;
10mod extend;
11mod ffi;
12mod filter;
13pub mod join;
14mod map;
15mod merge;
16mod sink;
17mod sort;
18mod source;
19pub mod stateful;
20mod take;
21pub mod transform;
22mod window;
23
24pub use apply::ApplyOperator;
25pub use distinct::DistinctOperator;
26pub use extend::ExtendOperator;
27pub use ffi::FFIOperator;
28pub use filter::FilterOperator;
29pub use join::JoinOperator;
30pub use map::MapOperator;
31pub use merge::MergeOperator;
32use reifydb_flow_operator_sdk::FlowChange;
33pub use sink::SinkViewOperator;
34pub use sort::SortOperator;
35pub use source::{SourceFlowOperator, SourceTableOperator, SourceViewOperator};
36pub use take::TakeOperator;
37pub use transform::registry::TransformOperatorRegistry;
38pub use window::WindowOperator;
39
40#[async_trait]
41pub trait Operator: Send + Sync {
42 fn id(&self) -> FlowNodeId; async fn apply(
45 &self,
46 txn: &mut FlowTransaction,
47 change: FlowChange,
48 evaluator: &StandardRowEvaluator,
49 ) -> crate::Result<FlowChange>;
50
51 async fn get_rows(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> crate::Result<Vec<Option<Row>>>;
52}
53
54pub type BoxedOperator = Box<dyn Operator>;
55
56pub enum Operators {
57 SourceTable(SourceTableOperator),
58 SourceView(SourceViewOperator),
59 SourceFlow(SourceFlowOperator),
60 Filter(FilterOperator),
61 Map(MapOperator),
62 Extend(ExtendOperator),
63 Join(JoinOperator),
64 Sort(SortOperator),
65 Take(TakeOperator),
66 Distinct(DistinctOperator),
67 Merge(MergeOperator),
68 Apply(ApplyOperator),
69 SinkView(SinkViewOperator),
70 Window(WindowOperator),
71}
72
73impl Operators {
74 pub async fn apply(
75 &self,
76 txn: &mut FlowTransaction,
77 change: FlowChange,
78 evaluator: &StandardRowEvaluator,
79 ) -> crate::Result<FlowChange> {
80 match self {
81 Operators::Filter(op) => op.apply(txn, change, evaluator).await,
82 Operators::Map(op) => op.apply(txn, change, evaluator).await,
83 Operators::Extend(op) => op.apply(txn, change, evaluator).await,
84 Operators::Join(op) => op.apply(txn, change, evaluator).await,
85 Operators::Sort(op) => op.apply(txn, change, evaluator).await,
86 Operators::Take(op) => op.apply(txn, change, evaluator).await,
87 Operators::Distinct(op) => op.apply(txn, change, evaluator).await,
88 Operators::Merge(op) => op.apply(txn, change, evaluator).await,
89 Operators::Apply(op) => op.apply(txn, change, evaluator).await,
90 Operators::SinkView(op) => op.apply(txn, change, evaluator).await,
91 Operators::Window(op) => op.apply(txn, change, evaluator).await,
92 Operators::SourceTable(op) => op.apply(txn, change, evaluator).await,
93 Operators::SourceView(op) => op.apply(txn, change, evaluator).await,
94 Operators::SourceFlow(op) => op.apply(txn, change, evaluator).await,
95 }
96 }
97
98 async fn get_rows(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> crate::Result<Vec<Option<Row>>> {
99 match self {
100 Operators::Filter(op) => op.get_rows(txn, rows).await,
101 Operators::Map(op) => op.get_rows(txn, rows).await,
102 Operators::Extend(op) => op.get_rows(txn, rows).await,
103 Operators::Join(op) => op.get_rows(txn, rows).await,
104 Operators::Sort(op) => op.get_rows(txn, rows).await,
105 Operators::Take(op) => op.get_rows(txn, rows).await,
106 Operators::Distinct(op) => op.get_rows(txn, rows).await,
107 Operators::Merge(op) => op.get_rows(txn, rows).await,
108 Operators::Apply(op) => op.get_rows(txn, rows).await,
109 Operators::SinkView(op) => op.get_rows(txn, rows).await,
110 Operators::Window(op) => op.get_rows(txn, rows).await,
111 Operators::SourceTable(op) => op.get_rows(txn, rows).await,
112 Operators::SourceView(op) => op.get_rows(txn, rows).await,
113 Operators::SourceFlow(op) => op.get_rows(txn, rows).await,
114 }
115 }
116}