reifydb_sub_flow/engine/
mod.rs1mod partition;
5mod process;
6mod register;
7
8use std::{collections::HashMap, ffi::CStr, fs::read_dir, path::PathBuf, sync::Arc};
9
10use parking_lot::RwLock;
11use reifydb_core::{
12 Error,
13 event::{EventBus, flow::FlowOperatorLoadedEvent},
14 interface::{FlowId, FlowNodeId, SourceId, TableId, ViewId},
15 log_debug, log_error,
16};
17use reifydb_engine::{StandardRowEvaluator, execute::Executor};
18use reifydb_rql::flow::{Flow, FlowDependencyGraph, FlowGraphAnalyzer};
19use reifydb_type::internal;
20
21use crate::{
22 ffi::loader::ffi_operator_loader,
23 operator::{BoxedOperator, Operators, transform::registry::TransformOperatorRegistry},
24};
25
26pub(crate) struct FlowEngineInner {
27 pub(crate) evaluator: StandardRowEvaluator,
28 pub(crate) executor: Executor,
29 pub(crate) registry: TransformOperatorRegistry,
30 pub(crate) operators: RwLock<HashMap<FlowNodeId, Arc<Operators>>>,
31 pub(crate) flows: RwLock<HashMap<FlowId, Flow>>,
32 pub(crate) sources: RwLock<HashMap<SourceId, Vec<(FlowId, FlowNodeId)>>>,
33 pub(crate) sinks: RwLock<HashMap<SourceId, Vec<(FlowId, FlowNodeId)>>>,
34 pub(crate) analyzer: RwLock<FlowGraphAnalyzer>,
35 pub(crate) event_bus: EventBus,
36}
37
38pub struct FlowEngine {
39 pub(crate) inner: Arc<FlowEngineInner>,
40}
41
42impl Clone for FlowEngine {
43 fn clone(&self) -> Self {
44 Self {
45 inner: Arc::clone(&self.inner),
46 }
47 }
48}
49
50impl FlowEngine {
51 pub fn new(
52 evaluator: StandardRowEvaluator,
53 executor: Executor,
54 registry: TransformOperatorRegistry,
55 event_bus: EventBus,
56 operators_dir: Option<PathBuf>,
57 ) -> Self {
58 if let Some(dir) = operators_dir {
60 if let Err(e) = Self::load_ffi_operators(&dir, &event_bus) {
61 log_error!("Failed to load FFI operators from {:?}: {}", dir, e);
62 }
63 }
64
65 Self {
66 inner: Arc::new(FlowEngineInner {
67 evaluator,
68 executor,
69 registry,
70 operators: RwLock::new(HashMap::new()),
71 flows: RwLock::new(HashMap::new()),
72 sources: RwLock::new(HashMap::new()),
73 sinks: RwLock::new(HashMap::new()),
74 analyzer: RwLock::new(FlowGraphAnalyzer::new()),
75 event_bus,
76 }),
77 }
78 }
79
80 fn load_ffi_operators(dir: &PathBuf, event_bus: &EventBus) -> reifydb_core::Result<()> {
82 let loader = ffi_operator_loader();
83
84 let entries = read_dir(dir).unwrap();
86
87 for entry in entries {
88 let entry = entry.unwrap();
89 let path = entry.path();
90
91 if !path.is_file() {
92 continue;
93 }
94
95 let is_shared_lib = path.extension().map_or(false, |ext| ext == "so" || ext == "dylib");
96 if !is_shared_lib {
97 continue;
98 }
99
100 let mut guard = loader.write();
103 let temp_operator = guard.load_operator(&path, &[], FlowNodeId(0))?;
104
105 let descriptor = temp_operator.descriptor();
107 let operator_name =
108 unsafe { CStr::from_ptr(descriptor.operator_name).to_str().unwrap().to_string() };
109 let api_version = descriptor.api_version;
110
111 log_debug!("Registered FFI operator: {} from {:?}", operator_name, path);
112
113 event_bus.emit(FlowOperatorLoadedEvent {
115 operator_name: operator_name.clone(),
116 library_path: path.clone(),
117 api_version,
118 });
119 }
120
121 Ok(())
122 }
123
124 pub(crate) fn create_ffi_operator(
126 &self,
127 operator_name: &str,
128 node_id: FlowNodeId,
129 ) -> crate::Result<BoxedOperator> {
130 let loader = ffi_operator_loader();
131 let mut loader_write = loader.write();
132
133 let operator = loader_write
134 .create_operator_by_name(operator_name, node_id, &[])
135 .map_err(|e| Error(internal!("Failed to create FFI operator: {:?}", e)))?;
136
137 Ok(Box::new(operator))
138 }
139
140 pub(crate) fn is_ffi_operator(&self, operator_name: &str) -> bool {
142 let loader = ffi_operator_loader();
143 let loader_read = loader.read();
144 loader_read.has_operator(operator_name)
145 }
146
147 pub fn has_registered_flows(&self) -> bool {
148 !self.inner.flows.read().is_empty()
149 }
150
151 pub fn clear(&self) {
153 self.inner.operators.write().clear();
154 self.inner.flows.write().clear();
155 self.inner.sources.write().clear();
156 self.inner.sinks.write().clear();
157 self.inner.analyzer.write().clear();
158 }
159
160 pub fn get_dependency_graph(&self) -> FlowDependencyGraph {
161 self.inner.analyzer.read().get_dependency_graph().clone()
162 }
163
164 pub fn get_flows_depending_on_table(&self, table_id: TableId) -> Vec<FlowId> {
165 let analyzer = self.inner.analyzer.read();
166 let dependency_graph = analyzer.get_dependency_graph();
167 analyzer.get_flows_depending_on_table(dependency_graph, table_id)
168 }
169
170 pub fn get_flows_depending_on_view(&self, view_id: ViewId) -> Vec<FlowId> {
171 let analyzer = self.inner.analyzer.read();
172 let dependency_graph = analyzer.get_dependency_graph();
173 analyzer.get_flows_depending_on_view(dependency_graph, view_id)
174 }
175
176 pub fn get_flow_producing_view(&self, view_id: ViewId) -> Option<FlowId> {
177 let analyzer = self.inner.analyzer.read();
178 let dependency_graph = analyzer.get_dependency_graph();
179 analyzer.get_flow_producing_view(dependency_graph, view_id)
180 }
181
182 pub fn calculate_execution_order(&self) -> Vec<FlowId> {
183 let analyzer = self.inner.analyzer.read();
184 let dependency_graph = analyzer.get_dependency_graph();
185 analyzer.calculate_execution_order(dependency_graph)
186 }
187}