varpulis_runtime/engine/builder.rs
1//! Builder pattern for ergonomic `Engine` construction.
2//!
3//! # Examples
4//!
5//! ```rust,no_run
6//! use varpulis_runtime::Engine;
7//! use tokio::sync::mpsc;
8//!
9//! let (tx, _rx) = mpsc::channel(100);
10//! let engine = Engine::builder()
11//! .output(tx)
12//! .build();
13//! ```
14
15use std::sync::Arc;
16
17use tokio::sync::mpsc;
18
19use super::{Engine, OutputChannel};
20use crate::connector;
21use crate::dead_letter::DlqConfig;
22use crate::event::{Event, SharedEvent};
23use crate::metrics::Metrics;
24use crate::udf::UdfRegistry;
25
26/// Builder for constructing an [`Engine`] with a fluent API.
27///
28/// Use [`Engine::builder()`] to create an instance. Without calling
29/// [`output()`](Self::output) or [`shared_output()`](Self::shared_output),
30/// the engine runs in benchmark mode (no output channel, no cloning overhead).
31///
32/// # Examples
33///
34/// ```rust,no_run
35/// use varpulis_runtime::{Engine, Metrics};
36/// use tokio::sync::mpsc;
37///
38/// // Simple construction with output channel
39/// let (tx, _rx) = mpsc::channel(100);
40/// let mut engine = Engine::builder()
41/// .output(tx)
42/// .build();
43///
44/// // Full construction with metrics, DLQ, and context name
45/// let (tx2, _rx2) = mpsc::channel(100);
46/// let mut engine = Engine::builder()
47/// .output(tx2)
48/// .metrics(Metrics::new())
49/// .context_name("worker-0")
50/// .dlq_path("/var/log/varpulis-dlq.jsonl")
51/// .build();
52///
53/// // Benchmark mode (no output, no cloning overhead)
54/// let mut engine = Engine::builder().build();
55/// ```
56#[derive(Debug)]
57pub struct EngineBuilder {
58 output_channel: Option<OutputChannel>,
59 metrics: Option<Metrics>,
60 context_name: Option<String>,
61 dlq_path: Option<std::path::PathBuf>,
62 dlq_config: DlqConfig,
63 udf_registry: UdfRegistry,
64 credentials_store: Option<Arc<connector::credentials::CredentialsStore>>,
65}
66
67impl Default for EngineBuilder {
68 fn default() -> Self {
69 Self::new()
70 }
71}
72
73impl EngineBuilder {
74 /// Create a new builder with default configuration (benchmark mode, no output).
75 pub fn new() -> Self {
76 Self {
77 output_channel: None,
78 metrics: None,
79 context_name: None,
80 dlq_path: None,
81 dlq_config: DlqConfig::default(),
82 udf_registry: UdfRegistry::new(),
83 credentials_store: None,
84 }
85 }
86
87 /// Set the output channel for emitted events (legacy owned channel).
88 pub fn output(mut self, tx: mpsc::Sender<Event>) -> Self {
89 self.output_channel = Some(OutputChannel::Owned(tx));
90 self
91 }
92
93 /// Set a zero-copy shared output channel (recommended for performance).
94 pub fn shared_output(mut self, tx: mpsc::Sender<SharedEvent>) -> Self {
95 self.output_channel = Some(OutputChannel::Shared(tx));
96 self
97 }
98
99 /// Enable Prometheus metrics collection.
100 pub fn metrics(mut self, metrics: Metrics) -> Self {
101 self.metrics = Some(metrics);
102 self
103 }
104
105 /// Set the context name for this engine instance (used in multi-context deployments).
106 pub fn context_name(mut self, name: impl Into<String>) -> Self {
107 self.context_name = Some(name.into());
108 self
109 }
110
111 /// Set a custom dead-letter queue file path (default: `varpulis-dlq.jsonl`).
112 pub fn dlq_path(mut self, path: impl Into<std::path::PathBuf>) -> Self {
113 self.dlq_path = Some(path.into());
114 self
115 }
116
117 /// Set custom dead-letter queue configuration.
118 pub fn dlq_config(mut self, config: DlqConfig) -> Self {
119 self.dlq_config = config;
120 self
121 }
122
123 /// Register a native scalar UDF.
124 pub fn scalar_udf(mut self, udf: Arc<dyn crate::udf::ScalarUDF>) -> Self {
125 self.udf_registry.register_scalar(udf);
126 self
127 }
128
129 /// Register a native aggregate UDF.
130 pub fn aggregate_udf(mut self, udf: Arc<dyn crate::udf::AggregateUDF>) -> Self {
131 self.udf_registry.register_aggregate(udf);
132 self
133 }
134
135 /// Set the connector credentials store for resolving `profile:` references in VPL.
136 pub fn credentials(mut self, store: Arc<connector::credentials::CredentialsStore>) -> Self {
137 self.credentials_store = Some(store);
138 self
139 }
140
141 /// Build the engine. Returns the configured `Engine` ready for `load()`.
142 ///
143 /// After building, call [`Engine::load()`] or [`Engine::load_with_source()`]
144 /// to load a VPL program. Post-load configuration like
145 /// [`Engine::enable_checkpointing()`] and [`Engine::enable_watermark_tracking()`]
146 /// should be called after loading.
147 pub fn build(self) -> Engine {
148 let mut engine = Engine::new_internal(self.output_channel);
149 engine.metrics = self.metrics;
150 engine.context_name = self.context_name;
151 engine.dlq_path = self.dlq_path;
152 engine.dlq_config = self.dlq_config;
153 engine.udf_registry = self.udf_registry;
154 engine.credentials_store = self.credentials_store;
155 engine
156 }
157}