1pub mod eval;
5pub mod process;
6pub mod register;
7
8use std::{
9 collections::{BTreeMap, BTreeSet, HashMap},
10 sync::Arc,
11};
12
13#[cfg(reifydb_target = "native")]
14use postcard::to_stdvec;
15use reifydb_catalog::catalog::Catalog;
16#[cfg(reifydb_target = "native")]
17use reifydb_core::internal;
18use reifydb_core::{
19 common::CommitVersion,
20 event::EventBus,
21 interface::catalog::{
22 flow::{FlowId, FlowNodeId},
23 id::{TableId, ViewId},
24 schema::SchemaId,
25 },
26};
27use reifydb_engine::vm::executor::Executor;
28#[cfg(reifydb_target = "native")]
29use reifydb_extension::operator::ffi_loader::ffi_operator_loader;
30use reifydb_rql::flow::{
31 analyzer::{FlowDependencyGraph, FlowGraphAnalyzer},
32 flow::FlowDag,
33};
34use reifydb_runtime::context::RuntimeContext;
35#[cfg(reifydb_target = "native")]
36use reifydb_type::{Result, error::Error, value::Value};
37use tracing::instrument;
38
39#[cfg(reifydb_target = "native")]
40use crate::operator::BoxedOperator;
41#[cfg(reifydb_target = "native")]
42use crate::operator::ffi::FFIOperator;
43use crate::{builder::OperatorFactory, operator::Operators};
44
45pub struct FlowEngine {
46 pub(crate) catalog: Catalog,
47 pub(crate) executor: Executor,
48 pub(crate) operators: BTreeMap<FlowNodeId, Arc<Operators>>,
49 pub(crate) flows: BTreeMap<FlowId, FlowDag>,
50 pub(crate) sources: BTreeMap<SchemaId, Vec<(FlowId, FlowNodeId)>>,
51 pub(crate) sinks: BTreeMap<SchemaId, Vec<(FlowId, FlowNodeId)>>,
52 pub(crate) analyzer: FlowGraphAnalyzer,
53 #[allow(dead_code)]
54 pub(crate) event_bus: EventBus,
55 pub(crate) flow_creation_versions: BTreeMap<FlowId, CommitVersion>,
56 pub(crate) runtime_context: RuntimeContext,
57 pub(crate) custom_operators: Arc<HashMap<String, OperatorFactory>>,
58}
59
60impl FlowEngine {
61 #[instrument(
62 name = "flow::engine::new",
63 level = "debug",
64 skip(catalog, executor, event_bus, runtime_context, custom_operators)
65 )]
66 pub fn new(
67 catalog: Catalog,
68 executor: Executor,
69 event_bus: EventBus,
70 runtime_context: RuntimeContext,
71 custom_operators: Arc<HashMap<String, OperatorFactory>>,
72 ) -> Self {
73 Self {
74 catalog,
75 executor,
76 operators: BTreeMap::new(),
77 flows: BTreeMap::new(),
78 sources: BTreeMap::new(),
79 sinks: BTreeMap::new(),
80 analyzer: FlowGraphAnalyzer::new(),
81 event_bus,
82 flow_creation_versions: BTreeMap::new(),
83 runtime_context,
84 custom_operators,
85 }
86 }
87
88 #[cfg(reifydb_target = "native")]
90 #[instrument(name = "flow::engine::create_ffi_operator", level = "debug", skip(self, config), fields(operator = %operator, node_id = ?node_id))]
91 pub(crate) fn create_ffi_operator(
92 &self,
93 operator: &str,
94 node_id: FlowNodeId,
95 config: &BTreeMap<String, Value>,
96 ) -> Result<BoxedOperator> {
97 let loader = ffi_operator_loader();
98 let mut loader_write = loader.write().unwrap();
99
100 let config_bytes = to_stdvec(config)
102 .map_err(|e| Error(internal!("Failed to serialize operator config: {:?}", e)))?;
103
104 let (descriptor, instance) = loader_write
105 .create_operator_by_name(operator, node_id, &config_bytes)
106 .map_err(|e| Error(internal!("Failed to create FFI operator: {:?}", e)))?;
107
108 Ok(Box::new(FFIOperator::new(descriptor, instance, node_id, self.executor.clone())))
109 }
110
111 #[cfg(reifydb_target = "native")]
113 pub(crate) fn is_ffi_operator(&self, operator: &str) -> bool {
114 let loader = ffi_operator_loader();
115 let loader_read = loader.read().unwrap();
116 loader_read.has_operator(operator)
117 }
118
119 #[cfg(not(reifydb_target = "native"))]
121 #[allow(dead_code)]
122 pub(crate) fn is_ffi_operator(&self, _operator: &str) -> bool {
123 false
124 }
125
126 pub fn flow_ids(&self) -> BTreeSet<FlowId> {
128 self.flows.keys().copied().collect()
129 }
130
131 pub fn clear(&mut self) {
133 self.operators.clear();
134 self.flows.clear();
135 self.sources.clear();
136 self.sinks.clear();
137 self.analyzer.clear();
138 self.flow_creation_versions.clear();
139 }
140
141 pub fn get_dependency_graph(&self) -> FlowDependencyGraph {
142 self.analyzer.get_dependency_graph().clone()
143 }
144
145 pub fn get_flows_depending_on_table(&self, table_id: TableId) -> Vec<FlowId> {
146 let dependency_graph = self.analyzer.get_dependency_graph();
147 self.analyzer.get_flows_depending_on_table(dependency_graph, table_id)
148 }
149
150 pub fn get_flows_depending_on_view(&self, view_id: ViewId) -> Vec<FlowId> {
151 let dependency_graph = self.analyzer.get_dependency_graph();
152 self.analyzer.get_flows_depending_on_view(dependency_graph, view_id)
153 }
154
155 pub fn get_flow_producing_view(&self, view_id: ViewId) -> Option<FlowId> {
156 let dependency_graph = self.analyzer.get_dependency_graph();
157 self.analyzer.get_flow_producing_view(dependency_graph, view_id)
158 }
159
160 pub fn calculate_execution_order(&self) -> Vec<FlowId> {
161 let dependency_graph = self.analyzer.get_dependency_graph();
162 self.analyzer.calculate_execution_order(dependency_graph)
163 }
164}