varpulis_runtime/engine/builder.rs
1//! Builder pattern for ergonomic `Engine` construction.
2//!
3//! # Examples
4//!
5//! ```rust,no_run
6//! use varpulis_runtime::Engine;
7//! use tokio::sync::mpsc;
8//!
9//! let (tx, _rx) = mpsc::channel(100);
10//! let engine = Engine::builder()
11//! .output(tx)
12//! .build();
13//! ```
14
15use crate::dead_letter::DlqConfig;
16use crate::metrics::Metrics;
17use crate::udf::UdfRegistry;
18use std::sync::Arc;
19use tokio::sync::mpsc;
20
21use super::{Engine, OutputChannel};
22use crate::event::{Event, SharedEvent};
23
24/// Builder for constructing an [`Engine`] with a fluent API.
25///
26/// Use [`Engine::builder()`] to create an instance. Without calling
27/// [`output()`](Self::output) or [`shared_output()`](Self::shared_output),
28/// the engine runs in benchmark mode (no output channel, no cloning overhead).
29///
30/// # Examples
31///
32/// ```rust,no_run
33/// use varpulis_runtime::{Engine, Metrics};
34/// use tokio::sync::mpsc;
35///
36/// // Simple construction with output channel
37/// let (tx, _rx) = mpsc::channel(100);
38/// let mut engine = Engine::builder()
39/// .output(tx)
40/// .build();
41///
42/// // Full construction with metrics, DLQ, and context name
43/// let (tx2, _rx2) = mpsc::channel(100);
44/// let mut engine = Engine::builder()
45/// .output(tx2)
46/// .metrics(Metrics::new())
47/// .context_name("worker-0")
48/// .dlq_path("/var/log/varpulis-dlq.jsonl")
49/// .build();
50///
51/// // Benchmark mode (no output, no cloning overhead)
52/// let mut engine = Engine::builder().build();
53/// ```
54#[derive(Debug)]
55pub struct EngineBuilder {
56 output_channel: Option<OutputChannel>,
57 metrics: Option<Metrics>,
58 context_name: Option<String>,
59 dlq_path: Option<std::path::PathBuf>,
60 dlq_config: DlqConfig,
61 udf_registry: UdfRegistry,
62}
63
64impl Default for EngineBuilder {
65 fn default() -> Self {
66 Self::new()
67 }
68}
69
70impl EngineBuilder {
71 /// Create a new builder with default configuration (benchmark mode, no output).
72 pub fn new() -> Self {
73 Self {
74 output_channel: None,
75 metrics: None,
76 context_name: None,
77 dlq_path: None,
78 dlq_config: DlqConfig::default(),
79 udf_registry: UdfRegistry::new(),
80 }
81 }
82
83 /// Set the output channel for emitted events (legacy owned channel).
84 pub fn output(mut self, tx: mpsc::Sender<Event>) -> Self {
85 self.output_channel = Some(OutputChannel::Owned(tx));
86 self
87 }
88
89 /// Set a zero-copy shared output channel (recommended for performance).
90 pub fn shared_output(mut self, tx: mpsc::Sender<SharedEvent>) -> Self {
91 self.output_channel = Some(OutputChannel::Shared(tx));
92 self
93 }
94
95 /// Enable Prometheus metrics collection.
96 pub fn metrics(mut self, metrics: Metrics) -> Self {
97 self.metrics = Some(metrics);
98 self
99 }
100
101 /// Set the context name for this engine instance (used in multi-context deployments).
102 pub fn context_name(mut self, name: impl Into<String>) -> Self {
103 self.context_name = Some(name.into());
104 self
105 }
106
107 /// Set a custom dead-letter queue file path (default: `varpulis-dlq.jsonl`).
108 pub fn dlq_path(mut self, path: impl Into<std::path::PathBuf>) -> Self {
109 self.dlq_path = Some(path.into());
110 self
111 }
112
113 /// Set custom dead-letter queue configuration.
114 pub fn dlq_config(mut self, config: DlqConfig) -> Self {
115 self.dlq_config = config;
116 self
117 }
118
119 /// Register a native scalar UDF.
120 pub fn scalar_udf(mut self, udf: Arc<dyn crate::udf::ScalarUDF>) -> Self {
121 self.udf_registry.register_scalar(udf);
122 self
123 }
124
125 /// Register a native aggregate UDF.
126 pub fn aggregate_udf(mut self, udf: Arc<dyn crate::udf::AggregateUDF>) -> Self {
127 self.udf_registry.register_aggregate(udf);
128 self
129 }
130
131 /// Build the engine. Returns the configured `Engine` ready for `load()`.
132 ///
133 /// After building, call [`Engine::load()`] or [`Engine::load_with_source()`]
134 /// to load a VPL program. Post-load configuration like
135 /// [`Engine::enable_checkpointing()`] and [`Engine::enable_watermark_tracking()`]
136 /// should be called after loading.
137 pub fn build(self) -> Engine {
138 let mut engine = Engine::new_internal(self.output_channel);
139 engine.metrics = self.metrics;
140 engine.context_name = self.context_name;
141 engine.dlq_path = self.dlq_path;
142 engine.dlq_config = self.dlq_config;
143 engine.udf_registry = self.udf_registry;
144 engine
145 }
146}