reifydb_sub_flow/
builder.rs1use std::{
7 collections::{BTreeMap, HashMap},
8 path::PathBuf,
9 sync::Arc,
10};
11
12use reifydb_core::interface::catalog::flow::FlowNodeId;
13use reifydb_sdk::connector::{
14 sink::{FFISink, FFISinkMetadata},
15 source::{FFISource, FFISourceMetadata},
16};
17use reifydb_type::{Result, value::Value};
18
19use crate::{connector::ConnectorRegistry, operator::BoxedOperator};
20
21pub type OperatorFactory = Arc<dyn Fn(FlowNodeId, &BTreeMap<String, Value>) -> Result<BoxedOperator> + Send + Sync>;
22
23pub struct FlowBuilder {
24 operators_dir: Option<PathBuf>,
25 num_workers: Option<usize>,
26 custom_operators: HashMap<String, OperatorFactory>,
27 connector_registry: ConnectorRegistry,
28}
29
30impl Default for FlowBuilder {
31 fn default() -> Self {
32 Self::new()
33 }
34}
35
36impl FlowBuilder {
37 pub fn new() -> Self {
39 Self {
40 operators_dir: None,
41 num_workers: None,
42 custom_operators: HashMap::new(),
43 connector_registry: ConnectorRegistry::new(),
44 }
45 }
46
47 pub fn operators_dir(mut self, path: PathBuf) -> Self {
49 self.operators_dir = Some(path);
50 self
51 }
52
53 pub fn num_workers(mut self, count: usize) -> Self {
56 self.num_workers = Some(count);
57 self
58 }
59
60 pub fn register_operator(
62 mut self,
63 name: impl Into<String>,
64 factory: impl Fn(FlowNodeId, &BTreeMap<String, Value>) -> Result<BoxedOperator> + Send + Sync + 'static,
65 ) -> Self {
66 self.custom_operators.insert(name.into(), Arc::new(factory));
67 self
68 }
69
70 pub fn register_source<S: FFISource + FFISourceMetadata>(mut self) -> Self {
72 self.connector_registry.register_source::<S>();
73 self
74 }
75
76 pub fn register_sink<S: FFISink + FFISinkMetadata>(mut self) -> Self {
78 self.connector_registry.register_sink::<S>();
79 self
80 }
81
82 pub(crate) fn build_config(self) -> FlowBuilderConfig {
84 FlowBuilderConfig {
85 operators_dir: self.operators_dir,
86 num_workers: self.num_workers.unwrap_or(1),
87 custom_operators: self.custom_operators,
88 connector_registry: self.connector_registry,
89 }
90 }
91}
92
93pub struct FlowBuilderConfig {
95 pub operators_dir: Option<PathBuf>,
97 pub num_workers: usize,
99 pub custom_operators: HashMap<String, OperatorFactory>,
101 pub connector_registry: ConnectorRegistry,
103}