reifydb_sub_flow/engine/
mod.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4mod eval;
5mod partition;
6mod process;
7mod register;
8
9use std::{collections::HashMap, ffi::CStr, fs::read_dir, path::PathBuf, sync::Arc};
10
11use parking_lot::RwLock;
12use reifydb_core::{
13	Error,
14	event::{EventBus, flow::FlowOperatorLoadedEvent},
15	interface::{FlowId, FlowNodeId, SourceId, TableId, ViewId},
16	log_debug, log_error,
17};
18use reifydb_engine::{StandardRowEvaluator, execute::Executor};
19use reifydb_rql::flow::{Flow, FlowDependencyGraph, FlowGraphAnalyzer};
20use reifydb_type::{Value, internal};
21
22use crate::{
23	ffi::loader::ffi_operator_loader,
24	operator::{BoxedOperator, Operators, transform::registry::TransformOperatorRegistry},
25};
26
27pub(crate) struct FlowEngineInner {
28	pub(crate) evaluator: StandardRowEvaluator,
29	pub(crate) executor: Executor,
30	pub(crate) registry: TransformOperatorRegistry,
31	pub(crate) operators: RwLock<HashMap<FlowNodeId, Arc<Operators>>>,
32	pub(crate) flows: RwLock<HashMap<FlowId, Flow>>,
33	pub(crate) sources: RwLock<HashMap<SourceId, Vec<(FlowId, FlowNodeId)>>>,
34	pub(crate) sinks: RwLock<HashMap<SourceId, Vec<(FlowId, FlowNodeId)>>>,
35	pub(crate) analyzer: RwLock<FlowGraphAnalyzer>,
36	pub(crate) event_bus: EventBus,
37}
38
39pub struct FlowEngine {
40	pub(crate) inner: Arc<FlowEngineInner>,
41}
42
43impl Clone for FlowEngine {
44	fn clone(&self) -> Self {
45		Self {
46			inner: Arc::clone(&self.inner),
47		}
48	}
49}
50
51impl FlowEngine {
52	pub fn new(
53		evaluator: StandardRowEvaluator,
54		executor: Executor,
55		registry: TransformOperatorRegistry,
56		event_bus: EventBus,
57		operators_dir: Option<PathBuf>,
58	) -> Self {
59		// Load FFI operators if directory specified
60		if let Some(dir) = operators_dir {
61			if let Err(e) = Self::load_ffi_operators(&dir, &event_bus) {
62				log_error!("Failed to load FFI operators from {:?}: {}", dir, e);
63			}
64		}
65
66		Self {
67			inner: Arc::new(FlowEngineInner {
68				evaluator,
69				executor,
70				registry,
71				operators: RwLock::new(HashMap::new()),
72				flows: RwLock::new(HashMap::new()),
73				sources: RwLock::new(HashMap::new()),
74				sinks: RwLock::new(HashMap::new()),
75				analyzer: RwLock::new(FlowGraphAnalyzer::new()),
76				event_bus,
77			}),
78		}
79	}
80
81	/// Load FFI operators from a directory into the global loader
82	fn load_ffi_operators(dir: &PathBuf, event_bus: &EventBus) -> reifydb_core::Result<()> {
83		let loader = ffi_operator_loader();
84
85		// Scan directory for shared libraries
86		let entries = read_dir(dir).unwrap();
87
88		for entry in entries {
89			let entry = entry.unwrap();
90			let path = entry.path();
91
92			if !path.is_file() {
93				continue;
94			}
95
96			let is_shared_lib = path.extension().map_or(false, |ext| ext == "so" || ext == "dylib");
97			if !is_shared_lib {
98				continue;
99			}
100
101			// Load the operator to register it in the global loader
102			// Use a temporary node ID just to extract the name
103			let mut guard = loader.write();
104			let temp_operator = guard.load_operator(&path, &[], FlowNodeId(0))?;
105
106			// Extract operator name and API version from descriptor
107			let descriptor = temp_operator.descriptor();
108			let operator_name =
109				unsafe { CStr::from_ptr(descriptor.operator_name).to_str().unwrap().to_string() };
110			let api_version = descriptor.api_version;
111
112			log_debug!("Registered FFI operator: {} from {:?}", operator_name, path);
113
114			// Emit event for loaded operator
115			event_bus.emit(FlowOperatorLoadedEvent {
116				operator_name: operator_name.clone(),
117				library_path: path.clone(),
118				api_version,
119			});
120		}
121
122		Ok(())
123	}
124
125	/// Create an FFI operator instance from the global singleton loader
126	pub(crate) fn create_ffi_operator(
127		&self,
128		operator_name: &str,
129		node_id: FlowNodeId,
130		config: &HashMap<String, Value>,
131	) -> crate::Result<BoxedOperator> {
132		let loader = ffi_operator_loader();
133		let mut loader_write = loader.write();
134
135		// Serialize config to bincode
136		let config_bytes = bincode::serde::encode_to_vec(config, bincode::config::standard())
137			.map_err(|e| Error(internal!("Failed to serialize operator config: {:?}", e)))?;
138
139		let operator = loader_write
140			.create_operator_by_name(operator_name, node_id, &config_bytes)
141			.map_err(|e| Error(internal!("Failed to create FFI operator: {:?}", e)))?;
142
143		Ok(Box::new(operator))
144	}
145
146	/// Check if an operator name corresponds to an FFI operator
147	pub(crate) fn is_ffi_operator(&self, operator_name: &str) -> bool {
148		let loader = ffi_operator_loader();
149		let loader_read = loader.read();
150		loader_read.has_operator(operator_name)
151	}
152
153	pub fn has_registered_flows(&self) -> bool {
154		!self.inner.flows.read().is_empty()
155	}
156
157	/// Clears all registered flows, operators, sources, sinks, and dependency graph
158	pub fn clear(&self) {
159		self.inner.operators.write().clear();
160		self.inner.flows.write().clear();
161		self.inner.sources.write().clear();
162		self.inner.sinks.write().clear();
163		self.inner.analyzer.write().clear();
164	}
165
166	pub fn get_dependency_graph(&self) -> FlowDependencyGraph {
167		self.inner.analyzer.read().get_dependency_graph().clone()
168	}
169
170	pub fn get_flows_depending_on_table(&self, table_id: TableId) -> Vec<FlowId> {
171		let analyzer = self.inner.analyzer.read();
172		let dependency_graph = analyzer.get_dependency_graph();
173		analyzer.get_flows_depending_on_table(dependency_graph, table_id)
174	}
175
176	pub fn get_flows_depending_on_view(&self, view_id: ViewId) -> Vec<FlowId> {
177		let analyzer = self.inner.analyzer.read();
178		let dependency_graph = analyzer.get_dependency_graph();
179		analyzer.get_flows_depending_on_view(dependency_graph, view_id)
180	}
181
182	pub fn get_flow_producing_view(&self, view_id: ViewId) -> Option<FlowId> {
183		let analyzer = self.inner.analyzer.read();
184		let dependency_graph = analyzer.get_dependency_graph();
185		analyzer.get_flow_producing_view(dependency_graph, view_id)
186	}
187
188	pub fn calculate_execution_order(&self) -> Vec<FlowId> {
189		let analyzer = self.inner.analyzer.read();
190		let dependency_graph = analyzer.get_dependency_graph();
191		analyzer.calculate_execution_order(dependency_graph)
192	}
193}