Skip to main content

reifydb_sub_flow/engine/
mod.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4pub mod eval;
5pub mod process;
6pub mod register;
7
8use std::{
9	collections::{HashMap, HashSet},
10	sync::Arc,
11};
12
13#[cfg(reifydb_target = "native")]
14use postcard::to_stdvec;
15use reifydb_catalog::catalog::Catalog;
16#[cfg(reifydb_target = "native")]
17use reifydb_core::internal;
18use reifydb_core::{
19	common::CommitVersion,
20	event::EventBus,
21	interface::catalog::{
22		flow::{FlowId, FlowNodeId},
23		id::{TableId, ViewId},
24		primitive::PrimitiveId,
25	},
26};
27use reifydb_engine::vm::executor::Executor;
28use reifydb_rql::flow::{
29	analyzer::{FlowDependencyGraph, FlowGraphAnalyzer},
30	flow::FlowDag,
31};
32use reifydb_runtime::clock::Clock;
33#[cfg(reifydb_target = "native")]
34use reifydb_type::{Result, error::Error, value::Value};
35use tracing::instrument;
36
37#[cfg(reifydb_target = "native")]
38use crate::ffi::loader::ffi_operator_loader;
39#[cfg(reifydb_target = "native")]
40use crate::operator::BoxedOperator;
41use crate::{builder::OperatorFactory, operator::Operators};
42
43pub struct FlowEngine {
44	pub(crate) catalog: Catalog,
45	pub(crate) executor: Executor,
46	pub(crate) operators: HashMap<FlowNodeId, Arc<Operators>>,
47	pub(crate) flows: HashMap<FlowId, FlowDag>,
48	pub(crate) sources: HashMap<PrimitiveId, Vec<(FlowId, FlowNodeId)>>,
49	pub(crate) sinks: HashMap<PrimitiveId, Vec<(FlowId, FlowNodeId)>>,
50	pub(crate) analyzer: FlowGraphAnalyzer,
51	#[allow(dead_code)]
52	pub(crate) event_bus: EventBus,
53	pub(crate) flow_creation_versions: HashMap<FlowId, CommitVersion>,
54	pub(crate) clock: Clock,
55	pub(crate) custom_operators: Arc<HashMap<String, OperatorFactory>>,
56}
57
58impl FlowEngine {
59	#[instrument(
60		name = "flow::engine::new",
61		level = "debug",
62		skip(catalog, executor, event_bus, clock, custom_operators)
63	)]
64	pub fn new(
65		catalog: Catalog,
66		executor: Executor,
67		event_bus: EventBus,
68		clock: Clock,
69		custom_operators: Arc<HashMap<String, OperatorFactory>>,
70	) -> Self {
71		Self {
72			catalog,
73			executor,
74			operators: HashMap::new(),
75			flows: HashMap::new(),
76			sources: HashMap::new(),
77			sinks: HashMap::new(),
78			analyzer: FlowGraphAnalyzer::new(),
79			event_bus,
80			flow_creation_versions: HashMap::new(),
81			clock,
82			custom_operators,
83		}
84	}
85
86	/// Create an FFI operator instance from the global singleton loader
87	#[cfg(reifydb_target = "native")]
88	#[instrument(name = "flow::engine::create_ffi_operator", level = "debug", skip(self, config), fields(operator = %operator, node_id = ?node_id))]
89	pub(crate) fn create_ffi_operator(
90		&self,
91		operator: &str,
92		node_id: FlowNodeId,
93		config: &HashMap<String, Value>,
94	) -> Result<BoxedOperator> {
95		let loader = ffi_operator_loader();
96		let mut loader_write = loader.write().unwrap();
97
98		// Serialize config to postcard
99		let config_bytes = to_stdvec(config)
100			.map_err(|e| Error(internal!("Failed to serialize operator config: {:?}", e)))?;
101
102		let operator = loader_write
103			.create_operator_by_name(operator, node_id, &config_bytes, self.executor.clone())
104			.map_err(|e| Error(internal!("Failed to create FFI operator: {:?}", e)))?;
105
106		Ok(Box::new(operator))
107	}
108
109	/// Check if an operator name corresponds to an FFI operator
110	#[cfg(reifydb_target = "native")]
111	pub(crate) fn is_ffi_operator(&self, operator: &str) -> bool {
112		let loader = ffi_operator_loader();
113		let loader_read = loader.read().unwrap();
114		loader_read.has_operator(operator)
115	}
116
117	/// FFI operators are not supported in WASM
118	#[cfg(not(reifydb_target = "native"))]
119	#[allow(dead_code)]
120	pub(crate) fn is_ffi_operator(&self, _operator: &str) -> bool {
121		false
122	}
123
124	/// Returns a set of all currently registered flow IDs
125	pub fn flow_ids(&self) -> HashSet<FlowId> {
126		self.flows.keys().copied().collect()
127	}
128
129	/// Clears all registered flows, operators, sources, sinks, dependency graph, and backfill versions
130	pub fn clear(&mut self) {
131		self.operators.clear();
132		self.flows.clear();
133		self.sources.clear();
134		self.sinks.clear();
135		self.analyzer.clear();
136		self.flow_creation_versions.clear();
137	}
138
139	pub fn get_dependency_graph(&self) -> FlowDependencyGraph {
140		self.analyzer.get_dependency_graph().clone()
141	}
142
143	pub fn get_flows_depending_on_table(&self, table_id: TableId) -> Vec<FlowId> {
144		let dependency_graph = self.analyzer.get_dependency_graph();
145		self.analyzer.get_flows_depending_on_table(dependency_graph, table_id)
146	}
147
148	pub fn get_flows_depending_on_view(&self, view_id: ViewId) -> Vec<FlowId> {
149		let dependency_graph = self.analyzer.get_dependency_graph();
150		self.analyzer.get_flows_depending_on_view(dependency_graph, view_id)
151	}
152
153	pub fn get_flow_producing_view(&self, view_id: ViewId) -> Option<FlowId> {
154		let dependency_graph = self.analyzer.get_dependency_graph();
155		self.analyzer.get_flow_producing_view(dependency_graph, view_id)
156	}
157
158	pub fn calculate_execution_order(&self) -> Vec<FlowId> {
159		let dependency_graph = self.analyzer.get_dependency_graph();
160		self.analyzer.calculate_execution_order(dependency_graph)
161	}
162}