Skip to main content

reifydb_sub_flow/engine/
register.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use std::{mem, sync::Arc};
5
6use postcard::from_bytes;
7use reifydb_core::{
8	interface::catalog::{
9		flow::{FlowId, FlowNodeId},
10		primitive::PrimitiveId,
11		view::ViewKind,
12	},
13	internal,
14};
15use reifydb_rql::flow::{
16	flow::FlowDag,
17	node::{
18		FlowNode,
19		FlowNodeType::{
20			self, Aggregate, Append, Apply, Distinct, Extend, Filter, Gate, Join, Map, SinkSubscription,
21			SinkView, Sort, SourceFlow, SourceInlineData, SourceRingBuffer, SourceSeries, SourceTable,
22			SourceView, Take, Window,
23		},
24	},
25};
26use reifydb_transaction::transaction::{Transaction, command::CommandTransaction};
27use reifydb_type::{Result, error::Error};
28use tracing::instrument;
29
30use super::eval::evaluate_operator_config;
31#[cfg(reifydb_target = "native")]
32use crate::operator::apply::ApplyOperator;
33use crate::{
34	engine::FlowEngine,
35	operator::{
36		Operators,
37		append::AppendOperator,
38		distinct::DistinctOperator,
39		extend::ExtendOperator,
40		filter::FilterOperator,
41		gate::GateOperator,
42		join::operator::JoinOperator,
43		map::MapOperator,
44		scan::{
45			flow::PrimitiveFlowOperator, ringbuffer::PrimitiveRingBufferOperator,
46			series::PrimitiveSeriesOperator, table::PrimitiveTableOperator, view::PrimitiveViewOperator,
47		},
48		sink::{subscription::SinkSubscriptionOperator, view::SinkViewOperator},
49		sort::SortOperator,
50		take::TakeOperator,
51		window::WindowOperator,
52	},
53};
54
55impl FlowEngine {
56	#[instrument(name = "flow::register", level = "debug", skip(self, txn), fields(flow_id = ?flow.id))]
57	pub fn register(&mut self, txn: &mut CommandTransaction, flow: FlowDag) -> Result<()> {
58		debug_assert!(!self.flows.contains_key(&flow.id), "Flow already registered");
59
60		for node_id in flow.topological_order()? {
61			let node = flow.get_node(&node_id).unwrap();
62			self.add(txn, &flow, node)?;
63		}
64
65		self.analyzer.add(flow.clone());
66		self.flows.insert(flow.id, flow);
67
68		Ok(())
69	}
70
71	#[instrument(name = "flow::register::add_node", level = "debug", skip(self, txn, flow), fields(flow_id = ?flow.id, node_id = ?node.id, node_type = ?mem::discriminant(&node.ty)))]
72	fn add(&mut self, txn: &mut CommandTransaction, flow: &FlowDag, node: &FlowNode) -> Result<()> {
73		debug_assert!(!self.operators.contains_key(&node.id), "Operator already registered");
74		let node = node.clone();
75
76		match node.ty {
77			SourceInlineData {
78				..
79			} => {
80				unimplemented!()
81			}
82			SourceTable {
83				table,
84			} => {
85				let table = self.catalog.get_table(&mut Transaction::Command(&mut *txn), table)?;
86
87				self.add_source(flow.id, node.id, PrimitiveId::table(table.id));
88				self.operators.insert(
89					node.id,
90					Arc::new(Operators::SourceTable(PrimitiveTableOperator::new(node.id, table))),
91				);
92			}
93			SourceView {
94				view,
95			} => {
96				let view = self.catalog.get_view(&mut Transaction::Command(&mut *txn), view)?;
97				self.add_source(flow.id, node.id, PrimitiveId::view(view.id));
98
99				// For transactional views, also register the underlying table/ringbuffer
100				// sources so the deferred coordinator routes changes correctly. A transactional
101				// view is computed on-the-fly; its changes are never published to CDC. By
102				// registering the view's upstream primitives, the deferred flow is triggered
103				// when the underlying data changes.
104				if view.kind == ViewKind::Transactional {
105					let mut additional_sources = Vec::new();
106					if let Some(view_flow) = self.catalog.find_flow_by_name(
107						&mut Transaction::Command(&mut *txn),
108						view.namespace,
109						&view.name,
110					)? {
111						let flow_nodes = self.catalog.list_flow_nodes_by_flow(
112							&mut Transaction::Command(&mut *txn),
113							view_flow.id,
114						)?;
115						for flow_node in &flow_nodes {
116							// SourceTable = 1, SourceRingBuffer = 17, SourceSeries = 18
117							if flow_node.node_type == 1
118								|| flow_node.node_type == 17 || flow_node.node_type == 18
119							{
120								if let Ok(nt) =
121									from_bytes::<FlowNodeType>(&flow_node.data)
122								{
123									match nt {
124										SourceTable {
125											table: t,
126										} => {
127											additional_sources.push(
128												PrimitiveId::table(t),
129											);
130										}
131										SourceRingBuffer {
132											ringbuffer: rb,
133										} => {
134											additional_sources.push(
135												PrimitiveId::ringbuffer(
136													rb,
137												),
138											);
139										}
140										SourceSeries {
141											series: s,
142										} => {
143											additional_sources.push(
144												PrimitiveId::series(s),
145											);
146										}
147										_ => {}
148									}
149								}
150							}
151						}
152					}
153					for source in additional_sources {
154						self.add_source(flow.id, node.id, source);
155					}
156				}
157
158				self.operators.insert(
159					node.id,
160					Arc::new(Operators::SourceView(PrimitiveViewOperator::new(node.id, view))),
161				);
162			}
163			SourceFlow {
164				flow: source_flow,
165			} => {
166				let source_flow_def =
167					self.catalog.get_flow(&mut Transaction::Command(&mut *txn), source_flow)?;
168				self.operators.insert(
169					node.id,
170					Arc::new(Operators::SourceFlow(PrimitiveFlowOperator::new(
171						node.id,
172						source_flow_def,
173					))),
174				);
175			}
176			SourceRingBuffer {
177				ringbuffer,
178			} => {
179				let rb = self
180					.catalog
181					.get_ringbuffer(&mut Transaction::Command(&mut *txn), ringbuffer)?;
182				self.add_source(flow.id, node.id, PrimitiveId::ringbuffer(rb.id));
183				self.operators.insert(
184					node.id,
185					Arc::new(Operators::SourceRingBuffer(PrimitiveRingBufferOperator::new(
186						node.id, rb,
187					))),
188				);
189			}
190			SourceSeries {
191				series,
192			} => {
193				let s = self.catalog.get_series(&mut Transaction::Command(&mut *txn), series)?;
194				self.add_source(flow.id, node.id, PrimitiveId::series(s.id));
195				self.operators.insert(
196					node.id,
197					Arc::new(Operators::SourceSeries(PrimitiveSeriesOperator::new(node.id, s))),
198				);
199			}
200			SinkView {
201				view,
202			} => {
203				let parent = self
204					.operators
205					.get(&node.inputs[0])
206					.ok_or_else(|| Error(internal!("Parent operator not found")))?
207					.clone();
208
209				self.add_sink(flow.id, node.id, PrimitiveId::view(*view));
210				let resolved = self.catalog.resolve_view(&mut Transaction::Command(&mut *txn), view)?;
211				self.operators.insert(
212					node.id,
213					Arc::new(Operators::SinkView(SinkViewOperator::new(parent, node.id, resolved))),
214				);
215			}
216			SinkSubscription {
217				subscription,
218			} => {
219				// Guard against race condition: flow may have been deleted during loading
220				if node.inputs.is_empty() {
221					return Err(Error(internal!(
222						"SinkSubscription node has no inputs - flow may have been deleted during loading"
223					)));
224				}
225				let parent = self
226					.operators
227					.get(&node.inputs[0])
228					.ok_or_else(|| Error(internal!("Parent operator not found")))?
229					.clone();
230
231				// Note: Subscriptions use UUID-based IDs and are not added to the sinks map
232				// which uses PrimitiveId (u64-based). Subscriptions are ephemeral 1:1 mapped.
233				let resolved = self
234					.catalog
235					.resolve_subscription(&mut Transaction::Command(&mut *txn), subscription)?;
236				self.operators.insert(
237					node.id,
238					Arc::new(Operators::SinkSubscription(SinkSubscriptionOperator::new(
239						parent, node.id, resolved,
240					))),
241				);
242			}
243			Filter {
244				conditions,
245			} => {
246				let parent = self
247					.operators
248					.get(&node.inputs[0])
249					.ok_or_else(|| Error(internal!("Parent operator not found")))?
250					.clone();
251				self.operators.insert(
252					node.id,
253					Arc::new(Operators::Filter(FilterOperator::new(
254						parent,
255						node.id,
256						conditions,
257						self.executor.functions.clone(),
258						self.clock.clone(),
259					))),
260				);
261			}
262			Gate {
263				conditions,
264			} => {
265				let parent = self
266					.operators
267					.get(&node.inputs[0])
268					.ok_or_else(|| Error(internal!("Parent operator not found")))?
269					.clone();
270				self.operators.insert(
271					node.id,
272					Arc::new(Operators::Gate(GateOperator::new(
273						parent,
274						node.id,
275						conditions,
276						self.executor.functions.clone(),
277						self.clock.clone(),
278					))),
279				);
280			}
281			Map {
282				expressions,
283			} => {
284				let parent = self
285					.operators
286					.get(&node.inputs[0])
287					.ok_or_else(|| Error(internal!("Parent operator not found")))?
288					.clone();
289				self.operators.insert(
290					node.id,
291					Arc::new(Operators::Map(MapOperator::new(
292						parent,
293						node.id,
294						expressions,
295						self.executor.functions.clone(),
296						self.clock.clone(),
297					))),
298				);
299			}
300			Extend {
301				expressions,
302			} => {
303				let parent = self
304					.operators
305					.get(&node.inputs[0])
306					.ok_or_else(|| Error(internal!("Parent operator not found")))?
307					.clone();
308				self.operators.insert(
309					node.id,
310					Arc::new(Operators::Extend(ExtendOperator::new(parent, node.id, expressions))),
311				);
312			}
313			Sort {
314				by: _,
315			} => {
316				let parent = self
317					.operators
318					.get(&node.inputs[0])
319					.ok_or_else(|| Error(internal!("Parent operator not found")))?
320					.clone();
321				self.operators.insert(
322					node.id,
323					Arc::new(Operators::Sort(SortOperator::new(parent, node.id, Vec::new()))),
324				);
325			}
326			Take {
327				limit,
328			} => {
329				let parent = self
330					.operators
331					.get(&node.inputs[0])
332					.ok_or_else(|| Error(internal!("Parent operator not found")))?
333					.clone();
334				self.operators.insert(
335					node.id,
336					Arc::new(Operators::Take(TakeOperator::new(parent, node.id, limit))),
337				);
338			}
339			Join {
340				join_type,
341				left,
342				right,
343				alias,
344			} => {
345				// The join node should have exactly 2 inputs
346				if node.inputs.len() != 2 {
347					return Err(Error(internal!("Join node must have exactly 2 inputs")));
348				}
349
350				let left_node = node.inputs[0];
351				let right_node = node.inputs[1];
352
353				let left_parent = self
354					.operators
355					.get(&left_node)
356					.ok_or_else(|| Error(internal!("Left parent operator not found")))?
357					.clone();
358
359				let right_parent = self
360					.operators
361					.get(&right_node)
362					.ok_or_else(|| Error(internal!("Right parent operator not found")))?
363					.clone();
364
365				self.operators.insert(
366					node.id,
367					Arc::new(Operators::Join(JoinOperator::new(
368						left_parent,
369						right_parent,
370						node.id,
371						join_type,
372						left_node,
373						right_node,
374						left,
375						right,
376						alias,
377						self.executor.clone(),
378					))),
379				);
380			}
381			Distinct {
382				expressions,
383			} => {
384				let parent = self
385					.operators
386					.get(&node.inputs[0])
387					.ok_or_else(|| Error(internal!("Parent operator not found")))?
388					.clone();
389				self.operators.insert(
390					node.id,
391					Arc::new(Operators::Distinct(DistinctOperator::new(
392						parent,
393						node.id,
394						expressions,
395						self.executor.functions.clone(),
396						self.clock.clone(),
397					))),
398				);
399			}
400			Append {} => {
401				// Append requires at least 2 inputs
402				if node.inputs.len() < 2 {
403					return Err(Error(internal!("Append node must have at least 2 inputs")));
404				}
405
406				let mut parents = Vec::with_capacity(node.inputs.len());
407
408				for input_node_id in &node.inputs {
409					let parent = self
410						.operators
411						.get(input_node_id)
412						.ok_or_else(|| {
413							Error(internal!(
414								"Parent operator not found for input {:?}",
415								input_node_id
416							))
417						})?
418						.clone();
419					parents.push(parent);
420				}
421
422				self.operators.insert(
423					node.id,
424					Arc::new(Operators::Append(AppendOperator::new(
425						node.id,
426						parents,
427						node.inputs.clone(),
428					))),
429				);
430			}
431			Apply {
432				operator,
433				expressions,
434			} => {
435				let config = evaluate_operator_config(
436					expressions.as_slice(),
437					&self.executor.functions,
438					&self.clock,
439				)?;
440
441				if let Some(factory) = self.custom_operators.get(operator.as_str()) {
442					let op = factory(node.id, &config)?;
443					self.operators.insert(node.id, Arc::new(Operators::Custom(op)));
444				} else {
445					#[cfg(reifydb_target = "native")]
446					{
447						let parent = self
448							.operators
449							.get(&node.inputs[0])
450							.ok_or_else(|| Error(internal!("Parent operator not found")))?
451							.clone();
452
453						if !self.is_ffi_operator(operator.as_str()) {
454							return Err(Error(internal!("Unknown operator: {}", operator)));
455						}
456
457						let ffi_op =
458							self.create_ffi_operator(operator.as_str(), node.id, &config)?;
459
460						self.operators.insert(
461							node.id,
462							Arc::new(Operators::Apply(ApplyOperator::new(
463								parent, node.id, ffi_op,
464							))),
465						);
466					}
467					#[cfg(not(reifydb_target = "native"))]
468					{
469						let _ = operator;
470						return Err(Error(internal!(
471							"FFI operators are not supported in WASM"
472						)));
473					}
474				}
475			}
476			Aggregate {
477				..
478			} => unimplemented!(),
479			Window {
480				window_type,
481				size,
482				slide,
483				group_by,
484				aggregations,
485				min_events,
486				max_window_count,
487				max_window_age,
488			} => {
489				let parent = self
490					.operators
491					.get(&node.inputs[0])
492					.ok_or_else(|| Error(internal!("Parent operator not found")))?
493					.clone();
494				let operator = WindowOperator::new(
495					parent,
496					node.id,
497					window_type.clone(),
498					size.clone(),
499					slide.clone(),
500					group_by.clone(),
501					aggregations.clone(),
502					min_events.clone(),
503					max_window_count.clone(),
504					max_window_age.clone(),
505					self.clock.clone(),
506					self.executor.functions.clone(),
507				);
508				self.operators.insert(node.id, Arc::new(Operators::Window(operator)));
509			}
510		}
511
512		Ok(())
513	}
514
515	fn add_source(&mut self, flow: FlowId, node: FlowNodeId, source: PrimitiveId) {
516		let nodes = self.sources.entry(source).or_insert_with(Vec::new);
517
518		let entry = (flow, node);
519		if !nodes.contains(&entry) {
520			nodes.push(entry);
521		}
522	}
523
524	fn add_sink(&mut self, flow: FlowId, node: FlowNodeId, sink: PrimitiveId) {
525		let nodes = self.sinks.entry(sink).or_insert_with(Vec::new);
526
527		let entry = (flow, node);
528		if !nodes.contains(&entry) {
529			nodes.push(entry);
530		}
531	}
532}