Skip to main content

varpulis_runtime/engine/
builder.rs

1//! Builder pattern for ergonomic `Engine` construction.
2//!
3//! # Examples
4//!
5//! ```rust,no_run
6//! use varpulis_runtime::Engine;
7//! use tokio::sync::mpsc;
8//!
9//! let (tx, _rx) = mpsc::channel(100);
10//! let engine = Engine::builder()
11//!     .output(tx)
12//!     .build();
13//! ```
14
15use std::sync::Arc;
16
17use tokio::sync::mpsc;
18
19use super::{Engine, OutputChannel};
20use crate::connector;
21use crate::dead_letter::DlqConfig;
22use crate::event::{Event, SharedEvent};
23use crate::metrics::Metrics;
24use crate::udf::UdfRegistry;
25
26/// Builder for constructing an [`Engine`] with a fluent API.
27///
28/// Use [`Engine::builder()`] to create an instance. Without calling
29/// [`output()`](Self::output) or [`shared_output()`](Self::shared_output),
30/// the engine runs in benchmark mode (no output channel, no cloning overhead).
31///
32/// # Examples
33///
34/// ```rust,no_run
35/// use varpulis_runtime::{Engine, Metrics};
36/// use tokio::sync::mpsc;
37///
38/// // Simple construction with output channel
39/// let (tx, _rx) = mpsc::channel(100);
40/// let mut engine = Engine::builder()
41///     .output(tx)
42///     .build();
43///
44/// // Full construction with metrics, DLQ, and context name
45/// let (tx2, _rx2) = mpsc::channel(100);
46/// let mut engine = Engine::builder()
47///     .output(tx2)
48///     .metrics(Metrics::new())
49///     .context_name("worker-0")
50///     .dlq_path("/var/log/varpulis-dlq.jsonl")
51///     .build();
52///
53/// // Benchmark mode (no output, no cloning overhead)
54/// let mut engine = Engine::builder().build();
55/// ```
56#[derive(Debug)]
57pub struct EngineBuilder {
58    output_channel: Option<OutputChannel>,
59    metrics: Option<Metrics>,
60    context_name: Option<String>,
61    dlq_path: Option<std::path::PathBuf>,
62    dlq_config: DlqConfig,
63    udf_registry: UdfRegistry,
64    credentials_store: Option<Arc<connector::credentials::CredentialsStore>>,
65}
66
67impl Default for EngineBuilder {
68    fn default() -> Self {
69        Self::new()
70    }
71}
72
73impl EngineBuilder {
74    /// Create a new builder with default configuration (benchmark mode, no output).
75    pub fn new() -> Self {
76        Self {
77            output_channel: None,
78            metrics: None,
79            context_name: None,
80            dlq_path: None,
81            dlq_config: DlqConfig::default(),
82            udf_registry: UdfRegistry::new(),
83            credentials_store: None,
84        }
85    }
86
87    /// Set the output channel for emitted events (legacy owned channel).
88    pub fn output(mut self, tx: mpsc::Sender<Event>) -> Self {
89        self.output_channel = Some(OutputChannel::Owned(tx));
90        self
91    }
92
93    /// Set a zero-copy shared output channel (recommended for performance).
94    pub fn shared_output(mut self, tx: mpsc::Sender<SharedEvent>) -> Self {
95        self.output_channel = Some(OutputChannel::Shared(tx));
96        self
97    }
98
99    /// Enable Prometheus metrics collection.
100    pub fn metrics(mut self, metrics: Metrics) -> Self {
101        self.metrics = Some(metrics);
102        self
103    }
104
105    /// Set the context name for this engine instance (used in multi-context deployments).
106    pub fn context_name(mut self, name: impl Into<String>) -> Self {
107        self.context_name = Some(name.into());
108        self
109    }
110
111    /// Set a custom dead-letter queue file path (default: `varpulis-dlq.jsonl`).
112    pub fn dlq_path(mut self, path: impl Into<std::path::PathBuf>) -> Self {
113        self.dlq_path = Some(path.into());
114        self
115    }
116
117    /// Set custom dead-letter queue configuration.
118    pub fn dlq_config(mut self, config: DlqConfig) -> Self {
119        self.dlq_config = config;
120        self
121    }
122
123    /// Register a native scalar UDF.
124    pub fn scalar_udf(mut self, udf: Arc<dyn crate::udf::ScalarUDF>) -> Self {
125        self.udf_registry.register_scalar(udf);
126        self
127    }
128
129    /// Register a native aggregate UDF.
130    pub fn aggregate_udf(mut self, udf: Arc<dyn crate::udf::AggregateUDF>) -> Self {
131        self.udf_registry.register_aggregate(udf);
132        self
133    }
134
135    /// Set the connector credentials store for resolving `profile:` references in VPL.
136    pub fn credentials(mut self, store: Arc<connector::credentials::CredentialsStore>) -> Self {
137        self.credentials_store = Some(store);
138        self
139    }
140
141    /// Build the engine. Returns the configured `Engine` ready for `load()`.
142    ///
143    /// After building, call [`Engine::load()`] or [`Engine::load_with_source()`]
144    /// to load a VPL program. Post-load configuration like
145    /// [`Engine::enable_checkpointing()`] and [`Engine::enable_watermark_tracking()`]
146    /// should be called after loading.
147    pub fn build(self) -> Engine {
148        let mut engine = Engine::new_internal(self.output_channel);
149        engine.metrics = self.metrics;
150        engine.context_name = self.context_name;
151        engine.dlq_path = self.dlq_path;
152        engine.dlq_config = self.dlq_config;
153        engine.udf_registry = self.udf_registry;
154        engine.credentials_store = self.credentials_store;
155        engine
156    }
157}