reifydb_sub_flow/
builder.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4//! Builder pattern for configuring the flow subsystem
5
6use std::{path::PathBuf, sync::Arc, time::Duration};
7
8use reifydb_core::interface::{CdcConsumerId, FlowNodeId};
9use reifydb_rql::expression::Expression;
10use reifydb_sub_api::Priority;
11
12use crate::{operator::BoxedOperator, subsystem::FlowSubsystemConfig};
13
14/// Type alias for operator factory functions
15pub type OperatorFactory =
16	Arc<dyn Fn(FlowNodeId, &[Expression<'static>]) -> crate::Result<BoxedOperator> + Send + Sync>;
17
18pub struct FlowBuilder {
19	consumer_id: CdcConsumerId,
20	poll_interval: Duration,
21	priority: Priority,
22	operators: Vec<(String, OperatorFactory)>,
23	max_batch_size: Option<u64>,
24	operators_dir: Option<PathBuf>,
25}
26
27impl Default for FlowBuilder {
28	fn default() -> Self {
29		Self::new()
30	}
31}
32
33impl FlowBuilder {
34	/// Create a new FlowBuilder with default settings
35	pub fn new() -> Self {
36		Self {
37			consumer_id: CdcConsumerId::flow_consumer(),
38			poll_interval: Duration::from_millis(1),
39			priority: Priority::Normal,
40			operators: Vec::new(),
41			max_batch_size: Some(10),
42			operators_dir: None,
43		}
44	}
45
46	/// Set the consumer ID for the flow subsystem
47	pub fn consumer_id(mut self, id: CdcConsumerId) -> Self {
48		self.consumer_id = id;
49		self
50	}
51
52	/// Set the poll interval for checking new CDC events
53	pub fn poll_interval(mut self, interval: Duration) -> Self {
54		self.poll_interval = interval;
55		self
56	}
57
58	/// Set the priority for the polling task in the worker pool
59	pub fn priority(mut self, priority: Priority) -> Self {
60		self.priority = priority;
61		self
62	}
63
64	/// Set the maximum batch size for CDC polling
65	pub fn max_batch_size(mut self, size: u64) -> Self {
66		self.max_batch_size = Some(size);
67		self
68	}
69
70	/// Set the directory to scan for FFI operator shared libraries
71	pub fn operators_dir(mut self, path: PathBuf) -> Self {
72		self.operators_dir = Some(path);
73		self
74	}
75
76	/// Register a custom operator factory
77	pub fn register_operator<F>(mut self, name: impl Into<String>, factory: F) -> Self
78	where
79		F: Fn(FlowNodeId, &[Expression<'static>]) -> crate::Result<BoxedOperator> + Send + Sync + 'static,
80	{
81		self.operators.push((name.into(), Arc::new(factory)));
82		self
83	}
84
85	/// Build the configuration
86	pub(crate) fn build_config(self) -> FlowSubsystemConfig {
87		FlowSubsystemConfig {
88			consumer_id: self.consumer_id,
89			poll_interval: self.poll_interval,
90			priority: self.priority,
91			operators: self.operators,
92			max_batch_size: self.max_batch_size,
93			operators_dir: self.operators_dir,
94		}
95	}
96}