Skip to main content

rustvello_prometheus/
sink.rs

1//! Prometheus sink implementing [`EventEmitter`].
2
3use std::net::SocketAddr;
4use std::sync::OnceLock;
5use std::time::Duration;
6
7use metrics::{counter, gauge, histogram};
8use metrics_exporter_prometheus::PrometheusBuilder;
9use rustvello_core::error::{RustvelloError, RustvelloResult};
10use rustvello_core::observability::EventEmitter;
11use rustvello_proto::identifiers::{InvocationId, RunnerId, TaskId};
12
13/// Guard ensuring the global metrics recorder is installed at most once.
14static RECORDER_INSTALLED: OnceLock<()> = OnceLock::new();
15
16/// Prometheus metrics sink for Rustvello.
17///
18/// Registers standard metrics and exposes them via an HTTP `/metrics` endpoint.
19/// The sink spawns a background tokio task for the HTTP server.
20///
21/// **Singleton:** Only one `PrometheusSink` can be created per process because
22/// the `metrics` crate uses a global recorder. A second call to [`new`] will
23/// return an error.
24///
25/// # Metric Naming Convention
26///
27/// `rustvello_<subsystem>_<metric>_<unit>`
28///
29/// # Example
30///
31/// ```rust,no_run
32/// use rustvello_prometheus::PrometheusSink;
33/// use std::net::SocketAddr;
34///
35/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
36/// let sink = PrometheusSink::new("0.0.0.0:9090".parse()?)?;
37/// # Ok(())
38/// # }
39/// ```
40pub struct PrometheusSink {
41    _handle: metrics_exporter_prometheus::PrometheusHandle,
42}
43
44impl PrometheusSink {
45    /// Create a new Prometheus sink and start the HTTP exporter on the given address.
46    ///
47    /// # Errors
48    ///
49    /// Returns an error if the global metrics recorder has already been installed
50    /// (only one `PrometheusSink` per process).
51    pub fn new(bind: SocketAddr) -> RustvelloResult<Self> {
52        if RECORDER_INSTALLED.get().is_some() {
53            return Err(RustvelloError::Configuration {
54                message:
55                    "Prometheus recorder already installed — only one PrometheusSink per process"
56                        .to_owned(),
57            });
58        }
59        let builder = PrometheusBuilder::new().with_http_listener(bind);
60        let handle = builder
61            .install_recorder()
62            .map_err(|e| RustvelloError::Configuration {
63                message: format!("failed to install Prometheus recorder: {e}"),
64            })?;
65        let _ = RECORDER_INSTALLED.set(());
66        Ok(Self { _handle: handle })
67    }
68}
69
70impl EventEmitter for PrometheusSink {
71    fn on_worker_started(&self, runner_id: &RunnerId) {
72        gauge!("rustvello_workers_active", "runner_id" => runner_id.as_str().to_owned())
73            .increment(1.0);
74    }
75
76    fn on_worker_shutdown(&self, runner_id: &RunnerId) {
77        gauge!("rustvello_workers_active", "runner_id" => runner_id.as_str().to_owned())
78            .decrement(1.0);
79    }
80
81    fn on_task_submitted(&self, task_id: &TaskId, _inv_id: &InvocationId) {
82        counter!("rustvello_tasks_submitted_total", "task_id" => task_id.to_string()).increment(1);
83    }
84
85    fn on_task_started(&self, task_id: &TaskId, _inv_id: &InvocationId) {
86        counter!("rustvello_tasks_started_total", "task_id" => task_id.to_string()).increment(1);
87    }
88
89    fn on_task_succeeded(&self, task_id: &TaskId, _inv_id: &InvocationId, duration: Duration) {
90        let tid = task_id.to_string();
91        counter!("rustvello_tasks_succeeded_total", "task_id" => tid.clone()).increment(1);
92        histogram!("rustvello_task_duration_seconds", "task_id" => tid)
93            .record(duration.as_secs_f64());
94    }
95
96    fn on_task_failed(
97        &self,
98        task_id: &TaskId,
99        _inv_id: &InvocationId,
100        _error: &str,
101        duration: Duration,
102    ) {
103        let tid = task_id.to_string();
104        counter!("rustvello_tasks_failed_total", "task_id" => tid.clone()).increment(1);
105        histogram!("rustvello_task_duration_seconds", "task_id" => tid)
106            .record(duration.as_secs_f64());
107    }
108
109    fn on_task_retried(&self, task_id: &TaskId, _inv_id: &InvocationId, _attempt: u32) {
110        counter!("rustvello_tasks_retried_total", "task_id" => task_id.to_string()).increment(1);
111    }
112
113    fn on_queue_depth(&self, queue: &str, depth: usize) {
114        gauge!("rustvello_queue_depth", "queue" => queue.to_owned()).set(depth as f64);
115    }
116
117    fn on_cc_rejected(&self, task_id: &TaskId) {
118        counter!("rustvello_cc_rejected_total", "task_id" => task_id.to_string()).increment(1);
119    }
120
121    fn on_cc_slot_acquired(&self, task_id: &TaskId) {
122        let tid = task_id.to_string();
123        counter!("rustvello_cc_slot_acquired_total", "task_id" => tid.clone()).increment(1);
124        gauge!("rustvello_cc_slots_active", "task_id" => tid).increment(1.0);
125    }
126
127    fn on_cc_slot_released(&self, task_id: &TaskId) {
128        let tid = task_id.to_string();
129        counter!("rustvello_cc_slot_released_total", "task_id" => tid.clone()).increment(1);
130        gauge!("rustvello_cc_slots_active", "task_id" => tid).decrement(1.0);
131    }
132}
133
134#[cfg(test)]
135mod tests {
136    use super::*;
137    use metrics::Recorder;
138    use metrics_exporter_prometheus::PrometheusBuilder;
139
140    /// Build an isolated recorder + handle pair for testing (no HTTP server).
141    fn test_recorder() -> (impl Recorder, metrics_exporter_prometheus::PrometheusHandle) {
142        let recorder = PrometheusBuilder::new().build_recorder();
143        let handle = recorder.handle();
144        (recorder, handle)
145    }
146
147    fn task_id(s: &str) -> TaskId {
148        s.parse().unwrap()
149    }
150
151    fn inv_id(s: &str) -> InvocationId {
152        InvocationId::from(s)
153    }
154
155    fn runner_id(s: &str) -> RunnerId {
156        RunnerId::from(s)
157    }
158
159    #[test]
160    fn worker_started_increments_gauge() {
161        let (rec, handle) = test_recorder();
162        let sink = PrometheusSink {
163            _handle: handle.clone(),
164        };
165
166        metrics::with_local_recorder(&rec, || {
167            sink.on_worker_started(&runner_id("r1"));
168        });
169
170        let output = handle.render();
171        assert!(
172            output.contains("rustvello_workers_active"),
173            "missing gauge:\n{output}"
174        );
175        assert!(
176            output.contains("runner_id=\"r1\""),
177            "missing label:\n{output}"
178        );
179    }
180
181    #[test]
182    fn worker_shutdown_decrements_gauge() {
183        let (rec, handle) = test_recorder();
184        let sink = PrometheusSink {
185            _handle: handle.clone(),
186        };
187
188        metrics::with_local_recorder(&rec, || {
189            sink.on_worker_started(&runner_id("r1"));
190            sink.on_worker_shutdown(&runner_id("r1"));
191        });
192
193        let output = handle.render();
194        // After +1 then -1 the gauge should be 0
195        assert!(output.contains("rustvello_workers_active{runner_id=\"r1\"} 0"));
196    }
197
198    #[test]
199    fn task_submitted_increments_counter() {
200        let (rec, handle) = test_recorder();
201        let sink = PrometheusSink {
202            _handle: handle.clone(),
203        };
204
205        metrics::with_local_recorder(&rec, || {
206            sink.on_task_submitted(&task_id("my_app.add"), &inv_id("inv-001"));
207            sink.on_task_submitted(&task_id("my_app.add"), &inv_id("inv-002"));
208        });
209
210        let output = handle.render();
211        assert!(
212            output.contains("rustvello_tasks_submitted_total"),
213            "missing counter:\n{output}"
214        );
215        // Two increments → value 2
216        assert!(output.contains("} 2"), "expected count 2:\n{output}");
217    }
218
219    #[test]
220    fn task_started_increments_counter() {
221        let (rec, handle) = test_recorder();
222        let sink = PrometheusSink {
223            _handle: handle.clone(),
224        };
225
226        metrics::with_local_recorder(&rec, || {
227            sink.on_task_started(&task_id("my_app.run"), &inv_id("inv-001"));
228        });
229
230        let output = handle.render();
231        assert!(output.contains("rustvello_tasks_started_total"));
232    }
233
234    #[test]
235    fn task_succeeded_records_counter_and_histogram() {
236        let (rec, handle) = test_recorder();
237        let sink = PrometheusSink {
238            _handle: handle.clone(),
239        };
240
241        metrics::with_local_recorder(&rec, || {
242            sink.on_task_succeeded(
243                &task_id("my_app.calc"),
244                &inv_id("inv-001"),
245                Duration::from_millis(150),
246            );
247        });
248
249        let output = handle.render();
250        assert!(
251            output.contains("rustvello_tasks_succeeded_total"),
252            "missing succeeded counter:\n{output}"
253        );
254        assert!(
255            output.contains("rustvello_task_duration_seconds"),
256            "missing duration histogram:\n{output}"
257        );
258    }
259
260    #[test]
261    fn task_failed_records_counter_and_histogram() {
262        let (rec, handle) = test_recorder();
263        let sink = PrometheusSink {
264            _handle: handle.clone(),
265        };
266
267        metrics::with_local_recorder(&rec, || {
268            sink.on_task_failed(
269                &task_id("my_app.calc"),
270                &inv_id("inv-001"),
271                "boom",
272                Duration::from_secs(1),
273            );
274        });
275
276        let output = handle.render();
277        assert!(output.contains("rustvello_tasks_failed_total"));
278        assert!(output.contains("rustvello_task_duration_seconds"));
279    }
280
281    #[test]
282    fn task_retried_increments_counter() {
283        let (rec, handle) = test_recorder();
284        let sink = PrometheusSink {
285            _handle: handle.clone(),
286        };
287
288        metrics::with_local_recorder(&rec, || {
289            sink.on_task_retried(&task_id("my_app.flaky"), &inv_id("inv-001"), 3);
290        });
291
292        let output = handle.render();
293        assert!(output.contains("rustvello_tasks_retried_total"));
294    }
295
296    #[test]
297    fn queue_depth_sets_gauge() {
298        let (rec, handle) = test_recorder();
299        let sink = PrometheusSink {
300            _handle: handle.clone(),
301        };
302
303        metrics::with_local_recorder(&rec, || {
304            sink.on_queue_depth("default", 42);
305        });
306
307        let output = handle.render();
308        assert!(output.contains("rustvello_queue_depth{queue=\"default\"} 42"));
309    }
310
311    #[test]
312    fn cc_rejected_increments_counter() {
313        let (rec, handle) = test_recorder();
314        let sink = PrometheusSink {
315            _handle: handle.clone(),
316        };
317
318        metrics::with_local_recorder(&rec, || {
319            sink.on_cc_rejected(&task_id("my_app.heavy"));
320        });
321
322        let output = handle.render();
323        assert!(output.contains("rustvello_cc_rejected_total"));
324    }
325
326    #[test]
327    fn cc_slot_acquired_increments_counter_and_gauge() {
328        let (rec, handle) = test_recorder();
329        let sink = PrometheusSink {
330            _handle: handle.clone(),
331        };
332
333        metrics::with_local_recorder(&rec, || {
334            sink.on_cc_slot_acquired(&task_id("my_app.limited"));
335        });
336
337        let output = handle.render();
338        assert!(
339            output.contains("rustvello_cc_slot_acquired_total"),
340            "missing acquired counter:\n{output}"
341        );
342        assert!(
343            output.contains("rustvello_cc_slots_active"),
344            "missing active gauge:\n{output}"
345        );
346    }
347
348    #[test]
349    fn cc_slot_released_decrements_gauge() {
350        let (rec, handle) = test_recorder();
351        let sink = PrometheusSink {
352            _handle: handle.clone(),
353        };
354
355        metrics::with_local_recorder(&rec, || {
356            sink.on_cc_slot_acquired(&task_id("my_app.limited"));
357            sink.on_cc_slot_released(&task_id("my_app.limited"));
358        });
359
360        let output = handle.render();
361        assert!(
362            output.contains("rustvello_cc_slot_released_total"),
363            "missing released counter:\n{output}"
364        );
365        // After acquire + release, gauge should be 0
366        assert!(
367            output.contains("rustvello_cc_slots_active{task_id=\"my_app.limited\"} 0"),
368            "expected gauge at 0:\n{output}"
369        );
370    }
371}