reifydb_sub_flow/engine/
register.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::sync::Arc;
5
6use FlowNodeType::{Aggregate, SinkView, SourceInlineData, SourceTable, SourceView};
7use reifydb_catalog::{CatalogTableQueryOperations, CatalogViewQueryOperations, resolve::resolve_view};
8use reifydb_core::{
9	Error,
10	interface::{FlowId, FlowNodeId, SourceId},
11};
12use reifydb_engine::StandardCommandTransaction;
13use reifydb_rql::flow::{
14	Flow, FlowNode, FlowNodeType,
15	FlowNodeType::{Apply, Distinct, Extend, Filter, Join, Map, Sort, Take, Union, Window},
16};
17use reifydb_type::internal;
18
19use super::eval::evaluate_operator_config;
20use crate::{
21	engine::FlowEngine,
22	operator::{
23		ApplyOperator, DistinctOperator, ExtendOperator, FilterOperator, JoinOperator, MapOperator, Operators,
24		SinkViewOperator, SortOperator, SourceTableOperator, SourceViewOperator, TakeOperator, WindowOperator,
25	},
26};
27
28impl FlowEngine {
29	pub fn register(&self, txn: &mut StandardCommandTransaction, flow: Flow) -> crate::Result<()> {
30		debug_assert!(!self.inner.flows.read().contains_key(&flow.id), "Flow already registered");
31
32		for node_id in flow.topological_order()? {
33			let node = flow.get_node(&node_id).unwrap();
34			self.add(txn, &flow, node)?;
35		}
36
37		// Add flow to analyzer for dependency tracking
38		self.inner.analyzer.write().add(flow.clone());
39		self.inner.flows.write().insert(flow.id, flow);
40
41		Ok(())
42	}
43
44	fn add(&self, txn: &mut StandardCommandTransaction, flow: &Flow, node: &FlowNode) -> crate::Result<()> {
45		debug_assert!(!self.inner.operators.read().contains_key(&node.id), "Operator already registered");
46		let node = node.clone();
47
48		match node.ty {
49			SourceInlineData {
50				..
51			} => {
52				unimplemented!()
53			}
54			SourceTable {
55				table,
56			} => {
57				let table = txn.get_table(table)?;
58
59				self.add_source(flow.id, node.id, SourceId::table(table.id));
60				self.inner.operators.write().insert(
61					node.id,
62					Arc::new(Operators::SourceTable(SourceTableOperator::new(node.id, table))),
63				);
64			}
65			SourceView {
66				view,
67			} => {
68				let view = txn.get_view(view)?;
69				self.add_source(flow.id, node.id, SourceId::view(view.id));
70				self.inner.operators.write().insert(
71					node.id,
72					Arc::new(Operators::SourceView(SourceViewOperator::new(node.id, view))),
73				);
74			}
75			SinkView {
76				view,
77			} => {
78				let parent = self
79					.inner
80					.operators
81					.read()
82					.get(&node.inputs[0])
83					.ok_or_else(|| Error(internal!("Parent operator not found")))?
84					.clone();
85
86				self.add_sink(flow.id, node.id, SourceId::view(*view));
87				self.inner.operators.write().insert(
88					node.id,
89					Arc::new(Operators::SinkView(SinkViewOperator::new(
90						parent,
91						node.id,
92						resolve_view(txn, view)?,
93					))),
94				);
95			}
96			Filter {
97				conditions,
98			} => {
99				let parent = self
100					.inner
101					.operators
102					.read()
103					.get(&node.inputs[0])
104					.ok_or_else(|| Error(internal!("Parent operator not found")))?
105					.clone();
106				self.inner.operators.write().insert(
107					node.id,
108					Arc::new(Operators::Filter(FilterOperator::new(parent, node.id, conditions))),
109				);
110			}
111			Map {
112				expressions,
113			} => {
114				let parent = self
115					.inner
116					.operators
117					.read()
118					.get(&node.inputs[0])
119					.ok_or_else(|| Error(internal!("Parent operator not found")))?
120					.clone();
121				self.inner.operators.write().insert(
122					node.id,
123					Arc::new(Operators::Map(MapOperator::new(parent, node.id, expressions))),
124				);
125			}
126			Extend {
127				expressions,
128			} => {
129				let parent = self
130					.inner
131					.operators
132					.read()
133					.get(&node.inputs[0])
134					.ok_or_else(|| Error(internal!("Parent operator not found")))?
135					.clone();
136				self.inner.operators.write().insert(
137					node.id,
138					Arc::new(Operators::Extend(ExtendOperator::new(parent, node.id, expressions))),
139				);
140			}
141			Sort {
142				by: _,
143			} => {
144				let parent = self
145					.inner
146					.operators
147					.read()
148					.get(&node.inputs[0])
149					.ok_or_else(|| Error(internal!("Parent operator not found")))?
150					.clone();
151				self.inner.operators.write().insert(
152					node.id,
153					Arc::new(Operators::Sort(SortOperator::new(parent, node.id, Vec::new()))),
154				);
155			}
156			Take {
157				limit,
158			} => {
159				let parent = self
160					.inner
161					.operators
162					.read()
163					.get(&node.inputs[0])
164					.ok_or_else(|| Error(internal!("Parent operator not found")))?
165					.clone();
166				self.inner.operators.write().insert(
167					node.id,
168					Arc::new(Operators::Take(TakeOperator::new(parent, node.id, limit))),
169				);
170			}
171			Join {
172				join_type,
173				left,
174				right,
175				alias,
176			} => {
177				// Find the left and right node IDs from the flow inputs
178				// The join node should have exactly 2 inputs
179				if node.inputs.len() != 2 {
180					return Err(Error(internal!("Join node must have exactly 2 inputs")));
181				}
182
183				let left_node = node.inputs[0];
184				let right_node = node.inputs[1];
185
186				let operators = self.inner.operators.read();
187				let left_parent = operators
188					.get(&left_node)
189					.ok_or_else(|| Error(internal!("Left parent operator not found")))?
190					.clone();
191
192				let right_parent = operators
193					.get(&right_node)
194					.ok_or_else(|| Error(internal!("Right parent operator not found")))?
195					.clone();
196				drop(operators);
197
198				self.inner.operators.write().insert(
199					node.id,
200					Arc::new(Operators::Join(JoinOperator::new(
201						left_parent,
202						right_parent,
203						node.id,
204						join_type,
205						left_node,
206						right_node,
207						left,
208						right,
209						alias,
210						self.inner.executor.clone(),
211					))),
212				);
213			}
214			Distinct {
215				expressions,
216			} => {
217				let parent = self
218					.inner
219					.operators
220					.read()
221					.get(&node.inputs[0])
222					.ok_or_else(|| Error(internal!("Parent operator not found")))?
223					.clone();
224				self.inner.operators.write().insert(
225					node.id,
226					Arc::new(Operators::Distinct(DistinctOperator::new(
227						parent,
228						node.id,
229						expressions,
230					))),
231				);
232			}
233			// Union {} => Ok(Operators::Union(UnionOperator::new())),
234			Union {} => unimplemented!(),
235			Apply {
236				operator_name,
237				expressions,
238			} => {
239				let parent = self
240					.inner
241					.operators
242					.read()
243					.get(&node.inputs[0])
244					.ok_or_else(|| Error(internal!("Parent operator not found")))?
245					.clone();
246
247				// Check if this is an FFI operator and use the appropriate creation method
248				let operator = if self.is_ffi_operator(operator_name.as_str()) {
249					let config = evaluate_operator_config(
250						expressions.as_slice(),
251						&self.inner.evaluator,
252					)?;
253					self.create_ffi_operator(operator_name.as_str(), node.id, &config)?
254				} else {
255					// Use registry for non-FFI operators
256					self.inner.registry.create_operator(
257						operator_name.as_str(),
258						node.id,
259						expressions.as_slice(),
260					)?
261				};
262
263				self.inner.operators.write().insert(
264					node.id,
265					Arc::new(Operators::Apply(ApplyOperator::new(parent, node.id, operator))),
266				);
267			}
268			Aggregate {
269				..
270			} => unimplemented!(),
271			Window {
272				window_type,
273				size,
274				slide,
275				group_by,
276				aggregations,
277				min_events,
278				max_window_count,
279				max_window_age,
280			} => {
281				let parent = self
282					.inner
283					.operators
284					.read()
285					.get(&node.inputs[0])
286					.ok_or_else(|| Error(internal!("Parent operator not found")))?
287					.clone();
288				let operator = WindowOperator::new(
289					parent,
290					node.id,
291					window_type.clone(),
292					size.clone(),
293					slide.clone(),
294					group_by.clone(),
295					aggregations.clone(),
296					min_events.clone(),
297					max_window_count.clone(),
298					max_window_age.clone(),
299				);
300				self.inner.operators.write().insert(node.id, Arc::new(Operators::Window(operator)));
301			}
302		}
303
304		Ok(())
305	}
306
307	fn add_source(&self, flow: FlowId, node: FlowNodeId, source: SourceId) {
308		let mut sources = self.inner.sources.write();
309		let nodes = sources.entry(source).or_insert_with(Vec::new);
310
311		let entry = (flow, node);
312		if !nodes.contains(&entry) {
313			nodes.push(entry);
314		}
315	}
316
317	fn add_sink(&self, flow: FlowId, node: FlowNodeId, sink: SourceId) {
318		let mut sinks = self.inner.sinks.write();
319		let nodes = sinks.entry(sink).or_insert_with(Vec::new);
320
321		let entry = (flow, node);
322		if !nodes.contains(&entry) {
323			nodes.push(entry);
324		}
325	}
326}