1pub mod eval;
5pub mod process;
6pub mod register;
7
8use std::{
9 collections::{BTreeMap, BTreeSet, HashMap},
10 sync::Arc,
11};
12
13#[cfg(reifydb_target = "native")]
14use postcard::to_stdvec;
15use reifydb_catalog::catalog::Catalog;
16#[cfg(reifydb_target = "native")]
17use reifydb_core::internal;
18use reifydb_core::{
19 common::CommitVersion,
20 event::EventBus,
21 interface::catalog::{
22 flow::{FlowId, FlowNodeId},
23 id::{TableId, ViewId},
24 shape::ShapeId,
25 },
26};
27use reifydb_engine::vm::executor::Executor;
28#[cfg(reifydb_target = "native")]
29use reifydb_extension::operator::ffi_loader::ffi_operator_loader;
30use reifydb_rql::flow::{
31 analyzer::{FlowDependencyGraph, FlowGraphAnalyzer},
32 flow::FlowDag,
33};
34use reifydb_runtime::context::{RuntimeContext, clock::Clock};
35#[cfg(reifydb_target = "native")]
36use reifydb_type::{Result, error::Error, value::Value};
37use tracing::instrument;
38
39#[cfg(reifydb_target = "native")]
40use crate::operator::BoxedOperator;
41#[cfg(reifydb_target = "native")]
42use crate::operator::ffi::FFIOperator;
43use crate::{builder::OperatorFactory, operator::Operators};
44
45pub struct FlowEngine {
46 pub(crate) catalog: Catalog,
47 pub(crate) executor: Executor,
48 pub operators: BTreeMap<FlowNodeId, Arc<Operators>>,
49 pub flows: BTreeMap<FlowId, FlowDag>,
50 pub sources: BTreeMap<ShapeId, Vec<(FlowId, FlowNodeId)>>,
51 pub sinks: BTreeMap<ShapeId, Vec<(FlowId, FlowNodeId)>>,
52 pub analyzer: FlowGraphAnalyzer,
53 #[allow(dead_code)]
54 pub(crate) event_bus: EventBus,
55 pub(crate) flow_creation_versions: BTreeMap<FlowId, CommitVersion>,
56 pub(crate) runtime_context: RuntimeContext,
57 pub(crate) custom_operators: Arc<HashMap<String, OperatorFactory>>,
58}
59
60impl FlowEngine {
61 #[instrument(
62 name = "flow::engine::new",
63 level = "debug",
64 skip(catalog, executor, event_bus, runtime_context, custom_operators)
65 )]
66 pub fn new(
67 catalog: Catalog,
68 executor: Executor,
69 event_bus: EventBus,
70 runtime_context: RuntimeContext,
71 custom_operators: Arc<HashMap<String, OperatorFactory>>,
72 ) -> Self {
73 Self {
74 catalog,
75 executor,
76 operators: BTreeMap::new(),
77 flows: BTreeMap::new(),
78 sources: BTreeMap::new(),
79 sinks: BTreeMap::new(),
80 analyzer: FlowGraphAnalyzer::new(),
81 event_bus,
82 flow_creation_versions: BTreeMap::new(),
83 runtime_context,
84 custom_operators,
85 }
86 }
87
88 pub fn clock(&self) -> &Clock {
90 &self.runtime_context.clock
91 }
92
93 #[cfg(reifydb_target = "native")]
95 #[instrument(name = "flow::engine::create_ffi_operator", level = "debug", skip(self, config), fields(operator = %operator, node_id = ?node_id))]
96 pub(crate) fn create_ffi_operator(
97 &self,
98 operator: &str,
99 node_id: FlowNodeId,
100 config: &BTreeMap<String, Value>,
101 ) -> Result<BoxedOperator> {
102 let loader = ffi_operator_loader();
103 let mut loader_write = loader.write().unwrap();
104
105 let config_bytes = to_stdvec(config)
107 .map_err(|e| Error(Box::new(internal!("Failed to serialize operator config: {:?}", e))))?;
108
109 let (descriptor, instance) = loader_write
110 .create_operator_by_name(operator, node_id, &config_bytes)
111 .map_err(|e| Error(Box::new(internal!("Failed to create FFI operator: {:?}", e))))?;
112
113 Ok(Box::new(FFIOperator::new(descriptor, instance, node_id, self.executor.clone())))
114 }
115
116 #[cfg(reifydb_target = "native")]
118 pub(crate) fn is_ffi_operator(&self, operator: &str) -> bool {
119 let loader = ffi_operator_loader();
120 let loader_read = loader.read().unwrap();
121 loader_read.has_operator(operator)
122 }
123
124 #[cfg(not(reifydb_target = "native"))]
126 #[allow(dead_code)]
127 pub(crate) fn is_ffi_operator(&self, _operator: &str) -> bool {
128 false
129 }
130
131 pub fn flow_ids(&self) -> BTreeSet<FlowId> {
133 self.flows.keys().copied().collect()
134 }
135
136 pub fn clear(&mut self) {
138 self.operators.clear();
139 self.flows.clear();
140 self.sources.clear();
141 self.sinks.clear();
142 self.analyzer.clear();
143 self.flow_creation_versions.clear();
144 }
145
146 pub fn remove_flow(&mut self, flow_id: FlowId) {
148 let node_ids: Vec<FlowNodeId> =
150 self.flows.get(&flow_id).map(|flow| flow.get_node_ids().collect()).unwrap_or_default();
151
152 for node_id in node_ids {
154 self.operators.remove(&node_id);
155 }
156
157 for entries in self.sources.values_mut() {
159 entries.retain(|(fid, _)| *fid != flow_id);
160 }
161 self.sources.retain(|_, v| !v.is_empty());
162
163 for entries in self.sinks.values_mut() {
165 entries.retain(|(fid, _)| *fid != flow_id);
166 }
167 self.sinks.retain(|_, v| !v.is_empty());
168
169 self.flows.remove(&flow_id);
171
172 self.analyzer.remove(flow_id);
174 }
175
176 pub fn get_dependency_graph(&self) -> FlowDependencyGraph {
177 self.analyzer.get_dependency_graph().clone()
178 }
179
180 pub fn get_flows_depending_on_table(&self, table_id: TableId) -> Vec<FlowId> {
181 let dependency_graph = self.analyzer.get_dependency_graph();
182 self.analyzer.get_flows_depending_on_table(dependency_graph, table_id)
183 }
184
185 pub fn get_flows_depending_on_view(&self, view_id: ViewId) -> Vec<FlowId> {
186 let dependency_graph = self.analyzer.get_dependency_graph();
187 self.analyzer.get_flows_depending_on_view(dependency_graph, view_id)
188 }
189
190 pub fn get_flow_producing_view(&self, view_id: ViewId) -> Option<FlowId> {
191 let dependency_graph = self.analyzer.get_dependency_graph();
192 self.analyzer.get_flow_producing_view(dependency_graph, view_id)
193 }
194
195 pub fn calculate_execution_levels(&self) -> Vec<Vec<FlowId>> {
196 let dependency_graph = self.analyzer.get_dependency_graph();
197 self.analyzer.calculate_execution_levels(dependency_graph)
198 }
199}