reifydb_sub_flow/engine/
mod.rs1pub mod eval;
5pub mod process;
6pub mod register;
7
8use std::{
9 collections::{BTreeMap, BTreeSet, HashMap},
10 sync::Arc,
11};
12
13#[cfg(reifydb_target = "native")]
14use postcard::to_stdvec;
15use reifydb_catalog::catalog::Catalog;
16#[cfg(reifydb_target = "native")]
17use reifydb_core::internal;
18use reifydb_core::{
19 common::CommitVersion,
20 event::EventBus,
21 interface::catalog::{
22 flow::{FlowId, FlowNodeId},
23 id::{TableId, ViewId},
24 primitive::PrimitiveId,
25 },
26};
27use reifydb_engine::vm::executor::Executor;
28use reifydb_rql::flow::{
29 analyzer::{FlowDependencyGraph, FlowGraphAnalyzer},
30 flow::FlowDag,
31};
32use reifydb_runtime::clock::Clock;
33#[cfg(reifydb_target = "native")]
34use reifydb_type::{Result, error::Error, value::Value};
35use tracing::instrument;
36
37#[cfg(reifydb_target = "native")]
38use crate::ffi::loader::ffi_operator_loader;
39#[cfg(reifydb_target = "native")]
40use crate::operator::BoxedOperator;
41use crate::{builder::OperatorFactory, operator::Operators};
42
43pub struct FlowEngine {
44 pub(crate) catalog: Catalog,
45 pub(crate) executor: Executor,
46 pub(crate) operators: BTreeMap<FlowNodeId, Arc<Operators>>,
47 pub(crate) flows: BTreeMap<FlowId, FlowDag>,
48 pub(crate) sources: BTreeMap<PrimitiveId, Vec<(FlowId, FlowNodeId)>>,
49 pub(crate) sinks: BTreeMap<PrimitiveId, Vec<(FlowId, FlowNodeId)>>,
50 pub(crate) analyzer: FlowGraphAnalyzer,
51 #[allow(dead_code)]
52 pub(crate) event_bus: EventBus,
53 pub(crate) flow_creation_versions: BTreeMap<FlowId, CommitVersion>,
54 pub(crate) clock: Clock,
55 pub(crate) custom_operators: Arc<HashMap<String, OperatorFactory>>,
56}
57
58impl FlowEngine {
59 #[instrument(
60 name = "flow::engine::new",
61 level = "debug",
62 skip(catalog, executor, event_bus, clock, custom_operators)
63 )]
64 pub fn new(
65 catalog: Catalog,
66 executor: Executor,
67 event_bus: EventBus,
68 clock: Clock,
69 custom_operators: Arc<HashMap<String, OperatorFactory>>,
70 ) -> Self {
71 Self {
72 catalog,
73 executor,
74 operators: BTreeMap::new(),
75 flows: BTreeMap::new(),
76 sources: BTreeMap::new(),
77 sinks: BTreeMap::new(),
78 analyzer: FlowGraphAnalyzer::new(),
79 event_bus,
80 flow_creation_versions: BTreeMap::new(),
81 clock,
82 custom_operators,
83 }
84 }
85
86 #[cfg(reifydb_target = "native")]
88 #[instrument(name = "flow::engine::create_ffi_operator", level = "debug", skip(self, config), fields(operator = %operator, node_id = ?node_id))]
89 pub(crate) fn create_ffi_operator(
90 &self,
91 operator: &str,
92 node_id: FlowNodeId,
93 config: &BTreeMap<String, Value>,
94 ) -> Result<BoxedOperator> {
95 let loader = ffi_operator_loader();
96 let mut loader_write = loader.write().unwrap();
97
98 let config_bytes = to_stdvec(config)
100 .map_err(|e| Error(internal!("Failed to serialize operator config: {:?}", e)))?;
101
102 let operator = loader_write
103 .create_operator_by_name(operator, node_id, &config_bytes, self.executor.clone())
104 .map_err(|e| Error(internal!("Failed to create FFI operator: {:?}", e)))?;
105
106 Ok(Box::new(operator))
107 }
108
109 #[cfg(reifydb_target = "native")]
111 pub(crate) fn is_ffi_operator(&self, operator: &str) -> bool {
112 let loader = ffi_operator_loader();
113 let loader_read = loader.read().unwrap();
114 loader_read.has_operator(operator)
115 }
116
117 #[cfg(not(reifydb_target = "native"))]
119 #[allow(dead_code)]
120 pub(crate) fn is_ffi_operator(&self, _operator: &str) -> bool {
121 false
122 }
123
124 pub fn flow_ids(&self) -> BTreeSet<FlowId> {
126 self.flows.keys().copied().collect()
127 }
128
129 pub fn clear(&mut self) {
131 self.operators.clear();
132 self.flows.clear();
133 self.sources.clear();
134 self.sinks.clear();
135 self.analyzer.clear();
136 self.flow_creation_versions.clear();
137 }
138
139 pub fn get_dependency_graph(&self) -> FlowDependencyGraph {
140 self.analyzer.get_dependency_graph().clone()
141 }
142
143 pub fn get_flows_depending_on_table(&self, table_id: TableId) -> Vec<FlowId> {
144 let dependency_graph = self.analyzer.get_dependency_graph();
145 self.analyzer.get_flows_depending_on_table(dependency_graph, table_id)
146 }
147
148 pub fn get_flows_depending_on_view(&self, view_id: ViewId) -> Vec<FlowId> {
149 let dependency_graph = self.analyzer.get_dependency_graph();
150 self.analyzer.get_flows_depending_on_view(dependency_graph, view_id)
151 }
152
153 pub fn get_flow_producing_view(&self, view_id: ViewId) -> Option<FlowId> {
154 let dependency_graph = self.analyzer.get_dependency_graph();
155 self.analyzer.get_flow_producing_view(dependency_graph, view_id)
156 }
157
158 pub fn calculate_execution_order(&self) -> Vec<FlowId> {
159 let dependency_graph = self.analyzer.get_dependency_graph();
160 self.analyzer.calculate_execution_order(dependency_graph)
161 }
162}