reifydb_sub_flow/engine/
register.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::sync::Arc;
5
6use FlowNodeType::{Aggregate, SinkView, SourceFlow, SourceInlineData, SourceTable, SourceView};
7use reifydb_catalog::{
8	CatalogTableQueryOperations, CatalogViewQueryOperations, resolve::resolve_view,
9	transaction::CatalogFlowQueryOperations,
10};
11use reifydb_core::{
12	CommitVersion, Error,
13	interface::{FlowId, FlowNodeId, SourceId},
14};
15use reifydb_engine::StandardCommandTransaction;
16use reifydb_rql::flow::{
17	Flow, FlowNode, FlowNodeType,
18	FlowNodeType::{Apply, Distinct, Extend, Filter, Join, Map, Merge, Sort, Take, Window},
19};
20use reifydb_type::internal;
21use tracing::instrument;
22
23use super::eval::evaluate_operator_config;
24use crate::{
25	engine::FlowEngine,
26	operator::{
27		ApplyOperator, DistinctOperator, ExtendOperator, FilterOperator, JoinOperator, MapOperator,
28		MergeOperator, Operators, SinkViewOperator, SortOperator, SourceFlowOperator, SourceTableOperator,
29		SourceViewOperator, TakeOperator, WindowOperator,
30	},
31};
32
33impl FlowEngine {
34	#[instrument(name = "flow::register::without_backfill", level = "info", skip(self, txn), fields(flow_id = ?flow.id))]
35	pub async fn register_without_backfill(
36		&self,
37		txn: &mut StandardCommandTransaction,
38		flow: Flow,
39	) -> crate::Result<()> {
40		self.register(txn, flow, None).await
41	}
42
43	#[instrument(name = "flow::register::with_backfill", level = "info", skip(self, txn), fields(flow_id = ?flow.id, backfill_version = flow_creation_version.0))]
44	pub async fn register_with_backfill(
45		&self,
46		txn: &mut StandardCommandTransaction,
47		flow: Flow,
48		flow_creation_version: CommitVersion,
49	) -> crate::Result<()> {
50		self.register(txn, flow, Some(flow_creation_version)).await
51	}
52
53	#[instrument(name = "flow::register", level = "debug", skip(self, txn), fields(flow_id = ?flow.id, has_backfill = flow_creation_version.is_some()))]
54	async fn register(
55		&self,
56		txn: &mut StandardCommandTransaction,
57		flow: Flow,
58		flow_creation_version: Option<CommitVersion>,
59	) -> crate::Result<()> {
60		debug_assert!(!self.inner.flows.read().await.contains_key(&flow.id), "Flow already registered");
61
62		for node_id in flow.topological_order()? {
63			let node = flow.get_node(&node_id).unwrap();
64			self.add(txn, &flow, node).await?;
65		}
66
67		if let Some(flow_creation_version) = flow_creation_version {
68			self.inner.flow_creation_versions.write().await.insert(flow.id, flow_creation_version);
69
70			if let Err(e) = self.load_initial_data(txn, &flow, flow_creation_version).await {
71				self.inner.flow_creation_versions.write().await.remove(&flow.id);
72				return Err(e);
73			}
74		}
75
76		// Add flow to analyzer for dependency tracking
77		self.inner.analyzer.write().await.add(flow.clone());
78		self.inner.flows.write().await.insert(flow.id, flow);
79
80		Ok(())
81	}
82
83	#[instrument(name = "flow::register::add_node", level = "debug", skip(self, txn, flow), fields(flow_id = ?flow.id, node_id = ?node.id, node_type = ?std::mem::discriminant(&node.ty)))]
84	async fn add(&self, txn: &mut StandardCommandTransaction, flow: &Flow, node: &FlowNode) -> crate::Result<()> {
85		debug_assert!(!self.inner.operators.read().await.contains_key(&node.id), "Operator already registered");
86		let node = node.clone();
87
88		match node.ty {
89			SourceInlineData {
90				..
91			} => {
92				unimplemented!()
93			}
94			SourceTable {
95				table,
96			} => {
97				let table = txn.get_table(table).await?;
98
99				self.add_source(flow.id, node.id, SourceId::table(table.id)).await;
100				self.inner.operators.write().await.insert(
101					node.id,
102					Arc::new(Operators::SourceTable(SourceTableOperator::new(node.id, table))),
103				);
104			}
105			SourceView {
106				view,
107			} => {
108				let view = txn.get_view(view).await?;
109				self.add_source(flow.id, node.id, SourceId::view(view.id)).await;
110				self.inner.operators.write().await.insert(
111					node.id,
112					Arc::new(Operators::SourceView(SourceViewOperator::new(node.id, view))),
113				);
114			}
115			SourceFlow {
116				flow: source_flow,
117			} => {
118				let source_flow_def = txn.get_flow(source_flow).await?;
119				self.add_source(flow.id, node.id, SourceId::flow(source_flow_def.id)).await;
120				self.inner.operators.write().await.insert(
121					node.id,
122					Arc::new(Operators::SourceFlow(SourceFlowOperator::new(
123						node.id,
124						source_flow_def,
125					))),
126				);
127			}
128			SinkView {
129				view,
130			} => {
131				let parent = self
132					.inner
133					.operators
134					.read()
135					.await
136					.get(&node.inputs[0])
137					.ok_or_else(|| Error(internal!("Parent operator not found")))?
138					.clone();
139
140				self.add_sink(flow.id, node.id, SourceId::view(*view)).await;
141				let resolved = resolve_view(txn, view).await?;
142				self.inner.operators.write().await.insert(
143					node.id,
144					Arc::new(Operators::SinkView(SinkViewOperator::new(parent, node.id, resolved))),
145				);
146			}
147			Filter {
148				conditions,
149			} => {
150				let parent = self
151					.inner
152					.operators
153					.read()
154					.await
155					.get(&node.inputs[0])
156					.ok_or_else(|| Error(internal!("Parent operator not found")))?
157					.clone();
158				self.inner.operators.write().await.insert(
159					node.id,
160					Arc::new(Operators::Filter(FilterOperator::new(parent, node.id, conditions))),
161				);
162			}
163			Map {
164				expressions,
165			} => {
166				let parent = self
167					.inner
168					.operators
169					.read()
170					.await
171					.get(&node.inputs[0])
172					.ok_or_else(|| Error(internal!("Parent operator not found")))?
173					.clone();
174				self.inner.operators.write().await.insert(
175					node.id,
176					Arc::new(Operators::Map(MapOperator::new(parent, node.id, expressions))),
177				);
178			}
179			Extend {
180				expressions,
181			} => {
182				let parent = self
183					.inner
184					.operators
185					.read()
186					.await
187					.get(&node.inputs[0])
188					.ok_or_else(|| Error(internal!("Parent operator not found")))?
189					.clone();
190				self.inner.operators.write().await.insert(
191					node.id,
192					Arc::new(Operators::Extend(ExtendOperator::new(parent, node.id, expressions))),
193				);
194			}
195			Sort {
196				by: _,
197			} => {
198				let parent = self
199					.inner
200					.operators
201					.read()
202					.await
203					.get(&node.inputs[0])
204					.ok_or_else(|| Error(internal!("Parent operator not found")))?
205					.clone();
206				self.inner.operators.write().await.insert(
207					node.id,
208					Arc::new(Operators::Sort(SortOperator::new(parent, node.id, Vec::new()))),
209				);
210			}
211			Take {
212				limit,
213			} => {
214				let parent = self
215					.inner
216					.operators
217					.read()
218					.await
219					.get(&node.inputs[0])
220					.ok_or_else(|| Error(internal!("Parent operator not found")))?
221					.clone();
222				self.inner.operators.write().await.insert(
223					node.id,
224					Arc::new(Operators::Take(TakeOperator::new(parent, node.id, limit))),
225				);
226			}
227			Join {
228				join_type,
229				left,
230				right,
231				alias,
232			} => {
233				// Find the left and right node IDs from the flow inputs
234				// The join node should have exactly 2 inputs
235				if node.inputs.len() != 2 {
236					return Err(Error(internal!("Join node must have exactly 2 inputs")));
237				}
238
239				let left_node = node.inputs[0];
240				let right_node = node.inputs[1];
241
242				let operators = self.inner.operators.read().await;
243				let left_parent = operators
244					.get(&left_node)
245					.ok_or_else(|| Error(internal!("Left parent operator not found")))?
246					.clone();
247
248				let right_parent = operators
249					.get(&right_node)
250					.ok_or_else(|| Error(internal!("Right parent operator not found")))?
251					.clone();
252				drop(operators);
253
254				self.inner.operators.write().await.insert(
255					node.id,
256					Arc::new(Operators::Join(JoinOperator::new(
257						left_parent,
258						right_parent,
259						node.id,
260						join_type,
261						left_node,
262						right_node,
263						left,
264						right,
265						alias,
266						self.inner.executor.clone(),
267					))),
268				);
269			}
270			Distinct {
271				expressions,
272			} => {
273				let parent = self
274					.inner
275					.operators
276					.read()
277					.await
278					.get(&node.inputs[0])
279					.ok_or_else(|| Error(internal!("Parent operator not found")))?
280					.clone();
281				self.inner.operators.write().await.insert(
282					node.id,
283					Arc::new(Operators::Distinct(DistinctOperator::new(
284						parent,
285						node.id,
286						expressions,
287					))),
288				);
289			}
290			Merge {} => {
291				// Merge requires at least 2 inputs
292				if node.inputs.len() < 2 {
293					return Err(Error(internal!("Merge node must have at least 2 inputs")));
294				}
295
296				let operators = self.inner.operators.read().await;
297				let mut parents = Vec::with_capacity(node.inputs.len());
298
299				for input_node_id in &node.inputs {
300					let parent = operators
301						.get(input_node_id)
302						.ok_or_else(|| {
303							Error(internal!(
304								"Parent operator not found for input {:?}",
305								input_node_id
306							))
307						})?
308						.clone();
309					parents.push(parent);
310				}
311				drop(operators);
312
313				self.inner.operators.write().await.insert(
314					node.id,
315					Arc::new(Operators::Merge(MergeOperator::new(
316						node.id,
317						parents,
318						node.inputs.clone(),
319					))),
320				);
321			}
322			Apply {
323				operator,
324				expressions,
325			} => {
326				let parent = self
327					.inner
328					.operators
329					.read()
330					.await
331					.get(&node.inputs[0])
332					.ok_or_else(|| Error(internal!("Parent operator not found")))?
333					.clone();
334
335				// Check if this is an FFI operator and use the appropriate creation method
336				let operator = if self.is_ffi_operator(operator.as_str()) {
337					let config = evaluate_operator_config(
338						expressions.as_slice(),
339						&self.inner.evaluator,
340					)?;
341					self.create_ffi_operator(operator.as_str(), node.id, &config)?
342				} else {
343					// Use registry for non-FFI operators
344					self.inner.registry.create_operator(
345						operator.as_str(),
346						node.id,
347						expressions.as_slice(),
348					)?
349				};
350
351				self.inner.operators.write().await.insert(
352					node.id,
353					Arc::new(Operators::Apply(ApplyOperator::new(parent, node.id, operator))),
354				);
355			}
356			Aggregate {
357				..
358			} => unimplemented!(),
359			Window {
360				window_type,
361				size,
362				slide,
363				group_by,
364				aggregations,
365				min_events,
366				max_window_count,
367				max_window_age,
368			} => {
369				let parent = self
370					.inner
371					.operators
372					.read()
373					.await
374					.get(&node.inputs[0])
375					.ok_or_else(|| Error(internal!("Parent operator not found")))?
376					.clone();
377				let operator = WindowOperator::new(
378					parent,
379					node.id,
380					window_type.clone(),
381					size.clone(),
382					slide.clone(),
383					group_by.clone(),
384					aggregations.clone(),
385					min_events.clone(),
386					max_window_count.clone(),
387					max_window_age.clone(),
388				);
389				self.inner
390					.operators
391					.write()
392					.await
393					.insert(node.id, Arc::new(Operators::Window(operator)));
394			}
395		}
396
397		Ok(())
398	}
399
400	async fn add_source(&self, flow: FlowId, node: FlowNodeId, source: SourceId) {
401		let mut sources = self.inner.sources.write().await;
402		let nodes = sources.entry(source).or_insert_with(Vec::new);
403
404		let entry = (flow, node);
405		if !nodes.contains(&entry) {
406			nodes.push(entry);
407		}
408	}
409
410	async fn add_sink(&self, flow: FlowId, node: FlowNodeId, sink: SourceId) {
411		let mut sinks = self.inner.sinks.write().await;
412		let nodes = sinks.entry(sink).or_insert_with(Vec::new);
413
414		let entry = (flow, node);
415		if !nodes.contains(&entry) {
416			nodes.push(entry);
417		}
418	}
419}