reifydb_sub_flow/
builder.rs1use std::{path::PathBuf, sync::Arc, time::Duration};
7
8use reifydb_core::interface::{CdcConsumerId, FlowNodeId};
9use reifydb_rql::expression::Expression;
10use reifydb_sub_api::Priority;
11
12use crate::{operator::BoxedOperator, subsystem::FlowSubsystemConfig};
13
14pub type OperatorFactory =
16 Arc<dyn Fn(FlowNodeId, &[Expression<'static>]) -> crate::Result<BoxedOperator> + Send + Sync>;
17
18pub struct FlowBuilder {
19 consumer_id: CdcConsumerId,
20 poll_interval: Duration,
21 priority: Priority,
22 operators: Vec<(String, OperatorFactory)>,
23 max_batch_size: Option<u64>,
24 operators_dir: Option<PathBuf>,
25}
26
27impl Default for FlowBuilder {
28 fn default() -> Self {
29 Self::new()
30 }
31}
32
33impl FlowBuilder {
34 pub fn new() -> Self {
36 Self {
37 consumer_id: CdcConsumerId::flow_consumer(),
38 poll_interval: Duration::from_millis(1),
39 priority: Priority::Normal,
40 operators: Vec::new(),
41 max_batch_size: Some(10),
42 operators_dir: None,
43 }
44 }
45
46 pub fn consumer_id(mut self, id: CdcConsumerId) -> Self {
48 self.consumer_id = id;
49 self
50 }
51
52 pub fn poll_interval(mut self, interval: Duration) -> Self {
54 self.poll_interval = interval;
55 self
56 }
57
58 pub fn priority(mut self, priority: Priority) -> Self {
60 self.priority = priority;
61 self
62 }
63
64 pub fn max_batch_size(mut self, size: u64) -> Self {
66 self.max_batch_size = Some(size);
67 self
68 }
69
70 pub fn operators_dir(mut self, path: PathBuf) -> Self {
72 self.operators_dir = Some(path);
73 self
74 }
75
76 pub fn register_operator<F>(mut self, name: impl Into<String>, factory: F) -> Self
78 where
79 F: Fn(FlowNodeId, &[Expression<'static>]) -> crate::Result<BoxedOperator> + Send + Sync + 'static,
80 {
81 self.operators.push((name.into(), Arc::new(factory)));
82 self
83 }
84
85 pub(crate) fn build_config(self) -> FlowSubsystemConfig {
87 FlowSubsystemConfig {
88 consumer_id: self.consumer_id,
89 poll_interval: self.poll_interval,
90 priority: self.priority,
91 operators: self.operators,
92 max_batch_size: self.max_batch_size,
93 operators_dir: self.operators_dir,
94 }
95 }
96}