Skip to main content

reifydb_sub_flow/
builder.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4//! Builder pattern for configuring the flow subsystem
5
6use std::{collections::HashMap, path::PathBuf, sync::Arc};
7
8use reifydb_core::interface::catalog::flow::FlowNodeId;
9use reifydb_type::{Result, value::Value};
10
11use crate::operator::BoxedOperator;
12
13pub type OperatorFactory = Arc<dyn Fn(FlowNodeId, &HashMap<String, Value>) -> Result<BoxedOperator> + Send + Sync>;
14
15pub struct FlowBuilder {
16	operators_dir: Option<PathBuf>,
17	num_workers: Option<usize>,
18	custom_operators: HashMap<String, OperatorFactory>,
19}
20
21impl Default for FlowBuilder {
22	fn default() -> Self {
23		Self::new()
24	}
25}
26
27impl FlowBuilder {
28	/// Create a new FlowBuilder with default settings
29	pub fn new() -> Self {
30		Self {
31			operators_dir: None,
32			num_workers: None,
33			custom_operators: HashMap::new(),
34		}
35	}
36
37	/// Set the directory to scan for FFI operator shared libraries
38	pub fn operators_dir(mut self, path: PathBuf) -> Self {
39		self.operators_dir = Some(path);
40		self
41	}
42
43	/// Set the number of worker threads for flow processing.
44	/// Defaults to 1 if not set.
45	pub fn num_workers(mut self, count: usize) -> Self {
46		self.num_workers = Some(count);
47		self
48	}
49
50	/// Register a native Rust operator factory by name.
51	pub fn register_operator(
52		mut self,
53		name: impl Into<String>,
54		factory: impl Fn(FlowNodeId, &HashMap<String, Value>) -> Result<BoxedOperator> + Send + Sync + 'static,
55	) -> Self {
56		self.custom_operators.insert(name.into(), Arc::new(factory));
57		self
58	}
59
60	/// Build the configuration (internal use only)
61	pub(crate) fn build_config(self) -> FlowBuilderConfig {
62		FlowBuilderConfig {
63			operators_dir: self.operators_dir,
64			num_workers: self.num_workers.unwrap_or(1),
65			custom_operators: self.custom_operators,
66		}
67	}
68}
69
70/// Configuration for FlowSubsystem
71pub struct FlowBuilderConfig {
72	/// Directory containing FFI operator shared libraries (native only)
73	pub operators_dir: Option<PathBuf>,
74	/// Number of worker threads for flow processing
75	pub num_workers: usize,
76	/// Native Rust operator factories registered via FlowBuilder::register_operator
77	pub custom_operators: HashMap<String, OperatorFactory>,
78}