reifydb_sub_flow/operator/
mod.rs1use reifydb_core::{interface::catalog::flow::FlowNodeId, value::column::columns::Columns};
5use reifydb_type::{
6 Result,
7 value::{datetime::DateTime, row_number::RowNumber},
8};
9
10use crate::transaction::FlowTransaction;
11
12pub mod append;
13pub mod apply;
14pub mod distinct;
15pub mod extend;
16#[cfg(reifydb_target = "native")]
17pub mod ffi;
18pub mod filter;
19pub mod gate;
20pub mod join;
21pub mod map;
22pub mod scan;
23pub mod sink;
24pub mod sort;
25pub mod stateful;
26pub mod take;
27pub mod window;
28
29use append::AppendOperator;
30use apply::ApplyOperator;
31use distinct::DistinctOperator;
32use extend::ExtendOperator;
33use filter::FilterOperator;
34use gate::GateOperator;
35use join::operator::JoinOperator;
36use map::MapOperator;
37use reifydb_core::interface::change::Change;
38use scan::{
39 flow::PrimitiveFlowOperator, ringbuffer::PrimitiveRingBufferOperator, series::PrimitiveSeriesOperator,
40 table::PrimitiveTableOperator, view::PrimitiveViewOperator,
41};
42use sink::{
43 ringbuffer_view::SinkRingBufferViewOperator, series_view::SinkSeriesViewOperator,
44 subscription::SinkSubscriptionOperator, view::SinkTableViewOperator,
45};
46use sort::SortOperator;
47use take::TakeOperator;
48use window::WindowOperator;
49
50pub trait Operator: Send + Sync {
51 fn id(&self) -> FlowNodeId;
52
53 fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change>;
54
55 fn tick(&self, _txn: &mut FlowTransaction, _timestamp: DateTime) -> Result<Option<Change>> {
58 Ok(None)
59 }
60
61 fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns>;
62}
63
64pub type BoxedOperator = Box<dyn Operator + Send + Sync>;
65
66pub enum Operators {
67 SourceTable(PrimitiveTableOperator),
68 SourceView(PrimitiveViewOperator),
69 SourceFlow(PrimitiveFlowOperator),
70 SourceRingBuffer(PrimitiveRingBufferOperator),
71 SourceSeries(PrimitiveSeriesOperator),
72 Filter(FilterOperator),
73 Gate(GateOperator),
74 Map(MapOperator),
75 Extend(ExtendOperator),
76 Join(JoinOperator),
77 Sort(SortOperator),
78 Take(TakeOperator),
79 Distinct(DistinctOperator),
80 Append(AppendOperator),
81 Apply(ApplyOperator),
82 SinkTableView(SinkTableViewOperator),
83 SinkRingBufferView(SinkRingBufferViewOperator),
84 SinkSeriesView(SinkSeriesViewOperator),
85 SinkSubscription(SinkSubscriptionOperator),
86 Window(WindowOperator),
87 Custom(BoxedOperator),
88}
89
90impl Operators {
91 pub fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
92 match self {
93 Operators::Filter(op) => op.apply(txn, change),
94 Operators::Gate(op) => op.apply(txn, change),
95 Operators::Map(op) => op.apply(txn, change),
96 Operators::Extend(op) => op.apply(txn, change),
97 Operators::Join(op) => op.apply(txn, change),
98 Operators::Sort(op) => op.apply(txn, change),
99 Operators::Take(op) => op.apply(txn, change),
100 Operators::Distinct(op) => op.apply(txn, change),
101 Operators::Append(op) => op.apply(txn, change),
102 Operators::Apply(op) => op.apply(txn, change),
103 Operators::SinkTableView(op) => op.apply(txn, change),
104 Operators::SinkRingBufferView(op) => op.apply(txn, change),
105 Operators::SinkSeriesView(op) => op.apply(txn, change),
106 Operators::SinkSubscription(op) => op.apply(txn, change),
107 Operators::Window(op) => op.apply(txn, change),
108 Operators::SourceTable(op) => op.apply(txn, change),
109 Operators::SourceView(op) => op.apply(txn, change),
110 Operators::SourceFlow(op) => op.apply(txn, change),
111 Operators::SourceRingBuffer(op) => op.apply(txn, change),
112 Operators::SourceSeries(op) => op.apply(txn, change),
113 Operators::Custom(op) => op.apply(txn, change),
114 }
115 }
116
117 pub fn tick(&self, txn: &mut FlowTransaction, timestamp: DateTime) -> Result<Option<Change>> {
118 match self {
119 Operators::Window(op) => op.tick(txn, timestamp),
120 Operators::Custom(op) => op.tick(txn, timestamp),
121 Operators::Apply(op) => op.tick(txn, timestamp),
122 _ => Ok(None),
123 }
124 }
125
126 fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
127 match self {
128 Operators::Filter(op) => op.pull(txn, rows),
129 Operators::Gate(op) => op.pull(txn, rows),
130 Operators::Map(op) => op.pull(txn, rows),
131 Operators::Extend(op) => op.pull(txn, rows),
132 Operators::Join(op) => op.pull(txn, rows),
133 Operators::Sort(op) => op.pull(txn, rows),
134 Operators::Take(op) => op.pull(txn, rows),
135 Operators::Distinct(op) => op.pull(txn, rows),
136 Operators::Append(op) => op.pull(txn, rows),
137 Operators::Apply(op) => op.pull(txn, rows),
138 Operators::SinkTableView(op) => op.pull(txn, rows),
139 Operators::SinkRingBufferView(op) => op.pull(txn, rows),
140 Operators::SinkSeriesView(op) => op.pull(txn, rows),
141 Operators::SinkSubscription(op) => op.pull(txn, rows),
142 Operators::Window(op) => op.pull(txn, rows),
143 Operators::SourceTable(op) => op.pull(txn, rows),
144 Operators::SourceView(op) => op.pull(txn, rows),
145 Operators::SourceFlow(op) => op.pull(txn, rows),
146 Operators::SourceRingBuffer(op) => op.pull(txn, rows),
147 Operators::SourceSeries(op) => op.pull(txn, rows),
148 Operators::Custom(op) => op.pull(txn, rows),
149 }
150 }
151}