Skip to main content

varpulis_runtime/engine/
builder.rs

1//! Builder pattern for ergonomic `Engine` construction.
2//!
3//! # Examples
4//!
5//! ```rust,no_run
6//! use varpulis_runtime::Engine;
7//! use tokio::sync::mpsc;
8//!
9//! let (tx, _rx) = mpsc::channel(100);
10//! let engine = Engine::builder()
11//!     .output(tx)
12//!     .build();
13//! ```
14
15use crate::dead_letter::DlqConfig;
16use crate::metrics::Metrics;
17use crate::udf::UdfRegistry;
18use std::sync::Arc;
19use tokio::sync::mpsc;
20
21use super::{Engine, OutputChannel};
22use crate::event::{Event, SharedEvent};
23
24/// Builder for constructing an [`Engine`] with a fluent API.
25///
26/// Use [`Engine::builder()`] to create an instance. Without calling
27/// [`output()`](Self::output) or [`shared_output()`](Self::shared_output),
28/// the engine runs in benchmark mode (no output channel, no cloning overhead).
29///
30/// # Examples
31///
32/// ```rust,no_run
33/// use varpulis_runtime::{Engine, Metrics};
34/// use tokio::sync::mpsc;
35///
36/// // Simple construction with output channel
37/// let (tx, _rx) = mpsc::channel(100);
38/// let mut engine = Engine::builder()
39///     .output(tx)
40///     .build();
41///
42/// // Full construction with metrics, DLQ, and context name
43/// let (tx2, _rx2) = mpsc::channel(100);
44/// let mut engine = Engine::builder()
45///     .output(tx2)
46///     .metrics(Metrics::new())
47///     .context_name("worker-0")
48///     .dlq_path("/var/log/varpulis-dlq.jsonl")
49///     .build();
50///
51/// // Benchmark mode (no output, no cloning overhead)
52/// let mut engine = Engine::builder().build();
53/// ```
54#[derive(Debug)]
55pub struct EngineBuilder {
56    output_channel: Option<OutputChannel>,
57    metrics: Option<Metrics>,
58    context_name: Option<String>,
59    dlq_path: Option<std::path::PathBuf>,
60    dlq_config: DlqConfig,
61    udf_registry: UdfRegistry,
62}
63
64impl Default for EngineBuilder {
65    fn default() -> Self {
66        Self::new()
67    }
68}
69
70impl EngineBuilder {
71    /// Create a new builder with default configuration (benchmark mode, no output).
72    pub fn new() -> Self {
73        Self {
74            output_channel: None,
75            metrics: None,
76            context_name: None,
77            dlq_path: None,
78            dlq_config: DlqConfig::default(),
79            udf_registry: UdfRegistry::new(),
80        }
81    }
82
83    /// Set the output channel for emitted events (legacy owned channel).
84    pub fn output(mut self, tx: mpsc::Sender<Event>) -> Self {
85        self.output_channel = Some(OutputChannel::Owned(tx));
86        self
87    }
88
89    /// Set a zero-copy shared output channel (recommended for performance).
90    pub fn shared_output(mut self, tx: mpsc::Sender<SharedEvent>) -> Self {
91        self.output_channel = Some(OutputChannel::Shared(tx));
92        self
93    }
94
95    /// Enable Prometheus metrics collection.
96    pub fn metrics(mut self, metrics: Metrics) -> Self {
97        self.metrics = Some(metrics);
98        self
99    }
100
101    /// Set the context name for this engine instance (used in multi-context deployments).
102    pub fn context_name(mut self, name: impl Into<String>) -> Self {
103        self.context_name = Some(name.into());
104        self
105    }
106
107    /// Set a custom dead-letter queue file path (default: `varpulis-dlq.jsonl`).
108    pub fn dlq_path(mut self, path: impl Into<std::path::PathBuf>) -> Self {
109        self.dlq_path = Some(path.into());
110        self
111    }
112
113    /// Set custom dead-letter queue configuration.
114    pub fn dlq_config(mut self, config: DlqConfig) -> Self {
115        self.dlq_config = config;
116        self
117    }
118
119    /// Register a native scalar UDF.
120    pub fn scalar_udf(mut self, udf: Arc<dyn crate::udf::ScalarUDF>) -> Self {
121        self.udf_registry.register_scalar(udf);
122        self
123    }
124
125    /// Register a native aggregate UDF.
126    pub fn aggregate_udf(mut self, udf: Arc<dyn crate::udf::AggregateUDF>) -> Self {
127        self.udf_registry.register_aggregate(udf);
128        self
129    }
130
131    /// Build the engine. Returns the configured `Engine` ready for `load()`.
132    ///
133    /// After building, call [`Engine::load()`] or [`Engine::load_with_source()`]
134    /// to load a VPL program. Post-load configuration like
135    /// [`Engine::enable_checkpointing()`] and [`Engine::enable_watermark_tracking()`]
136    /// should be called after loading.
137    pub fn build(self) -> Engine {
138        let mut engine = Engine::new_internal(self.output_channel);
139        engine.metrics = self.metrics;
140        engine.context_name = self.context_name;
141        engine.dlq_path = self.dlq_path;
142        engine.dlq_config = self.dlq_config;
143        engine.udf_registry = self.udf_registry;
144        engine
145    }
146}