Skip to main content

reifydb_sub_flow/operator/
mod.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::{interface::catalog::flow::FlowNodeId, value::column::columns::Columns};
5use reifydb_type::{Result, value::row_number::RowNumber};
6
7use crate::transaction::FlowTransaction;
8
9pub mod append;
10pub mod apply;
11pub mod distinct;
12pub mod extend;
13#[cfg(reifydb_target = "native")]
14pub mod ffi;
15pub mod filter;
16pub mod gate;
17pub mod join;
18pub mod map;
19pub mod scan;
20pub mod sink;
21pub mod sort;
22pub mod stateful;
23pub mod take;
24pub mod window;
25
26use append::AppendOperator;
27use apply::ApplyOperator;
28use distinct::DistinctOperator;
29use extend::ExtendOperator;
30use filter::FilterOperator;
31use gate::GateOperator;
32use join::operator::JoinOperator;
33use map::MapOperator;
34use reifydb_core::interface::change::Change;
35use scan::{
36	flow::PrimitiveFlowOperator, ringbuffer::PrimitiveRingBufferOperator, series::PrimitiveSeriesOperator,
37	table::PrimitiveTableOperator, view::PrimitiveViewOperator,
38};
39use sink::{subscription::SinkSubscriptionOperator, view::SinkViewOperator};
40use sort::SortOperator;
41use take::TakeOperator;
42use window::WindowOperator;
43
44pub trait Operator: Send + Sync {
45	fn id(&self) -> FlowNodeId;
46
47	fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change>;
48
49	fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns>;
50}
51
52pub type BoxedOperator = Box<dyn Operator + Send + Sync>;
53
54pub enum Operators {
55	SourceTable(PrimitiveTableOperator),
56	SourceView(PrimitiveViewOperator),
57	SourceFlow(PrimitiveFlowOperator),
58	SourceRingBuffer(PrimitiveRingBufferOperator),
59	SourceSeries(PrimitiveSeriesOperator),
60	Filter(FilterOperator),
61	Gate(GateOperator),
62	Map(MapOperator),
63	Extend(ExtendOperator),
64	Join(JoinOperator),
65	Sort(SortOperator),
66	Take(TakeOperator),
67	Distinct(DistinctOperator),
68	Append(AppendOperator),
69	Apply(ApplyOperator),
70	SinkView(SinkViewOperator),
71	SinkSubscription(SinkSubscriptionOperator),
72	Window(WindowOperator),
73	Custom(BoxedOperator),
74}
75
76impl Operators {
77	pub fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
78		match self {
79			Operators::Filter(op) => op.apply(txn, change),
80			Operators::Gate(op) => op.apply(txn, change),
81			Operators::Map(op) => op.apply(txn, change),
82			Operators::Extend(op) => op.apply(txn, change),
83			Operators::Join(op) => op.apply(txn, change),
84			Operators::Sort(op) => op.apply(txn, change),
85			Operators::Take(op) => op.apply(txn, change),
86			Operators::Distinct(op) => op.apply(txn, change),
87			Operators::Append(op) => op.apply(txn, change),
88			Operators::Apply(op) => op.apply(txn, change),
89			Operators::SinkView(op) => op.apply(txn, change),
90			Operators::SinkSubscription(op) => op.apply(txn, change),
91			Operators::Window(op) => op.apply(txn, change),
92			Operators::SourceTable(op) => op.apply(txn, change),
93			Operators::SourceView(op) => op.apply(txn, change),
94			Operators::SourceFlow(op) => op.apply(txn, change),
95			Operators::SourceRingBuffer(op) => op.apply(txn, change),
96			Operators::SourceSeries(op) => op.apply(txn, change),
97			Operators::Custom(op) => op.apply(txn, change),
98		}
99	}
100
101	fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
102		match self {
103			Operators::Filter(op) => op.pull(txn, rows),
104			Operators::Gate(op) => op.pull(txn, rows),
105			Operators::Map(op) => op.pull(txn, rows),
106			Operators::Extend(op) => op.pull(txn, rows),
107			Operators::Join(op) => op.pull(txn, rows),
108			Operators::Sort(op) => op.pull(txn, rows),
109			Operators::Take(op) => op.pull(txn, rows),
110			Operators::Distinct(op) => op.pull(txn, rows),
111			Operators::Append(op) => op.pull(txn, rows),
112			Operators::Apply(op) => op.pull(txn, rows),
113			Operators::SinkView(op) => op.pull(txn, rows),
114			Operators::SinkSubscription(op) => op.pull(txn, rows),
115			Operators::Window(op) => op.pull(txn, rows),
116			Operators::SourceTable(op) => op.pull(txn, rows),
117			Operators::SourceView(op) => op.pull(txn, rows),
118			Operators::SourceFlow(op) => op.pull(txn, rows),
119			Operators::SourceRingBuffer(op) => op.pull(txn, rows),
120			Operators::SourceSeries(op) => op.pull(txn, rows),
121			Operators::Custom(op) => op.pull(txn, rows),
122		}
123	}
124}