reifydb_sub_flow/engine/
mod.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4mod backfill;
5mod eval;
6mod partition;
7mod process;
8mod register;
9
10use std::{
11	collections::{HashMap, HashSet},
12	fs::read_dir,
13	path::PathBuf,
14	sync::Arc,
15};
16
17use parking_lot::RwLock;
18use reifydb_core::{
19	CommitVersion, Error,
20	event::{EventBus, flow::FlowOperatorLoadedEvent},
21	interface::{FlowId, FlowNodeId, SourceId, TableId, ViewId},
22	log_debug, log_error,
23};
24use reifydb_engine::{StandardRowEvaluator, execute::Executor};
25use reifydb_rql::flow::{Flow, FlowDependencyGraph, FlowGraphAnalyzer};
26use reifydb_type::{Value, internal};
27
28use crate::{
29	ffi::loader::ffi_operator_loader,
30	operator::{BoxedOperator, Operators, transform::registry::TransformOperatorRegistry},
31};
32
33pub(crate) struct FlowEngineInner {
34	pub(crate) evaluator: StandardRowEvaluator,
35	pub(crate) executor: Executor,
36	pub(crate) registry: TransformOperatorRegistry,
37	pub(crate) operators: RwLock<HashMap<FlowNodeId, Arc<Operators>>>,
38	pub(crate) flows: RwLock<HashMap<FlowId, Flow>>,
39	pub(crate) sources: RwLock<HashMap<SourceId, Vec<(FlowId, FlowNodeId)>>>,
40	pub(crate) sinks: RwLock<HashMap<SourceId, Vec<(FlowId, FlowNodeId)>>>,
41	pub(crate) analyzer: RwLock<FlowGraphAnalyzer>,
42	pub(crate) event_bus: EventBus,
43	pub(crate) flow_creation_versions: RwLock<HashMap<FlowId, CommitVersion>>,
44}
45
46pub struct FlowEngine {
47	pub(crate) inner: Arc<FlowEngineInner>,
48}
49
50impl Clone for FlowEngine {
51	fn clone(&self) -> Self {
52		Self {
53			inner: Arc::clone(&self.inner),
54		}
55	}
56}
57
58impl FlowEngine {
59	pub fn new(
60		evaluator: StandardRowEvaluator,
61		executor: Executor,
62		registry: TransformOperatorRegistry,
63		event_bus: EventBus,
64		operators_dir: Option<PathBuf>,
65	) -> Self {
66		// Load FFI operators if directory specified
67		if let Some(dir) = operators_dir {
68			if let Err(e) = Self::load_ffi_operators(&dir, &event_bus) {
69				log_error!("Failed to load FFI operators from {:?}: {}", dir, e);
70			}
71		}
72
73		Self {
74			inner: Arc::new(FlowEngineInner {
75				evaluator,
76				executor,
77				registry,
78				operators: RwLock::new(HashMap::new()),
79				flows: RwLock::new(HashMap::new()),
80				sources: RwLock::new(HashMap::new()),
81				sinks: RwLock::new(HashMap::new()),
82				analyzer: RwLock::new(FlowGraphAnalyzer::new()),
83				event_bus,
84				flow_creation_versions: RwLock::new(HashMap::new()),
85			}),
86		}
87	}
88
89	/// Load FFI operators from a directory into the global loader
90	fn load_ffi_operators(dir: &PathBuf, event_bus: &EventBus) -> reifydb_core::Result<()> {
91		let loader = ffi_operator_loader();
92
93		// Scan directory for shared libraries
94		let entries = read_dir(dir).unwrap();
95
96		for entry in entries {
97			let entry = entry.unwrap();
98			let path = entry.path();
99
100			if !path.is_file() {
101				continue;
102			}
103
104			let is_shared_lib = path.extension().map_or(false, |ext| ext == "so" || ext == "dylib");
105			if !is_shared_lib {
106				continue;
107			}
108
109			// Register the operator without instantiating it
110			let mut guard = loader.write();
111			let (operator_name, api_version) = match guard.register_operator(&path)? {
112				Some(info) => info,
113				None => {
114					// Not a valid FFI operator, skip silently
115					continue;
116				}
117			};
118
119			log_debug!("Registered FFI operator: {} from {:?}", operator_name, path);
120
121			// Emit event for loaded operator
122			event_bus.emit(FlowOperatorLoadedEvent {
123				operator_name: operator_name.clone(),
124				library_path: path.clone(),
125				api_version,
126			});
127		}
128
129		Ok(())
130	}
131
132	/// Create an FFI operator instance from the global singleton loader
133	pub(crate) fn create_ffi_operator(
134		&self,
135		operator_name: &str,
136		node_id: FlowNodeId,
137		config: &HashMap<String, Value>,
138	) -> crate::Result<BoxedOperator> {
139		let loader = ffi_operator_loader();
140		let mut loader_write = loader.write();
141
142		// Serialize config to bincode
143		let config_bytes = bincode::serde::encode_to_vec(config, bincode::config::standard())
144			.map_err(|e| Error(internal!("Failed to serialize operator config: {:?}", e)))?;
145
146		let operator = loader_write
147			.create_operator_by_name(operator_name, node_id, &config_bytes)
148			.map_err(|e| Error(internal!("Failed to create FFI operator: {:?}", e)))?;
149
150		Ok(Box::new(operator))
151	}
152
153	/// Check if an operator name corresponds to an FFI operator
154	pub(crate) fn is_ffi_operator(&self, operator_name: &str) -> bool {
155		let loader = ffi_operator_loader();
156		let loader_read = loader.read();
157		loader_read.has_operator(operator_name)
158	}
159
160	pub fn has_registered_flows(&self) -> bool {
161		!self.inner.flows.read().is_empty()
162	}
163
164	/// Returns a set of all currently registered flow IDs
165	pub fn flow_ids(&self) -> HashSet<FlowId> {
166		self.inner.flows.read().keys().copied().collect()
167	}
168
169	/// Clears all registered flows, operators, sources, sinks, dependency graph, and backfill versions
170	pub fn clear(&self) {
171		self.inner.operators.write().clear();
172		self.inner.flows.write().clear();
173		self.inner.sources.write().clear();
174		self.inner.sinks.write().clear();
175		self.inner.analyzer.write().clear();
176		self.inner.flow_creation_versions.write().clear();
177	}
178
179	pub fn get_dependency_graph(&self) -> FlowDependencyGraph {
180		self.inner.analyzer.read().get_dependency_graph().clone()
181	}
182
183	pub fn get_flows_depending_on_table(&self, table_id: TableId) -> Vec<FlowId> {
184		let analyzer = self.inner.analyzer.read();
185		let dependency_graph = analyzer.get_dependency_graph();
186		analyzer.get_flows_depending_on_table(dependency_graph, table_id)
187	}
188
189	pub fn get_flows_depending_on_view(&self, view_id: ViewId) -> Vec<FlowId> {
190		let analyzer = self.inner.analyzer.read();
191		let dependency_graph = analyzer.get_dependency_graph();
192		analyzer.get_flows_depending_on_view(dependency_graph, view_id)
193	}
194
195	pub fn get_flow_producing_view(&self, view_id: ViewId) -> Option<FlowId> {
196		let analyzer = self.inner.analyzer.read();
197		let dependency_graph = analyzer.get_dependency_graph();
198		analyzer.get_flow_producing_view(dependency_graph, view_id)
199	}
200
201	pub fn calculate_execution_order(&self) -> Vec<FlowId> {
202		let analyzer = self.inner.analyzer.read();
203		let dependency_graph = analyzer.get_dependency_graph();
204		analyzer.calculate_execution_order(dependency_graph)
205	}
206}