Skip to main content

reifydb_sub_flow/engine/
register.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{mem, sync::Arc};
5
6use postcard::from_bytes;
7use reifydb_core::{
8	interface::catalog::{
9		flow::{FlowId, FlowNodeId},
10		primitive::PrimitiveId,
11		view::ViewKind,
12	},
13	internal,
14};
15use reifydb_rql::flow::{
16	flow::FlowDag,
17	node::{
18		FlowNode,
19		FlowNodeType::{
20			self, Aggregate, Append, Apply, Distinct, Extend, Filter, Gate, Join, Map, SinkSubscription,
21			SinkView, Sort, SourceFlow, SourceInlineData, SourceRingBuffer, SourceSeries, SourceTable,
22			SourceView, Take, Window,
23		},
24	},
25};
26use reifydb_transaction::transaction::{Transaction, command::CommandTransaction};
27use reifydb_type::{Result, error::Error};
28use tracing::instrument;
29
30use super::eval::evaluate_operator_config;
31#[cfg(reifydb_target = "native")]
32use crate::operator::apply::ApplyOperator;
33use crate::{
34	engine::FlowEngine,
35	operator::{
36		Operators,
37		append::AppendOperator,
38		distinct::DistinctOperator,
39		extend::ExtendOperator,
40		filter::FilterOperator,
41		gate::GateOperator,
42		join::operator::JoinOperator,
43		map::MapOperator,
44		scan::{
45			flow::PrimitiveFlowOperator, ringbuffer::PrimitiveRingBufferOperator,
46			series::PrimitiveSeriesOperator, table::PrimitiveTableOperator, view::PrimitiveViewOperator,
47		},
48		sink::{subscription::SinkSubscriptionOperator, view::SinkViewOperator},
49		sort::SortOperator,
50		take::TakeOperator,
51		window::WindowOperator,
52	},
53};
54
55impl FlowEngine {
56	#[instrument(name = "flow::register", level = "debug", skip(self, txn), fields(flow_id = ?flow.id))]
57	pub fn register(&mut self, txn: &mut CommandTransaction, flow: FlowDag) -> Result<()> {
58		self.register_with_transaction(&mut Transaction::Command(txn), flow)
59	}
60
61	#[instrument(name = "flow::register_with_transaction", level = "debug", skip(self, txn), fields(flow_id = ?flow.id))]
62	pub fn register_with_transaction(&mut self, txn: &mut Transaction<'_>, flow: FlowDag) -> Result<()> {
63		debug_assert!(!self.flows.contains_key(&flow.id), "Flow already registered");
64
65		for node_id in flow.topological_order()? {
66			let node = flow.get_node(&node_id).unwrap();
67			self.add(txn, &flow, node)?;
68		}
69
70		self.analyzer.add(flow.clone());
71		self.flows.insert(flow.id, flow);
72
73		Ok(())
74	}
75
76	#[instrument(name = "flow::add", level = "debug", skip(self, txn, flow), fields(flow_id = ?flow.id, node_id = ?node.id, node_type = ?mem::discriminant(&node.ty)))]
77	fn add(&mut self, txn: &mut Transaction<'_>, flow: &FlowDag, node: &FlowNode) -> Result<()> {
78		debug_assert!(!self.operators.contains_key(&node.id), "Operator already registered");
79		let node = node.clone();
80
81		match node.ty {
82			SourceInlineData {
83				..
84			} => {
85				unimplemented!()
86			}
87			SourceTable {
88				table,
89			} => {
90				let table = self.catalog.get_table(&mut txn.reborrow(), table)?;
91
92				self.add_source(flow.id, node.id, PrimitiveId::table(table.id));
93				self.operators.insert(
94					node.id,
95					Arc::new(Operators::SourceTable(PrimitiveTableOperator::new(node.id, table))),
96				);
97			}
98			SourceView {
99				view,
100			} => {
101				let view = self.catalog.get_view(&mut txn.reborrow(), view)?;
102				self.add_source(flow.id, node.id, PrimitiveId::view(view.id));
103
104				// For transactional views, also register the underlying table/ringbuffer
105				// sources so the deferred coordinator routes changes correctly. A transactional
106				// view is computed on-the-fly; its changes are never published to CDC. By
107				// registering the view's upstream primitives, the deferred flow is triggered
108				// when the underlying data changes.
109				if view.kind == ViewKind::Transactional {
110					let mut additional_sources = Vec::new();
111					if let Some(view_flow) = self.catalog.find_flow_by_name(
112						&mut txn.reborrow(),
113						view.namespace,
114						&view.name,
115					)? {
116						let flow_nodes = self
117							.catalog
118							.list_flow_nodes_by_flow(&mut txn.reborrow(), view_flow.id)?;
119						for flow_node in &flow_nodes {
120							// SourceTable = 1, SourceRingBuffer = 17, SourceSeries = 18
121							if flow_node.node_type == 1
122								|| flow_node.node_type == 17 || flow_node.node_type == 18
123							{
124								if let Ok(nt) =
125									from_bytes::<FlowNodeType>(&flow_node.data)
126								{
127									match nt {
128										SourceTable {
129											table: t,
130										} => {
131											additional_sources.push(
132												PrimitiveId::table(t),
133											);
134										}
135										SourceRingBuffer {
136											ringbuffer: rb,
137										} => {
138											additional_sources.push(
139												PrimitiveId::ringbuffer(
140													rb,
141												),
142											);
143										}
144										SourceSeries {
145											series: s,
146										} => {
147											additional_sources.push(
148												PrimitiveId::series(s),
149											);
150										}
151										_ => {}
152									}
153								}
154							}
155						}
156					}
157					for source in additional_sources {
158						self.add_source(flow.id, node.id, source);
159					}
160				}
161
162				self.operators.insert(
163					node.id,
164					Arc::new(Operators::SourceView(PrimitiveViewOperator::new(node.id, view))),
165				);
166			}
167			SourceFlow {
168				flow: source_flow,
169			} => {
170				let source_flow_def = self.catalog.get_flow(&mut txn.reborrow(), source_flow)?;
171				self.operators.insert(
172					node.id,
173					Arc::new(Operators::SourceFlow(PrimitiveFlowOperator::new(
174						node.id,
175						source_flow_def,
176					))),
177				);
178			}
179			SourceRingBuffer {
180				ringbuffer,
181			} => {
182				let rb = self.catalog.get_ringbuffer(&mut txn.reborrow(), ringbuffer)?;
183				self.add_source(flow.id, node.id, PrimitiveId::ringbuffer(rb.id));
184				self.operators.insert(
185					node.id,
186					Arc::new(Operators::SourceRingBuffer(PrimitiveRingBufferOperator::new(
187						node.id, rb,
188					))),
189				);
190			}
191			SourceSeries {
192				series,
193			} => {
194				let s = self.catalog.get_series(&mut txn.reborrow(), series)?;
195				self.add_source(flow.id, node.id, PrimitiveId::series(s.id));
196				self.operators.insert(
197					node.id,
198					Arc::new(Operators::SourceSeries(PrimitiveSeriesOperator::new(node.id, s))),
199				);
200			}
201			SinkView {
202				view,
203			} => {
204				let parent = self
205					.operators
206					.get(&node.inputs[0])
207					.ok_or_else(|| Error(internal!("Parent operator not found")))?
208					.clone();
209
210				self.add_sink(flow.id, node.id, PrimitiveId::view(*view));
211				let resolved = self.catalog.resolve_view(&mut txn.reborrow(), view)?;
212				self.operators.insert(
213					node.id,
214					Arc::new(Operators::SinkView(SinkViewOperator::new(parent, node.id, resolved))),
215				);
216			}
217			SinkSubscription {
218				subscription,
219			} => {
220				// Guard against race condition: flow may have been deleted during loading
221				if node.inputs.is_empty() {
222					return Err(Error(internal!(
223						"SinkSubscription node has no inputs - flow may have been deleted during loading"
224					)));
225				}
226				let parent = self
227					.operators
228					.get(&node.inputs[0])
229					.ok_or_else(|| Error(internal!("Parent operator not found")))?
230					.clone();
231
232				// Note: Subscriptions use UUID-based IDs and are not added to the sinks map
233				// which uses PrimitiveId (u64-based). Subscriptions are ephemeral 1:1 mapped.
234				let resolved = self.catalog.resolve_subscription(&mut txn.reborrow(), subscription)?;
235				self.operators.insert(
236					node.id,
237					Arc::new(Operators::SinkSubscription(SinkSubscriptionOperator::new(
238						parent, node.id, resolved,
239					))),
240				);
241			}
242			Filter {
243				conditions,
244			} => {
245				let parent = self
246					.operators
247					.get(&node.inputs[0])
248					.ok_or_else(|| Error(internal!("Parent operator not found")))?
249					.clone();
250				self.operators.insert(
251					node.id,
252					Arc::new(Operators::Filter(FilterOperator::new(
253						parent,
254						node.id,
255						conditions,
256						self.executor.functions.clone(),
257						self.clock.clone(),
258					))),
259				);
260			}
261			Gate {
262				conditions,
263			} => {
264				let parent = self
265					.operators
266					.get(&node.inputs[0])
267					.ok_or_else(|| Error(internal!("Parent operator not found")))?
268					.clone();
269				self.operators.insert(
270					node.id,
271					Arc::new(Operators::Gate(GateOperator::new(
272						parent,
273						node.id,
274						conditions,
275						self.executor.functions.clone(),
276						self.clock.clone(),
277					))),
278				);
279			}
280			Map {
281				expressions,
282			} => {
283				let parent = self
284					.operators
285					.get(&node.inputs[0])
286					.ok_or_else(|| Error(internal!("Parent operator not found")))?
287					.clone();
288				self.operators.insert(
289					node.id,
290					Arc::new(Operators::Map(MapOperator::new(
291						parent,
292						node.id,
293						expressions,
294						self.executor.functions.clone(),
295						self.clock.clone(),
296					))),
297				);
298			}
299			Extend {
300				expressions,
301			} => {
302				let parent = self
303					.operators
304					.get(&node.inputs[0])
305					.ok_or_else(|| Error(internal!("Parent operator not found")))?
306					.clone();
307				self.operators.insert(
308					node.id,
309					Arc::new(Operators::Extend(ExtendOperator::new(parent, node.id, expressions))),
310				);
311			}
312			Sort {
313				by: _,
314			} => {
315				let parent = self
316					.operators
317					.get(&node.inputs[0])
318					.ok_or_else(|| Error(internal!("Parent operator not found")))?
319					.clone();
320				self.operators.insert(
321					node.id,
322					Arc::new(Operators::Sort(SortOperator::new(parent, node.id, Vec::new()))),
323				);
324			}
325			Take {
326				limit,
327			} => {
328				let parent = self
329					.operators
330					.get(&node.inputs[0])
331					.ok_or_else(|| Error(internal!("Parent operator not found")))?
332					.clone();
333				self.operators.insert(
334					node.id,
335					Arc::new(Operators::Take(TakeOperator::new(parent, node.id, limit))),
336				);
337			}
338			Join {
339				join_type,
340				left,
341				right,
342				alias,
343			} => {
344				// The join node should have exactly 2 inputs
345				if node.inputs.len() != 2 {
346					return Err(Error(internal!("Join node must have exactly 2 inputs")));
347				}
348
349				let left_node = node.inputs[0];
350				let right_node = node.inputs[1];
351
352				let left_parent = self
353					.operators
354					.get(&left_node)
355					.ok_or_else(|| Error(internal!("Left parent operator not found")))?
356					.clone();
357
358				let right_parent = self
359					.operators
360					.get(&right_node)
361					.ok_or_else(|| Error(internal!("Right parent operator not found")))?
362					.clone();
363
364				self.operators.insert(
365					node.id,
366					Arc::new(Operators::Join(JoinOperator::new(
367						left_parent,
368						right_parent,
369						node.id,
370						join_type,
371						left_node,
372						right_node,
373						left,
374						right,
375						alias,
376						self.executor.clone(),
377					))),
378				);
379			}
380			Distinct {
381				expressions,
382			} => {
383				let parent = self
384					.operators
385					.get(&node.inputs[0])
386					.ok_or_else(|| Error(internal!("Parent operator not found")))?
387					.clone();
388				self.operators.insert(
389					node.id,
390					Arc::new(Operators::Distinct(DistinctOperator::new(
391						parent,
392						node.id,
393						expressions,
394						self.executor.functions.clone(),
395						self.clock.clone(),
396					))),
397				);
398			}
399			Append {} => {
400				// Append requires at least 2 inputs
401				if node.inputs.len() < 2 {
402					return Err(Error(internal!("Append node must have at least 2 inputs")));
403				}
404
405				let mut parents = Vec::with_capacity(node.inputs.len());
406
407				for input_node_id in &node.inputs {
408					let parent = self
409						.operators
410						.get(input_node_id)
411						.ok_or_else(|| {
412							Error(internal!(
413								"Parent operator not found for input {:?}",
414								input_node_id
415							))
416						})?
417						.clone();
418					parents.push(parent);
419				}
420
421				self.operators.insert(
422					node.id,
423					Arc::new(Operators::Append(AppendOperator::new(
424						node.id,
425						parents,
426						node.inputs.clone(),
427					))),
428				);
429			}
430			Apply {
431				operator,
432				expressions,
433			} => {
434				let config = evaluate_operator_config(
435					expressions.as_slice(),
436					&self.executor.functions,
437					&self.clock,
438				)?;
439
440				if let Some(factory) = self.custom_operators.get(operator.as_str()) {
441					let op = factory(node.id, &config)?;
442					self.operators.insert(node.id, Arc::new(Operators::Custom(op)));
443				} else {
444					#[cfg(reifydb_target = "native")]
445					{
446						let parent = self
447							.operators
448							.get(&node.inputs[0])
449							.ok_or_else(|| Error(internal!("Parent operator not found")))?
450							.clone();
451
452						if !self.is_ffi_operator(operator.as_str()) {
453							return Err(Error(internal!("Unknown operator: {}", operator)));
454						}
455
456						let ffi_op =
457							self.create_ffi_operator(operator.as_str(), node.id, &config)?;
458
459						self.operators.insert(
460							node.id,
461							Arc::new(Operators::Apply(ApplyOperator::new(
462								parent, node.id, ffi_op,
463							))),
464						);
465					}
466					#[cfg(not(reifydb_target = "native"))]
467					{
468						let _ = operator;
469						return Err(Error(internal!(
470							"FFI operators are not supported in WASM"
471						)));
472					}
473				}
474			}
475			Aggregate {
476				..
477			} => unimplemented!(),
478			Window {
479				window_type,
480				size,
481				slide,
482				group_by,
483				aggregations,
484				min_events,
485				max_window_count,
486				max_window_age,
487			} => {
488				let parent = self
489					.operators
490					.get(&node.inputs[0])
491					.ok_or_else(|| Error(internal!("Parent operator not found")))?
492					.clone();
493				let operator = WindowOperator::new(
494					parent,
495					node.id,
496					window_type.clone(),
497					size.clone(),
498					slide.clone(),
499					group_by.clone(),
500					aggregations.clone(),
501					min_events.clone(),
502					max_window_count.clone(),
503					max_window_age.clone(),
504					self.clock.clone(),
505					self.executor.functions.clone(),
506				);
507				self.operators.insert(node.id, Arc::new(Operators::Window(operator)));
508			}
509		}
510
511		Ok(())
512	}
513
514	fn add_source(&mut self, flow: FlowId, node: FlowNodeId, source: PrimitiveId) {
515		let nodes = self.sources.entry(source).or_insert_with(Vec::new);
516
517		let entry = (flow, node);
518		if !nodes.contains(&entry) {
519			nodes.push(entry);
520		}
521	}
522
523	fn add_sink(&mut self, flow: FlowId, node: FlowNodeId, sink: PrimitiveId) {
524		let nodes = self.sinks.entry(sink).or_insert_with(Vec::new);
525
526		let entry = (flow, node);
527		if !nodes.contains(&entry) {
528			nodes.push(entry);
529		}
530	}
531}