Skip to main content

reifydb_sub_flow/engine/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4pub mod eval;
5pub mod process;
6pub mod register;
7
8use std::{
9	collections::{BTreeMap, BTreeSet, HashMap},
10	sync::Arc,
11};
12
13#[cfg(reifydb_target = "native")]
14use postcard::to_stdvec;
15use reifydb_catalog::catalog::Catalog;
16#[cfg(reifydb_target = "native")]
17use reifydb_core::internal;
18use reifydb_core::{
19	common::CommitVersion,
20	event::EventBus,
21	interface::catalog::{
22		flow::{FlowId, FlowNodeId},
23		id::{TableId, ViewId},
24		schema::SchemaId,
25	},
26};
27use reifydb_engine::vm::executor::Executor;
28#[cfg(reifydb_target = "native")]
29use reifydb_extension::operator::ffi_loader::ffi_operator_loader;
30use reifydb_rql::flow::{
31	analyzer::{FlowDependencyGraph, FlowGraphAnalyzer},
32	flow::FlowDag,
33};
34use reifydb_runtime::context::RuntimeContext;
35#[cfg(reifydb_target = "native")]
36use reifydb_type::{Result, error::Error, value::Value};
37use tracing::instrument;
38
39#[cfg(reifydb_target = "native")]
40use crate::operator::BoxedOperator;
41#[cfg(reifydb_target = "native")]
42use crate::operator::ffi::FFIOperator;
43use crate::{builder::OperatorFactory, operator::Operators};
44
45pub struct FlowEngine {
46	pub(crate) catalog: Catalog,
47	pub(crate) executor: Executor,
48	pub(crate) operators: BTreeMap<FlowNodeId, Arc<Operators>>,
49	pub(crate) flows: BTreeMap<FlowId, FlowDag>,
50	pub(crate) sources: BTreeMap<SchemaId, Vec<(FlowId, FlowNodeId)>>,
51	pub(crate) sinks: BTreeMap<SchemaId, Vec<(FlowId, FlowNodeId)>>,
52	pub(crate) analyzer: FlowGraphAnalyzer,
53	#[allow(dead_code)]
54	pub(crate) event_bus: EventBus,
55	pub(crate) flow_creation_versions: BTreeMap<FlowId, CommitVersion>,
56	pub(crate) runtime_context: RuntimeContext,
57	pub(crate) custom_operators: Arc<HashMap<String, OperatorFactory>>,
58}
59
60impl FlowEngine {
61	#[instrument(
62		name = "flow::engine::new",
63		level = "debug",
64		skip(catalog, executor, event_bus, runtime_context, custom_operators)
65	)]
66	pub fn new(
67		catalog: Catalog,
68		executor: Executor,
69		event_bus: EventBus,
70		runtime_context: RuntimeContext,
71		custom_operators: Arc<HashMap<String, OperatorFactory>>,
72	) -> Self {
73		Self {
74			catalog,
75			executor,
76			operators: BTreeMap::new(),
77			flows: BTreeMap::new(),
78			sources: BTreeMap::new(),
79			sinks: BTreeMap::new(),
80			analyzer: FlowGraphAnalyzer::new(),
81			event_bus,
82			flow_creation_versions: BTreeMap::new(),
83			runtime_context,
84			custom_operators,
85		}
86	}
87
88	/// Create an FFI operator instance from the global singleton loader
89	#[cfg(reifydb_target = "native")]
90	#[instrument(name = "flow::engine::create_ffi_operator", level = "debug", skip(self, config), fields(operator = %operator, node_id = ?node_id))]
91	pub(crate) fn create_ffi_operator(
92		&self,
93		operator: &str,
94		node_id: FlowNodeId,
95		config: &BTreeMap<String, Value>,
96	) -> Result<BoxedOperator> {
97		let loader = ffi_operator_loader();
98		let mut loader_write = loader.write().unwrap();
99
100		// Serialize config to postcard
101		let config_bytes = to_stdvec(config)
102			.map_err(|e| Error(internal!("Failed to serialize operator config: {:?}", e)))?;
103
104		let (descriptor, instance) = loader_write
105			.create_operator_by_name(operator, node_id, &config_bytes)
106			.map_err(|e| Error(internal!("Failed to create FFI operator: {:?}", e)))?;
107
108		Ok(Box::new(FFIOperator::new(descriptor, instance, node_id, self.executor.clone())))
109	}
110
111	/// Check if an operator name corresponds to an FFI operator
112	#[cfg(reifydb_target = "native")]
113	pub(crate) fn is_ffi_operator(&self, operator: &str) -> bool {
114		let loader = ffi_operator_loader();
115		let loader_read = loader.read().unwrap();
116		loader_read.has_operator(operator)
117	}
118
119	/// FFI operators are not supported in WASM
120	#[cfg(not(reifydb_target = "native"))]
121	#[allow(dead_code)]
122	pub(crate) fn is_ffi_operator(&self, _operator: &str) -> bool {
123		false
124	}
125
126	/// Returns a set of all currently registered flow IDs
127	pub fn flow_ids(&self) -> BTreeSet<FlowId> {
128		self.flows.keys().copied().collect()
129	}
130
131	/// Clears all registered flows, operators, sources, sinks, dependency graph, and backfill versions
132	pub fn clear(&mut self) {
133		self.operators.clear();
134		self.flows.clear();
135		self.sources.clear();
136		self.sinks.clear();
137		self.analyzer.clear();
138		self.flow_creation_versions.clear();
139	}
140
141	pub fn get_dependency_graph(&self) -> FlowDependencyGraph {
142		self.analyzer.get_dependency_graph().clone()
143	}
144
145	pub fn get_flows_depending_on_table(&self, table_id: TableId) -> Vec<FlowId> {
146		let dependency_graph = self.analyzer.get_dependency_graph();
147		self.analyzer.get_flows_depending_on_table(dependency_graph, table_id)
148	}
149
150	pub fn get_flows_depending_on_view(&self, view_id: ViewId) -> Vec<FlowId> {
151		let dependency_graph = self.analyzer.get_dependency_graph();
152		self.analyzer.get_flows_depending_on_view(dependency_graph, view_id)
153	}
154
155	pub fn get_flow_producing_view(&self, view_id: ViewId) -> Option<FlowId> {
156		let dependency_graph = self.analyzer.get_dependency_graph();
157		self.analyzer.get_flow_producing_view(dependency_graph, view_id)
158	}
159
160	pub fn calculate_execution_order(&self) -> Vec<FlowId> {
161		let dependency_graph = self.analyzer.get_dependency_graph();
162		self.analyzer.calculate_execution_order(dependency_graph)
163	}
164}