1use std::sync::Arc;
5
6use FlowNodeType::{Aggregate, SinkView, SourceFlow, SourceInlineData, SourceTable, SourceView};
7use reifydb_catalog::{
8 CatalogTableQueryOperations, CatalogViewQueryOperations, resolve::resolve_view,
9 transaction::CatalogFlowQueryOperations,
10};
11use reifydb_core::{
12 CommitVersion, Error,
13 interface::{FlowId, FlowNodeId, SourceId},
14};
15use reifydb_engine::StandardCommandTransaction;
16use reifydb_rql::flow::{
17 Flow, FlowNode, FlowNodeType,
18 FlowNodeType::{Apply, Distinct, Extend, Filter, Join, Map, Merge, Sort, Take, Window},
19};
20use reifydb_type::internal;
21use tracing::instrument;
22
23use super::eval::evaluate_operator_config;
24use crate::{
25 engine::FlowEngine,
26 operator::{
27 ApplyOperator, DistinctOperator, ExtendOperator, FilterOperator, JoinOperator, MapOperator,
28 MergeOperator, Operators, SinkViewOperator, SortOperator, SourceFlowOperator, SourceTableOperator,
29 SourceViewOperator, TakeOperator, WindowOperator,
30 },
31};
32
33impl FlowEngine {
34 #[instrument(level = "info", skip(self, txn), fields(flow_id = ?flow.id))]
35 pub fn register_without_backfill(&self, txn: &mut StandardCommandTransaction, flow: Flow) -> crate::Result<()> {
36 self.register(txn, flow, None)
37 }
38
39 #[instrument(level = "info", skip(self, txn), fields(flow_id = ?flow.id, backfill_version = flow_creation_version.0))]
40 pub fn register_with_backfill(
41 &self,
42 txn: &mut StandardCommandTransaction,
43 flow: Flow,
44 flow_creation_version: CommitVersion,
45 ) -> crate::Result<()> {
46 self.register(txn, flow, Some(flow_creation_version))
47 }
48
49 #[instrument(level = "debug", skip(self, txn), fields(flow_id = ?flow.id, has_backfill = flow_creation_version.is_some()))]
50 fn register(
51 &self,
52 txn: &mut StandardCommandTransaction,
53 flow: Flow,
54 flow_creation_version: Option<CommitVersion>,
55 ) -> crate::Result<()> {
56 debug_assert!(!self.inner.flows.read().contains_key(&flow.id), "Flow already registered");
57
58 for node_id in flow.topological_order()? {
59 let node = flow.get_node(&node_id).unwrap();
60 self.add(txn, &flow, node)?;
61 }
62
63 if let Some(flow_creation_version) = flow_creation_version {
64 self.inner.flow_creation_versions.write().insert(flow.id, flow_creation_version);
65
66 if let Err(e) = self.load_initial_data(txn, &flow, flow_creation_version) {
67 self.inner.flow_creation_versions.write().remove(&flow.id);
68 return Err(e);
69 }
70 }
71
72 self.inner.analyzer.write().add(flow.clone());
74 self.inner.flows.write().insert(flow.id, flow);
75
76 Ok(())
77 }
78
79 #[instrument(level = "debug", skip(self, txn, flow), fields(flow_id = ?flow.id, node_id = ?node.id, node_type = ?std::mem::discriminant(&node.ty)))]
80 fn add(&self, txn: &mut StandardCommandTransaction, flow: &Flow, node: &FlowNode) -> crate::Result<()> {
81 debug_assert!(!self.inner.operators.read().contains_key(&node.id), "Operator already registered");
82 let node = node.clone();
83
84 match node.ty {
85 SourceInlineData {
86 ..
87 } => {
88 unimplemented!()
89 }
90 SourceTable {
91 table,
92 } => {
93 let table = txn.get_table(table)?;
94
95 self.add_source(flow.id, node.id, SourceId::table(table.id));
96 self.inner.operators.write().insert(
97 node.id,
98 Arc::new(Operators::SourceTable(SourceTableOperator::new(node.id, table))),
99 );
100 }
101 SourceView {
102 view,
103 } => {
104 let view = txn.get_view(view)?;
105 self.add_source(flow.id, node.id, SourceId::view(view.id));
106 self.inner.operators.write().insert(
107 node.id,
108 Arc::new(Operators::SourceView(SourceViewOperator::new(node.id, view))),
109 );
110 }
111 SourceFlow {
112 flow: source_flow,
113 } => {
114 let source_flow_def = txn.get_flow(source_flow)?;
115 self.add_source(flow.id, node.id, SourceId::flow(source_flow_def.id));
116 self.inner.operators.write().insert(
117 node.id,
118 Arc::new(Operators::SourceFlow(SourceFlowOperator::new(
119 node.id,
120 source_flow_def,
121 ))),
122 );
123 }
124 SinkView {
125 view,
126 } => {
127 let parent = self
128 .inner
129 .operators
130 .read()
131 .get(&node.inputs[0])
132 .ok_or_else(|| Error(internal!("Parent operator not found")))?
133 .clone();
134
135 self.add_sink(flow.id, node.id, SourceId::view(*view));
136 self.inner.operators.write().insert(
137 node.id,
138 Arc::new(Operators::SinkView(SinkViewOperator::new(
139 parent,
140 node.id,
141 resolve_view(txn, view)?,
142 ))),
143 );
144 }
145 Filter {
146 conditions,
147 } => {
148 let parent = self
149 .inner
150 .operators
151 .read()
152 .get(&node.inputs[0])
153 .ok_or_else(|| Error(internal!("Parent operator not found")))?
154 .clone();
155 self.inner.operators.write().insert(
156 node.id,
157 Arc::new(Operators::Filter(FilterOperator::new(parent, node.id, conditions))),
158 );
159 }
160 Map {
161 expressions,
162 } => {
163 let parent = self
164 .inner
165 .operators
166 .read()
167 .get(&node.inputs[0])
168 .ok_or_else(|| Error(internal!("Parent operator not found")))?
169 .clone();
170 self.inner.operators.write().insert(
171 node.id,
172 Arc::new(Operators::Map(MapOperator::new(parent, node.id, expressions))),
173 );
174 }
175 Extend {
176 expressions,
177 } => {
178 let parent = self
179 .inner
180 .operators
181 .read()
182 .get(&node.inputs[0])
183 .ok_or_else(|| Error(internal!("Parent operator not found")))?
184 .clone();
185 self.inner.operators.write().insert(
186 node.id,
187 Arc::new(Operators::Extend(ExtendOperator::new(parent, node.id, expressions))),
188 );
189 }
190 Sort {
191 by: _,
192 } => {
193 let parent = self
194 .inner
195 .operators
196 .read()
197 .get(&node.inputs[0])
198 .ok_or_else(|| Error(internal!("Parent operator not found")))?
199 .clone();
200 self.inner.operators.write().insert(
201 node.id,
202 Arc::new(Operators::Sort(SortOperator::new(parent, node.id, Vec::new()))),
203 );
204 }
205 Take {
206 limit,
207 } => {
208 let parent = self
209 .inner
210 .operators
211 .read()
212 .get(&node.inputs[0])
213 .ok_or_else(|| Error(internal!("Parent operator not found")))?
214 .clone();
215 self.inner.operators.write().insert(
216 node.id,
217 Arc::new(Operators::Take(TakeOperator::new(parent, node.id, limit))),
218 );
219 }
220 Join {
221 join_type,
222 left,
223 right,
224 alias,
225 } => {
226 if node.inputs.len() != 2 {
229 return Err(Error(internal!("Join node must have exactly 2 inputs")));
230 }
231
232 let left_node = node.inputs[0];
233 let right_node = node.inputs[1];
234
235 let operators = self.inner.operators.read();
236 let left_parent = operators
237 .get(&left_node)
238 .ok_or_else(|| Error(internal!("Left parent operator not found")))?
239 .clone();
240
241 let right_parent = operators
242 .get(&right_node)
243 .ok_or_else(|| Error(internal!("Right parent operator not found")))?
244 .clone();
245 drop(operators);
246
247 self.inner.operators.write().insert(
248 node.id,
249 Arc::new(Operators::Join(JoinOperator::new(
250 left_parent,
251 right_parent,
252 node.id,
253 join_type,
254 left_node,
255 right_node,
256 left,
257 right,
258 alias,
259 self.inner.executor.clone(),
260 ))),
261 );
262 }
263 Distinct {
264 expressions,
265 } => {
266 let parent = self
267 .inner
268 .operators
269 .read()
270 .get(&node.inputs[0])
271 .ok_or_else(|| Error(internal!("Parent operator not found")))?
272 .clone();
273 self.inner.operators.write().insert(
274 node.id,
275 Arc::new(Operators::Distinct(DistinctOperator::new(
276 parent,
277 node.id,
278 expressions,
279 ))),
280 );
281 }
282 Merge {} => {
283 if node.inputs.len() < 2 {
285 return Err(Error(internal!("Merge node must have at least 2 inputs")));
286 }
287
288 let operators = self.inner.operators.read();
289 let mut parents = Vec::with_capacity(node.inputs.len());
290
291 for input_node_id in &node.inputs {
292 let parent = operators
293 .get(input_node_id)
294 .ok_or_else(|| {
295 Error(internal!(
296 "Parent operator not found for input {:?}",
297 input_node_id
298 ))
299 })?
300 .clone();
301 parents.push(parent);
302 }
303 drop(operators);
304
305 self.inner.operators.write().insert(
306 node.id,
307 Arc::new(Operators::Merge(MergeOperator::new(
308 node.id,
309 parents,
310 node.inputs.clone(),
311 ))),
312 );
313 }
314 Apply {
315 operator,
316 expressions,
317 } => {
318 let parent = self
319 .inner
320 .operators
321 .read()
322 .get(&node.inputs[0])
323 .ok_or_else(|| Error(internal!("Parent operator not found")))?
324 .clone();
325
326 let operator = if self.is_ffi_operator(operator.as_str()) {
328 let config = evaluate_operator_config(
329 expressions.as_slice(),
330 &self.inner.evaluator,
331 )?;
332 self.create_ffi_operator(operator.as_str(), node.id, &config)?
333 } else {
334 self.inner.registry.create_operator(
336 operator.as_str(),
337 node.id,
338 expressions.as_slice(),
339 )?
340 };
341
342 self.inner.operators.write().insert(
343 node.id,
344 Arc::new(Operators::Apply(ApplyOperator::new(parent, node.id, operator))),
345 );
346 }
347 Aggregate {
348 ..
349 } => unimplemented!(),
350 Window {
351 window_type,
352 size,
353 slide,
354 group_by,
355 aggregations,
356 min_events,
357 max_window_count,
358 max_window_age,
359 } => {
360 let parent = self
361 .inner
362 .operators
363 .read()
364 .get(&node.inputs[0])
365 .ok_or_else(|| Error(internal!("Parent operator not found")))?
366 .clone();
367 let operator = WindowOperator::new(
368 parent,
369 node.id,
370 window_type.clone(),
371 size.clone(),
372 slide.clone(),
373 group_by.clone(),
374 aggregations.clone(),
375 min_events.clone(),
376 max_window_count.clone(),
377 max_window_age.clone(),
378 );
379 self.inner.operators.write().insert(node.id, Arc::new(Operators::Window(operator)));
380 }
381 }
382
383 Ok(())
384 }
385
386 fn add_source(&self, flow: FlowId, node: FlowNodeId, source: SourceId) {
387 let mut sources = self.inner.sources.write();
388 let nodes = sources.entry(source).or_insert_with(Vec::new);
389
390 let entry = (flow, node);
391 if !nodes.contains(&entry) {
392 nodes.push(entry);
393 }
394 }
395
396 fn add_sink(&self, flow: FlowId, node: FlowNodeId, sink: SourceId) {
397 let mut sinks = self.inner.sinks.write();
398 let nodes = sinks.entry(sink).or_insert_with(Vec::new);
399
400 let entry = (flow, node);
401 if !nodes.contains(&entry) {
402 nodes.push(entry);
403 }
404 }
405}