Skip to main content

reifydb_sub_flow/
builder.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	collections::{BTreeMap, HashMap},
6	path::PathBuf,
7	sync::Arc,
8};
9
10use reifydb_core::interface::catalog::flow::FlowNodeId;
11use reifydb_sdk::connector::{
12	sink::{FFISink, FFISinkMetadata},
13	source::{FFISource, FFISourceMetadata},
14};
15use reifydb_type::{Result, value::Value};
16
17use crate::{connector::ConnectorRegistry, operator::BoxedOperator};
18
19pub type OperatorFactory = Arc<dyn Fn(FlowNodeId, &BTreeMap<String, Value>) -> Result<BoxedOperator> + Send + Sync>;
20
21pub struct FlowConfigurator {
22	operators_dir: Option<PathBuf>,
23	custom_operators: HashMap<String, OperatorFactory>,
24	connector_registry: ConnectorRegistry,
25}
26
27impl Default for FlowConfigurator {
28	fn default() -> Self {
29		Self::new()
30	}
31}
32
33impl FlowConfigurator {
34	/// Create a new FlowConfigurator with default settings
35	pub fn new() -> Self {
36		Self {
37			operators_dir: None,
38			custom_operators: HashMap::new(),
39			connector_registry: ConnectorRegistry::new(),
40		}
41	}
42
43	/// Set the directory to scan for FFI operator shared libraries
44	pub fn operators_dir(mut self, path: PathBuf) -> Self {
45		self.operators_dir = Some(path);
46		self
47	}
48
49	/// Register a native Rust operator factory by name.
50	pub fn register_operator(
51		mut self,
52		name: impl Into<String>,
53		factory: impl Fn(FlowNodeId, &BTreeMap<String, Value>) -> Result<BoxedOperator> + Send + Sync + 'static,
54	) -> Self {
55		self.custom_operators.insert(name.into(), Arc::new(factory));
56		self
57	}
58
59	/// Register a native Rust source connector.
60	pub fn register_source<S: FFISource + FFISourceMetadata>(mut self) -> Self {
61		self.connector_registry.register_source::<S>();
62		self
63	}
64
65	/// Register a native Rust sink connector.
66	pub fn register_sink<S: FFISink + FFISinkMetadata>(mut self) -> Self {
67		self.connector_registry.register_sink::<S>();
68		self
69	}
70
71	/// Build the configuration (internal use only)
72	pub(crate) fn configure(self) -> FlowConfig {
73		FlowConfig {
74			operators_dir: self.operators_dir,
75			custom_operators: self.custom_operators,
76			connector_registry: self.connector_registry,
77		}
78	}
79}
80
81/// Configuration for FlowSubsystem
82pub struct FlowConfig {
83	/// Directory containing FFI operator shared libraries (native only)
84	pub operators_dir: Option<PathBuf>,
85	/// Native Rust operator factories registered via FlowConfigurator::register_operator
86	pub custom_operators: HashMap<String, OperatorFactory>,
87	/// Registry of source and sink connectors
88	pub connector_registry: ConnectorRegistry,
89}