1use std::sync::Arc;
5
6use FlowNodeType::{Aggregate, SinkView, SourceInlineData, SourceTable, SourceView};
7use reifydb_catalog::{CatalogTableQueryOperations, CatalogViewQueryOperations, resolve::resolve_view};
8use reifydb_core::{
9 Error,
10 interface::{FlowId, FlowNodeId, SourceId},
11};
12use reifydb_engine::StandardCommandTransaction;
13use reifydb_rql::flow::{
14 Flow, FlowNode, FlowNodeType,
15 FlowNodeType::{Apply, Distinct, Extend, Filter, Join, Map, Sort, Take, Union, Window},
16};
17use reifydb_type::internal;
18
19use crate::{
20 engine::FlowEngine,
21 operator::{
22 ApplyOperator, DistinctOperator, ExtendOperator, FilterOperator, JoinOperator, MapOperator, Operators,
23 SinkViewOperator, SortOperator, SourceTableOperator, SourceViewOperator, TakeOperator, WindowOperator,
24 },
25};
26
27impl FlowEngine {
28 pub fn register(&self, txn: &mut StandardCommandTransaction, flow: Flow) -> crate::Result<()> {
29 debug_assert!(!self.inner.flows.read().contains_key(&flow.id), "Flow already registered");
30
31 for node_id in flow.topological_order()? {
32 let node = flow.get_node(&node_id).unwrap();
33 self.add(txn, &flow, node)?;
34 }
35
36 self.inner.analyzer.write().add(flow.clone());
38 self.inner.flows.write().insert(flow.id, flow);
39
40 Ok(())
41 }
42
43 fn add(&self, txn: &mut StandardCommandTransaction, flow: &Flow, node: &FlowNode) -> crate::Result<()> {
44 debug_assert!(!self.inner.operators.read().contains_key(&node.id), "Operator already registered");
45 let node = node.clone();
46
47 match node.ty {
48 SourceInlineData {
49 ..
50 } => {
51 unimplemented!()
52 }
53 SourceTable {
54 table,
55 } => {
56 let table = txn.get_table(table)?;
57
58 self.add_source(flow.id, node.id, SourceId::table(table.id));
59 self.inner.operators.write().insert(
60 node.id,
61 Arc::new(Operators::SourceTable(SourceTableOperator::new(node.id, table))),
62 );
63 }
64 SourceView {
65 view,
66 } => {
67 let view = txn.get_view(view)?;
68 self.add_source(flow.id, node.id, SourceId::view(view.id));
69 self.inner.operators.write().insert(
70 node.id,
71 Arc::new(Operators::SourceView(SourceViewOperator::new(node.id, view))),
72 );
73 }
74 SinkView {
75 view,
76 } => {
77 let parent = self
78 .inner
79 .operators
80 .read()
81 .get(&node.inputs[0])
82 .ok_or_else(|| Error(internal!("Parent operator not found")))?
83 .clone();
84
85 self.add_sink(flow.id, node.id, SourceId::view(*view));
86 self.inner.operators.write().insert(
87 node.id,
88 Arc::new(Operators::SinkView(SinkViewOperator::new(
89 parent,
90 node.id,
91 resolve_view(txn, view)?,
92 ))),
93 );
94 }
95 Filter {
96 conditions,
97 } => {
98 let parent = self
99 .inner
100 .operators
101 .read()
102 .get(&node.inputs[0])
103 .ok_or_else(|| Error(internal!("Parent operator not found")))?
104 .clone();
105 self.inner.operators.write().insert(
106 node.id,
107 Arc::new(Operators::Filter(FilterOperator::new(parent, node.id, conditions))),
108 );
109 }
110 Map {
111 expressions,
112 } => {
113 let parent = self
114 .inner
115 .operators
116 .read()
117 .get(&node.inputs[0])
118 .ok_or_else(|| Error(internal!("Parent operator not found")))?
119 .clone();
120 self.inner.operators.write().insert(
121 node.id,
122 Arc::new(Operators::Map(MapOperator::new(parent, node.id, expressions))),
123 );
124 }
125 Extend {
126 expressions,
127 } => {
128 let parent = self
129 .inner
130 .operators
131 .read()
132 .get(&node.inputs[0])
133 .ok_or_else(|| Error(internal!("Parent operator not found")))?
134 .clone();
135 self.inner.operators.write().insert(
136 node.id,
137 Arc::new(Operators::Extend(ExtendOperator::new(parent, node.id, expressions))),
138 );
139 }
140 Sort {
141 by: _,
142 } => {
143 let parent = self
144 .inner
145 .operators
146 .read()
147 .get(&node.inputs[0])
148 .ok_or_else(|| Error(internal!("Parent operator not found")))?
149 .clone();
150 self.inner.operators.write().insert(
151 node.id,
152 Arc::new(Operators::Sort(SortOperator::new(parent, node.id, Vec::new()))),
153 );
154 }
155 Take {
156 limit,
157 } => {
158 let parent = self
159 .inner
160 .operators
161 .read()
162 .get(&node.inputs[0])
163 .ok_or_else(|| Error(internal!("Parent operator not found")))?
164 .clone();
165 self.inner.operators.write().insert(
166 node.id,
167 Arc::new(Operators::Take(TakeOperator::new(parent, node.id, limit))),
168 );
169 }
170 Join {
171 join_type,
172 left,
173 right,
174 alias,
175 } => {
176 if node.inputs.len() != 2 {
179 return Err(Error(internal!("Join node must have exactly 2 inputs")));
180 }
181
182 let left_node = node.inputs[0];
183 let right_node = node.inputs[1];
184
185 let operators = self.inner.operators.read();
186 let left_parent = operators
187 .get(&left_node)
188 .ok_or_else(|| Error(internal!("Left parent operator not found")))?
189 .clone();
190
191 let right_parent = operators
192 .get(&right_node)
193 .ok_or_else(|| Error(internal!("Right parent operator not found")))?
194 .clone();
195 drop(operators);
196
197 self.inner.operators.write().insert(
198 node.id,
199 Arc::new(Operators::Join(JoinOperator::new(
200 left_parent,
201 right_parent,
202 node.id,
203 join_type,
204 left_node,
205 right_node,
206 left,
207 right,
208 alias,
209 self.inner.executor.clone(),
210 ))),
211 );
212 }
213 Distinct {
214 expressions,
215 } => {
216 let parent = self
217 .inner
218 .operators
219 .read()
220 .get(&node.inputs[0])
221 .ok_or_else(|| Error(internal!("Parent operator not found")))?
222 .clone();
223 self.inner.operators.write().insert(
224 node.id,
225 Arc::new(Operators::Distinct(DistinctOperator::new(
226 parent,
227 node.id,
228 expressions,
229 ))),
230 );
231 }
232 Union {} => unimplemented!(),
234 Apply {
235 operator_name,
236 expressions,
237 } => {
238 let parent = self
239 .inner
240 .operators
241 .read()
242 .get(&node.inputs[0])
243 .ok_or_else(|| Error(internal!("Parent operator not found")))?
244 .clone();
245
246 let operator = if self.is_ffi_operator(operator_name.as_str()) {
248 self.create_ffi_operator(operator_name.as_str(), node.id)?
250 } else {
251 self.inner.registry.create_operator(
253 operator_name.as_str(),
254 node.id,
255 expressions.as_slice(),
256 )?
257 };
258
259 self.inner.operators.write().insert(
260 node.id,
261 Arc::new(Operators::Apply(ApplyOperator::new(parent, node.id, operator))),
262 );
263 }
264 Aggregate {
265 ..
266 } => unimplemented!(),
267 Window {
268 window_type,
269 size,
270 slide,
271 group_by,
272 aggregations,
273 min_events,
274 max_window_count,
275 max_window_age,
276 } => {
277 let parent = self
278 .inner
279 .operators
280 .read()
281 .get(&node.inputs[0])
282 .ok_or_else(|| Error(internal!("Parent operator not found")))?
283 .clone();
284 let operator = WindowOperator::new(
285 parent,
286 node.id,
287 window_type.clone(),
288 size.clone(),
289 slide.clone(),
290 group_by.clone(),
291 aggregations.clone(),
292 min_events.clone(),
293 max_window_count.clone(),
294 max_window_age.clone(),
295 );
296 self.inner.operators.write().insert(node.id, Arc::new(Operators::Window(operator)));
297 }
298 }
299
300 Ok(())
301 }
302
303 fn add_source(&self, flow: FlowId, node: FlowNodeId, source: SourceId) {
304 let mut sources = self.inner.sources.write();
305 let nodes = sources.entry(source).or_insert_with(Vec::new);
306
307 let entry = (flow, node);
308 if !nodes.contains(&entry) {
309 nodes.push(entry);
310 }
311 }
312
313 fn add_sink(&self, flow: FlowId, node: FlowNodeId, sink: SourceId) {
314 let mut sinks = self.inner.sinks.write();
315 let nodes = sinks.entry(sink).or_insert_with(Vec::new);
316
317 let entry = (flow, node);
318 if !nodes.contains(&entry) {
319 nodes.push(entry);
320 }
321 }
322}