reifydb_sub_flow/operator/
mod.rs1use reifydb_core::{interface::catalog::flow::FlowNodeId, value::column::columns::Columns};
5use reifydb_type::{Result, value::row_number::RowNumber};
6
7use crate::transaction::FlowTransaction;
8
9pub mod append;
10pub mod apply;
11pub mod distinct;
12pub mod extend;
13#[cfg(reifydb_target = "native")]
14pub mod ffi;
15pub mod filter;
16pub mod gate;
17pub mod join;
18pub mod map;
19pub mod scan;
20pub mod sink;
21pub mod sort;
22pub mod stateful;
23pub mod take;
24pub mod window;
25
26use append::AppendOperator;
27use apply::ApplyOperator;
28use distinct::DistinctOperator;
29use extend::ExtendOperator;
30use filter::FilterOperator;
31use gate::GateOperator;
32use join::operator::JoinOperator;
33use map::MapOperator;
34use reifydb_core::interface::change::Change;
35use scan::{
36 flow::PrimitiveFlowOperator, ringbuffer::PrimitiveRingBufferOperator, series::PrimitiveSeriesOperator,
37 table::PrimitiveTableOperator, view::PrimitiveViewOperator,
38};
39use sink::{subscription::SinkSubscriptionOperator, view::SinkViewOperator};
40use sort::SortOperator;
41use take::TakeOperator;
42use window::WindowOperator;
43
44pub trait Operator: Send + Sync {
45 fn id(&self) -> FlowNodeId;
46
47 fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change>;
48
49 fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns>;
50}
51
52pub type BoxedOperator = Box<dyn Operator + Send + Sync>;
53
54pub enum Operators {
55 SourceTable(PrimitiveTableOperator),
56 SourceView(PrimitiveViewOperator),
57 SourceFlow(PrimitiveFlowOperator),
58 SourceRingBuffer(PrimitiveRingBufferOperator),
59 SourceSeries(PrimitiveSeriesOperator),
60 Filter(FilterOperator),
61 Gate(GateOperator),
62 Map(MapOperator),
63 Extend(ExtendOperator),
64 Join(JoinOperator),
65 Sort(SortOperator),
66 Take(TakeOperator),
67 Distinct(DistinctOperator),
68 Append(AppendOperator),
69 Apply(ApplyOperator),
70 SinkView(SinkViewOperator),
71 SinkSubscription(SinkSubscriptionOperator),
72 Window(WindowOperator),
73 Custom(BoxedOperator),
74}
75
76impl Operators {
77 pub fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
78 match self {
79 Operators::Filter(op) => op.apply(txn, change),
80 Operators::Gate(op) => op.apply(txn, change),
81 Operators::Map(op) => op.apply(txn, change),
82 Operators::Extend(op) => op.apply(txn, change),
83 Operators::Join(op) => op.apply(txn, change),
84 Operators::Sort(op) => op.apply(txn, change),
85 Operators::Take(op) => op.apply(txn, change),
86 Operators::Distinct(op) => op.apply(txn, change),
87 Operators::Append(op) => op.apply(txn, change),
88 Operators::Apply(op) => op.apply(txn, change),
89 Operators::SinkView(op) => op.apply(txn, change),
90 Operators::SinkSubscription(op) => op.apply(txn, change),
91 Operators::Window(op) => op.apply(txn, change),
92 Operators::SourceTable(op) => op.apply(txn, change),
93 Operators::SourceView(op) => op.apply(txn, change),
94 Operators::SourceFlow(op) => op.apply(txn, change),
95 Operators::SourceRingBuffer(op) => op.apply(txn, change),
96 Operators::SourceSeries(op) => op.apply(txn, change),
97 Operators::Custom(op) => op.apply(txn, change),
98 }
99 }
100
101 fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
102 match self {
103 Operators::Filter(op) => op.pull(txn, rows),
104 Operators::Gate(op) => op.pull(txn, rows),
105 Operators::Map(op) => op.pull(txn, rows),
106 Operators::Extend(op) => op.pull(txn, rows),
107 Operators::Join(op) => op.pull(txn, rows),
108 Operators::Sort(op) => op.pull(txn, rows),
109 Operators::Take(op) => op.pull(txn, rows),
110 Operators::Distinct(op) => op.pull(txn, rows),
111 Operators::Append(op) => op.pull(txn, rows),
112 Operators::Apply(op) => op.pull(txn, rows),
113 Operators::SinkView(op) => op.pull(txn, rows),
114 Operators::SinkSubscription(op) => op.pull(txn, rows),
115 Operators::Window(op) => op.pull(txn, rows),
116 Operators::SourceTable(op) => op.pull(txn, rows),
117 Operators::SourceView(op) => op.pull(txn, rows),
118 Operators::SourceFlow(op) => op.pull(txn, rows),
119 Operators::SourceRingBuffer(op) => op.pull(txn, rows),
120 Operators::SourceSeries(op) => op.pull(txn, rows),
121 Operators::Custom(op) => op.pull(txn, rows),
122 }
123 }
124}