Skip to main content

reifydb_sub_flow/engine/
register.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{mem, sync::Arc};
5
6use postcard::from_bytes;
7use reifydb_core::{
8	interface::catalog::{
9		flow::{FlowId, FlowNodeId},
10		schema::SchemaId,
11		view::ViewKind,
12	},
13	internal,
14};
15use reifydb_rql::flow::{
16	flow::FlowDag,
17	node::{
18		FlowNode,
19		FlowNodeType::{
20			self, Aggregate, Append, Apply, Distinct, Extend, Filter, Gate, Join, Map, SinkRingBufferView,
21			SinkSeriesView, SinkSubscription, SinkTableView, Sort, SourceFlow, SourceInlineData,
22			SourceRingBuffer, SourceSeries, SourceTable, SourceView, Take, Window,
23		},
24	},
25};
26use reifydb_transaction::transaction::{Transaction, command::CommandTransaction};
27use reifydb_type::{Result, error::Error};
28use tracing::instrument;
29
30use super::eval::evaluate_operator_config;
31#[cfg(reifydb_target = "native")]
32use crate::operator::apply::ApplyOperator;
33use crate::{
34	engine::FlowEngine,
35	operator::{
36		Operators,
37		append::AppendOperator,
38		distinct::DistinctOperator,
39		extend::ExtendOperator,
40		filter::FilterOperator,
41		gate::GateOperator,
42		join::operator::JoinOperator,
43		map::MapOperator,
44		scan::{
45			flow::PrimitiveFlowOperator, ringbuffer::PrimitiveRingBufferOperator,
46			series::PrimitiveSeriesOperator, table::PrimitiveTableOperator, view::PrimitiveViewOperator,
47		},
48		sink::{
49			ringbuffer_view::SinkRingBufferViewOperator, series_view::SinkSeriesViewOperator,
50			subscription::SinkSubscriptionOperator, view::SinkTableViewOperator,
51		},
52		sort::SortOperator,
53		take::TakeOperator,
54		window::WindowOperator,
55	},
56};
57
58impl FlowEngine {
59	#[instrument(name = "flow::register", level = "debug", skip(self, txn), fields(flow_id = ?flow.id))]
60	pub fn register(&mut self, txn: &mut CommandTransaction, flow: FlowDag) -> Result<()> {
61		self.register_with_transaction(&mut Transaction::Command(txn), flow)
62	}
63
64	#[instrument(name = "flow::register_with_transaction", level = "debug", skip(self, txn), fields(flow_id = ?flow.id))]
65	pub fn register_with_transaction(&mut self, txn: &mut Transaction<'_>, flow: FlowDag) -> Result<()> {
66		debug_assert!(!self.flows.contains_key(&flow.id), "Flow already registered");
67
68		for node_id in flow.topological_order()? {
69			let node = flow.get_node(&node_id).unwrap();
70			self.add(txn, &flow, node)?;
71		}
72
73		self.analyzer.add(flow.clone());
74		self.flows.insert(flow.id, flow);
75
76		Ok(())
77	}
78
79	#[instrument(name = "flow::add", level = "debug", skip(self, txn, flow), fields(flow_id = ?flow.id, node_id = ?node.id, node_type = ?mem::discriminant(&node.ty)))]
80	fn add(&mut self, txn: &mut Transaction<'_>, flow: &FlowDag, node: &FlowNode) -> Result<()> {
81		debug_assert!(!self.operators.contains_key(&node.id), "Operator already registered");
82		let node = node.clone();
83
84		match node.ty {
85			SourceInlineData {
86				..
87			} => {
88				unimplemented!()
89			}
90			SourceTable {
91				table,
92			} => {
93				let table = self.catalog.get_table(&mut txn.reborrow(), table)?;
94
95				self.add_source(flow.id, node.id, SchemaId::table(table.id));
96				self.operators.insert(
97					node.id,
98					Arc::new(Operators::SourceTable(PrimitiveTableOperator::new(node.id, table))),
99				);
100			}
101			SourceView {
102				view,
103			} => {
104				let view = self.catalog.get_view(&mut txn.reborrow(), view)?;
105				self.add_source(flow.id, node.id, SchemaId::view(view.id()));
106
107				// For deferred views, also register the underlying primitive as a source.
108				// Deferred view sinks write to the underlying primitive's key space,
109				// so CDC from upstream deferred view commits uses the underlying SchemaId.
110				if view.kind() == ViewKind::Deferred {
111					self.add_source(flow.id, node.id, view.underlying_id());
112				}
113
114				// For transactional views, also register the underlying table/ringbuffer
115				// sources so the deferred coordinator routes changes correctly. A transactional
116				// view is computed on-the-fly; its changes are never published to CDC. By
117				// registering the view's upstream primitives, the deferred flow is triggered
118				// when the underlying data changes.
119				if view.kind() == ViewKind::Transactional {
120					let mut additional_sources = Vec::new();
121					if let Some(view_flow) = self.catalog.find_flow_by_name(
122						&mut txn.reborrow(),
123						view.namespace(),
124						view.name(),
125					)? {
126						let flow_nodes = self
127							.catalog
128							.list_flow_nodes_by_flow(&mut txn.reborrow(), view_flow.id)?;
129						for flow_node in &flow_nodes {
130							// SourceTable = 1, SourceRingBuffer = 17, SourceSeries = 18
131							if flow_node.node_type == 1
132								|| flow_node.node_type == 17 || flow_node.node_type == 18
133							{
134								if let Ok(nt) =
135									from_bytes::<FlowNodeType>(&flow_node.data)
136								{
137									match nt {
138										SourceTable {
139											table: t,
140										} => {
141											additional_sources.push(
142												SchemaId::table(t),
143											);
144										}
145										SourceRingBuffer {
146											ringbuffer: rb,
147										} => {
148											additional_sources.push(
149												SchemaId::ringbuffer(
150													rb,
151												),
152											);
153										}
154										SourceSeries {
155											series: s,
156										} => {
157											additional_sources.push(
158												SchemaId::series(s),
159											);
160										}
161										_ => {}
162									}
163								}
164							}
165						}
166					}
167					for source in additional_sources {
168						self.add_source(flow.id, node.id, source);
169					}
170				}
171
172				self.operators.insert(
173					node.id,
174					Arc::new(Operators::SourceView(PrimitiveViewOperator::new(node.id, view))),
175				);
176			}
177			SourceFlow {
178				flow: source_flow,
179			} => {
180				let source_flow = self.catalog.get_flow(&mut txn.reborrow(), source_flow)?;
181				self.operators.insert(
182					node.id,
183					Arc::new(Operators::SourceFlow(PrimitiveFlowOperator::new(
184						node.id,
185						source_flow,
186					))),
187				);
188			}
189			SourceRingBuffer {
190				ringbuffer,
191			} => {
192				let rb = self.catalog.get_ringbuffer(&mut txn.reborrow(), ringbuffer)?;
193				self.add_source(flow.id, node.id, SchemaId::ringbuffer(rb.id));
194				self.operators.insert(
195					node.id,
196					Arc::new(Operators::SourceRingBuffer(PrimitiveRingBufferOperator::new(
197						node.id, rb,
198					))),
199				);
200			}
201			SourceSeries {
202				series,
203			} => {
204				let s = self.catalog.get_series(&mut txn.reborrow(), series)?;
205				self.add_source(flow.id, node.id, SchemaId::series(s.id));
206				self.operators.insert(
207					node.id,
208					Arc::new(Operators::SourceSeries(PrimitiveSeriesOperator::new(node.id, s))),
209				);
210			}
211			SinkTableView {
212				view,
213				table,
214			} => {
215				let parent = self
216					.operators
217					.get(&node.inputs[0])
218					.ok_or_else(|| Error(internal!("Parent operator not found")))?
219					.clone();
220
221				self.add_sink(flow.id, node.id, SchemaId::view(*view));
222				let resolved = self.catalog.resolve_view(&mut txn.reborrow(), view)?;
223				self.operators.insert(
224					node.id,
225					Arc::new(Operators::SinkTableView(SinkTableViewOperator::new(
226						parent, node.id, resolved, table,
227					))),
228				);
229			}
230			SinkRingBufferView {
231				view,
232				ringbuffer,
233				capacity,
234				propagate_evictions,
235			} => {
236				let parent = self
237					.operators
238					.get(&node.inputs[0])
239					.ok_or_else(|| Error(internal!("Parent operator not found")))?
240					.clone();
241				self.add_sink(flow.id, node.id, SchemaId::view(*view));
242				let resolved = self.catalog.resolve_view(&mut txn.reborrow(), view)?;
243				self.operators.insert(
244					node.id,
245					Arc::new(Operators::SinkRingBufferView(SinkRingBufferViewOperator::new(
246						parent,
247						node.id,
248						resolved,
249						ringbuffer,
250						capacity,
251						propagate_evictions,
252					))),
253				);
254			}
255			SinkSeriesView {
256				view,
257				series,
258				key,
259			} => {
260				let parent = self
261					.operators
262					.get(&node.inputs[0])
263					.ok_or_else(|| Error(internal!("Parent operator not found")))?
264					.clone();
265				self.add_sink(flow.id, node.id, SchemaId::view(*view));
266				let resolved = self.catalog.resolve_view(&mut txn.reborrow(), view)?;
267				self.operators.insert(
268					node.id,
269					Arc::new(Operators::SinkSeriesView(SinkSeriesViewOperator::new(
270						parent,
271						node.id,
272						resolved,
273						series,
274						key.clone(),
275					))),
276				);
277			}
278			SinkSubscription {
279				subscription,
280			} => {
281				// Guard against race condition: flow may have been deleted during loading
282				if node.inputs.is_empty() {
283					return Err(Error(internal!(
284						"SinkSubscription node has no inputs - flow may have been deleted during loading"
285					)));
286				}
287				let parent = self
288					.operators
289					.get(&node.inputs[0])
290					.ok_or_else(|| Error(internal!("Parent operator not found")))?
291					.clone();
292
293				// Note: Subscriptions use UUID-based IDs and are not added to the sinks map
294				// which uses SchemaId (u64-based). Subscriptions are ephemeral 1:1 mapped.
295				let resolved = self.catalog.resolve_subscription(&mut txn.reborrow(), subscription)?;
296				self.operators.insert(
297					node.id,
298					Arc::new(Operators::SinkSubscription(SinkSubscriptionOperator::new(
299						parent, node.id, resolved,
300					))),
301				);
302			}
303			Filter {
304				conditions,
305			} => {
306				let parent = self
307					.operators
308					.get(&node.inputs[0])
309					.ok_or_else(|| Error(internal!("Parent operator not found")))?
310					.clone();
311				self.operators.insert(
312					node.id,
313					Arc::new(Operators::Filter(FilterOperator::new(
314						parent,
315						node.id,
316						conditions,
317						self.executor.functions.clone(),
318						self.runtime_context.clone(),
319					))),
320				);
321			}
322			Gate {
323				conditions,
324			} => {
325				let parent = self
326					.operators
327					.get(&node.inputs[0])
328					.ok_or_else(|| Error(internal!("Parent operator not found")))?
329					.clone();
330				self.operators.insert(
331					node.id,
332					Arc::new(Operators::Gate(GateOperator::new(
333						parent,
334						node.id,
335						conditions,
336						self.executor.functions.clone(),
337						self.runtime_context.clone(),
338					))),
339				);
340			}
341			Map {
342				expressions,
343			} => {
344				let parent = self
345					.operators
346					.get(&node.inputs[0])
347					.ok_or_else(|| Error(internal!("Parent operator not found")))?
348					.clone();
349				self.operators.insert(
350					node.id,
351					Arc::new(Operators::Map(MapOperator::new(
352						parent,
353						node.id,
354						expressions,
355						self.executor.functions.clone(),
356						self.runtime_context.clone(),
357					))),
358				);
359			}
360			Extend {
361				expressions,
362			} => {
363				let parent = self
364					.operators
365					.get(&node.inputs[0])
366					.ok_or_else(|| Error(internal!("Parent operator not found")))?
367					.clone();
368				self.operators.insert(
369					node.id,
370					Arc::new(Operators::Extend(ExtendOperator::new(parent, node.id, expressions))),
371				);
372			}
373			Sort {
374				by: _,
375			} => {
376				let parent = self
377					.operators
378					.get(&node.inputs[0])
379					.ok_or_else(|| Error(internal!("Parent operator not found")))?
380					.clone();
381				self.operators.insert(
382					node.id,
383					Arc::new(Operators::Sort(SortOperator::new(parent, node.id, Vec::new()))),
384				);
385			}
386			Take {
387				limit,
388			} => {
389				let parent = self
390					.operators
391					.get(&node.inputs[0])
392					.ok_or_else(|| Error(internal!("Parent operator not found")))?
393					.clone();
394				self.operators.insert(
395					node.id,
396					Arc::new(Operators::Take(TakeOperator::new(parent, node.id, limit))),
397				);
398			}
399			Join {
400				join_type,
401				left,
402				right,
403				alias,
404			} => {
405				// The join node should have exactly 2 inputs
406				if node.inputs.len() != 2 {
407					return Err(Error(internal!("Join node must have exactly 2 inputs")));
408				}
409
410				let left_node = node.inputs[0];
411				let right_node = node.inputs[1];
412
413				let left_parent = self
414					.operators
415					.get(&left_node)
416					.ok_or_else(|| Error(internal!("Left parent operator not found")))?
417					.clone();
418
419				let right_parent = self
420					.operators
421					.get(&right_node)
422					.ok_or_else(|| Error(internal!("Right parent operator not found")))?
423					.clone();
424
425				self.operators.insert(
426					node.id,
427					Arc::new(Operators::Join(JoinOperator::new(
428						left_parent,
429						right_parent,
430						node.id,
431						join_type,
432						left_node,
433						right_node,
434						left,
435						right,
436						alias,
437						self.executor.clone(),
438					))),
439				);
440			}
441			Distinct {
442				expressions,
443			} => {
444				let parent = self
445					.operators
446					.get(&node.inputs[0])
447					.ok_or_else(|| Error(internal!("Parent operator not found")))?
448					.clone();
449				self.operators.insert(
450					node.id,
451					Arc::new(Operators::Distinct(DistinctOperator::new(
452						parent,
453						node.id,
454						expressions,
455						self.executor.functions.clone(),
456						self.runtime_context.clone(),
457					))),
458				);
459			}
460			Append {} => {
461				// Append requires at least 2 inputs
462				if node.inputs.len() < 2 {
463					return Err(Error(internal!("Append node must have at least 2 inputs")));
464				}
465
466				let mut parents = Vec::with_capacity(node.inputs.len());
467
468				for input_node_id in &node.inputs {
469					let parent = self
470						.operators
471						.get(input_node_id)
472						.ok_or_else(|| {
473							Error(internal!(
474								"Parent operator not found for input {:?}",
475								input_node_id
476							))
477						})?
478						.clone();
479					parents.push(parent);
480				}
481
482				self.operators.insert(
483					node.id,
484					Arc::new(Operators::Append(AppendOperator::new(
485						node.id,
486						parents,
487						node.inputs.clone(),
488					))),
489				);
490			}
491			Apply {
492				operator,
493				expressions,
494			} => {
495				let config = evaluate_operator_config(
496					expressions.as_slice(),
497					&self.executor.functions,
498					&self.runtime_context,
499				)?;
500
501				if let Some(factory) = self.custom_operators.get(operator.as_str()) {
502					let op = factory(node.id, &config)?;
503					self.operators.insert(node.id, Arc::new(Operators::Custom(op)));
504				} else {
505					#[cfg(reifydb_target = "native")]
506					{
507						let parent = self
508							.operators
509							.get(&node.inputs[0])
510							.ok_or_else(|| Error(internal!("Parent operator not found")))?
511							.clone();
512
513						if !self.is_ffi_operator(operator.as_str()) {
514							return Err(Error(internal!("Unknown operator: {}", operator)));
515						}
516
517						let ffi_op =
518							self.create_ffi_operator(operator.as_str(), node.id, &config)?;
519
520						self.operators.insert(
521							node.id,
522							Arc::new(Operators::Apply(ApplyOperator::new(
523								parent, node.id, ffi_op,
524							))),
525						);
526					}
527					#[cfg(not(reifydb_target = "native"))]
528					{
529						let _ = operator;
530						return Err(Error(internal!(
531							"FFI operators are not supported in WASM"
532						)));
533					}
534				}
535			}
536			Aggregate {
537				..
538			} => unimplemented!(),
539			Window {
540				kind,
541				group_by,
542				aggregations,
543				ts,
544			} => {
545				let parent = self
546					.operators
547					.get(&node.inputs[0])
548					.ok_or_else(|| Error(internal!("Parent operator not found")))?
549					.clone();
550				let operator = WindowOperator::new(
551					parent,
552					node.id,
553					kind.clone(),
554					group_by.clone(),
555					aggregations.clone(),
556					ts.clone(),
557					self.runtime_context.clone(),
558					self.executor.functions.clone(),
559				);
560				self.operators.insert(node.id, Arc::new(Operators::Window(operator)));
561			}
562		}
563
564		Ok(())
565	}
566
567	fn add_source(&mut self, flow: FlowId, node: FlowNodeId, schema: SchemaId) {
568		let nodes = self.sources.entry(schema).or_insert_with(Vec::new);
569
570		let entry = (flow, node);
571		if !nodes.contains(&entry) {
572			nodes.push(entry);
573		}
574	}
575
576	fn add_sink(&mut self, flow: FlowId, node: FlowNodeId, sink: SchemaId) {
577		let nodes = self.sinks.entry(sink).or_insert_with(Vec::new);
578
579		let entry = (flow, node);
580		if !nodes.contains(&entry) {
581			nodes.push(entry);
582		}
583	}
584}