reifydb_sub_flow/operator/
mod.rs

1use async_trait::async_trait;
2use reifydb_core::{Row, interface::FlowNodeId};
3use reifydb_engine::StandardRowEvaluator;
4use reifydb_type::RowNumber;
5
6use crate::transaction::FlowTransaction;
7
8mod apply;
9mod distinct;
10mod extend;
11mod ffi;
12mod filter;
13pub mod join;
14mod map;
15mod merge;
16mod sink;
17mod sort;
18mod source;
19pub mod stateful;
20mod take;
21pub mod transform;
22mod window;
23
24pub use apply::ApplyOperator;
25pub use distinct::DistinctOperator;
26pub use extend::ExtendOperator;
27pub use ffi::FFIOperator;
28pub use filter::FilterOperator;
29pub use join::JoinOperator;
30pub use map::MapOperator;
31pub use merge::MergeOperator;
32use reifydb_flow_operator_sdk::FlowChange;
33pub use sink::SinkViewOperator;
34pub use sort::SortOperator;
35pub use source::{SourceFlowOperator, SourceTableOperator, SourceViewOperator};
36pub use take::TakeOperator;
37pub use transform::registry::TransformOperatorRegistry;
38pub use window::WindowOperator;
39
40#[async_trait]
41pub trait Operator: Send + Sync {
42	fn id(&self) -> FlowNodeId; // FIXME replace by operator id
43
44	async fn apply(
45		&self,
46		txn: &mut FlowTransaction,
47		change: FlowChange,
48		evaluator: &StandardRowEvaluator,
49	) -> crate::Result<FlowChange>;
50
51	async fn get_rows(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> crate::Result<Vec<Option<Row>>>;
52}
53
54pub type BoxedOperator = Box<dyn Operator>;
55
56pub enum Operators {
57	SourceTable(SourceTableOperator),
58	SourceView(SourceViewOperator),
59	SourceFlow(SourceFlowOperator),
60	Filter(FilterOperator),
61	Map(MapOperator),
62	Extend(ExtendOperator),
63	Join(JoinOperator),
64	Sort(SortOperator),
65	Take(TakeOperator),
66	Distinct(DistinctOperator),
67	Merge(MergeOperator),
68	Apply(ApplyOperator),
69	SinkView(SinkViewOperator),
70	Window(WindowOperator),
71}
72
73impl Operators {
74	pub async fn apply(
75		&self,
76		txn: &mut FlowTransaction,
77		change: FlowChange,
78		evaluator: &StandardRowEvaluator,
79	) -> crate::Result<FlowChange> {
80		match self {
81			Operators::Filter(op) => op.apply(txn, change, evaluator).await,
82			Operators::Map(op) => op.apply(txn, change, evaluator).await,
83			Operators::Extend(op) => op.apply(txn, change, evaluator).await,
84			Operators::Join(op) => op.apply(txn, change, evaluator).await,
85			Operators::Sort(op) => op.apply(txn, change, evaluator).await,
86			Operators::Take(op) => op.apply(txn, change, evaluator).await,
87			Operators::Distinct(op) => op.apply(txn, change, evaluator).await,
88			Operators::Merge(op) => op.apply(txn, change, evaluator).await,
89			Operators::Apply(op) => op.apply(txn, change, evaluator).await,
90			Operators::SinkView(op) => op.apply(txn, change, evaluator).await,
91			Operators::Window(op) => op.apply(txn, change, evaluator).await,
92			Operators::SourceTable(op) => op.apply(txn, change, evaluator).await,
93			Operators::SourceView(op) => op.apply(txn, change, evaluator).await,
94			Operators::SourceFlow(op) => op.apply(txn, change, evaluator).await,
95		}
96	}
97
98	async fn get_rows(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> crate::Result<Vec<Option<Row>>> {
99		match self {
100			Operators::Filter(op) => op.get_rows(txn, rows).await,
101			Operators::Map(op) => op.get_rows(txn, rows).await,
102			Operators::Extend(op) => op.get_rows(txn, rows).await,
103			Operators::Join(op) => op.get_rows(txn, rows).await,
104			Operators::Sort(op) => op.get_rows(txn, rows).await,
105			Operators::Take(op) => op.get_rows(txn, rows).await,
106			Operators::Distinct(op) => op.get_rows(txn, rows).await,
107			Operators::Merge(op) => op.get_rows(txn, rows).await,
108			Operators::Apply(op) => op.get_rows(txn, rows).await,
109			Operators::SinkView(op) => op.get_rows(txn, rows).await,
110			Operators::Window(op) => op.get_rows(txn, rows).await,
111			Operators::SourceTable(op) => op.get_rows(txn, rows).await,
112			Operators::SourceView(op) => op.get_rows(txn, rows).await,
113			Operators::SourceFlow(op) => op.get_rows(txn, rows).await,
114		}
115	}
116}