reifydb_sub_flow/engine/
mod.rs1mod eval;
5mod partition;
6mod process;
7mod register;
8
9use std::{collections::HashMap, ffi::CStr, fs::read_dir, path::PathBuf, sync::Arc};
10
11use parking_lot::RwLock;
12use reifydb_core::{
13 Error,
14 event::{EventBus, flow::FlowOperatorLoadedEvent},
15 interface::{FlowId, FlowNodeId, SourceId, TableId, ViewId},
16 log_debug, log_error,
17};
18use reifydb_engine::{StandardRowEvaluator, execute::Executor};
19use reifydb_rql::flow::{Flow, FlowDependencyGraph, FlowGraphAnalyzer};
20use reifydb_type::{Value, internal};
21
22use crate::{
23 ffi::loader::ffi_operator_loader,
24 operator::{BoxedOperator, Operators, transform::registry::TransformOperatorRegistry},
25};
26
27pub(crate) struct FlowEngineInner {
28 pub(crate) evaluator: StandardRowEvaluator,
29 pub(crate) executor: Executor,
30 pub(crate) registry: TransformOperatorRegistry,
31 pub(crate) operators: RwLock<HashMap<FlowNodeId, Arc<Operators>>>,
32 pub(crate) flows: RwLock<HashMap<FlowId, Flow>>,
33 pub(crate) sources: RwLock<HashMap<SourceId, Vec<(FlowId, FlowNodeId)>>>,
34 pub(crate) sinks: RwLock<HashMap<SourceId, Vec<(FlowId, FlowNodeId)>>>,
35 pub(crate) analyzer: RwLock<FlowGraphAnalyzer>,
36 pub(crate) event_bus: EventBus,
37}
38
39pub struct FlowEngine {
40 pub(crate) inner: Arc<FlowEngineInner>,
41}
42
43impl Clone for FlowEngine {
44 fn clone(&self) -> Self {
45 Self {
46 inner: Arc::clone(&self.inner),
47 }
48 }
49}
50
51impl FlowEngine {
52 pub fn new(
53 evaluator: StandardRowEvaluator,
54 executor: Executor,
55 registry: TransformOperatorRegistry,
56 event_bus: EventBus,
57 operators_dir: Option<PathBuf>,
58 ) -> Self {
59 if let Some(dir) = operators_dir {
61 if let Err(e) = Self::load_ffi_operators(&dir, &event_bus) {
62 log_error!("Failed to load FFI operators from {:?}: {}", dir, e);
63 }
64 }
65
66 Self {
67 inner: Arc::new(FlowEngineInner {
68 evaluator,
69 executor,
70 registry,
71 operators: RwLock::new(HashMap::new()),
72 flows: RwLock::new(HashMap::new()),
73 sources: RwLock::new(HashMap::new()),
74 sinks: RwLock::new(HashMap::new()),
75 analyzer: RwLock::new(FlowGraphAnalyzer::new()),
76 event_bus,
77 }),
78 }
79 }
80
81 fn load_ffi_operators(dir: &PathBuf, event_bus: &EventBus) -> reifydb_core::Result<()> {
83 let loader = ffi_operator_loader();
84
85 let entries = read_dir(dir).unwrap();
87
88 for entry in entries {
89 let entry = entry.unwrap();
90 let path = entry.path();
91
92 if !path.is_file() {
93 continue;
94 }
95
96 let is_shared_lib = path.extension().map_or(false, |ext| ext == "so" || ext == "dylib");
97 if !is_shared_lib {
98 continue;
99 }
100
101 let mut guard = loader.write();
104 let temp_operator = guard.load_operator(&path, &[], FlowNodeId(0))?;
105
106 let descriptor = temp_operator.descriptor();
108 let operator_name =
109 unsafe { CStr::from_ptr(descriptor.operator_name).to_str().unwrap().to_string() };
110 let api_version = descriptor.api_version;
111
112 log_debug!("Registered FFI operator: {} from {:?}", operator_name, path);
113
114 event_bus.emit(FlowOperatorLoadedEvent {
116 operator_name: operator_name.clone(),
117 library_path: path.clone(),
118 api_version,
119 });
120 }
121
122 Ok(())
123 }
124
125 pub(crate) fn create_ffi_operator(
127 &self,
128 operator_name: &str,
129 node_id: FlowNodeId,
130 config: &HashMap<String, Value>,
131 ) -> crate::Result<BoxedOperator> {
132 let loader = ffi_operator_loader();
133 let mut loader_write = loader.write();
134
135 let config_bytes = bincode::serde::encode_to_vec(config, bincode::config::standard())
137 .map_err(|e| Error(internal!("Failed to serialize operator config: {:?}", e)))?;
138
139 let operator = loader_write
140 .create_operator_by_name(operator_name, node_id, &config_bytes)
141 .map_err(|e| Error(internal!("Failed to create FFI operator: {:?}", e)))?;
142
143 Ok(Box::new(operator))
144 }
145
146 pub(crate) fn is_ffi_operator(&self, operator_name: &str) -> bool {
148 let loader = ffi_operator_loader();
149 let loader_read = loader.read();
150 loader_read.has_operator(operator_name)
151 }
152
153 pub fn has_registered_flows(&self) -> bool {
154 !self.inner.flows.read().is_empty()
155 }
156
157 pub fn clear(&self) {
159 self.inner.operators.write().clear();
160 self.inner.flows.write().clear();
161 self.inner.sources.write().clear();
162 self.inner.sinks.write().clear();
163 self.inner.analyzer.write().clear();
164 }
165
166 pub fn get_dependency_graph(&self) -> FlowDependencyGraph {
167 self.inner.analyzer.read().get_dependency_graph().clone()
168 }
169
170 pub fn get_flows_depending_on_table(&self, table_id: TableId) -> Vec<FlowId> {
171 let analyzer = self.inner.analyzer.read();
172 let dependency_graph = analyzer.get_dependency_graph();
173 analyzer.get_flows_depending_on_table(dependency_graph, table_id)
174 }
175
176 pub fn get_flows_depending_on_view(&self, view_id: ViewId) -> Vec<FlowId> {
177 let analyzer = self.inner.analyzer.read();
178 let dependency_graph = analyzer.get_dependency_graph();
179 analyzer.get_flows_depending_on_view(dependency_graph, view_id)
180 }
181
182 pub fn get_flow_producing_view(&self, view_id: ViewId) -> Option<FlowId> {
183 let analyzer = self.inner.analyzer.read();
184 let dependency_graph = analyzer.get_dependency_graph();
185 analyzer.get_flow_producing_view(dependency_graph, view_id)
186 }
187
188 pub fn calculate_execution_order(&self) -> Vec<FlowId> {
189 let analyzer = self.inner.analyzer.read();
190 let dependency_graph = analyzer.get_dependency_graph();
191 analyzer.calculate_execution_order(dependency_graph)
192 }
193}