Skip to main content

reifydb_sub_flow/engine/
register.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use reifydb_core::{
7	interface::catalog::{
8		flow::{FlowId, FlowNodeId},
9		primitive::PrimitiveId,
10	},
11	internal,
12};
13use reifydb_rql::flow::{
14	flow::FlowDag,
15	node::{
16		FlowNode,
17		FlowNodeType::{
18			Aggregate, Append, Apply, Distinct, Extend, Filter, Join, Map, SinkSubscription, SinkView,
19			Sort, SourceFlow, SourceInlineData, SourceTable, SourceView, Take, Window,
20		},
21	},
22};
23use reifydb_transaction::transaction::{Transaction, command::CommandTransaction};
24use reifydb_type::error::Error;
25use tracing::instrument;
26
27use super::eval::evaluate_operator_config;
28use crate::{
29	engine::FlowEngine,
30	operator::{
31		Operators,
32		append::AppendOperator,
33		apply::ApplyOperator,
34		distinct::DistinctOperator,
35		extend::ExtendOperator,
36		filter::FilterOperator,
37		join::operator::JoinOperator,
38		map::MapOperator,
39		scan::{flow::PrimitiveFlowOperator, table::PrimitiveTableOperator, view::PrimitiveViewOperator},
40		sink::{subscription::SinkSubscriptionOperator, view::SinkViewOperator},
41		sort::SortOperator,
42		take::TakeOperator,
43		window::WindowOperator,
44	},
45};
46
47impl FlowEngine {
48	#[instrument(name = "flow::register", level = "debug", skip(self, txn), fields(flow_id = ?flow.id))]
49	pub fn register(&mut self, txn: &mut CommandTransaction, flow: FlowDag) -> reifydb_type::Result<()> {
50		debug_assert!(!self.flows.contains_key(&flow.id), "Flow already registered");
51
52		for node_id in flow.topological_order()? {
53			let node = flow.get_node(&node_id).unwrap();
54			self.add(txn, &flow, node)?;
55		}
56
57		self.analyzer.add(flow.clone());
58		self.flows.insert(flow.id, flow);
59
60		Ok(())
61	}
62
63	#[instrument(name = "flow::register::add_node", level = "debug", skip(self, txn, flow), fields(flow_id = ?flow.id, node_id = ?node.id, node_type = ?std::mem::discriminant(&node.ty)))]
64	fn add(&mut self, txn: &mut CommandTransaction, flow: &FlowDag, node: &FlowNode) -> reifydb_type::Result<()> {
65		debug_assert!(!self.operators.contains_key(&node.id), "Operator already registered");
66		let node = node.clone();
67
68		match node.ty {
69			SourceInlineData {
70				..
71			} => {
72				unimplemented!()
73			}
74			SourceTable {
75				table,
76			} => {
77				let table = self.catalog.get_table(&mut Transaction::Command(&mut *txn), table)?;
78
79				self.add_source(flow.id, node.id, PrimitiveId::table(table.id));
80				self.operators.insert(
81					node.id,
82					Arc::new(Operators::SourceTable(PrimitiveTableOperator::new(node.id, table))),
83				);
84			}
85			SourceView {
86				view,
87			} => {
88				let view = self.catalog.get_view(&mut Transaction::Command(&mut *txn), view)?;
89				self.add_source(flow.id, node.id, PrimitiveId::view(view.id));
90				self.operators.insert(
91					node.id,
92					Arc::new(Operators::SourceView(PrimitiveViewOperator::new(node.id, view))),
93				);
94			}
95			SourceFlow {
96				flow: source_flow,
97			} => {
98				let source_flow_def =
99					self.catalog.get_flow(&mut Transaction::Command(&mut *txn), source_flow)?;
100				self.add_source(flow.id, node.id, PrimitiveId::flow(source_flow_def.id));
101				self.operators.insert(
102					node.id,
103					Arc::new(Operators::SourceFlow(PrimitiveFlowOperator::new(
104						node.id,
105						source_flow_def,
106					))),
107				);
108			}
109			SinkView {
110				view,
111			} => {
112				let parent = self
113					.operators
114					.get(&node.inputs[0])
115					.ok_or_else(|| Error(internal!("Parent operator not found")))?
116					.clone();
117
118				self.add_sink(flow.id, node.id, PrimitiveId::view(*view));
119				let resolved = self.catalog.resolve_view(&mut Transaction::Command(&mut *txn), view)?;
120				self.operators.insert(
121					node.id,
122					Arc::new(Operators::SinkView(SinkViewOperator::new(parent, node.id, resolved))),
123				);
124			}
125			SinkSubscription {
126				subscription,
127			} => {
128				// Guard against race condition: flow may have been deleted during loading
129				if node.inputs.is_empty() {
130					return Err(Error(internal!(
131						"SinkSubscription node has no inputs - flow may have been deleted during loading"
132					)));
133				}
134				let parent = self
135					.operators
136					.get(&node.inputs[0])
137					.ok_or_else(|| Error(internal!("Parent operator not found")))?
138					.clone();
139
140				// Note: Subscriptions use UUID-based IDs and are not added to the sinks map
141				// which uses PrimitiveId (u64-based). Subscriptions are ephemeral 1:1 mapped.
142				let resolved = self
143					.catalog
144					.resolve_subscription(&mut Transaction::Command(&mut *txn), subscription)?;
145				self.operators.insert(
146					node.id,
147					Arc::new(Operators::SinkSubscription(SinkSubscriptionOperator::new(
148						parent, node.id, resolved,
149					))),
150				);
151			}
152			Filter {
153				conditions,
154			} => {
155				let parent = self
156					.operators
157					.get(&node.inputs[0])
158					.ok_or_else(|| Error(internal!("Parent operator not found")))?
159					.clone();
160				self.operators.insert(
161					node.id,
162					Arc::new(Operators::Filter(FilterOperator::new(
163						parent,
164						node.id,
165						conditions,
166						self.executor.functions.clone(),
167						self.clock.clone(),
168					))),
169				);
170			}
171			Map {
172				expressions,
173			} => {
174				let parent = self
175					.operators
176					.get(&node.inputs[0])
177					.ok_or_else(|| Error(internal!("Parent operator not found")))?
178					.clone();
179				self.operators.insert(
180					node.id,
181					Arc::new(Operators::Map(MapOperator::new(
182						parent,
183						node.id,
184						expressions,
185						self.executor.functions.clone(),
186						self.clock.clone(),
187					))),
188				);
189			}
190			Extend {
191				expressions,
192			} => {
193				let parent = self
194					.operators
195					.get(&node.inputs[0])
196					.ok_or_else(|| Error(internal!("Parent operator not found")))?
197					.clone();
198				self.operators.insert(
199					node.id,
200					Arc::new(Operators::Extend(ExtendOperator::new(parent, node.id, expressions))),
201				);
202			}
203			Sort {
204				by: _,
205			} => {
206				let parent = self
207					.operators
208					.get(&node.inputs[0])
209					.ok_or_else(|| Error(internal!("Parent operator not found")))?
210					.clone();
211				self.operators.insert(
212					node.id,
213					Arc::new(Operators::Sort(SortOperator::new(parent, node.id, Vec::new()))),
214				);
215			}
216			Take {
217				limit,
218			} => {
219				let parent = self
220					.operators
221					.get(&node.inputs[0])
222					.ok_or_else(|| Error(internal!("Parent operator not found")))?
223					.clone();
224				self.operators.insert(
225					node.id,
226					Arc::new(Operators::Take(TakeOperator::new(parent, node.id, limit))),
227				);
228			}
229			Join {
230				join_type,
231				left,
232				right,
233				alias,
234			} => {
235				// The join node should have exactly 2 inputs
236				if node.inputs.len() != 2 {
237					return Err(Error(internal!("Join node must have exactly 2 inputs")));
238				}
239
240				let left_node = node.inputs[0];
241				let right_node = node.inputs[1];
242
243				let left_parent = self
244					.operators
245					.get(&left_node)
246					.ok_or_else(|| Error(internal!("Left parent operator not found")))?
247					.clone();
248
249				let right_parent = self
250					.operators
251					.get(&right_node)
252					.ok_or_else(|| Error(internal!("Right parent operator not found")))?
253					.clone();
254
255				self.operators.insert(
256					node.id,
257					Arc::new(Operators::Join(JoinOperator::new(
258						left_parent,
259						right_parent,
260						node.id,
261						join_type,
262						left_node,
263						right_node,
264						left,
265						right,
266						alias,
267						self.executor.clone(),
268					))),
269				);
270			}
271			Distinct {
272				expressions,
273			} => {
274				let parent = self
275					.operators
276					.get(&node.inputs[0])
277					.ok_or_else(|| Error(internal!("Parent operator not found")))?
278					.clone();
279				self.operators.insert(
280					node.id,
281					Arc::new(Operators::Distinct(DistinctOperator::new(
282						parent,
283						node.id,
284						expressions,
285						self.executor.functions.clone(),
286						self.clock.clone(),
287					))),
288				);
289			}
290			Append {} => {
291				// Append requires at least 2 inputs
292				if node.inputs.len() < 2 {
293					return Err(Error(internal!("Append node must have at least 2 inputs")));
294				}
295
296				let mut parents = Vec::with_capacity(node.inputs.len());
297
298				for input_node_id in &node.inputs {
299					let parent = self
300						.operators
301						.get(input_node_id)
302						.ok_or_else(|| {
303							Error(internal!(
304								"Parent operator not found for input {:?}",
305								input_node_id
306							))
307						})?
308						.clone();
309					parents.push(parent);
310				}
311
312				self.operators.insert(
313					node.id,
314					Arc::new(Operators::Append(AppendOperator::new(
315						node.id,
316						parents,
317						node.inputs.clone(),
318					))),
319				);
320			}
321			Apply {
322				operator,
323				expressions,
324			} => {
325				#[cfg(reifydb_target = "native")]
326				{
327					let parent = self
328						.operators
329						.get(&node.inputs[0])
330						.ok_or_else(|| Error(internal!("Parent operator not found")))?
331						.clone();
332
333					if !self.is_ffi_operator(operator.as_str()) {
334						unimplemented!("only ffi operators can be used")
335					}
336
337					let config = evaluate_operator_config(
338						expressions.as_slice(),
339						&self.executor.functions,
340						&self.clock,
341					)?;
342					let operator = self.create_ffi_operator(operator.as_str(), node.id, &config)?;
343
344					self.operators.insert(
345						node.id,
346						Arc::new(Operators::Apply(ApplyOperator::new(
347							parent, node.id, operator,
348						))),
349					);
350				}
351				#[cfg(not(reifydb_target = "native"))]
352				{
353					let _ = (operator, expressions);
354					return Err(Error(internal!("FFI operators are not supported in WASM")));
355				}
356			}
357			Aggregate {
358				..
359			} => unimplemented!(),
360			Window {
361				window_type,
362				size,
363				slide,
364				group_by,
365				aggregations,
366				min_events,
367				max_window_count,
368				max_window_age,
369			} => {
370				let parent = self
371					.operators
372					.get(&node.inputs[0])
373					.ok_or_else(|| Error(internal!("Parent operator not found")))?
374					.clone();
375				let operator = WindowOperator::new(
376					parent,
377					node.id,
378					window_type.clone(),
379					size.clone(),
380					slide.clone(),
381					group_by.clone(),
382					aggregations.clone(),
383					min_events.clone(),
384					max_window_count.clone(),
385					max_window_age.clone(),
386					self.clock.clone(),
387					self.executor.functions.clone(),
388				);
389				self.operators.insert(node.id, Arc::new(Operators::Window(operator)));
390			}
391		}
392
393		Ok(())
394	}
395
396	fn add_source(&mut self, flow: FlowId, node: FlowNodeId, source: PrimitiveId) {
397		let nodes = self.sources.entry(source).or_insert_with(Vec::new);
398
399		let entry = (flow, node);
400		if !nodes.contains(&entry) {
401			nodes.push(entry);
402		}
403	}
404
405	fn add_sink(&mut self, flow: FlowId, node: FlowNodeId, sink: PrimitiveId) {
406		let nodes = self.sinks.entry(sink).or_insert_with(Vec::new);
407
408		let entry = (flow, node);
409		if !nodes.contains(&entry) {
410			nodes.push(entry);
411		}
412	}
413}