reifydb_sub_flow/engine/
mod.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4mod backfill;
5mod eval;
6mod partition;
7mod process;
8mod register;
9
10use std::{
11	collections::{HashMap, HashSet},
12	fs::read_dir,
13	path::PathBuf,
14	sync::Arc,
15};
16
17use parking_lot::RwLock;
18use reifydb_core::{
19	CommitVersion, Error,
20	event::{
21		EventBus,
22		flow::{FlowOperatorLoadedEvent, OperatorColumnDef},
23	},
24	interface::{FlowId, FlowNodeId, SourceId, TableId, ViewId},
25};
26use reifydb_engine::{StandardRowEvaluator, execute::Executor};
27use reifydb_rql::flow::{Flow, FlowDependencyGraph, FlowGraphAnalyzer};
28use reifydb_type::{Value, internal};
29use tracing::{debug, error, instrument};
30
31use crate::{
32	ffi::loader::{ColumnDefInfo, ffi_operator_loader},
33	operator::{BoxedOperator, Operators, transform::registry::TransformOperatorRegistry},
34};
35
36pub(crate) struct FlowEngineInner {
37	pub(crate) evaluator: StandardRowEvaluator,
38	pub(crate) executor: Executor,
39	pub(crate) registry: TransformOperatorRegistry,
40	pub(crate) operators: RwLock<HashMap<FlowNodeId, Arc<Operators>>>,
41	pub(crate) flows: RwLock<HashMap<FlowId, Flow>>,
42	pub(crate) sources: RwLock<HashMap<SourceId, Vec<(FlowId, FlowNodeId)>>>,
43	pub(crate) sinks: RwLock<HashMap<SourceId, Vec<(FlowId, FlowNodeId)>>>,
44	pub(crate) analyzer: RwLock<FlowGraphAnalyzer>,
45	pub(crate) event_bus: EventBus,
46	pub(crate) flow_creation_versions: RwLock<HashMap<FlowId, CommitVersion>>,
47}
48
49pub struct FlowEngine {
50	pub(crate) inner: Arc<FlowEngineInner>,
51}
52
53impl Clone for FlowEngine {
54	fn clone(&self) -> Self {
55		Self {
56			inner: Arc::clone(&self.inner),
57		}
58	}
59}
60
61impl FlowEngine {
62	#[instrument(level = "info", skip(evaluator, executor, registry, event_bus), fields(operators_dir = ?operators_dir))]
63	pub fn new(
64		evaluator: StandardRowEvaluator,
65		executor: Executor,
66		registry: TransformOperatorRegistry,
67		event_bus: EventBus,
68		operators_dir: Option<PathBuf>,
69	) -> Self {
70		// Load FFI operators if directory specified
71		if let Some(dir) = operators_dir {
72			if let Err(e) = Self::load_ffi_operators(&dir, &event_bus) {
73				error!("Failed to load FFI operators from {:?}: {}", dir, e);
74			}
75		}
76
77		Self {
78			inner: Arc::new(FlowEngineInner {
79				evaluator,
80				executor,
81				registry,
82				operators: RwLock::new(HashMap::new()),
83				flows: RwLock::new(HashMap::new()),
84				sources: RwLock::new(HashMap::new()),
85				sinks: RwLock::new(HashMap::new()),
86				analyzer: RwLock::new(FlowGraphAnalyzer::new()),
87				event_bus,
88				flow_creation_versions: RwLock::new(HashMap::new()),
89			}),
90		}
91	}
92
93	/// Load FFI operators from a directory into the global loader
94	#[instrument(level = "debug", skip(event_bus), fields(dir = ?dir))]
95	fn load_ffi_operators(dir: &PathBuf, event_bus: &EventBus) -> reifydb_core::Result<()> {
96		let loader = ffi_operator_loader();
97
98		// Scan directory for shared libraries
99		let entries = read_dir(dir).unwrap();
100
101		for entry in entries {
102			let entry = entry.unwrap();
103			let path = entry.path();
104
105			if !path.is_file() {
106				continue;
107			}
108
109			let is_shared_lib = path.extension().map_or(false, |ext| ext == "so" || ext == "dylib");
110			if !is_shared_lib {
111				continue;
112			}
113
114			// Register the operator without instantiating it
115			let mut guard = loader.write();
116			let info = match guard.register_operator(&path)? {
117				Some(info) => info,
118				None => {
119					// Not a valid FFI operator, skip silently
120					continue;
121				}
122			};
123
124			debug!("Registered FFI operator: {} from {:?}", info.operator, path);
125
126			// Convert column definitions to event format
127			fn convert_column_defs(columns: &[ColumnDefInfo]) -> Vec<OperatorColumnDef> {
128				columns.iter()
129					.map(|c| OperatorColumnDef {
130						name: c.name.clone(),
131						field_type: c.field_type,
132						description: c.description.clone(),
133					})
134					.collect()
135			}
136
137			// Emit event for loaded operator
138			event_bus.emit(FlowOperatorLoadedEvent {
139				operator: info.operator,
140				library_path: info.library_path,
141				api: info.api,
142				version: info.version,
143				description: info.description,
144				input: convert_column_defs(&info.input_columns),
145				output: convert_column_defs(&info.output_columns),
146				capabilities: info.capabilities,
147			});
148		}
149
150		Ok(())
151	}
152
153	/// Create an FFI operator instance from the global singleton loader
154	#[instrument(level = "debug", skip(self, config), fields(operator = %operator, node_id = ?node_id))]
155	pub(crate) fn create_ffi_operator(
156		&self,
157		operator: &str,
158		node_id: FlowNodeId,
159		config: &HashMap<String, Value>,
160	) -> crate::Result<BoxedOperator> {
161		let loader = ffi_operator_loader();
162		let mut loader_write = loader.write();
163
164		// Serialize config to bincode
165		let config_bytes = bincode::serde::encode_to_vec(config, bincode::config::standard())
166			.map_err(|e| Error(internal!("Failed to serialize operator config: {:?}", e)))?;
167
168		let operator = loader_write
169			.create_operator_by_name(operator, node_id, &config_bytes)
170			.map_err(|e| Error(internal!("Failed to create FFI operator: {:?}", e)))?;
171
172		Ok(Box::new(operator))
173	}
174
175	/// Check if an operator name corresponds to an FFI operator
176	pub(crate) fn is_ffi_operator(&self, operator: &str) -> bool {
177		let loader = ffi_operator_loader();
178		let loader_read = loader.read();
179		loader_read.has_operator(operator)
180	}
181
182	pub fn has_registered_flows(&self) -> bool {
183		!self.inner.flows.read().is_empty()
184	}
185
186	/// Returns a set of all currently registered flow IDs
187	pub fn flow_ids(&self) -> HashSet<FlowId> {
188		self.inner.flows.read().keys().copied().collect()
189	}
190
191	/// Clears all registered flows, operators, sources, sinks, dependency graph, and backfill versions
192	pub fn clear(&self) {
193		self.inner.operators.write().clear();
194		self.inner.flows.write().clear();
195		self.inner.sources.write().clear();
196		self.inner.sinks.write().clear();
197		self.inner.analyzer.write().clear();
198		self.inner.flow_creation_versions.write().clear();
199	}
200
201	pub fn get_dependency_graph(&self) -> FlowDependencyGraph {
202		self.inner.analyzer.read().get_dependency_graph().clone()
203	}
204
205	pub fn get_flows_depending_on_table(&self, table_id: TableId) -> Vec<FlowId> {
206		let analyzer = self.inner.analyzer.read();
207		let dependency_graph = analyzer.get_dependency_graph();
208		analyzer.get_flows_depending_on_table(dependency_graph, table_id)
209	}
210
211	pub fn get_flows_depending_on_view(&self, view_id: ViewId) -> Vec<FlowId> {
212		let analyzer = self.inner.analyzer.read();
213		let dependency_graph = analyzer.get_dependency_graph();
214		analyzer.get_flows_depending_on_view(dependency_graph, view_id)
215	}
216
217	pub fn get_flow_producing_view(&self, view_id: ViewId) -> Option<FlowId> {
218		let analyzer = self.inner.analyzer.read();
219		let dependency_graph = analyzer.get_dependency_graph();
220		analyzer.get_flow_producing_view(dependency_graph, view_id)
221	}
222
223	pub fn calculate_execution_order(&self) -> Vec<FlowId> {
224		let analyzer = self.inner.analyzer.read();
225		let dependency_graph = analyzer.get_dependency_graph();
226		analyzer.calculate_execution_order(dependency_graph)
227	}
228}