reifydb_sub_flow/
builder.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4//! Builder pattern for configuring the flow subsystem
5
6use std::{path::PathBuf, sync::Arc, time::Duration};
7
8use reifydb_core::interface::{CdcConsumerId, FlowNodeId};
9use reifydb_rql::expression::Expression;
10
11use crate::{operator::BoxedOperator, subsystem::FlowSubsystemConfig};
12
13/// Type alias for operator factory functions
14pub type OperatorFactory = Arc<dyn Fn(FlowNodeId, &[Expression]) -> crate::Result<BoxedOperator> + Send + Sync>;
15
16pub struct FlowBuilder {
17	consumer_id: CdcConsumerId,
18	poll_interval: Duration,
19	operators: Vec<(String, OperatorFactory)>,
20	max_batch_size: Option<u64>,
21	operators_dir: Option<PathBuf>,
22}
23
24impl Default for FlowBuilder {
25	fn default() -> Self {
26		Self::new()
27	}
28}
29
30impl FlowBuilder {
31	/// Create a new FlowBuilder with default settings
32	pub fn new() -> Self {
33		Self {
34			consumer_id: CdcConsumerId::flow_consumer(),
35			poll_interval: Duration::from_millis(1),
36			operators: Vec::new(),
37			max_batch_size: Some(10),
38			operators_dir: None,
39		}
40	}
41
42	/// Set the consumer ID for the flow subsystem
43	pub fn consumer_id(mut self, id: CdcConsumerId) -> Self {
44		self.consumer_id = id;
45		self
46	}
47
48	/// Set the poll interval for checking new CDC events
49	pub fn poll_interval(mut self, interval: Duration) -> Self {
50		self.poll_interval = interval;
51		self
52	}
53
54	/// Set the maximum batch size for CDC polling
55	pub fn max_batch_size(mut self, size: u64) -> Self {
56		self.max_batch_size = Some(size);
57		self
58	}
59
60	/// Set the directory to scan for FFI operator shared libraries
61	pub fn operators_dir(mut self, path: PathBuf) -> Self {
62		self.operators_dir = Some(path);
63		self
64	}
65
66	/// Register a custom operator factory
67	pub fn register_operator<F>(mut self, name: impl Into<String>, factory: F) -> Self
68	where
69		F: Fn(FlowNodeId, &[Expression]) -> crate::Result<BoxedOperator> + Send + Sync + 'static,
70	{
71		self.operators.push((name.into(), Arc::new(factory)));
72		self
73	}
74
75	/// Build the configuration
76	pub(crate) fn build_config(self) -> FlowSubsystemConfig {
77		FlowSubsystemConfig {
78			consumer_id: self.consumer_id,
79			poll_interval: self.poll_interval,
80			operators: self.operators,
81			max_batch_size: self.max_batch_size,
82			operators_dir: self.operators_dir,
83		}
84	}
85}