Skip to main content

reifydb_sub_flow/engine/
register.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{mem, sync::Arc};
5
6use postcard::from_bytes;
7use reifydb_core::{
8	interface::catalog::{
9		flow::{FlowId, FlowNodeId},
10		id::ViewId,
11		shape::ShapeId,
12		view::ViewKind,
13	},
14	internal,
15};
16use reifydb_rql::flow::{
17	flow::FlowDag,
18	node::{
19		FlowNode,
20		FlowNodeType::{
21			self, Aggregate, Append, Apply, Distinct, Extend, Filter, Gate, Join, Map, SinkRingBufferView,
22			SinkSeriesView, SinkSubscription, SinkTableView, Sort, SourceFlow, SourceInlineData,
23			SourceRingBuffer, SourceSeries, SourceTable, SourceView, Take, Window,
24		},
25	},
26};
27use reifydb_transaction::transaction::{Transaction, command::CommandTransaction};
28use reifydb_type::{Result, error::Error};
29use tracing::instrument;
30
31use super::eval::evaluate_operator_config;
32#[cfg(reifydb_target = "native")]
33use crate::operator::apply::ApplyOperator;
34use crate::{
35	engine::FlowEngine,
36	operator::{
37		Operators,
38		append::AppendOperator,
39		distinct::DistinctOperator,
40		extend::ExtendOperator,
41		filter::FilterOperator,
42		gate::GateOperator,
43		join::operator::{JoinOperator, JoinSideConfig},
44		map::MapOperator,
45		scan::{
46			flow::PrimitiveFlowOperator, ringbuffer::PrimitiveRingBufferOperator,
47			series::PrimitiveSeriesOperator, table::PrimitiveTableOperator, view::PrimitiveViewOperator,
48		},
49		sink::{
50			ringbuffer_view::SinkRingBufferViewOperator, series_view::SinkSeriesViewOperator,
51			view::SinkTableViewOperator,
52		},
53		sort::SortOperator,
54		take::TakeOperator,
55		window::{WindowConfig, WindowOperator},
56	},
57};
58
59impl FlowEngine {
60	#[instrument(name = "flow::register", level = "debug", skip(self, txn), fields(flow_id = ?flow.id))]
61	pub fn register(&mut self, txn: &mut CommandTransaction, flow: FlowDag) -> Result<()> {
62		self.register_with_transaction(&mut Transaction::Command(txn), flow)
63	}
64
65	#[instrument(name = "flow::register_with_transaction", level = "debug", skip(self, txn), fields(flow_id = ?flow.id))]
66	pub fn register_with_transaction(&mut self, txn: &mut Transaction<'_>, flow: FlowDag) -> Result<()> {
67		debug_assert!(!self.flows.contains_key(&flow.id), "Flow already registered");
68
69		for node_id in flow.topological_order()? {
70			let node = flow.get_node(&node_id).unwrap();
71			self.add(txn, &flow, node)?;
72		}
73
74		self.analyzer.add(flow.clone());
75		self.flows.insert(flow.id, flow);
76
77		Ok(())
78	}
79
80	#[instrument(name = "flow::add", level = "debug", skip(self, txn, flow), fields(flow_id = ?flow.id, node_id = ?node.id, node_type = ?mem::discriminant(&node.ty)))]
81	pub fn add(&mut self, txn: &mut Transaction<'_>, flow: &FlowDag, node: &FlowNode) -> Result<()> {
82		debug_assert!(!self.operators.contains_key(&node.id), "Operator already registered");
83		let node = node.clone();
84
85		match node.ty {
86			SourceInlineData {
87				..
88			} => {
89				unimplemented!()
90			}
91			SourceTable {
92				table,
93			} => {
94				let table = self.catalog.get_table(&mut txn.reborrow(), table)?;
95
96				self.add_source(flow.id, node.id, ShapeId::table(table.id));
97				self.operators.insert(
98					node.id,
99					Arc::new(Operators::SourceTable(PrimitiveTableOperator::new(node.id, table))),
100				);
101			}
102			SourceView {
103				view,
104			} => self.register_source_view(txn, flow, &node, view)?,
105			SourceFlow {
106				flow: source_flow,
107			} => {
108				let source_flow = self.catalog.get_flow(&mut txn.reborrow(), source_flow)?;
109				self.operators.insert(
110					node.id,
111					Arc::new(Operators::SourceFlow(PrimitiveFlowOperator::new(
112						node.id,
113						source_flow,
114					))),
115				);
116			}
117			SourceRingBuffer {
118				ringbuffer,
119			} => {
120				let rb = self.catalog.get_ringbuffer(&mut txn.reborrow(), ringbuffer)?;
121				self.add_source(flow.id, node.id, ShapeId::ringbuffer(rb.id));
122				self.operators.insert(
123					node.id,
124					Arc::new(Operators::SourceRingBuffer(PrimitiveRingBufferOperator::new(
125						node.id, rb,
126					))),
127				);
128			}
129			SourceSeries {
130				series,
131			} => {
132				let s = self.catalog.get_series(&mut txn.reborrow(), series)?;
133				self.add_source(flow.id, node.id, ShapeId::series(s.id));
134				self.operators.insert(
135					node.id,
136					Arc::new(Operators::SourceSeries(PrimitiveSeriesOperator::new(node.id, s))),
137				);
138			}
139			SinkTableView {
140				view,
141				table,
142			} => {
143				let parent = self
144					.operators
145					.get(&node.inputs[0])
146					.ok_or_else(|| Error(Box::new(internal!("Parent operator not found"))))?
147					.clone();
148
149				self.add_sink(flow.id, node.id, ShapeId::view(*view));
150				let resolved = self.catalog.resolve_view(&mut txn.reborrow(), view)?;
151				self.operators.insert(
152					node.id,
153					Arc::new(Operators::SinkTableView(SinkTableViewOperator::new(
154						parent, node.id, resolved, table,
155					))),
156				);
157			}
158			SinkRingBufferView {
159				view,
160				ringbuffer,
161				capacity,
162				propagate_evictions,
163			} => {
164				let parent = self
165					.operators
166					.get(&node.inputs[0])
167					.ok_or_else(|| Error(Box::new(internal!("Parent operator not found"))))?
168					.clone();
169				self.add_sink(flow.id, node.id, ShapeId::view(*view));
170				let resolved = self.catalog.resolve_view(&mut txn.reborrow(), view)?;
171				self.operators.insert(
172					node.id,
173					Arc::new(Operators::SinkRingBufferView(SinkRingBufferViewOperator::new(
174						parent,
175						node.id,
176						resolved,
177						ringbuffer,
178						capacity,
179						propagate_evictions,
180					))),
181				);
182			}
183			SinkSeriesView {
184				view,
185				series,
186				key,
187			} => {
188				let parent = self
189					.operators
190					.get(&node.inputs[0])
191					.ok_or_else(|| Error(Box::new(internal!("Parent operator not found"))))?
192					.clone();
193				self.add_sink(flow.id, node.id, ShapeId::view(*view));
194				let resolved = self.catalog.resolve_view(&mut txn.reborrow(), view)?;
195				self.operators.insert(
196					node.id,
197					Arc::new(Operators::SinkSeriesView(SinkSeriesViewOperator::new(
198						parent,
199						node.id,
200						resolved,
201						series,
202						key.clone(),
203					))),
204				);
205			}
206			SinkSubscription {
207				..
208			} => {
209				// Subscriptions are now ephemeral and handled by reifydb-sub-subscription.
210				// Persistent subscription flows are no longer created.
211				return Err(Error(Box::new(internal!(
212					"SinkSubscription nodes are no longer supported in persistent flows"
213				))));
214			}
215			Filter {
216				conditions,
217			} => {
218				let parent = self
219					.operators
220					.get(&node.inputs[0])
221					.ok_or_else(|| Error(Box::new(internal!("Parent operator not found"))))?
222					.clone();
223				self.operators.insert(
224					node.id,
225					Arc::new(Operators::Filter(FilterOperator::new(
226						parent,
227						node.id,
228						conditions,
229						self.executor.routines.clone(),
230						self.runtime_context.clone(),
231					))),
232				);
233			}
234			Gate {
235				conditions,
236			} => {
237				let parent = self
238					.operators
239					.get(&node.inputs[0])
240					.ok_or_else(|| Error(Box::new(internal!("Parent operator not found"))))?
241					.clone();
242				self.operators.insert(
243					node.id,
244					Arc::new(Operators::Gate(GateOperator::new(
245						parent,
246						node.id,
247						conditions,
248						self.executor.routines.clone(),
249						self.runtime_context.clone(),
250					))),
251				);
252			}
253			Map {
254				expressions,
255			} => {
256				let parent = self
257					.operators
258					.get(&node.inputs[0])
259					.ok_or_else(|| Error(Box::new(internal!("Parent operator not found"))))?
260					.clone();
261				self.operators.insert(
262					node.id,
263					Arc::new(Operators::Map(MapOperator::new(
264						parent,
265						node.id,
266						expressions,
267						self.executor.routines.clone(),
268						self.runtime_context.clone(),
269					))),
270				);
271			}
272			Extend {
273				expressions,
274			} => {
275				let parent = self
276					.operators
277					.get(&node.inputs[0])
278					.ok_or_else(|| Error(Box::new(internal!("Parent operator not found"))))?
279					.clone();
280				self.operators.insert(
281					node.id,
282					Arc::new(Operators::Extend(ExtendOperator::new(parent, node.id, expressions))),
283				);
284			}
285			Sort {
286				by: _,
287			} => {
288				let parent = self
289					.operators
290					.get(&node.inputs[0])
291					.ok_or_else(|| Error(Box::new(internal!("Parent operator not found"))))?
292					.clone();
293				self.operators.insert(
294					node.id,
295					Arc::new(Operators::Sort(SortOperator::new(parent, node.id, Vec::new()))),
296				);
297			}
298			Take {
299				limit,
300			} => {
301				let parent = self
302					.operators
303					.get(&node.inputs[0])
304					.ok_or_else(|| Error(Box::new(internal!("Parent operator not found"))))?
305					.clone();
306				self.operators.insert(
307					node.id,
308					Arc::new(Operators::Take(TakeOperator::new(parent, node.id, limit))),
309				);
310			}
311			Join {
312				join_type,
313				left,
314				right,
315				alias,
316			} => {
317				if node.inputs.len() != 2 {
318					return Err(Error(Box::new(internal!("Join node must have exactly 2 inputs"))));
319				}
320
321				let left_node = node.inputs[0];
322				let right_node = node.inputs[1];
323
324				let left_parent = self
325					.operators
326					.get(&left_node)
327					.ok_or_else(|| Error(Box::new(internal!("Left parent operator not found"))))?
328					.clone();
329
330				let right_parent = self
331					.operators
332					.get(&right_node)
333					.ok_or_else(|| Error(Box::new(internal!("Right parent operator not found"))))?
334					.clone();
335
336				self.operators.insert(
337					node.id,
338					Arc::new(Operators::Join(JoinOperator::new(
339						JoinSideConfig {
340							parent: left_parent,
341							node: left_node,
342							exprs: left,
343						},
344						JoinSideConfig {
345							parent: right_parent,
346							node: right_node,
347							exprs: right,
348						},
349						node.id,
350						join_type,
351						alias,
352						self.executor.clone(),
353					))),
354				);
355			}
356			Distinct {
357				expressions,
358			} => {
359				let parent = self
360					.operators
361					.get(&node.inputs[0])
362					.ok_or_else(|| Error(Box::new(internal!("Parent operator not found"))))?
363					.clone();
364				self.operators.insert(
365					node.id,
366					Arc::new(Operators::Distinct(DistinctOperator::new(
367						parent,
368						node.id,
369						expressions,
370						self.executor.routines.clone(),
371						self.runtime_context.clone(),
372					))),
373				);
374			}
375			Append => {
376				if node.inputs.len() < 2 {
377					return Err(Error(Box::new(internal!(
378						"Append node must have at least 2 inputs"
379					))));
380				}
381
382				let mut parents = Vec::with_capacity(node.inputs.len());
383
384				for input_node_id in &node.inputs {
385					let parent = self
386						.operators
387						.get(input_node_id)
388						.ok_or_else(|| {
389							Error(Box::new(internal!(
390								"Parent operator not found for input {:?}",
391								input_node_id
392							)))
393						})?
394						.clone();
395					parents.push(parent);
396				}
397
398				self.operators.insert(
399					node.id,
400					Arc::new(Operators::Append(AppendOperator::new(
401						node.id,
402						parents,
403						node.inputs.clone(),
404					))),
405				);
406			}
407			Apply {
408				operator,
409				expressions,
410			} => {
411				let config = evaluate_operator_config(
412					expressions.as_slice(),
413					&self.executor.routines,
414					&self.runtime_context,
415				)?;
416
417				if let Some(factory) = self.custom_operators.get(operator.as_str()) {
418					let op = factory(node.id, &config)?;
419					self.operators.insert(node.id, Arc::new(Operators::Custom(op)));
420				} else {
421					#[cfg(reifydb_target = "native")]
422					{
423						let parent = self
424							.operators
425							.get(&node.inputs[0])
426							.ok_or_else(|| {
427								Error(Box::new(internal!("Parent operator not found")))
428							})?
429							.clone();
430
431						if !self.is_ffi_operator(operator.as_str()) {
432							return Err(Error(Box::new(internal!(
433								"Unknown operator: {}",
434								operator
435							))));
436						}
437
438						let ffi_op =
439							self.create_ffi_operator(operator.as_str(), node.id, &config)?;
440
441						self.operators.insert(
442							node.id,
443							Arc::new(Operators::Apply(ApplyOperator::new(
444								parent, node.id, ffi_op,
445							))),
446						);
447					}
448					#[cfg(not(reifydb_target = "native"))]
449					{
450						let _ = operator;
451						return Err(Error(Box::new(internal!(
452							"FFI operators are not supported in WASM"
453						))));
454					}
455				}
456			}
457			Aggregate {
458				..
459			} => unimplemented!(),
460			Window {
461				kind,
462				group_by,
463				aggregations,
464				ts,
465			} => {
466				let parent = self
467					.operators
468					.get(&node.inputs[0])
469					.ok_or_else(|| Error(Box::new(internal!("Parent operator not found"))))?
470					.clone();
471				let operator = WindowOperator::new(WindowConfig {
472					parent,
473					node: node.id,
474					kind: kind.clone(),
475					group_by: group_by.clone(),
476					aggregations: aggregations.clone(),
477					ts: ts.clone(),
478					runtime_context: self.runtime_context.clone(),
479					routines: self.executor.routines.clone(),
480				});
481				self.operators.insert(node.id, Arc::new(Operators::Window(operator)));
482			}
483		}
484
485		Ok(())
486	}
487
488	#[inline]
489	fn register_source_view(
490		&mut self,
491		txn: &mut Transaction<'_>,
492		flow: &FlowDag,
493		node: &FlowNode,
494		view: ViewId,
495	) -> Result<()> {
496		let view = self.catalog.get_view(&mut txn.reborrow(), view)?;
497		self.add_source(flow.id, node.id, ShapeId::view(view.id()));
498
499		// Both deferred and transactional view sinks write view-shape rows into
500		// the view's underlying backing shape, so CDC on that shape carries
501		// view-shape rows. Registering it as a source makes a SourceView in any
502		// consumer flow see view output identically regardless of view kind.
503		self.add_source(flow.id, node.id, view.underlying_id());
504
505		// Legacy transactional-view propagation: a downstream DEFERRED flow
506		// reading from a transactional view must also be woken up by the
507		// view's UPSTREAM primitive CDC, because the deferred coordinator
508		// handles cascading transactional-view changes through pre-commit
509		// interceptors that bypass CDC.
510		//
511		// Skip when:
512		//  - the current flow is itself transactional (its pre-commit path already propagates changes through
513		//    `available_changes`), or
514		//  - the current flow is an ephemeral subscription (a `SinkSubscription` is its terminal node).
515		//    Subscription consumers rely on the `view.underlying_id()` CDC registration above, identical across
516		//    view kinds. Registering upstream primitives here would leak raw base-table rows to the subscriber.
517		if view.kind() == ViewKind::Transactional {
518			let current_flow_is_transactional = flow.get_node_ids().any(|nid| {
519				if let Some(n) = flow.get_node(&nid) {
520					let sink_view = match &n.ty {
521						SinkTableView {
522							view,
523							..
524						}
525						| SinkRingBufferView {
526							view,
527							..
528						}
529						| SinkSeriesView {
530							view,
531							..
532						} => Some(view),
533						_ => None,
534					};
535					sink_view
536						.and_then(|v| {
537							self.catalog.find_view(&mut txn.reborrow(), *v).ok().flatten()
538						})
539						.map(|v| v.kind() == ViewKind::Transactional)
540						.unwrap_or(false)
541				} else {
542					false
543				}
544			});
545
546			let current_flow_is_subscription = flow.get_node_ids().any(|nid| {
547				flow.get_node(&nid).map(|n| matches!(n.ty, SinkSubscription { .. })).unwrap_or(false)
548			});
549
550			if !current_flow_is_transactional && !current_flow_is_subscription {
551				let mut additional_sources = Vec::new();
552				if let Some(view_flow) = self.catalog.find_flow_by_name(
553					&mut txn.reborrow(),
554					view.namespace(),
555					view.name(),
556				)? {
557					let flow_nodes = self
558						.catalog
559						.list_flow_nodes_by_flow(&mut txn.reborrow(), view_flow.id)?;
560					for flow_node in &flow_nodes {
561						if let Ok(nt) = from_bytes::<FlowNodeType>(&flow_node.data)
562							&& let Some(shape) = nt.primitive_source_shape_id()
563						{
564							additional_sources.push(shape);
565						}
566					}
567				}
568				for source in additional_sources {
569					self.add_source(flow.id, node.id, source);
570				}
571			}
572		}
573
574		self.operators
575			.insert(node.id, Arc::new(Operators::SourceView(PrimitiveViewOperator::new(node.id, view))));
576		Ok(())
577	}
578
579	pub fn add_source(&mut self, flow: FlowId, node: FlowNodeId, shape: ShapeId) {
580		let nodes = self.sources.entry(shape).or_default();
581
582		let entry = (flow, node);
583		if !nodes.contains(&entry) {
584			nodes.push(entry);
585		}
586	}
587
588	pub fn add_sink(&mut self, flow: FlowId, node: FlowNodeId, sink: ShapeId) {
589		let nodes = self.sinks.entry(sink).or_default();
590
591		let entry = (flow, node);
592		if !nodes.contains(&entry) {
593			nodes.push(entry);
594		}
595	}
596}