1mod backfill;
5mod eval;
6mod partition;
7mod process;
8mod register;
9
10use std::{
11 collections::{HashMap, HashSet},
12 fs::read_dir,
13 path::PathBuf,
14 sync::Arc,
15};
16
17use parking_lot::RwLock;
18use reifydb_core::{
19 CommitVersion, Error,
20 event::{
21 EventBus,
22 flow::{FlowOperatorLoadedEvent, OperatorColumnDef},
23 },
24 interface::{FlowId, FlowNodeId, SourceId, TableId, ViewId},
25};
26use reifydb_engine::{StandardRowEvaluator, execute::Executor};
27use reifydb_rql::flow::{Flow, FlowDependencyGraph, FlowGraphAnalyzer};
28use reifydb_type::{Value, internal};
29use tracing::{debug, error, instrument};
30
31use crate::{
32 ffi::loader::{ColumnDefInfo, ffi_operator_loader},
33 operator::{BoxedOperator, Operators, transform::registry::TransformOperatorRegistry},
34};
35
36pub(crate) struct FlowEngineInner {
37 pub(crate) evaluator: StandardRowEvaluator,
38 pub(crate) executor: Executor,
39 pub(crate) registry: TransformOperatorRegistry,
40 pub(crate) operators: RwLock<HashMap<FlowNodeId, Arc<Operators>>>,
41 pub(crate) flows: RwLock<HashMap<FlowId, Flow>>,
42 pub(crate) sources: RwLock<HashMap<SourceId, Vec<(FlowId, FlowNodeId)>>>,
43 pub(crate) sinks: RwLock<HashMap<SourceId, Vec<(FlowId, FlowNodeId)>>>,
44 pub(crate) analyzer: RwLock<FlowGraphAnalyzer>,
45 pub(crate) event_bus: EventBus,
46 pub(crate) flow_creation_versions: RwLock<HashMap<FlowId, CommitVersion>>,
47}
48
49pub struct FlowEngine {
50 pub(crate) inner: Arc<FlowEngineInner>,
51}
52
53impl Clone for FlowEngine {
54 fn clone(&self) -> Self {
55 Self {
56 inner: Arc::clone(&self.inner),
57 }
58 }
59}
60
61impl FlowEngine {
62 #[instrument(level = "info", skip(evaluator, executor, registry, event_bus), fields(operators_dir = ?operators_dir))]
63 pub fn new(
64 evaluator: StandardRowEvaluator,
65 executor: Executor,
66 registry: TransformOperatorRegistry,
67 event_bus: EventBus,
68 operators_dir: Option<PathBuf>,
69 ) -> Self {
70 if let Some(dir) = operators_dir {
72 if let Err(e) = Self::load_ffi_operators(&dir, &event_bus) {
73 error!("Failed to load FFI operators from {:?}: {}", dir, e);
74 }
75 }
76
77 Self {
78 inner: Arc::new(FlowEngineInner {
79 evaluator,
80 executor,
81 registry,
82 operators: RwLock::new(HashMap::new()),
83 flows: RwLock::new(HashMap::new()),
84 sources: RwLock::new(HashMap::new()),
85 sinks: RwLock::new(HashMap::new()),
86 analyzer: RwLock::new(FlowGraphAnalyzer::new()),
87 event_bus,
88 flow_creation_versions: RwLock::new(HashMap::new()),
89 }),
90 }
91 }
92
93 #[instrument(level = "debug", skip(event_bus), fields(dir = ?dir))]
95 fn load_ffi_operators(dir: &PathBuf, event_bus: &EventBus) -> reifydb_core::Result<()> {
96 let loader = ffi_operator_loader();
97
98 let entries = read_dir(dir).unwrap();
100
101 for entry in entries {
102 let entry = entry.unwrap();
103 let path = entry.path();
104
105 if !path.is_file() {
106 continue;
107 }
108
109 let is_shared_lib = path.extension().map_or(false, |ext| ext == "so" || ext == "dylib");
110 if !is_shared_lib {
111 continue;
112 }
113
114 let mut guard = loader.write();
116 let info = match guard.register_operator(&path)? {
117 Some(info) => info,
118 None => {
119 continue;
121 }
122 };
123
124 debug!("Registered FFI operator: {} from {:?}", info.operator, path);
125
126 fn convert_column_defs(columns: &[ColumnDefInfo]) -> Vec<OperatorColumnDef> {
128 columns.iter()
129 .map(|c| OperatorColumnDef {
130 name: c.name.clone(),
131 field_type: c.field_type,
132 description: c.description.clone(),
133 })
134 .collect()
135 }
136
137 event_bus.emit(FlowOperatorLoadedEvent {
139 operator: info.operator,
140 library_path: info.library_path,
141 api: info.api,
142 version: info.version,
143 description: info.description,
144 input: convert_column_defs(&info.input_columns),
145 output: convert_column_defs(&info.output_columns),
146 capabilities: info.capabilities,
147 });
148 }
149
150 Ok(())
151 }
152
153 #[instrument(level = "debug", skip(self, config), fields(operator = %operator, node_id = ?node_id))]
155 pub(crate) fn create_ffi_operator(
156 &self,
157 operator: &str,
158 node_id: FlowNodeId,
159 config: &HashMap<String, Value>,
160 ) -> crate::Result<BoxedOperator> {
161 let loader = ffi_operator_loader();
162 let mut loader_write = loader.write();
163
164 let config_bytes = bincode::serde::encode_to_vec(config, bincode::config::standard())
166 .map_err(|e| Error(internal!("Failed to serialize operator config: {:?}", e)))?;
167
168 let operator = loader_write
169 .create_operator_by_name(operator, node_id, &config_bytes)
170 .map_err(|e| Error(internal!("Failed to create FFI operator: {:?}", e)))?;
171
172 Ok(Box::new(operator))
173 }
174
175 pub(crate) fn is_ffi_operator(&self, operator: &str) -> bool {
177 let loader = ffi_operator_loader();
178 let loader_read = loader.read();
179 loader_read.has_operator(operator)
180 }
181
182 pub fn has_registered_flows(&self) -> bool {
183 !self.inner.flows.read().is_empty()
184 }
185
186 pub fn flow_ids(&self) -> HashSet<FlowId> {
188 self.inner.flows.read().keys().copied().collect()
189 }
190
191 pub fn clear(&self) {
193 self.inner.operators.write().clear();
194 self.inner.flows.write().clear();
195 self.inner.sources.write().clear();
196 self.inner.sinks.write().clear();
197 self.inner.analyzer.write().clear();
198 self.inner.flow_creation_versions.write().clear();
199 }
200
201 pub fn get_dependency_graph(&self) -> FlowDependencyGraph {
202 self.inner.analyzer.read().get_dependency_graph().clone()
203 }
204
205 pub fn get_flows_depending_on_table(&self, table_id: TableId) -> Vec<FlowId> {
206 let analyzer = self.inner.analyzer.read();
207 let dependency_graph = analyzer.get_dependency_graph();
208 analyzer.get_flows_depending_on_table(dependency_graph, table_id)
209 }
210
211 pub fn get_flows_depending_on_view(&self, view_id: ViewId) -> Vec<FlowId> {
212 let analyzer = self.inner.analyzer.read();
213 let dependency_graph = analyzer.get_dependency_graph();
214 analyzer.get_flows_depending_on_view(dependency_graph, view_id)
215 }
216
217 pub fn get_flow_producing_view(&self, view_id: ViewId) -> Option<FlowId> {
218 let analyzer = self.inner.analyzer.read();
219 let dependency_graph = analyzer.get_dependency_graph();
220 analyzer.get_flow_producing_view(dependency_graph, view_id)
221 }
222
223 pub fn calculate_execution_order(&self) -> Vec<FlowId> {
224 let analyzer = self.inner.analyzer.read();
225 let dependency_graph = analyzer.get_dependency_graph();
226 analyzer.calculate_execution_order(dependency_graph)
227 }
228}