varpulis_runtime/engine/builder.rs
1//! Builder pattern for ergonomic `Engine` construction.
2//!
3//! # Examples
4//!
5//! ```rust,no_run
6//! use varpulis_runtime::Engine;
7//! use tokio::sync::mpsc;
8//!
9//! let (tx, _rx) = mpsc::channel(100);
10//! let engine = Engine::builder()
11//! .output(tx)
12//! .build();
13//! ```
14
15use std::sync::Arc;
16
17use tokio::sync::mpsc;
18
19use super::{Engine, OutputChannel};
20use crate::dead_letter::DlqConfig;
21use crate::event::{Event, SharedEvent};
22use crate::metrics::Metrics;
23use crate::udf::UdfRegistry;
24
25/// Builder for constructing an [`Engine`] with a fluent API.
26///
27/// Use [`Engine::builder()`] to create an instance. Without calling
28/// [`output()`](Self::output) or [`shared_output()`](Self::shared_output),
29/// the engine runs in benchmark mode (no output channel, no cloning overhead).
30///
31/// # Examples
32///
33/// ```rust,no_run
34/// use varpulis_runtime::{Engine, Metrics};
35/// use tokio::sync::mpsc;
36///
37/// // Simple construction with output channel
38/// let (tx, _rx) = mpsc::channel(100);
39/// let mut engine = Engine::builder()
40/// .output(tx)
41/// .build();
42///
43/// // Full construction with metrics, DLQ, and context name
44/// let (tx2, _rx2) = mpsc::channel(100);
45/// let mut engine = Engine::builder()
46/// .output(tx2)
47/// .metrics(Metrics::new())
48/// .context_name("worker-0")
49/// .dlq_path("/var/log/varpulis-dlq.jsonl")
50/// .build();
51///
52/// // Benchmark mode (no output, no cloning overhead)
53/// let mut engine = Engine::builder().build();
54/// ```
55#[derive(Debug)]
56pub struct EngineBuilder {
57 output_channel: Option<OutputChannel>,
58 metrics: Option<Metrics>,
59 context_name: Option<String>,
60 dlq_path: Option<std::path::PathBuf>,
61 dlq_config: DlqConfig,
62 udf_registry: UdfRegistry,
63}
64
65impl Default for EngineBuilder {
66 fn default() -> Self {
67 Self::new()
68 }
69}
70
71impl EngineBuilder {
72 /// Create a new builder with default configuration (benchmark mode, no output).
73 pub fn new() -> Self {
74 Self {
75 output_channel: None,
76 metrics: None,
77 context_name: None,
78 dlq_path: None,
79 dlq_config: DlqConfig::default(),
80 udf_registry: UdfRegistry::new(),
81 }
82 }
83
84 /// Set the output channel for emitted events (legacy owned channel).
85 pub fn output(mut self, tx: mpsc::Sender<Event>) -> Self {
86 self.output_channel = Some(OutputChannel::Owned(tx));
87 self
88 }
89
90 /// Set a zero-copy shared output channel (recommended for performance).
91 pub fn shared_output(mut self, tx: mpsc::Sender<SharedEvent>) -> Self {
92 self.output_channel = Some(OutputChannel::Shared(tx));
93 self
94 }
95
96 /// Enable Prometheus metrics collection.
97 pub fn metrics(mut self, metrics: Metrics) -> Self {
98 self.metrics = Some(metrics);
99 self
100 }
101
102 /// Set the context name for this engine instance (used in multi-context deployments).
103 pub fn context_name(mut self, name: impl Into<String>) -> Self {
104 self.context_name = Some(name.into());
105 self
106 }
107
108 /// Set a custom dead-letter queue file path (default: `varpulis-dlq.jsonl`).
109 pub fn dlq_path(mut self, path: impl Into<std::path::PathBuf>) -> Self {
110 self.dlq_path = Some(path.into());
111 self
112 }
113
114 /// Set custom dead-letter queue configuration.
115 pub fn dlq_config(mut self, config: DlqConfig) -> Self {
116 self.dlq_config = config;
117 self
118 }
119
120 /// Register a native scalar UDF.
121 pub fn scalar_udf(mut self, udf: Arc<dyn crate::udf::ScalarUDF>) -> Self {
122 self.udf_registry.register_scalar(udf);
123 self
124 }
125
126 /// Register a native aggregate UDF.
127 pub fn aggregate_udf(mut self, udf: Arc<dyn crate::udf::AggregateUDF>) -> Self {
128 self.udf_registry.register_aggregate(udf);
129 self
130 }
131
132 /// Build the engine. Returns the configured `Engine` ready for `load()`.
133 ///
134 /// After building, call [`Engine::load()`] or [`Engine::load_with_source()`]
135 /// to load a VPL program. Post-load configuration like
136 /// [`Engine::enable_checkpointing()`] and [`Engine::enable_watermark_tracking()`]
137 /// should be called after loading.
138 pub fn build(self) -> Engine {
139 let mut engine = Engine::new_internal(self.output_channel);
140 engine.metrics = self.metrics;
141 engine.context_name = self.context_name;
142 engine.dlq_path = self.dlq_path;
143 engine.dlq_config = self.dlq_config;
144 engine.udf_registry = self.udf_registry;
145 engine
146 }
147}