reifydb_sub_flow/engine/
register.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::sync::Arc;
5
6use FlowNodeType::{Aggregate, SinkView, SourceFlow, SourceInlineData, SourceTable, SourceView};
7use reifydb_catalog::{
8	CatalogTableQueryOperations, CatalogViewQueryOperations, resolve::resolve_view,
9	transaction::CatalogFlowQueryOperations,
10};
11use reifydb_core::{
12	CommitVersion, Error,
13	interface::{FlowId, FlowNodeId, SourceId},
14};
15use reifydb_engine::StandardCommandTransaction;
16use reifydb_rql::flow::{
17	Flow, FlowNode, FlowNodeType,
18	FlowNodeType::{Apply, Distinct, Extend, Filter, Join, Map, Sort, Take, Union, Window},
19};
20use reifydb_type::internal;
21
22use super::eval::evaluate_operator_config;
23use crate::{
24	engine::FlowEngine,
25	operator::{
26		ApplyOperator, DistinctOperator, ExtendOperator, FilterOperator, JoinOperator, MapOperator, Operators,
27		SinkViewOperator, SortOperator, SourceFlowOperator, SourceTableOperator, SourceViewOperator,
28		TakeOperator, WindowOperator,
29	},
30};
31
32impl FlowEngine {
33	pub fn register_without_backfill(&self, txn: &mut StandardCommandTransaction, flow: Flow) -> crate::Result<()> {
34		self.register(txn, flow, None)
35	}
36
37	pub fn register_with_backfill(
38		&self,
39		txn: &mut StandardCommandTransaction,
40		flow: Flow,
41		flow_creation_version: CommitVersion,
42	) -> crate::Result<()> {
43		self.register(txn, flow, Some(flow_creation_version))
44	}
45
46	fn register(
47		&self,
48		txn: &mut StandardCommandTransaction,
49		flow: Flow,
50		flow_creation_version: Option<CommitVersion>,
51	) -> crate::Result<()> {
52		debug_assert!(!self.inner.flows.read().contains_key(&flow.id), "Flow already registered");
53
54		for node_id in flow.topological_order()? {
55			let node = flow.get_node(&node_id).unwrap();
56			self.add(txn, &flow, node)?;
57		}
58
59		if let Some(flow_creation_version) = flow_creation_version {
60			self.inner.flow_creation_versions.write().insert(flow.id, flow_creation_version);
61
62			if let Err(e) = self.load_initial_data(txn, &flow, flow_creation_version) {
63				self.inner.flow_creation_versions.write().remove(&flow.id);
64				return Err(e);
65			}
66		}
67
68		// Add flow to analyzer for dependency tracking
69		self.inner.analyzer.write().add(flow.clone());
70		self.inner.flows.write().insert(flow.id, flow);
71
72		Ok(())
73	}
74
75	fn add(&self, txn: &mut StandardCommandTransaction, flow: &Flow, node: &FlowNode) -> crate::Result<()> {
76		debug_assert!(!self.inner.operators.read().contains_key(&node.id), "Operator already registered");
77		let node = node.clone();
78
79		match node.ty {
80			SourceInlineData {
81				..
82			} => {
83				unimplemented!()
84			}
85			SourceTable {
86				table,
87			} => {
88				let table = txn.get_table(table)?;
89
90				self.add_source(flow.id, node.id, SourceId::table(table.id));
91				self.inner.operators.write().insert(
92					node.id,
93					Arc::new(Operators::SourceTable(SourceTableOperator::new(node.id, table))),
94				);
95			}
96			SourceView {
97				view,
98			} => {
99				let view = txn.get_view(view)?;
100				self.add_source(flow.id, node.id, SourceId::view(view.id));
101				self.inner.operators.write().insert(
102					node.id,
103					Arc::new(Operators::SourceView(SourceViewOperator::new(node.id, view))),
104				);
105			}
106			SourceFlow {
107				flow: source_flow,
108			} => {
109				let source_flow_def = txn.get_flow(source_flow)?;
110				self.add_source(flow.id, node.id, SourceId::flow(source_flow_def.id));
111				self.inner.operators.write().insert(
112					node.id,
113					Arc::new(Operators::SourceFlow(SourceFlowOperator::new(
114						node.id,
115						source_flow_def,
116					))),
117				);
118			}
119			SinkView {
120				view,
121			} => {
122				let parent = self
123					.inner
124					.operators
125					.read()
126					.get(&node.inputs[0])
127					.ok_or_else(|| Error(internal!("Parent operator not found")))?
128					.clone();
129
130				self.add_sink(flow.id, node.id, SourceId::view(*view));
131				self.inner.operators.write().insert(
132					node.id,
133					Arc::new(Operators::SinkView(SinkViewOperator::new(
134						parent,
135						node.id,
136						resolve_view(txn, view)?,
137					))),
138				);
139			}
140			Filter {
141				conditions,
142			} => {
143				let parent = self
144					.inner
145					.operators
146					.read()
147					.get(&node.inputs[0])
148					.ok_or_else(|| Error(internal!("Parent operator not found")))?
149					.clone();
150				self.inner.operators.write().insert(
151					node.id,
152					Arc::new(Operators::Filter(FilterOperator::new(parent, node.id, conditions))),
153				);
154			}
155			Map {
156				expressions,
157			} => {
158				let parent = self
159					.inner
160					.operators
161					.read()
162					.get(&node.inputs[0])
163					.ok_or_else(|| Error(internal!("Parent operator not found")))?
164					.clone();
165				self.inner.operators.write().insert(
166					node.id,
167					Arc::new(Operators::Map(MapOperator::new(parent, node.id, expressions))),
168				);
169			}
170			Extend {
171				expressions,
172			} => {
173				let parent = self
174					.inner
175					.operators
176					.read()
177					.get(&node.inputs[0])
178					.ok_or_else(|| Error(internal!("Parent operator not found")))?
179					.clone();
180				self.inner.operators.write().insert(
181					node.id,
182					Arc::new(Operators::Extend(ExtendOperator::new(parent, node.id, expressions))),
183				);
184			}
185			Sort {
186				by: _,
187			} => {
188				let parent = self
189					.inner
190					.operators
191					.read()
192					.get(&node.inputs[0])
193					.ok_or_else(|| Error(internal!("Parent operator not found")))?
194					.clone();
195				self.inner.operators.write().insert(
196					node.id,
197					Arc::new(Operators::Sort(SortOperator::new(parent, node.id, Vec::new()))),
198				);
199			}
200			Take {
201				limit,
202			} => {
203				let parent = self
204					.inner
205					.operators
206					.read()
207					.get(&node.inputs[0])
208					.ok_or_else(|| Error(internal!("Parent operator not found")))?
209					.clone();
210				self.inner.operators.write().insert(
211					node.id,
212					Arc::new(Operators::Take(TakeOperator::new(parent, node.id, limit))),
213				);
214			}
215			Join {
216				join_type,
217				left,
218				right,
219				alias,
220			} => {
221				// Find the left and right node IDs from the flow inputs
222				// The join node should have exactly 2 inputs
223				if node.inputs.len() != 2 {
224					return Err(Error(internal!("Join node must have exactly 2 inputs")));
225				}
226
227				let left_node = node.inputs[0];
228				let right_node = node.inputs[1];
229
230				let operators = self.inner.operators.read();
231				let left_parent = operators
232					.get(&left_node)
233					.ok_or_else(|| Error(internal!("Left parent operator not found")))?
234					.clone();
235
236				let right_parent = operators
237					.get(&right_node)
238					.ok_or_else(|| Error(internal!("Right parent operator not found")))?
239					.clone();
240				drop(operators);
241
242				self.inner.operators.write().insert(
243					node.id,
244					Arc::new(Operators::Join(JoinOperator::new(
245						left_parent,
246						right_parent,
247						node.id,
248						join_type,
249						left_node,
250						right_node,
251						left,
252						right,
253						alias,
254						self.inner.executor.clone(),
255					))),
256				);
257			}
258			Distinct {
259				expressions,
260			} => {
261				let parent = self
262					.inner
263					.operators
264					.read()
265					.get(&node.inputs[0])
266					.ok_or_else(|| Error(internal!("Parent operator not found")))?
267					.clone();
268				self.inner.operators.write().insert(
269					node.id,
270					Arc::new(Operators::Distinct(DistinctOperator::new(
271						parent,
272						node.id,
273						expressions,
274					))),
275				);
276			}
277			// Union {} => Ok(Operators::Union(UnionOperator::new())),
278			Union {} => unimplemented!(),
279			Apply {
280				operator_name,
281				expressions,
282			} => {
283				let parent = self
284					.inner
285					.operators
286					.read()
287					.get(&node.inputs[0])
288					.ok_or_else(|| Error(internal!("Parent operator not found")))?
289					.clone();
290
291				// Check if this is an FFI operator and use the appropriate creation method
292				let operator = if self.is_ffi_operator(operator_name.as_str()) {
293					let config = evaluate_operator_config(
294						expressions.as_slice(),
295						&self.inner.evaluator,
296					)?;
297					self.create_ffi_operator(operator_name.as_str(), node.id, &config)?
298				} else {
299					// Use registry for non-FFI operators
300					self.inner.registry.create_operator(
301						operator_name.as_str(),
302						node.id,
303						expressions.as_slice(),
304					)?
305				};
306
307				self.inner.operators.write().insert(
308					node.id,
309					Arc::new(Operators::Apply(ApplyOperator::new(parent, node.id, operator))),
310				);
311			}
312			Aggregate {
313				..
314			} => unimplemented!(),
315			Window {
316				window_type,
317				size,
318				slide,
319				group_by,
320				aggregations,
321				min_events,
322				max_window_count,
323				max_window_age,
324			} => {
325				let parent = self
326					.inner
327					.operators
328					.read()
329					.get(&node.inputs[0])
330					.ok_or_else(|| Error(internal!("Parent operator not found")))?
331					.clone();
332				let operator = WindowOperator::new(
333					parent,
334					node.id,
335					window_type.clone(),
336					size.clone(),
337					slide.clone(),
338					group_by.clone(),
339					aggregations.clone(),
340					min_events.clone(),
341					max_window_count.clone(),
342					max_window_age.clone(),
343				);
344				self.inner.operators.write().insert(node.id, Arc::new(Operators::Window(operator)));
345			}
346		}
347
348		Ok(())
349	}
350
351	fn add_source(&self, flow: FlowId, node: FlowNodeId, source: SourceId) {
352		let mut sources = self.inner.sources.write();
353		let nodes = sources.entry(source).or_insert_with(Vec::new);
354
355		let entry = (flow, node);
356		if !nodes.contains(&entry) {
357			nodes.push(entry);
358		}
359	}
360
361	fn add_sink(&self, flow: FlowId, node: FlowNodeId, sink: SourceId) {
362		let mut sinks = self.inner.sinks.write();
363		let nodes = sinks.entry(sink).or_insert_with(Vec::new);
364
365		let entry = (flow, node);
366		if !nodes.contains(&entry) {
367			nodes.push(entry);
368		}
369	}
370}