Skip to main content

varpulis_runtime/engine/
builder.rs

1//! Builder pattern for ergonomic `Engine` construction.
2//!
3//! # Examples
4//!
5//! ```rust,no_run
6//! use varpulis_runtime::Engine;
7//! use tokio::sync::mpsc;
8//!
9//! let (tx, _rx) = mpsc::channel(100);
10//! let engine = Engine::builder()
11//!     .output(tx)
12//!     .build();
13//! ```
14
15use std::sync::Arc;
16
17use tokio::sync::mpsc;
18
19use super::{Engine, OutputChannel};
20use crate::dead_letter::DlqConfig;
21use crate::event::{Event, SharedEvent};
22use crate::metrics::Metrics;
23use crate::udf::UdfRegistry;
24
25/// Builder for constructing an [`Engine`] with a fluent API.
26///
27/// Use [`Engine::builder()`] to create an instance. Without calling
28/// [`output()`](Self::output) or [`shared_output()`](Self::shared_output),
29/// the engine runs in benchmark mode (no output channel, no cloning overhead).
30///
31/// # Examples
32///
33/// ```rust,no_run
34/// use varpulis_runtime::{Engine, Metrics};
35/// use tokio::sync::mpsc;
36///
37/// // Simple construction with output channel
38/// let (tx, _rx) = mpsc::channel(100);
39/// let mut engine = Engine::builder()
40///     .output(tx)
41///     .build();
42///
43/// // Full construction with metrics, DLQ, and context name
44/// let (tx2, _rx2) = mpsc::channel(100);
45/// let mut engine = Engine::builder()
46///     .output(tx2)
47///     .metrics(Metrics::new())
48///     .context_name("worker-0")
49///     .dlq_path("/var/log/varpulis-dlq.jsonl")
50///     .build();
51///
52/// // Benchmark mode (no output, no cloning overhead)
53/// let mut engine = Engine::builder().build();
54/// ```
55#[derive(Debug)]
56pub struct EngineBuilder {
57    output_channel: Option<OutputChannel>,
58    metrics: Option<Metrics>,
59    context_name: Option<String>,
60    dlq_path: Option<std::path::PathBuf>,
61    dlq_config: DlqConfig,
62    udf_registry: UdfRegistry,
63}
64
65impl Default for EngineBuilder {
66    fn default() -> Self {
67        Self::new()
68    }
69}
70
71impl EngineBuilder {
72    /// Create a new builder with default configuration (benchmark mode, no output).
73    pub fn new() -> Self {
74        Self {
75            output_channel: None,
76            metrics: None,
77            context_name: None,
78            dlq_path: None,
79            dlq_config: DlqConfig::default(),
80            udf_registry: UdfRegistry::new(),
81        }
82    }
83
84    /// Set the output channel for emitted events (legacy owned channel).
85    pub fn output(mut self, tx: mpsc::Sender<Event>) -> Self {
86        self.output_channel = Some(OutputChannel::Owned(tx));
87        self
88    }
89
90    /// Set a zero-copy shared output channel (recommended for performance).
91    pub fn shared_output(mut self, tx: mpsc::Sender<SharedEvent>) -> Self {
92        self.output_channel = Some(OutputChannel::Shared(tx));
93        self
94    }
95
96    /// Enable Prometheus metrics collection.
97    pub fn metrics(mut self, metrics: Metrics) -> Self {
98        self.metrics = Some(metrics);
99        self
100    }
101
102    /// Set the context name for this engine instance (used in multi-context deployments).
103    pub fn context_name(mut self, name: impl Into<String>) -> Self {
104        self.context_name = Some(name.into());
105        self
106    }
107
108    /// Set a custom dead-letter queue file path (default: `varpulis-dlq.jsonl`).
109    pub fn dlq_path(mut self, path: impl Into<std::path::PathBuf>) -> Self {
110        self.dlq_path = Some(path.into());
111        self
112    }
113
114    /// Set custom dead-letter queue configuration.
115    pub fn dlq_config(mut self, config: DlqConfig) -> Self {
116        self.dlq_config = config;
117        self
118    }
119
120    /// Register a native scalar UDF.
121    pub fn scalar_udf(mut self, udf: Arc<dyn crate::udf::ScalarUDF>) -> Self {
122        self.udf_registry.register_scalar(udf);
123        self
124    }
125
126    /// Register a native aggregate UDF.
127    pub fn aggregate_udf(mut self, udf: Arc<dyn crate::udf::AggregateUDF>) -> Self {
128        self.udf_registry.register_aggregate(udf);
129        self
130    }
131
132    /// Build the engine. Returns the configured `Engine` ready for `load()`.
133    ///
134    /// After building, call [`Engine::load()`] or [`Engine::load_with_source()`]
135    /// to load a VPL program. Post-load configuration like
136    /// [`Engine::enable_checkpointing()`] and [`Engine::enable_watermark_tracking()`]
137    /// should be called after loading.
138    pub fn build(self) -> Engine {
139        let mut engine = Engine::new_internal(self.output_channel);
140        engine.metrics = self.metrics;
141        engine.context_name = self.context_name;
142        engine.dlq_path = self.dlq_path;
143        engine.dlq_config = self.dlq_config;
144        engine.udf_registry = self.udf_registry;
145        engine
146    }
147}