reifydb_sub_flow/
builder.rs1use std::{collections::HashMap, path::PathBuf, sync::Arc};
7
8use reifydb_core::interface::catalog::flow::FlowNodeId;
9use reifydb_type::{Result, value::Value};
10
11use crate::operator::BoxedOperator;
12
13pub type OperatorFactory = Arc<dyn Fn(FlowNodeId, &HashMap<String, Value>) -> Result<BoxedOperator> + Send + Sync>;
14
15pub struct FlowBuilder {
16 operators_dir: Option<PathBuf>,
17 num_workers: Option<usize>,
18 custom_operators: HashMap<String, OperatorFactory>,
19}
20
21impl Default for FlowBuilder {
22 fn default() -> Self {
23 Self::new()
24 }
25}
26
27impl FlowBuilder {
28 pub fn new() -> Self {
30 Self {
31 operators_dir: None,
32 num_workers: None,
33 custom_operators: HashMap::new(),
34 }
35 }
36
37 pub fn operators_dir(mut self, path: PathBuf) -> Self {
39 self.operators_dir = Some(path);
40 self
41 }
42
43 pub fn num_workers(mut self, count: usize) -> Self {
46 self.num_workers = Some(count);
47 self
48 }
49
50 pub fn register_operator(
52 mut self,
53 name: impl Into<String>,
54 factory: impl Fn(FlowNodeId, &HashMap<String, Value>) -> Result<BoxedOperator> + Send + Sync + 'static,
55 ) -> Self {
56 self.custom_operators.insert(name.into(), Arc::new(factory));
57 self
58 }
59
60 pub(crate) fn build_config(self) -> FlowBuilderConfig {
62 FlowBuilderConfig {
63 operators_dir: self.operators_dir,
64 num_workers: self.num_workers.unwrap_or(1),
65 custom_operators: self.custom_operators,
66 }
67 }
68}
69
70pub struct FlowBuilderConfig {
72 pub operators_dir: Option<PathBuf>,
74 pub num_workers: usize,
76 pub custom_operators: HashMap<String, OperatorFactory>,
78}