reifydb_sub_flow/engine/
register.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::sync::Arc;
5
6use FlowNodeType::{Aggregate, SinkView, SourceFlow, SourceInlineData, SourceTable, SourceView};
7use reifydb_catalog::{
8	CatalogTableQueryOperations, CatalogViewQueryOperations, resolve::resolve_view,
9	transaction::CatalogFlowQueryOperations,
10};
11use reifydb_core::{
12	CommitVersion, Error,
13	interface::{FlowId, FlowNodeId, SourceId},
14};
15use reifydb_engine::StandardCommandTransaction;
16use reifydb_rql::flow::{
17	Flow, FlowNode, FlowNodeType,
18	FlowNodeType::{Apply, Distinct, Extend, Filter, Join, Map, Merge, Sort, Take, Window},
19};
20use reifydb_type::internal;
21use tracing::instrument;
22
23use super::eval::evaluate_operator_config;
24use crate::{
25	engine::FlowEngine,
26	operator::{
27		ApplyOperator, DistinctOperator, ExtendOperator, FilterOperator, JoinOperator, MapOperator,
28		MergeOperator, Operators, SinkViewOperator, SortOperator, SourceFlowOperator, SourceTableOperator,
29		SourceViewOperator, TakeOperator, WindowOperator,
30	},
31};
32
33impl FlowEngine {
34	#[instrument(level = "info", skip(self, txn), fields(flow_id = ?flow.id))]
35	pub fn register_without_backfill(&self, txn: &mut StandardCommandTransaction, flow: Flow) -> crate::Result<()> {
36		self.register(txn, flow, None)
37	}
38
39	#[instrument(level = "info", skip(self, txn), fields(flow_id = ?flow.id, backfill_version = flow_creation_version.0))]
40	pub fn register_with_backfill(
41		&self,
42		txn: &mut StandardCommandTransaction,
43		flow: Flow,
44		flow_creation_version: CommitVersion,
45	) -> crate::Result<()> {
46		self.register(txn, flow, Some(flow_creation_version))
47	}
48
49	#[instrument(level = "debug", skip(self, txn), fields(flow_id = ?flow.id, has_backfill = flow_creation_version.is_some()))]
50	fn register(
51		&self,
52		txn: &mut StandardCommandTransaction,
53		flow: Flow,
54		flow_creation_version: Option<CommitVersion>,
55	) -> crate::Result<()> {
56		debug_assert!(!self.inner.flows.read().contains_key(&flow.id), "Flow already registered");
57
58		for node_id in flow.topological_order()? {
59			let node = flow.get_node(&node_id).unwrap();
60			self.add(txn, &flow, node)?;
61		}
62
63		if let Some(flow_creation_version) = flow_creation_version {
64			self.inner.flow_creation_versions.write().insert(flow.id, flow_creation_version);
65
66			if let Err(e) = self.load_initial_data(txn, &flow, flow_creation_version) {
67				self.inner.flow_creation_versions.write().remove(&flow.id);
68				return Err(e);
69			}
70		}
71
72		// Add flow to analyzer for dependency tracking
73		self.inner.analyzer.write().add(flow.clone());
74		self.inner.flows.write().insert(flow.id, flow);
75
76		Ok(())
77	}
78
79	#[instrument(level = "debug", skip(self, txn, flow), fields(flow_id = ?flow.id, node_id = ?node.id, node_type = ?std::mem::discriminant(&node.ty)))]
80	fn add(&self, txn: &mut StandardCommandTransaction, flow: &Flow, node: &FlowNode) -> crate::Result<()> {
81		debug_assert!(!self.inner.operators.read().contains_key(&node.id), "Operator already registered");
82		let node = node.clone();
83
84		match node.ty {
85			SourceInlineData {
86				..
87			} => {
88				unimplemented!()
89			}
90			SourceTable {
91				table,
92			} => {
93				let table = txn.get_table(table)?;
94
95				self.add_source(flow.id, node.id, SourceId::table(table.id));
96				self.inner.operators.write().insert(
97					node.id,
98					Arc::new(Operators::SourceTable(SourceTableOperator::new(node.id, table))),
99				);
100			}
101			SourceView {
102				view,
103			} => {
104				let view = txn.get_view(view)?;
105				self.add_source(flow.id, node.id, SourceId::view(view.id));
106				self.inner.operators.write().insert(
107					node.id,
108					Arc::new(Operators::SourceView(SourceViewOperator::new(node.id, view))),
109				);
110			}
111			SourceFlow {
112				flow: source_flow,
113			} => {
114				let source_flow_def = txn.get_flow(source_flow)?;
115				self.add_source(flow.id, node.id, SourceId::flow(source_flow_def.id));
116				self.inner.operators.write().insert(
117					node.id,
118					Arc::new(Operators::SourceFlow(SourceFlowOperator::new(
119						node.id,
120						source_flow_def,
121					))),
122				);
123			}
124			SinkView {
125				view,
126			} => {
127				let parent = self
128					.inner
129					.operators
130					.read()
131					.get(&node.inputs[0])
132					.ok_or_else(|| Error(internal!("Parent operator not found")))?
133					.clone();
134
135				self.add_sink(flow.id, node.id, SourceId::view(*view));
136				self.inner.operators.write().insert(
137					node.id,
138					Arc::new(Operators::SinkView(SinkViewOperator::new(
139						parent,
140						node.id,
141						resolve_view(txn, view)?,
142					))),
143				);
144			}
145			Filter {
146				conditions,
147			} => {
148				let parent = self
149					.inner
150					.operators
151					.read()
152					.get(&node.inputs[0])
153					.ok_or_else(|| Error(internal!("Parent operator not found")))?
154					.clone();
155				self.inner.operators.write().insert(
156					node.id,
157					Arc::new(Operators::Filter(FilterOperator::new(parent, node.id, conditions))),
158				);
159			}
160			Map {
161				expressions,
162			} => {
163				let parent = self
164					.inner
165					.operators
166					.read()
167					.get(&node.inputs[0])
168					.ok_or_else(|| Error(internal!("Parent operator not found")))?
169					.clone();
170				self.inner.operators.write().insert(
171					node.id,
172					Arc::new(Operators::Map(MapOperator::new(parent, node.id, expressions))),
173				);
174			}
175			Extend {
176				expressions,
177			} => {
178				let parent = self
179					.inner
180					.operators
181					.read()
182					.get(&node.inputs[0])
183					.ok_or_else(|| Error(internal!("Parent operator not found")))?
184					.clone();
185				self.inner.operators.write().insert(
186					node.id,
187					Arc::new(Operators::Extend(ExtendOperator::new(parent, node.id, expressions))),
188				);
189			}
190			Sort {
191				by: _,
192			} => {
193				let parent = self
194					.inner
195					.operators
196					.read()
197					.get(&node.inputs[0])
198					.ok_or_else(|| Error(internal!("Parent operator not found")))?
199					.clone();
200				self.inner.operators.write().insert(
201					node.id,
202					Arc::new(Operators::Sort(SortOperator::new(parent, node.id, Vec::new()))),
203				);
204			}
205			Take {
206				limit,
207			} => {
208				let parent = self
209					.inner
210					.operators
211					.read()
212					.get(&node.inputs[0])
213					.ok_or_else(|| Error(internal!("Parent operator not found")))?
214					.clone();
215				self.inner.operators.write().insert(
216					node.id,
217					Arc::new(Operators::Take(TakeOperator::new(parent, node.id, limit))),
218				);
219			}
220			Join {
221				join_type,
222				left,
223				right,
224				alias,
225			} => {
226				// Find the left and right node IDs from the flow inputs
227				// The join node should have exactly 2 inputs
228				if node.inputs.len() != 2 {
229					return Err(Error(internal!("Join node must have exactly 2 inputs")));
230				}
231
232				let left_node = node.inputs[0];
233				let right_node = node.inputs[1];
234
235				let operators = self.inner.operators.read();
236				let left_parent = operators
237					.get(&left_node)
238					.ok_or_else(|| Error(internal!("Left parent operator not found")))?
239					.clone();
240
241				let right_parent = operators
242					.get(&right_node)
243					.ok_or_else(|| Error(internal!("Right parent operator not found")))?
244					.clone();
245				drop(operators);
246
247				self.inner.operators.write().insert(
248					node.id,
249					Arc::new(Operators::Join(JoinOperator::new(
250						left_parent,
251						right_parent,
252						node.id,
253						join_type,
254						left_node,
255						right_node,
256						left,
257						right,
258						alias,
259						self.inner.executor.clone(),
260					))),
261				);
262			}
263			Distinct {
264				expressions,
265			} => {
266				let parent = self
267					.inner
268					.operators
269					.read()
270					.get(&node.inputs[0])
271					.ok_or_else(|| Error(internal!("Parent operator not found")))?
272					.clone();
273				self.inner.operators.write().insert(
274					node.id,
275					Arc::new(Operators::Distinct(DistinctOperator::new(
276						parent,
277						node.id,
278						expressions,
279					))),
280				);
281			}
282			Merge {} => {
283				// Merge requires at least 2 inputs
284				if node.inputs.len() < 2 {
285					return Err(Error(internal!("Merge node must have at least 2 inputs")));
286				}
287
288				let operators = self.inner.operators.read();
289				let mut parents = Vec::with_capacity(node.inputs.len());
290
291				for input_node_id in &node.inputs {
292					let parent = operators
293						.get(input_node_id)
294						.ok_or_else(|| {
295							Error(internal!(
296								"Parent operator not found for input {:?}",
297								input_node_id
298							))
299						})?
300						.clone();
301					parents.push(parent);
302				}
303				drop(operators);
304
305				self.inner.operators.write().insert(
306					node.id,
307					Arc::new(Operators::Merge(MergeOperator::new(
308						node.id,
309						parents,
310						node.inputs.clone(),
311					))),
312				);
313			}
314			Apply {
315				operator,
316				expressions,
317			} => {
318				let parent = self
319					.inner
320					.operators
321					.read()
322					.get(&node.inputs[0])
323					.ok_or_else(|| Error(internal!("Parent operator not found")))?
324					.clone();
325
326				// Check if this is an FFI operator and use the appropriate creation method
327				let operator = if self.is_ffi_operator(operator.as_str()) {
328					let config = evaluate_operator_config(
329						expressions.as_slice(),
330						&self.inner.evaluator,
331					)?;
332					self.create_ffi_operator(operator.as_str(), node.id, &config)?
333				} else {
334					// Use registry for non-FFI operators
335					self.inner.registry.create_operator(
336						operator.as_str(),
337						node.id,
338						expressions.as_slice(),
339					)?
340				};
341
342				self.inner.operators.write().insert(
343					node.id,
344					Arc::new(Operators::Apply(ApplyOperator::new(parent, node.id, operator))),
345				);
346			}
347			Aggregate {
348				..
349			} => unimplemented!(),
350			Window {
351				window_type,
352				size,
353				slide,
354				group_by,
355				aggregations,
356				min_events,
357				max_window_count,
358				max_window_age,
359			} => {
360				let parent = self
361					.inner
362					.operators
363					.read()
364					.get(&node.inputs[0])
365					.ok_or_else(|| Error(internal!("Parent operator not found")))?
366					.clone();
367				let operator = WindowOperator::new(
368					parent,
369					node.id,
370					window_type.clone(),
371					size.clone(),
372					slide.clone(),
373					group_by.clone(),
374					aggregations.clone(),
375					min_events.clone(),
376					max_window_count.clone(),
377					max_window_age.clone(),
378				);
379				self.inner.operators.write().insert(node.id, Arc::new(Operators::Window(operator)));
380			}
381		}
382
383		Ok(())
384	}
385
386	fn add_source(&self, flow: FlowId, node: FlowNodeId, source: SourceId) {
387		let mut sources = self.inner.sources.write();
388		let nodes = sources.entry(source).or_insert_with(Vec::new);
389
390		let entry = (flow, node);
391		if !nodes.contains(&entry) {
392			nodes.push(entry);
393		}
394	}
395
396	fn add_sink(&self, flow: FlowId, node: FlowNodeId, sink: SourceId) {
397		let mut sinks = self.inner.sinks.write();
398		let nodes = sinks.entry(sink).or_insert_with(Vec::new);
399
400		let entry = (flow, node);
401		if !nodes.contains(&entry) {
402			nodes.push(entry);
403		}
404	}
405}