reifydb_sub_flow/engine/
mod.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4mod partition;
5mod process;
6mod register;
7
8use std::{collections::HashMap, ffi::CStr, fs::read_dir, path::PathBuf, sync::Arc};
9
10use parking_lot::RwLock;
11use reifydb_core::{
12	Error,
13	event::{EventBus, flow::FlowOperatorLoadedEvent},
14	interface::{FlowId, FlowNodeId, SourceId, TableId, ViewId},
15	log_debug, log_error,
16};
17use reifydb_engine::{StandardRowEvaluator, execute::Executor};
18use reifydb_rql::flow::{Flow, FlowDependencyGraph, FlowGraphAnalyzer};
19use reifydb_type::internal;
20
21use crate::{
22	ffi::loader::ffi_operator_loader,
23	operator::{BoxedOperator, Operators, transform::registry::TransformOperatorRegistry},
24};
25
26pub(crate) struct FlowEngineInner {
27	pub(crate) evaluator: StandardRowEvaluator,
28	pub(crate) executor: Executor,
29	pub(crate) registry: TransformOperatorRegistry,
30	pub(crate) operators: RwLock<HashMap<FlowNodeId, Arc<Operators>>>,
31	pub(crate) flows: RwLock<HashMap<FlowId, Flow>>,
32	pub(crate) sources: RwLock<HashMap<SourceId, Vec<(FlowId, FlowNodeId)>>>,
33	pub(crate) sinks: RwLock<HashMap<SourceId, Vec<(FlowId, FlowNodeId)>>>,
34	pub(crate) analyzer: RwLock<FlowGraphAnalyzer>,
35	pub(crate) event_bus: EventBus,
36}
37
38pub struct FlowEngine {
39	pub(crate) inner: Arc<FlowEngineInner>,
40}
41
42impl Clone for FlowEngine {
43	fn clone(&self) -> Self {
44		Self {
45			inner: Arc::clone(&self.inner),
46		}
47	}
48}
49
50impl FlowEngine {
51	pub fn new(
52		evaluator: StandardRowEvaluator,
53		executor: Executor,
54		registry: TransformOperatorRegistry,
55		event_bus: EventBus,
56		operators_dir: Option<PathBuf>,
57	) -> Self {
58		// Load FFI operators if directory specified
59		if let Some(dir) = operators_dir {
60			if let Err(e) = Self::load_ffi_operators(&dir, &event_bus) {
61				log_error!("Failed to load FFI operators from {:?}: {}", dir, e);
62			}
63		}
64
65		Self {
66			inner: Arc::new(FlowEngineInner {
67				evaluator,
68				executor,
69				registry,
70				operators: RwLock::new(HashMap::new()),
71				flows: RwLock::new(HashMap::new()),
72				sources: RwLock::new(HashMap::new()),
73				sinks: RwLock::new(HashMap::new()),
74				analyzer: RwLock::new(FlowGraphAnalyzer::new()),
75				event_bus,
76			}),
77		}
78	}
79
80	/// Load FFI operators from a directory into the global loader
81	fn load_ffi_operators(dir: &PathBuf, event_bus: &EventBus) -> reifydb_core::Result<()> {
82		let loader = ffi_operator_loader();
83
84		// Scan directory for shared libraries
85		let entries = read_dir(dir).unwrap();
86
87		for entry in entries {
88			let entry = entry.unwrap();
89			let path = entry.path();
90
91			if !path.is_file() {
92				continue;
93			}
94
95			let is_shared_lib = path.extension().map_or(false, |ext| ext == "so" || ext == "dylib");
96			if !is_shared_lib {
97				continue;
98			}
99
100			// Load the operator to register it in the global loader
101			// Use a temporary node ID just to extract the name
102			let mut guard = loader.write();
103			let temp_operator = guard.load_operator(&path, &[], FlowNodeId(0))?;
104
105			// Extract operator name and API version from descriptor
106			let descriptor = temp_operator.descriptor();
107			let operator_name =
108				unsafe { CStr::from_ptr(descriptor.operator_name).to_str().unwrap().to_string() };
109			let api_version = descriptor.api_version;
110
111			log_debug!("Registered FFI operator: {} from {:?}", operator_name, path);
112
113			// Emit event for loaded operator
114			event_bus.emit(FlowOperatorLoadedEvent {
115				operator_name: operator_name.clone(),
116				library_path: path.clone(),
117				api_version,
118			});
119		}
120
121		Ok(())
122	}
123
124	/// Create an FFI operator instance from the global singleton loader
125	pub(crate) fn create_ffi_operator(
126		&self,
127		operator_name: &str,
128		node_id: FlowNodeId,
129	) -> crate::Result<BoxedOperator> {
130		let loader = ffi_operator_loader();
131		let mut loader_write = loader.write();
132
133		let operator = loader_write
134			.create_operator_by_name(operator_name, node_id, &[])
135			.map_err(|e| Error(internal!("Failed to create FFI operator: {:?}", e)))?;
136
137		Ok(Box::new(operator))
138	}
139
140	/// Check if an operator name corresponds to an FFI operator
141	pub(crate) fn is_ffi_operator(&self, operator_name: &str) -> bool {
142		let loader = ffi_operator_loader();
143		let loader_read = loader.read();
144		loader_read.has_operator(operator_name)
145	}
146
147	pub fn has_registered_flows(&self) -> bool {
148		!self.inner.flows.read().is_empty()
149	}
150
151	/// Clears all registered flows, operators, sources, sinks, and dependency graph
152	pub fn clear(&self) {
153		self.inner.operators.write().clear();
154		self.inner.flows.write().clear();
155		self.inner.sources.write().clear();
156		self.inner.sinks.write().clear();
157		self.inner.analyzer.write().clear();
158	}
159
160	pub fn get_dependency_graph(&self) -> FlowDependencyGraph {
161		self.inner.analyzer.read().get_dependency_graph().clone()
162	}
163
164	pub fn get_flows_depending_on_table(&self, table_id: TableId) -> Vec<FlowId> {
165		let analyzer = self.inner.analyzer.read();
166		let dependency_graph = analyzer.get_dependency_graph();
167		analyzer.get_flows_depending_on_table(dependency_graph, table_id)
168	}
169
170	pub fn get_flows_depending_on_view(&self, view_id: ViewId) -> Vec<FlowId> {
171		let analyzer = self.inner.analyzer.read();
172		let dependency_graph = analyzer.get_dependency_graph();
173		analyzer.get_flows_depending_on_view(dependency_graph, view_id)
174	}
175
176	pub fn get_flow_producing_view(&self, view_id: ViewId) -> Option<FlowId> {
177		let analyzer = self.inner.analyzer.read();
178		let dependency_graph = analyzer.get_dependency_graph();
179		analyzer.get_flow_producing_view(dependency_graph, view_id)
180	}
181
182	pub fn calculate_execution_order(&self) -> Vec<FlowId> {
183		let analyzer = self.inner.analyzer.read();
184		let dependency_graph = analyzer.get_dependency_graph();
185		analyzer.calculate_execution_order(dependency_graph)
186	}
187}