reifydb_sub_flow/
builder.rs1use std::{path::PathBuf, sync::Arc, time::Duration};
7
8use reifydb_core::interface::{CdcConsumerId, FlowNodeId};
9use reifydb_rql::expression::Expression;
10
11use crate::{operator::BoxedOperator, subsystem::FlowSubsystemConfig};
12
13pub type OperatorFactory = Arc<dyn Fn(FlowNodeId, &[Expression]) -> crate::Result<BoxedOperator> + Send + Sync>;
15
16pub struct FlowBuilder {
17 consumer_id: CdcConsumerId,
18 poll_interval: Duration,
19 operators: Vec<(String, OperatorFactory)>,
20 max_batch_size: Option<u64>,
21 operators_dir: Option<PathBuf>,
22}
23
24impl Default for FlowBuilder {
25 fn default() -> Self {
26 Self::new()
27 }
28}
29
30impl FlowBuilder {
31 pub fn new() -> Self {
33 Self {
34 consumer_id: CdcConsumerId::flow_consumer(),
35 poll_interval: Duration::from_millis(1),
36 operators: Vec::new(),
37 max_batch_size: Some(10),
38 operators_dir: None,
39 }
40 }
41
42 pub fn consumer_id(mut self, id: CdcConsumerId) -> Self {
44 self.consumer_id = id;
45 self
46 }
47
48 pub fn poll_interval(mut self, interval: Duration) -> Self {
50 self.poll_interval = interval;
51 self
52 }
53
54 pub fn max_batch_size(mut self, size: u64) -> Self {
56 self.max_batch_size = Some(size);
57 self
58 }
59
60 pub fn operators_dir(mut self, path: PathBuf) -> Self {
62 self.operators_dir = Some(path);
63 self
64 }
65
66 pub fn register_operator<F>(mut self, name: impl Into<String>, factory: F) -> Self
68 where
69 F: Fn(FlowNodeId, &[Expression]) -> crate::Result<BoxedOperator> + Send + Sync + 'static,
70 {
71 self.operators.push((name.into(), Arc::new(factory)));
72 self
73 }
74
75 pub(crate) fn build_config(self) -> FlowSubsystemConfig {
77 FlowSubsystemConfig {
78 consumer_id: self.consumer_id,
79 poll_interval: self.poll_interval,
80 operators: self.operators,
81 max_batch_size: self.max_batch_size,
82 operators_dir: self.operators_dir,
83 }
84 }
85}