1mod backfill;
5mod eval;
6mod partition;
7mod process;
8mod register;
9
10use std::{
11 collections::{HashMap, HashSet},
12 fs::read_dir,
13 path::PathBuf,
14 sync::Arc,
15};
16
17use parking_lot::RwLock;
18use reifydb_core::{
19 CommitVersion, Error,
20 event::{EventBus, flow::FlowOperatorLoadedEvent},
21 interface::{FlowId, FlowNodeId, SourceId, TableId, ViewId},
22 log_debug, log_error,
23};
24use reifydb_engine::{StandardRowEvaluator, execute::Executor};
25use reifydb_rql::flow::{Flow, FlowDependencyGraph, FlowGraphAnalyzer};
26use reifydb_type::{Value, internal};
27
28use crate::{
29 ffi::loader::ffi_operator_loader,
30 operator::{BoxedOperator, Operators, transform::registry::TransformOperatorRegistry},
31};
32
33pub(crate) struct FlowEngineInner {
34 pub(crate) evaluator: StandardRowEvaluator,
35 pub(crate) executor: Executor,
36 pub(crate) registry: TransformOperatorRegistry,
37 pub(crate) operators: RwLock<HashMap<FlowNodeId, Arc<Operators>>>,
38 pub(crate) flows: RwLock<HashMap<FlowId, Flow>>,
39 pub(crate) sources: RwLock<HashMap<SourceId, Vec<(FlowId, FlowNodeId)>>>,
40 pub(crate) sinks: RwLock<HashMap<SourceId, Vec<(FlowId, FlowNodeId)>>>,
41 pub(crate) analyzer: RwLock<FlowGraphAnalyzer>,
42 pub(crate) event_bus: EventBus,
43 pub(crate) flow_creation_versions: RwLock<HashMap<FlowId, CommitVersion>>,
44}
45
46pub struct FlowEngine {
47 pub(crate) inner: Arc<FlowEngineInner>,
48}
49
50impl Clone for FlowEngine {
51 fn clone(&self) -> Self {
52 Self {
53 inner: Arc::clone(&self.inner),
54 }
55 }
56}
57
58impl FlowEngine {
59 pub fn new(
60 evaluator: StandardRowEvaluator,
61 executor: Executor,
62 registry: TransformOperatorRegistry,
63 event_bus: EventBus,
64 operators_dir: Option<PathBuf>,
65 ) -> Self {
66 if let Some(dir) = operators_dir {
68 if let Err(e) = Self::load_ffi_operators(&dir, &event_bus) {
69 log_error!("Failed to load FFI operators from {:?}: {}", dir, e);
70 }
71 }
72
73 Self {
74 inner: Arc::new(FlowEngineInner {
75 evaluator,
76 executor,
77 registry,
78 operators: RwLock::new(HashMap::new()),
79 flows: RwLock::new(HashMap::new()),
80 sources: RwLock::new(HashMap::new()),
81 sinks: RwLock::new(HashMap::new()),
82 analyzer: RwLock::new(FlowGraphAnalyzer::new()),
83 event_bus,
84 flow_creation_versions: RwLock::new(HashMap::new()),
85 }),
86 }
87 }
88
89 fn load_ffi_operators(dir: &PathBuf, event_bus: &EventBus) -> reifydb_core::Result<()> {
91 let loader = ffi_operator_loader();
92
93 let entries = read_dir(dir).unwrap();
95
96 for entry in entries {
97 let entry = entry.unwrap();
98 let path = entry.path();
99
100 if !path.is_file() {
101 continue;
102 }
103
104 let is_shared_lib = path.extension().map_or(false, |ext| ext == "so" || ext == "dylib");
105 if !is_shared_lib {
106 continue;
107 }
108
109 let mut guard = loader.write();
111 let (operator_name, api_version) = match guard.register_operator(&path)? {
112 Some(info) => info,
113 None => {
114 continue;
116 }
117 };
118
119 log_debug!("Registered FFI operator: {} from {:?}", operator_name, path);
120
121 event_bus.emit(FlowOperatorLoadedEvent {
123 operator_name: operator_name.clone(),
124 library_path: path.clone(),
125 api_version,
126 });
127 }
128
129 Ok(())
130 }
131
132 pub(crate) fn create_ffi_operator(
134 &self,
135 operator_name: &str,
136 node_id: FlowNodeId,
137 config: &HashMap<String, Value>,
138 ) -> crate::Result<BoxedOperator> {
139 let loader = ffi_operator_loader();
140 let mut loader_write = loader.write();
141
142 let config_bytes = bincode::serde::encode_to_vec(config, bincode::config::standard())
144 .map_err(|e| Error(internal!("Failed to serialize operator config: {:?}", e)))?;
145
146 let operator = loader_write
147 .create_operator_by_name(operator_name, node_id, &config_bytes)
148 .map_err(|e| Error(internal!("Failed to create FFI operator: {:?}", e)))?;
149
150 Ok(Box::new(operator))
151 }
152
153 pub(crate) fn is_ffi_operator(&self, operator_name: &str) -> bool {
155 let loader = ffi_operator_loader();
156 let loader_read = loader.read();
157 loader_read.has_operator(operator_name)
158 }
159
160 pub fn has_registered_flows(&self) -> bool {
161 !self.inner.flows.read().is_empty()
162 }
163
164 pub fn flow_ids(&self) -> HashSet<FlowId> {
166 self.inner.flows.read().keys().copied().collect()
167 }
168
169 pub fn clear(&self) {
171 self.inner.operators.write().clear();
172 self.inner.flows.write().clear();
173 self.inner.sources.write().clear();
174 self.inner.sinks.write().clear();
175 self.inner.analyzer.write().clear();
176 self.inner.flow_creation_versions.write().clear();
177 }
178
179 pub fn get_dependency_graph(&self) -> FlowDependencyGraph {
180 self.inner.analyzer.read().get_dependency_graph().clone()
181 }
182
183 pub fn get_flows_depending_on_table(&self, table_id: TableId) -> Vec<FlowId> {
184 let analyzer = self.inner.analyzer.read();
185 let dependency_graph = analyzer.get_dependency_graph();
186 analyzer.get_flows_depending_on_table(dependency_graph, table_id)
187 }
188
189 pub fn get_flows_depending_on_view(&self, view_id: ViewId) -> Vec<FlowId> {
190 let analyzer = self.inner.analyzer.read();
191 let dependency_graph = analyzer.get_dependency_graph();
192 analyzer.get_flows_depending_on_view(dependency_graph, view_id)
193 }
194
195 pub fn get_flow_producing_view(&self, view_id: ViewId) -> Option<FlowId> {
196 let analyzer = self.inner.analyzer.read();
197 let dependency_graph = analyzer.get_dependency_graph();
198 analyzer.get_flow_producing_view(dependency_graph, view_id)
199 }
200
201 pub fn calculate_execution_order(&self) -> Vec<FlowId> {
202 let analyzer = self.inner.analyzer.read();
203 let dependency_graph = analyzer.get_dependency_graph();
204 analyzer.calculate_execution_order(dependency_graph)
205 }
206}