Skip to main content

reifydb_sub_flow/
builder.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Builder pattern for configuring the flow subsystem
5
6use std::{
7	collections::{BTreeMap, HashMap},
8	path::PathBuf,
9	sync::Arc,
10};
11
12use reifydb_core::interface::catalog::flow::FlowNodeId;
13use reifydb_sdk::connector::{
14	sink::{FFISink, FFISinkMetadata},
15	source::{FFISource, FFISourceMetadata},
16};
17use reifydb_type::{Result, value::Value};
18
19use crate::{connector::ConnectorRegistry, operator::BoxedOperator};
20
21pub type OperatorFactory = Arc<dyn Fn(FlowNodeId, &BTreeMap<String, Value>) -> Result<BoxedOperator> + Send + Sync>;
22
23pub struct FlowBuilder {
24	operators_dir: Option<PathBuf>,
25	num_workers: Option<usize>,
26	custom_operators: HashMap<String, OperatorFactory>,
27	connector_registry: ConnectorRegistry,
28}
29
30impl Default for FlowBuilder {
31	fn default() -> Self {
32		Self::new()
33	}
34}
35
36impl FlowBuilder {
37	/// Create a new FlowBuilder with default settings
38	pub fn new() -> Self {
39		Self {
40			operators_dir: None,
41			num_workers: None,
42			custom_operators: HashMap::new(),
43			connector_registry: ConnectorRegistry::new(),
44		}
45	}
46
47	/// Set the directory to scan for FFI operator shared libraries
48	pub fn operators_dir(mut self, path: PathBuf) -> Self {
49		self.operators_dir = Some(path);
50		self
51	}
52
53	/// Set the number of worker threads for flow processing.
54	/// Defaults to 1 if not set.
55	pub fn num_workers(mut self, count: usize) -> Self {
56		self.num_workers = Some(count);
57		self
58	}
59
60	/// Register a native Rust operator factory by name.
61	pub fn register_operator(
62		mut self,
63		name: impl Into<String>,
64		factory: impl Fn(FlowNodeId, &BTreeMap<String, Value>) -> Result<BoxedOperator> + Send + Sync + 'static,
65	) -> Self {
66		self.custom_operators.insert(name.into(), Arc::new(factory));
67		self
68	}
69
70	/// Register a native Rust source connector.
71	pub fn register_source<S: FFISource + FFISourceMetadata>(mut self) -> Self {
72		self.connector_registry.register_source::<S>();
73		self
74	}
75
76	/// Register a native Rust sink connector.
77	pub fn register_sink<S: FFISink + FFISinkMetadata>(mut self) -> Self {
78		self.connector_registry.register_sink::<S>();
79		self
80	}
81
82	/// Build the configuration (internal use only)
83	pub(crate) fn build_config(self) -> FlowBuilderConfig {
84		FlowBuilderConfig {
85			operators_dir: self.operators_dir,
86			num_workers: self.num_workers.unwrap_or(1),
87			custom_operators: self.custom_operators,
88			connector_registry: self.connector_registry,
89		}
90	}
91}
92
93/// Configuration for FlowSubsystem
94pub struct FlowBuilderConfig {
95	/// Directory containing FFI operator shared libraries (native only)
96	pub operators_dir: Option<PathBuf>,
97	/// Number of worker threads for flow processing
98	pub num_workers: usize,
99	/// Native Rust operator factories registered via FlowBuilder::register_operator
100	pub custom_operators: HashMap<String, OperatorFactory>,
101	/// Registry of source and sink connectors
102	pub connector_registry: ConnectorRegistry,
103}