reifydb_sub_flow/
builder.rs1use std::{
7 collections::{BTreeMap, HashMap},
8 path::PathBuf,
9 sync::Arc,
10};
11
12use reifydb_core::interface::catalog::flow::FlowNodeId;
13use reifydb_type::{Result, value::Value};
14
15use crate::operator::BoxedOperator;
16
17pub type OperatorFactory = Arc<dyn Fn(FlowNodeId, &BTreeMap<String, Value>) -> Result<BoxedOperator> + Send + Sync>;
18
19pub struct FlowBuilder {
20 operators_dir: Option<PathBuf>,
21 num_workers: Option<usize>,
22 custom_operators: HashMap<String, OperatorFactory>,
23}
24
25impl Default for FlowBuilder {
26 fn default() -> Self {
27 Self::new()
28 }
29}
30
31impl FlowBuilder {
32 pub fn new() -> Self {
34 Self {
35 operators_dir: None,
36 num_workers: None,
37 custom_operators: HashMap::new(),
38 }
39 }
40
41 pub fn operators_dir(mut self, path: PathBuf) -> Self {
43 self.operators_dir = Some(path);
44 self
45 }
46
47 pub fn num_workers(mut self, count: usize) -> Self {
50 self.num_workers = Some(count);
51 self
52 }
53
54 pub fn register_operator(
56 mut self,
57 name: impl Into<String>,
58 factory: impl Fn(FlowNodeId, &BTreeMap<String, Value>) -> Result<BoxedOperator> + Send + Sync + 'static,
59 ) -> Self {
60 self.custom_operators.insert(name.into(), Arc::new(factory));
61 self
62 }
63
64 pub(crate) fn build_config(self) -> FlowBuilderConfig {
66 FlowBuilderConfig {
67 operators_dir: self.operators_dir,
68 num_workers: self.num_workers.unwrap_or(1),
69 custom_operators: self.custom_operators,
70 }
71 }
72}
73
74pub struct FlowBuilderConfig {
76 pub operators_dir: Option<PathBuf>,
78 pub num_workers: usize,
80 pub custom_operators: HashMap<String, OperatorFactory>,
82}