1mod backfill;
5mod eval;
6mod partition;
7mod process;
8mod register;
9
10use std::{
11 collections::{HashMap, HashSet},
12 fs::read_dir,
13 path::PathBuf,
14 sync::Arc,
15};
16
17use reifydb_core::{
18 CommitVersion, Error,
19 event::{
20 EventBus,
21 flow::{FlowOperatorLoadedEvent, OperatorColumnDef},
22 },
23 interface::{FlowId, FlowNodeId, SourceId, TableId, ViewId},
24};
25use reifydb_engine::{StandardRowEvaluator, execute::Executor};
26use reifydb_rql::flow::{Flow, FlowDependencyGraph, FlowGraphAnalyzer};
27use reifydb_type::{Value, internal};
28use tokio::sync::RwLock;
29use tracing::{debug, error, instrument};
30
31use crate::{
32 ffi::loader::{ColumnDefInfo, ffi_operator_loader},
33 operator::{BoxedOperator, Operators, transform::registry::TransformOperatorRegistry},
34};
35
36pub(crate) struct FlowEngineInner {
37 pub(crate) evaluator: StandardRowEvaluator,
38 pub(crate) executor: Executor,
39 pub(crate) registry: TransformOperatorRegistry,
40 pub(crate) operators: RwLock<HashMap<FlowNodeId, Arc<Operators>>>,
41 pub(crate) flows: RwLock<HashMap<FlowId, Flow>>,
42 pub(crate) sources: RwLock<HashMap<SourceId, Vec<(FlowId, FlowNodeId)>>>,
43 pub(crate) sinks: RwLock<HashMap<SourceId, Vec<(FlowId, FlowNodeId)>>>,
44 pub(crate) analyzer: RwLock<FlowGraphAnalyzer>,
45 pub(crate) event_bus: EventBus,
46 pub(crate) flow_creation_versions: RwLock<HashMap<FlowId, CommitVersion>>,
47}
48
49pub struct FlowEngine {
50 pub(crate) inner: Arc<FlowEngineInner>,
51}
52
53impl Clone for FlowEngine {
54 fn clone(&self) -> Self {
55 Self {
56 inner: Arc::clone(&self.inner),
57 }
58 }
59}
60
61impl FlowEngine {
62 #[instrument(name = "flow::engine::new", level = "info", skip(evaluator, executor, registry, event_bus), fields(operators_dir = ?operators_dir))]
63 pub fn new(
64 evaluator: StandardRowEvaluator,
65 executor: Executor,
66 registry: TransformOperatorRegistry,
67 event_bus: EventBus,
68 operators_dir: Option<PathBuf>,
69 ) -> Self {
70 if let Some(dir) = operators_dir {
72 if let Err(e) = Self::load_ffi_operators(&dir, &event_bus) {
73 error!("Failed to load FFI operators from {:?}: {}", dir, e);
74 }
75 }
76
77 Self {
78 inner: Arc::new(FlowEngineInner {
79 evaluator,
80 executor,
81 registry,
82 operators: RwLock::new(HashMap::new()),
83 flows: RwLock::new(HashMap::new()),
84 sources: RwLock::new(HashMap::new()),
85 sinks: RwLock::new(HashMap::new()),
86 analyzer: RwLock::new(FlowGraphAnalyzer::new()),
87 event_bus,
88 flow_creation_versions: RwLock::new(HashMap::new()),
89 }),
90 }
91 }
92
93 #[instrument(name = "flow::engine::load_ffi_operators", level = "debug", skip(event_bus), fields(dir = ?dir))]
95 fn load_ffi_operators(dir: &PathBuf, event_bus: &EventBus) -> reifydb_core::Result<()> {
96 let loader = ffi_operator_loader();
97
98 let entries = read_dir(dir).unwrap();
100
101 for entry in entries {
102 let entry = entry.unwrap();
103 let path = entry.path();
104
105 if !path.is_file() {
106 continue;
107 }
108
109 let is_shared_lib = path.extension().map_or(false, |ext| ext == "so" || ext == "dylib");
110 if !is_shared_lib {
111 continue;
112 }
113
114 let mut guard = loader.write().unwrap();
116 let info = match guard.register_operator(&path)? {
117 Some(info) => info,
118 None => {
119 continue;
121 }
122 };
123
124 debug!("Registered FFI operator: {} from {:?}", info.operator, path);
125
126 fn convert_column_defs(columns: &[ColumnDefInfo]) -> Vec<OperatorColumnDef> {
128 columns.iter()
129 .map(|c| OperatorColumnDef {
130 name: c.name.clone(),
131 field_type: c.field_type,
132 description: c.description.clone(),
133 })
134 .collect()
135 }
136
137 let event_bus = event_bus.clone();
139 let event = FlowOperatorLoadedEvent {
140 operator: info.operator,
141 library_path: info.library_path,
142 api: info.api,
143 version: info.version,
144 description: info.description,
145 input: convert_column_defs(&info.input_columns),
146 output: convert_column_defs(&info.output_columns),
147 capabilities: info.capabilities,
148 };
149 if let Ok(handle) = tokio::runtime::Handle::try_current() {
151 handle.spawn(async move {
152 event_bus.emit(event).await;
153 });
154 }
155 }
156
157 Ok(())
158 }
159
160 #[instrument(name = "flow::engine::create_ffi_operator", level = "debug", skip(self, config), fields(operator = %operator, node_id = ?node_id))]
162 pub(crate) fn create_ffi_operator(
163 &self,
164 operator: &str,
165 node_id: FlowNodeId,
166 config: &HashMap<String, Value>,
167 ) -> crate::Result<BoxedOperator> {
168 let loader = ffi_operator_loader();
169 let mut loader_write = loader.write().unwrap();
170
171 let config_bytes = postcard::to_stdvec(config)
173 .map_err(|e| Error(internal!("Failed to serialize operator config: {:?}", e)))?;
174
175 let operator = loader_write
176 .create_operator_by_name(operator, node_id, &config_bytes)
177 .map_err(|e| Error(internal!("Failed to create FFI operator: {:?}", e)))?;
178
179 Ok(Box::new(operator))
180 }
181
182 pub(crate) fn is_ffi_operator(&self, operator: &str) -> bool {
184 let loader = ffi_operator_loader();
185 let loader_read = loader.read().unwrap();
186 loader_read.has_operator(operator)
187 }
188
189 pub async fn has_registered_flows(&self) -> bool {
190 !self.inner.flows.read().await.is_empty()
191 }
192
193 pub async fn flow_ids(&self) -> HashSet<FlowId> {
195 self.inner.flows.read().await.keys().copied().collect()
196 }
197
198 pub async fn clear(&self) {
200 self.inner.operators.write().await.clear();
201 self.inner.flows.write().await.clear();
202 self.inner.sources.write().await.clear();
203 self.inner.sinks.write().await.clear();
204 self.inner.analyzer.write().await.clear();
205 self.inner.flow_creation_versions.write().await.clear();
206 }
207
208 pub async fn get_dependency_graph(&self) -> FlowDependencyGraph {
209 self.inner.analyzer.read().await.get_dependency_graph().clone()
210 }
211
212 pub async fn get_flows_depending_on_table(&self, table_id: TableId) -> Vec<FlowId> {
213 let analyzer = self.inner.analyzer.read().await;
214 let dependency_graph = analyzer.get_dependency_graph();
215 analyzer.get_flows_depending_on_table(dependency_graph, table_id)
216 }
217
218 pub async fn get_flows_depending_on_view(&self, view_id: ViewId) -> Vec<FlowId> {
219 let analyzer = self.inner.analyzer.read().await;
220 let dependency_graph = analyzer.get_dependency_graph();
221 analyzer.get_flows_depending_on_view(dependency_graph, view_id)
222 }
223
224 pub async fn get_flow_producing_view(&self, view_id: ViewId) -> Option<FlowId> {
225 let analyzer = self.inner.analyzer.read().await;
226 let dependency_graph = analyzer.get_dependency_graph();
227 analyzer.get_flow_producing_view(dependency_graph, view_id)
228 }
229
230 pub async fn calculate_execution_order(&self) -> Vec<FlowId> {
231 let analyzer = self.inner.analyzer.read().await;
232 let dependency_graph = analyzer.get_dependency_graph();
233 analyzer.calculate_execution_order(dependency_graph)
234 }
235}