reifydb_sub_flow/
builder.rs1use std::{
5 collections::{BTreeMap, HashMap},
6 path::PathBuf,
7 sync::Arc,
8};
9
10use reifydb_core::interface::catalog::flow::FlowNodeId;
11use reifydb_sdk::connector::{
12 sink::{FFISink, FFISinkMetadata},
13 source::{FFISource, FFISourceMetadata},
14};
15use reifydb_type::{Result, value::Value};
16
17use crate::{connector::ConnectorRegistry, operator::BoxedOperator};
18
19pub type OperatorFactory = Arc<dyn Fn(FlowNodeId, &BTreeMap<String, Value>) -> Result<BoxedOperator> + Send + Sync>;
20
21pub struct FlowConfigurator {
22 operators_dir: Option<PathBuf>,
23 custom_operators: HashMap<String, OperatorFactory>,
24 connector_registry: ConnectorRegistry,
25}
26
27impl Default for FlowConfigurator {
28 fn default() -> Self {
29 Self::new()
30 }
31}
32
33impl FlowConfigurator {
34 pub fn new() -> Self {
36 Self {
37 operators_dir: None,
38 custom_operators: HashMap::new(),
39 connector_registry: ConnectorRegistry::new(),
40 }
41 }
42
43 pub fn operators_dir(mut self, path: PathBuf) -> Self {
45 self.operators_dir = Some(path);
46 self
47 }
48
49 pub fn register_operator(
51 mut self,
52 name: impl Into<String>,
53 factory: impl Fn(FlowNodeId, &BTreeMap<String, Value>) -> Result<BoxedOperator> + Send + Sync + 'static,
54 ) -> Self {
55 self.custom_operators.insert(name.into(), Arc::new(factory));
56 self
57 }
58
59 pub fn register_source<S: FFISource + FFISourceMetadata>(mut self) -> Self {
61 self.connector_registry.register_source::<S>();
62 self
63 }
64
65 pub fn register_sink<S: FFISink + FFISinkMetadata>(mut self) -> Self {
67 self.connector_registry.register_sink::<S>();
68 self
69 }
70
71 pub(crate) fn configure(self) -> FlowConfig {
73 FlowConfig {
74 operators_dir: self.operators_dir,
75 custom_operators: self.custom_operators,
76 connector_registry: self.connector_registry,
77 }
78 }
79}
80
81pub struct FlowConfig {
83 pub operators_dir: Option<PathBuf>,
85 pub custom_operators: HashMap<String, OperatorFactory>,
87 pub connector_registry: ConnectorRegistry,
89}