reifydb_sub_flow/operator/
mod.rs1use reifydb_core::{interface::catalog::flow::FlowNodeId, value::column::columns::Columns};
5use reifydb_type::{
6 Result,
7 value::{datetime::DateTime, row_number::RowNumber},
8};
9
10use crate::transaction::FlowTransaction;
11
12pub mod append;
13pub mod apply;
14pub mod distinct;
15pub mod extend;
16#[cfg(reifydb_target = "native")]
17pub mod ffi;
18pub mod filter;
19pub mod gate;
20pub mod join;
21pub mod map;
22pub mod scan;
23pub mod sink;
24pub mod sort;
25pub mod stateful;
26pub mod take;
27pub mod window;
28
29use append::AppendOperator;
30use apply::ApplyOperator;
31use distinct::DistinctOperator;
32use extend::ExtendOperator;
33use filter::FilterOperator;
34use gate::GateOperator;
35use join::operator::JoinOperator;
36use map::MapOperator;
37use reifydb_core::interface::change::Change;
38use scan::{
39 flow::PrimitiveFlowOperator, ringbuffer::PrimitiveRingBufferOperator, series::PrimitiveSeriesOperator,
40 table::PrimitiveTableOperator, view::PrimitiveViewOperator,
41};
42use sink::{
43 ringbuffer_view::SinkRingBufferViewOperator, series_view::SinkSeriesViewOperator, view::SinkTableViewOperator,
44};
45use sort::SortOperator;
46use take::TakeOperator;
47use window::WindowOperator;
48
49pub trait Operator: Send + Sync {
50 fn id(&self) -> FlowNodeId;
51
52 fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change>;
53
54 fn tick(&self, _txn: &mut FlowTransaction, _timestamp: DateTime) -> Result<Option<Change>> {
57 Ok(None)
58 }
59
60 fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns>;
61}
62
63pub type BoxedOperator = Box<dyn Operator + Send + Sync>;
64
65pub enum Operators {
66 SourceTable(PrimitiveTableOperator),
67 SourceView(PrimitiveViewOperator),
68 SourceFlow(PrimitiveFlowOperator),
69 SourceRingBuffer(PrimitiveRingBufferOperator),
70 SourceSeries(PrimitiveSeriesOperator),
71 Filter(FilterOperator),
72 Gate(GateOperator),
73 Map(MapOperator),
74 Extend(ExtendOperator),
75 Join(JoinOperator),
76 Sort(SortOperator),
77 Take(TakeOperator),
78 Distinct(DistinctOperator),
79 Append(AppendOperator),
80 Apply(ApplyOperator),
81 SinkTableView(SinkTableViewOperator),
82 SinkRingBufferView(SinkRingBufferViewOperator),
83 SinkSeriesView(SinkSeriesViewOperator),
84 Window(WindowOperator),
85 Custom(BoxedOperator),
86}
87
88impl Operators {
89 pub fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
90 match self {
91 Operators::Filter(op) => op.apply(txn, change),
92 Operators::Gate(op) => op.apply(txn, change),
93 Operators::Map(op) => op.apply(txn, change),
94 Operators::Extend(op) => op.apply(txn, change),
95 Operators::Join(op) => op.apply(txn, change),
96 Operators::Sort(op) => op.apply(txn, change),
97 Operators::Take(op) => op.apply(txn, change),
98 Operators::Distinct(op) => op.apply(txn, change),
99 Operators::Append(op) => op.apply(txn, change),
100 Operators::Apply(op) => op.apply(txn, change),
101 Operators::SinkTableView(op) => op.apply(txn, change),
102 Operators::SinkRingBufferView(op) => op.apply(txn, change),
103 Operators::SinkSeriesView(op) => op.apply(txn, change),
104 Operators::Window(op) => op.apply(txn, change),
105 Operators::SourceTable(op) => op.apply(txn, change),
106 Operators::SourceView(op) => op.apply(txn, change),
107 Operators::SourceFlow(op) => op.apply(txn, change),
108 Operators::SourceRingBuffer(op) => op.apply(txn, change),
109 Operators::SourceSeries(op) => op.apply(txn, change),
110 Operators::Custom(op) => op.apply(txn, change),
111 }
112 }
113
114 pub fn tick(&self, txn: &mut FlowTransaction, timestamp: DateTime) -> Result<Option<Change>> {
115 match self {
116 Operators::Window(op) => op.tick(txn, timestamp),
117 Operators::Custom(op) => op.tick(txn, timestamp),
118 Operators::Apply(op) => op.tick(txn, timestamp),
119 _ => Ok(None),
120 }
121 }
122
123 fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
124 match self {
125 Operators::Filter(op) => op.pull(txn, rows),
126 Operators::Gate(op) => op.pull(txn, rows),
127 Operators::Map(op) => op.pull(txn, rows),
128 Operators::Extend(op) => op.pull(txn, rows),
129 Operators::Join(op) => op.pull(txn, rows),
130 Operators::Sort(op) => op.pull(txn, rows),
131 Operators::Take(op) => op.pull(txn, rows),
132 Operators::Distinct(op) => op.pull(txn, rows),
133 Operators::Append(op) => op.pull(txn, rows),
134 Operators::Apply(op) => op.pull(txn, rows),
135 Operators::SinkTableView(op) => op.pull(txn, rows),
136 Operators::SinkRingBufferView(op) => op.pull(txn, rows),
137 Operators::SinkSeriesView(op) => op.pull(txn, rows),
138 Operators::Window(op) => op.pull(txn, rows),
139 Operators::SourceTable(op) => op.pull(txn, rows),
140 Operators::SourceView(op) => op.pull(txn, rows),
141 Operators::SourceFlow(op) => op.pull(txn, rows),
142 Operators::SourceRingBuffer(op) => op.pull(txn, rows),
143 Operators::SourceSeries(op) => op.pull(txn, rows),
144 Operators::Custom(op) => op.pull(txn, rows),
145 }
146 }
147}