reifydb_sub_flow/engine/
mod.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4mod backfill;
5mod eval;
6mod partition;
7mod process;
8mod register;
9
10use std::{
11	collections::{HashMap, HashSet},
12	fs::read_dir,
13	path::PathBuf,
14	sync::Arc,
15};
16
17use reifydb_core::{
18	CommitVersion, Error,
19	event::{
20		EventBus,
21		flow::{FlowOperatorLoadedEvent, OperatorColumnDef},
22	},
23	interface::{FlowId, FlowNodeId, SourceId, TableId, ViewId},
24};
25use reifydb_engine::{StandardRowEvaluator, execute::Executor};
26use reifydb_rql::flow::{Flow, FlowDependencyGraph, FlowGraphAnalyzer};
27use reifydb_type::{Value, internal};
28use tokio::sync::RwLock;
29use tracing::{debug, error, instrument};
30
31use crate::{
32	ffi::loader::{ColumnDefInfo, ffi_operator_loader},
33	operator::{BoxedOperator, Operators, transform::registry::TransformOperatorRegistry},
34};
35
36pub(crate) struct FlowEngineInner {
37	pub(crate) evaluator: StandardRowEvaluator,
38	pub(crate) executor: Executor,
39	pub(crate) registry: TransformOperatorRegistry,
40	pub(crate) operators: RwLock<HashMap<FlowNodeId, Arc<Operators>>>,
41	pub(crate) flows: RwLock<HashMap<FlowId, Flow>>,
42	pub(crate) sources: RwLock<HashMap<SourceId, Vec<(FlowId, FlowNodeId)>>>,
43	pub(crate) sinks: RwLock<HashMap<SourceId, Vec<(FlowId, FlowNodeId)>>>,
44	pub(crate) analyzer: RwLock<FlowGraphAnalyzer>,
45	pub(crate) event_bus: EventBus,
46	pub(crate) flow_creation_versions: RwLock<HashMap<FlowId, CommitVersion>>,
47}
48
49pub struct FlowEngine {
50	pub(crate) inner: Arc<FlowEngineInner>,
51}
52
53impl Clone for FlowEngine {
54	fn clone(&self) -> Self {
55		Self {
56			inner: Arc::clone(&self.inner),
57		}
58	}
59}
60
61impl FlowEngine {
62	#[instrument(name = "flow::engine::new", level = "info", skip(evaluator, executor, registry, event_bus), fields(operators_dir = ?operators_dir))]
63	pub fn new(
64		evaluator: StandardRowEvaluator,
65		executor: Executor,
66		registry: TransformOperatorRegistry,
67		event_bus: EventBus,
68		operators_dir: Option<PathBuf>,
69	) -> Self {
70		// Load FFI operators if directory specified
71		if let Some(dir) = operators_dir {
72			if let Err(e) = Self::load_ffi_operators(&dir, &event_bus) {
73				error!("Failed to load FFI operators from {:?}: {}", dir, e);
74			}
75		}
76
77		Self {
78			inner: Arc::new(FlowEngineInner {
79				evaluator,
80				executor,
81				registry,
82				operators: RwLock::new(HashMap::new()),
83				flows: RwLock::new(HashMap::new()),
84				sources: RwLock::new(HashMap::new()),
85				sinks: RwLock::new(HashMap::new()),
86				analyzer: RwLock::new(FlowGraphAnalyzer::new()),
87				event_bus,
88				flow_creation_versions: RwLock::new(HashMap::new()),
89			}),
90		}
91	}
92
93	/// Load FFI operators from a directory into the global loader
94	#[instrument(name = "flow::engine::load_ffi_operators", level = "debug", skip(event_bus), fields(dir = ?dir))]
95	fn load_ffi_operators(dir: &PathBuf, event_bus: &EventBus) -> reifydb_core::Result<()> {
96		let loader = ffi_operator_loader();
97
98		// Scan directory for shared libraries
99		let entries = read_dir(dir).unwrap();
100
101		for entry in entries {
102			let entry = entry.unwrap();
103			let path = entry.path();
104
105			if !path.is_file() {
106				continue;
107			}
108
109			let is_shared_lib = path.extension().map_or(false, |ext| ext == "so" || ext == "dylib");
110			if !is_shared_lib {
111				continue;
112			}
113
114			// Register the operator without instantiating it
115			let mut guard = loader.write().unwrap();
116			let info = match guard.register_operator(&path)? {
117				Some(info) => info,
118				None => {
119					// Not a valid FFI operator, skip silently
120					continue;
121				}
122			};
123
124			debug!("Registered FFI operator: {} from {:?}", info.operator, path);
125
126			// Convert column definitions to event format
127			fn convert_column_defs(columns: &[ColumnDefInfo]) -> Vec<OperatorColumnDef> {
128				columns.iter()
129					.map(|c| OperatorColumnDef {
130						name: c.name.clone(),
131						field_type: c.field_type,
132						description: c.description.clone(),
133					})
134					.collect()
135			}
136
137			// Emit event for loaded operator
138			let event_bus = event_bus.clone();
139			let event = FlowOperatorLoadedEvent {
140				operator: info.operator,
141				library_path: info.library_path,
142				api: info.api,
143				version: info.version,
144				description: info.description,
145				input: convert_column_defs(&info.input_columns),
146				output: convert_column_defs(&info.output_columns),
147				capabilities: info.capabilities,
148			};
149			// Only spawn if there's a tokio runtime available
150			if let Ok(handle) = tokio::runtime::Handle::try_current() {
151				handle.spawn(async move {
152					event_bus.emit(event).await;
153				});
154			}
155		}
156
157		Ok(())
158	}
159
160	/// Create an FFI operator instance from the global singleton loader
161	#[instrument(name = "flow::engine::create_ffi_operator", level = "debug", skip(self, config), fields(operator = %operator, node_id = ?node_id))]
162	pub(crate) fn create_ffi_operator(
163		&self,
164		operator: &str,
165		node_id: FlowNodeId,
166		config: &HashMap<String, Value>,
167	) -> crate::Result<BoxedOperator> {
168		let loader = ffi_operator_loader();
169		let mut loader_write = loader.write().unwrap();
170
171		// Serialize config to postcard
172		let config_bytes = postcard::to_stdvec(config)
173			.map_err(|e| Error(internal!("Failed to serialize operator config: {:?}", e)))?;
174
175		let operator = loader_write
176			.create_operator_by_name(operator, node_id, &config_bytes)
177			.map_err(|e| Error(internal!("Failed to create FFI operator: {:?}", e)))?;
178
179		Ok(Box::new(operator))
180	}
181
182	/// Check if an operator name corresponds to an FFI operator
183	pub(crate) fn is_ffi_operator(&self, operator: &str) -> bool {
184		let loader = ffi_operator_loader();
185		let loader_read = loader.read().unwrap();
186		loader_read.has_operator(operator)
187	}
188
189	pub async fn has_registered_flows(&self) -> bool {
190		!self.inner.flows.read().await.is_empty()
191	}
192
193	/// Returns a set of all currently registered flow IDs
194	pub async fn flow_ids(&self) -> HashSet<FlowId> {
195		self.inner.flows.read().await.keys().copied().collect()
196	}
197
198	/// Clears all registered flows, operators, sources, sinks, dependency graph, and backfill versions
199	pub async fn clear(&self) {
200		self.inner.operators.write().await.clear();
201		self.inner.flows.write().await.clear();
202		self.inner.sources.write().await.clear();
203		self.inner.sinks.write().await.clear();
204		self.inner.analyzer.write().await.clear();
205		self.inner.flow_creation_versions.write().await.clear();
206	}
207
208	pub async fn get_dependency_graph(&self) -> FlowDependencyGraph {
209		self.inner.analyzer.read().await.get_dependency_graph().clone()
210	}
211
212	pub async fn get_flows_depending_on_table(&self, table_id: TableId) -> Vec<FlowId> {
213		let analyzer = self.inner.analyzer.read().await;
214		let dependency_graph = analyzer.get_dependency_graph();
215		analyzer.get_flows_depending_on_table(dependency_graph, table_id)
216	}
217
218	pub async fn get_flows_depending_on_view(&self, view_id: ViewId) -> Vec<FlowId> {
219		let analyzer = self.inner.analyzer.read().await;
220		let dependency_graph = analyzer.get_dependency_graph();
221		analyzer.get_flows_depending_on_view(dependency_graph, view_id)
222	}
223
224	pub async fn get_flow_producing_view(&self, view_id: ViewId) -> Option<FlowId> {
225		let analyzer = self.inner.analyzer.read().await;
226		let dependency_graph = analyzer.get_dependency_graph();
227		analyzer.get_flow_producing_view(dependency_graph, view_id)
228	}
229
230	pub async fn calculate_execution_order(&self) -> Vec<FlowId> {
231		let analyzer = self.inner.analyzer.read().await;
232		let dependency_graph = analyzer.get_dependency_graph();
233		analyzer.calculate_execution_order(dependency_graph)
234	}
235}