reifydb_sub_flow/engine/
register.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::sync::Arc;
5
6use FlowNodeType::{Aggregate, SinkView, SourceInlineData, SourceTable, SourceView};
7use reifydb_catalog::{CatalogTableQueryOperations, CatalogViewQueryOperations, resolve::resolve_view};
8use reifydb_core::{
9	Error,
10	interface::{FlowId, FlowNodeId, SourceId},
11};
12use reifydb_engine::StandardCommandTransaction;
13use reifydb_rql::flow::{
14	Flow, FlowNode, FlowNodeType,
15	FlowNodeType::{Apply, Distinct, Extend, Filter, Join, Map, Sort, Take, Union, Window},
16};
17use reifydb_type::internal;
18
19use crate::{
20	engine::FlowEngine,
21	operator::{
22		ApplyOperator, DistinctOperator, ExtendOperator, FilterOperator, JoinOperator, MapOperator, Operators,
23		SinkViewOperator, SortOperator, SourceTableOperator, SourceViewOperator, TakeOperator, WindowOperator,
24	},
25};
26
27impl FlowEngine {
28	pub fn register(&self, txn: &mut StandardCommandTransaction, flow: Flow) -> crate::Result<()> {
29		debug_assert!(!self.inner.flows.read().contains_key(&flow.id), "Flow already registered");
30
31		for node_id in flow.topological_order()? {
32			let node = flow.get_node(&node_id).unwrap();
33			self.add(txn, &flow, node)?;
34		}
35
36		// Add flow to analyzer for dependency tracking
37		self.inner.analyzer.write().add(flow.clone());
38		self.inner.flows.write().insert(flow.id, flow);
39
40		Ok(())
41	}
42
43	fn add(&self, txn: &mut StandardCommandTransaction, flow: &Flow, node: &FlowNode) -> crate::Result<()> {
44		debug_assert!(!self.inner.operators.read().contains_key(&node.id), "Operator already registered");
45		let node = node.clone();
46
47		match node.ty {
48			SourceInlineData {
49				..
50			} => {
51				unimplemented!()
52			}
53			SourceTable {
54				table,
55			} => {
56				let table = txn.get_table(table)?;
57
58				self.add_source(flow.id, node.id, SourceId::table(table.id));
59				self.inner.operators.write().insert(
60					node.id,
61					Arc::new(Operators::SourceTable(SourceTableOperator::new(node.id, table))),
62				);
63			}
64			SourceView {
65				view,
66			} => {
67				let view = txn.get_view(view)?;
68				self.add_source(flow.id, node.id, SourceId::view(view.id));
69				self.inner.operators.write().insert(
70					node.id,
71					Arc::new(Operators::SourceView(SourceViewOperator::new(node.id, view))),
72				);
73			}
74			SinkView {
75				view,
76			} => {
77				let parent = self
78					.inner
79					.operators
80					.read()
81					.get(&node.inputs[0])
82					.ok_or_else(|| Error(internal!("Parent operator not found")))?
83					.clone();
84
85				self.add_sink(flow.id, node.id, SourceId::view(*view));
86				self.inner.operators.write().insert(
87					node.id,
88					Arc::new(Operators::SinkView(SinkViewOperator::new(
89						parent,
90						node.id,
91						resolve_view(txn, view)?,
92					))),
93				);
94			}
95			Filter {
96				conditions,
97			} => {
98				let parent = self
99					.inner
100					.operators
101					.read()
102					.get(&node.inputs[0])
103					.ok_or_else(|| Error(internal!("Parent operator not found")))?
104					.clone();
105				self.inner.operators.write().insert(
106					node.id,
107					Arc::new(Operators::Filter(FilterOperator::new(parent, node.id, conditions))),
108				);
109			}
110			Map {
111				expressions,
112			} => {
113				let parent = self
114					.inner
115					.operators
116					.read()
117					.get(&node.inputs[0])
118					.ok_or_else(|| Error(internal!("Parent operator not found")))?
119					.clone();
120				self.inner.operators.write().insert(
121					node.id,
122					Arc::new(Operators::Map(MapOperator::new(parent, node.id, expressions))),
123				);
124			}
125			Extend {
126				expressions,
127			} => {
128				let parent = self
129					.inner
130					.operators
131					.read()
132					.get(&node.inputs[0])
133					.ok_or_else(|| Error(internal!("Parent operator not found")))?
134					.clone();
135				self.inner.operators.write().insert(
136					node.id,
137					Arc::new(Operators::Extend(ExtendOperator::new(parent, node.id, expressions))),
138				);
139			}
140			Sort {
141				by: _,
142			} => {
143				let parent = self
144					.inner
145					.operators
146					.read()
147					.get(&node.inputs[0])
148					.ok_or_else(|| Error(internal!("Parent operator not found")))?
149					.clone();
150				self.inner.operators.write().insert(
151					node.id,
152					Arc::new(Operators::Sort(SortOperator::new(parent, node.id, Vec::new()))),
153				);
154			}
155			Take {
156				limit,
157			} => {
158				let parent = self
159					.inner
160					.operators
161					.read()
162					.get(&node.inputs[0])
163					.ok_or_else(|| Error(internal!("Parent operator not found")))?
164					.clone();
165				self.inner.operators.write().insert(
166					node.id,
167					Arc::new(Operators::Take(TakeOperator::new(parent, node.id, limit))),
168				);
169			}
170			Join {
171				join_type,
172				left,
173				right,
174				alias,
175			} => {
176				// Find the left and right node IDs from the flow inputs
177				// The join node should have exactly 2 inputs
178				if node.inputs.len() != 2 {
179					return Err(Error(internal!("Join node must have exactly 2 inputs")));
180				}
181
182				let left_node = node.inputs[0];
183				let right_node = node.inputs[1];
184
185				let operators = self.inner.operators.read();
186				let left_parent = operators
187					.get(&left_node)
188					.ok_or_else(|| Error(internal!("Left parent operator not found")))?
189					.clone();
190
191				let right_parent = operators
192					.get(&right_node)
193					.ok_or_else(|| Error(internal!("Right parent operator not found")))?
194					.clone();
195				drop(operators);
196
197				self.inner.operators.write().insert(
198					node.id,
199					Arc::new(Operators::Join(JoinOperator::new(
200						left_parent,
201						right_parent,
202						node.id,
203						join_type,
204						left_node,
205						right_node,
206						left,
207						right,
208						alias,
209						self.inner.executor.clone(),
210					))),
211				);
212			}
213			Distinct {
214				expressions,
215			} => {
216				let parent = self
217					.inner
218					.operators
219					.read()
220					.get(&node.inputs[0])
221					.ok_or_else(|| Error(internal!("Parent operator not found")))?
222					.clone();
223				self.inner.operators.write().insert(
224					node.id,
225					Arc::new(Operators::Distinct(DistinctOperator::new(
226						parent,
227						node.id,
228						expressions,
229					))),
230				);
231			}
232			// Union {} => Ok(Operators::Union(UnionOperator::new())),
233			Union {} => unimplemented!(),
234			Apply {
235				operator_name,
236				expressions,
237			} => {
238				let parent = self
239					.inner
240					.operators
241					.read()
242					.get(&node.inputs[0])
243					.ok_or_else(|| Error(internal!("Parent operator not found")))?
244					.clone();
245
246				// Check if this is an FFI operator and use the appropriate creation method
247				let operator = if self.is_ffi_operator(operator_name.as_str()) {
248					// Use the shared loader to prevent library unloading
249					self.create_ffi_operator(operator_name.as_str(), node.id)?
250				} else {
251					// Use registry for non-FFI operators
252					self.inner.registry.create_operator(
253						operator_name.as_str(),
254						node.id,
255						expressions.as_slice(),
256					)?
257				};
258
259				self.inner.operators.write().insert(
260					node.id,
261					Arc::new(Operators::Apply(ApplyOperator::new(parent, node.id, operator))),
262				);
263			}
264			Aggregate {
265				..
266			} => unimplemented!(),
267			Window {
268				window_type,
269				size,
270				slide,
271				group_by,
272				aggregations,
273				min_events,
274				max_window_count,
275				max_window_age,
276			} => {
277				let parent = self
278					.inner
279					.operators
280					.read()
281					.get(&node.inputs[0])
282					.ok_or_else(|| Error(internal!("Parent operator not found")))?
283					.clone();
284				let operator = WindowOperator::new(
285					parent,
286					node.id,
287					window_type.clone(),
288					size.clone(),
289					slide.clone(),
290					group_by.clone(),
291					aggregations.clone(),
292					min_events.clone(),
293					max_window_count.clone(),
294					max_window_age.clone(),
295				);
296				self.inner.operators.write().insert(node.id, Arc::new(Operators::Window(operator)));
297			}
298		}
299
300		Ok(())
301	}
302
303	fn add_source(&self, flow: FlowId, node: FlowNodeId, source: SourceId) {
304		let mut sources = self.inner.sources.write();
305		let nodes = sources.entry(source).or_insert_with(Vec::new);
306
307		let entry = (flow, node);
308		if !nodes.contains(&entry) {
309			nodes.push(entry);
310		}
311	}
312
313	fn add_sink(&self, flow: FlowId, node: FlowNodeId, sink: SourceId) {
314		let mut sinks = self.inner.sinks.write();
315		let nodes = sinks.entry(sink).or_insert_with(Vec::new);
316
317		let entry = (flow, node);
318		if !nodes.contains(&entry) {
319			nodes.push(entry);
320		}
321	}
322}