1use std::{mem, sync::Arc};
5
6use postcard::from_bytes;
7use reifydb_core::{
8 interface::catalog::{
9 flow::{FlowId, FlowNodeId},
10 primitive::PrimitiveId,
11 view::ViewKind,
12 },
13 internal,
14};
15use reifydb_rql::flow::{
16 flow::FlowDag,
17 node::{
18 FlowNode,
19 FlowNodeType::{
20 self, Aggregate, Append, Apply, Distinct, Extend, Filter, Gate, Join, Map, SinkSubscription,
21 SinkView, Sort, SourceFlow, SourceInlineData, SourceRingBuffer, SourceSeries, SourceTable,
22 SourceView, Take, Window,
23 },
24 },
25};
26use reifydb_transaction::transaction::{Transaction, command::CommandTransaction};
27use reifydb_type::{Result, error::Error};
28use tracing::instrument;
29
30use super::eval::evaluate_operator_config;
31#[cfg(reifydb_target = "native")]
32use crate::operator::apply::ApplyOperator;
33use crate::{
34 engine::FlowEngine,
35 operator::{
36 Operators,
37 append::AppendOperator,
38 distinct::DistinctOperator,
39 extend::ExtendOperator,
40 filter::FilterOperator,
41 gate::GateOperator,
42 join::operator::JoinOperator,
43 map::MapOperator,
44 scan::{
45 flow::PrimitiveFlowOperator, ringbuffer::PrimitiveRingBufferOperator,
46 series::PrimitiveSeriesOperator, table::PrimitiveTableOperator, view::PrimitiveViewOperator,
47 },
48 sink::{subscription::SinkSubscriptionOperator, view::SinkViewOperator},
49 sort::SortOperator,
50 take::TakeOperator,
51 window::WindowOperator,
52 },
53};
54
55impl FlowEngine {
56 #[instrument(name = "flow::register", level = "debug", skip(self, txn), fields(flow_id = ?flow.id))]
57 pub fn register(&mut self, txn: &mut CommandTransaction, flow: FlowDag) -> Result<()> {
58 debug_assert!(!self.flows.contains_key(&flow.id), "Flow already registered");
59
60 for node_id in flow.topological_order()? {
61 let node = flow.get_node(&node_id).unwrap();
62 self.add(txn, &flow, node)?;
63 }
64
65 self.analyzer.add(flow.clone());
66 self.flows.insert(flow.id, flow);
67
68 Ok(())
69 }
70
71 #[instrument(name = "flow::register::add_node", level = "debug", skip(self, txn, flow), fields(flow_id = ?flow.id, node_id = ?node.id, node_type = ?mem::discriminant(&node.ty)))]
72 fn add(&mut self, txn: &mut CommandTransaction, flow: &FlowDag, node: &FlowNode) -> Result<()> {
73 debug_assert!(!self.operators.contains_key(&node.id), "Operator already registered");
74 let node = node.clone();
75
76 match node.ty {
77 SourceInlineData {
78 ..
79 } => {
80 unimplemented!()
81 }
82 SourceTable {
83 table,
84 } => {
85 let table = self.catalog.get_table(&mut Transaction::Command(&mut *txn), table)?;
86
87 self.add_source(flow.id, node.id, PrimitiveId::table(table.id));
88 self.operators.insert(
89 node.id,
90 Arc::new(Operators::SourceTable(PrimitiveTableOperator::new(node.id, table))),
91 );
92 }
93 SourceView {
94 view,
95 } => {
96 let view = self.catalog.get_view(&mut Transaction::Command(&mut *txn), view)?;
97 self.add_source(flow.id, node.id, PrimitiveId::view(view.id));
98
99 if view.kind == ViewKind::Transactional {
105 let mut additional_sources = Vec::new();
106 if let Some(view_flow) = self.catalog.find_flow_by_name(
107 &mut Transaction::Command(&mut *txn),
108 view.namespace,
109 &view.name,
110 )? {
111 let flow_nodes = self.catalog.list_flow_nodes_by_flow(
112 &mut Transaction::Command(&mut *txn),
113 view_flow.id,
114 )?;
115 for flow_node in &flow_nodes {
116 if flow_node.node_type == 1
118 || flow_node.node_type == 17 || flow_node.node_type == 18
119 {
120 if let Ok(nt) =
121 from_bytes::<FlowNodeType>(&flow_node.data)
122 {
123 match nt {
124 SourceTable {
125 table: t,
126 } => {
127 additional_sources.push(
128 PrimitiveId::table(t),
129 );
130 }
131 SourceRingBuffer {
132 ringbuffer: rb,
133 } => {
134 additional_sources.push(
135 PrimitiveId::ringbuffer(
136 rb,
137 ),
138 );
139 }
140 SourceSeries {
141 series: s,
142 } => {
143 additional_sources.push(
144 PrimitiveId::series(s),
145 );
146 }
147 _ => {}
148 }
149 }
150 }
151 }
152 }
153 for source in additional_sources {
154 self.add_source(flow.id, node.id, source);
155 }
156 }
157
158 self.operators.insert(
159 node.id,
160 Arc::new(Operators::SourceView(PrimitiveViewOperator::new(node.id, view))),
161 );
162 }
163 SourceFlow {
164 flow: source_flow,
165 } => {
166 let source_flow_def =
167 self.catalog.get_flow(&mut Transaction::Command(&mut *txn), source_flow)?;
168 self.operators.insert(
169 node.id,
170 Arc::new(Operators::SourceFlow(PrimitiveFlowOperator::new(
171 node.id,
172 source_flow_def,
173 ))),
174 );
175 }
176 SourceRingBuffer {
177 ringbuffer,
178 } => {
179 let rb = self
180 .catalog
181 .get_ringbuffer(&mut Transaction::Command(&mut *txn), ringbuffer)?;
182 self.add_source(flow.id, node.id, PrimitiveId::ringbuffer(rb.id));
183 self.operators.insert(
184 node.id,
185 Arc::new(Operators::SourceRingBuffer(PrimitiveRingBufferOperator::new(
186 node.id, rb,
187 ))),
188 );
189 }
190 SourceSeries {
191 series,
192 } => {
193 let s = self.catalog.get_series(&mut Transaction::Command(&mut *txn), series)?;
194 self.add_source(flow.id, node.id, PrimitiveId::series(s.id));
195 self.operators.insert(
196 node.id,
197 Arc::new(Operators::SourceSeries(PrimitiveSeriesOperator::new(node.id, s))),
198 );
199 }
200 SinkView {
201 view,
202 } => {
203 let parent = self
204 .operators
205 .get(&node.inputs[0])
206 .ok_or_else(|| Error(internal!("Parent operator not found")))?
207 .clone();
208
209 self.add_sink(flow.id, node.id, PrimitiveId::view(*view));
210 let resolved = self.catalog.resolve_view(&mut Transaction::Command(&mut *txn), view)?;
211 self.operators.insert(
212 node.id,
213 Arc::new(Operators::SinkView(SinkViewOperator::new(parent, node.id, resolved))),
214 );
215 }
216 SinkSubscription {
217 subscription,
218 } => {
219 if node.inputs.is_empty() {
221 return Err(Error(internal!(
222 "SinkSubscription node has no inputs - flow may have been deleted during loading"
223 )));
224 }
225 let parent = self
226 .operators
227 .get(&node.inputs[0])
228 .ok_or_else(|| Error(internal!("Parent operator not found")))?
229 .clone();
230
231 let resolved = self
234 .catalog
235 .resolve_subscription(&mut Transaction::Command(&mut *txn), subscription)?;
236 self.operators.insert(
237 node.id,
238 Arc::new(Operators::SinkSubscription(SinkSubscriptionOperator::new(
239 parent, node.id, resolved,
240 ))),
241 );
242 }
243 Filter {
244 conditions,
245 } => {
246 let parent = self
247 .operators
248 .get(&node.inputs[0])
249 .ok_or_else(|| Error(internal!("Parent operator not found")))?
250 .clone();
251 self.operators.insert(
252 node.id,
253 Arc::new(Operators::Filter(FilterOperator::new(
254 parent,
255 node.id,
256 conditions,
257 self.executor.functions.clone(),
258 self.clock.clone(),
259 ))),
260 );
261 }
262 Gate {
263 conditions,
264 } => {
265 let parent = self
266 .operators
267 .get(&node.inputs[0])
268 .ok_or_else(|| Error(internal!("Parent operator not found")))?
269 .clone();
270 self.operators.insert(
271 node.id,
272 Arc::new(Operators::Gate(GateOperator::new(
273 parent,
274 node.id,
275 conditions,
276 self.executor.functions.clone(),
277 self.clock.clone(),
278 ))),
279 );
280 }
281 Map {
282 expressions,
283 } => {
284 let parent = self
285 .operators
286 .get(&node.inputs[0])
287 .ok_or_else(|| Error(internal!("Parent operator not found")))?
288 .clone();
289 self.operators.insert(
290 node.id,
291 Arc::new(Operators::Map(MapOperator::new(
292 parent,
293 node.id,
294 expressions,
295 self.executor.functions.clone(),
296 self.clock.clone(),
297 ))),
298 );
299 }
300 Extend {
301 expressions,
302 } => {
303 let parent = self
304 .operators
305 .get(&node.inputs[0])
306 .ok_or_else(|| Error(internal!("Parent operator not found")))?
307 .clone();
308 self.operators.insert(
309 node.id,
310 Arc::new(Operators::Extend(ExtendOperator::new(parent, node.id, expressions))),
311 );
312 }
313 Sort {
314 by: _,
315 } => {
316 let parent = self
317 .operators
318 .get(&node.inputs[0])
319 .ok_or_else(|| Error(internal!("Parent operator not found")))?
320 .clone();
321 self.operators.insert(
322 node.id,
323 Arc::new(Operators::Sort(SortOperator::new(parent, node.id, Vec::new()))),
324 );
325 }
326 Take {
327 limit,
328 } => {
329 let parent = self
330 .operators
331 .get(&node.inputs[0])
332 .ok_or_else(|| Error(internal!("Parent operator not found")))?
333 .clone();
334 self.operators.insert(
335 node.id,
336 Arc::new(Operators::Take(TakeOperator::new(parent, node.id, limit))),
337 );
338 }
339 Join {
340 join_type,
341 left,
342 right,
343 alias,
344 } => {
345 if node.inputs.len() != 2 {
347 return Err(Error(internal!("Join node must have exactly 2 inputs")));
348 }
349
350 let left_node = node.inputs[0];
351 let right_node = node.inputs[1];
352
353 let left_parent = self
354 .operators
355 .get(&left_node)
356 .ok_or_else(|| Error(internal!("Left parent operator not found")))?
357 .clone();
358
359 let right_parent = self
360 .operators
361 .get(&right_node)
362 .ok_or_else(|| Error(internal!("Right parent operator not found")))?
363 .clone();
364
365 self.operators.insert(
366 node.id,
367 Arc::new(Operators::Join(JoinOperator::new(
368 left_parent,
369 right_parent,
370 node.id,
371 join_type,
372 left_node,
373 right_node,
374 left,
375 right,
376 alias,
377 self.executor.clone(),
378 ))),
379 );
380 }
381 Distinct {
382 expressions,
383 } => {
384 let parent = self
385 .operators
386 .get(&node.inputs[0])
387 .ok_or_else(|| Error(internal!("Parent operator not found")))?
388 .clone();
389 self.operators.insert(
390 node.id,
391 Arc::new(Operators::Distinct(DistinctOperator::new(
392 parent,
393 node.id,
394 expressions,
395 self.executor.functions.clone(),
396 self.clock.clone(),
397 ))),
398 );
399 }
400 Append {} => {
401 if node.inputs.len() < 2 {
403 return Err(Error(internal!("Append node must have at least 2 inputs")));
404 }
405
406 let mut parents = Vec::with_capacity(node.inputs.len());
407
408 for input_node_id in &node.inputs {
409 let parent = self
410 .operators
411 .get(input_node_id)
412 .ok_or_else(|| {
413 Error(internal!(
414 "Parent operator not found for input {:?}",
415 input_node_id
416 ))
417 })?
418 .clone();
419 parents.push(parent);
420 }
421
422 self.operators.insert(
423 node.id,
424 Arc::new(Operators::Append(AppendOperator::new(
425 node.id,
426 parents,
427 node.inputs.clone(),
428 ))),
429 );
430 }
431 Apply {
432 operator,
433 expressions,
434 } => {
435 let config = evaluate_operator_config(
436 expressions.as_slice(),
437 &self.executor.functions,
438 &self.clock,
439 )?;
440
441 if let Some(factory) = self.custom_operators.get(operator.as_str()) {
442 let op = factory(node.id, &config)?;
443 self.operators.insert(node.id, Arc::new(Operators::Custom(op)));
444 } else {
445 #[cfg(reifydb_target = "native")]
446 {
447 let parent = self
448 .operators
449 .get(&node.inputs[0])
450 .ok_or_else(|| Error(internal!("Parent operator not found")))?
451 .clone();
452
453 if !self.is_ffi_operator(operator.as_str()) {
454 return Err(Error(internal!("Unknown operator: {}", operator)));
455 }
456
457 let ffi_op =
458 self.create_ffi_operator(operator.as_str(), node.id, &config)?;
459
460 self.operators.insert(
461 node.id,
462 Arc::new(Operators::Apply(ApplyOperator::new(
463 parent, node.id, ffi_op,
464 ))),
465 );
466 }
467 #[cfg(not(reifydb_target = "native"))]
468 {
469 let _ = operator;
470 return Err(Error(internal!(
471 "FFI operators are not supported in WASM"
472 )));
473 }
474 }
475 }
476 Aggregate {
477 ..
478 } => unimplemented!(),
479 Window {
480 window_type,
481 size,
482 slide,
483 group_by,
484 aggregations,
485 min_events,
486 max_window_count,
487 max_window_age,
488 } => {
489 let parent = self
490 .operators
491 .get(&node.inputs[0])
492 .ok_or_else(|| Error(internal!("Parent operator not found")))?
493 .clone();
494 let operator = WindowOperator::new(
495 parent,
496 node.id,
497 window_type.clone(),
498 size.clone(),
499 slide.clone(),
500 group_by.clone(),
501 aggregations.clone(),
502 min_events.clone(),
503 max_window_count.clone(),
504 max_window_age.clone(),
505 self.clock.clone(),
506 self.executor.functions.clone(),
507 );
508 self.operators.insert(node.id, Arc::new(Operators::Window(operator)));
509 }
510 }
511
512 Ok(())
513 }
514
515 fn add_source(&mut self, flow: FlowId, node: FlowNodeId, source: PrimitiveId) {
516 let nodes = self.sources.entry(source).or_insert_with(Vec::new);
517
518 let entry = (flow, node);
519 if !nodes.contains(&entry) {
520 nodes.push(entry);
521 }
522 }
523
524 fn add_sink(&mut self, flow: FlowId, node: FlowNodeId, sink: PrimitiveId) {
525 let nodes = self.sinks.entry(sink).or_insert_with(Vec::new);
526
527 let entry = (flow, node);
528 if !nodes.contains(&entry) {
529 nodes.push(entry);
530 }
531 }
532}