Skip to main content

reifydb_sub_flow/
builder.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Builder pattern for configuring the flow subsystem
5
6use std::{
7	collections::{BTreeMap, HashMap},
8	path::PathBuf,
9	sync::Arc,
10};
11
12use reifydb_core::interface::catalog::flow::FlowNodeId;
13use reifydb_type::{Result, value::Value};
14
15use crate::operator::BoxedOperator;
16
17pub type OperatorFactory = Arc<dyn Fn(FlowNodeId, &BTreeMap<String, Value>) -> Result<BoxedOperator> + Send + Sync>;
18
19pub struct FlowBuilder {
20	operators_dir: Option<PathBuf>,
21	num_workers: Option<usize>,
22	custom_operators: HashMap<String, OperatorFactory>,
23}
24
25impl Default for FlowBuilder {
26	fn default() -> Self {
27		Self::new()
28	}
29}
30
31impl FlowBuilder {
32	/// Create a new FlowBuilder with default settings
33	pub fn new() -> Self {
34		Self {
35			operators_dir: None,
36			num_workers: None,
37			custom_operators: HashMap::new(),
38		}
39	}
40
41	/// Set the directory to scan for FFI operator shared libraries
42	pub fn operators_dir(mut self, path: PathBuf) -> Self {
43		self.operators_dir = Some(path);
44		self
45	}
46
47	/// Set the number of worker threads for flow processing.
48	/// Defaults to 1 if not set.
49	pub fn num_workers(mut self, count: usize) -> Self {
50		self.num_workers = Some(count);
51		self
52	}
53
54	/// Register a native Rust operator factory by name.
55	pub fn register_operator(
56		mut self,
57		name: impl Into<String>,
58		factory: impl Fn(FlowNodeId, &BTreeMap<String, Value>) -> Result<BoxedOperator> + Send + Sync + 'static,
59	) -> Self {
60		self.custom_operators.insert(name.into(), Arc::new(factory));
61		self
62	}
63
64	/// Build the configuration (internal use only)
65	pub(crate) fn build_config(self) -> FlowBuilderConfig {
66		FlowBuilderConfig {
67			operators_dir: self.operators_dir,
68			num_workers: self.num_workers.unwrap_or(1),
69			custom_operators: self.custom_operators,
70		}
71	}
72}
73
74/// Configuration for FlowSubsystem
75pub struct FlowBuilderConfig {
76	/// Directory containing FFI operator shared libraries (native only)
77	pub operators_dir: Option<PathBuf>,
78	/// Number of worker threads for flow processing
79	pub num_workers: usize,
80	/// Native Rust operator factories registered via FlowBuilder::register_operator
81	pub custom_operators: HashMap<String, OperatorFactory>,
82}