Skip to main content

dag_executor/metrics/
prometheus.rs

1//! Prometheus exposition for collected metrics.
2//!
3//! This module is only compiled when the `metrics` feature is enabled. It
4//! translates a [`MetricsSnapshot`] into the Prometheus text exposition format,
5//! which can be served from any HTTP handler (kept out of this crate to avoid
6//! pulling in a web server).
7
8use crate::metrics::collector::MetricsSnapshot;
9use prometheus::{IntCounter, Registry};
10
11/// Holds Prometheus counters mirroring the executor's metrics.
12pub struct PrometheusExporter {
13    registry: Registry,
14    started: IntCounter,
15    completed: IntCounter,
16    failed: IntCounter,
17    dead_lettered: IntCounter,
18    retries: IntCounter,
19    skipped: IntCounter,
20}
21
22impl PrometheusExporter {
23    /// Build an exporter with a fresh registry.
24    pub fn new() -> prometheus::Result<Self> {
25        let registry = Registry::new();
26        let started = IntCounter::new("dag_tasks_started_total", "Tasks started")?;
27        let completed = IntCounter::new("dag_tasks_completed_total", "Tasks completed")?;
28        let failed = IntCounter::new("dag_tasks_failed_total", "Tasks failed")?;
29        let dead_lettered =
30            IntCounter::new("dag_tasks_dead_lettered_total", "Tasks dead-lettered")?;
31        let retries = IntCounter::new("dag_task_retries_total", "Retry attempts")?;
32        let skipped = IntCounter::new("dag_tasks_skipped_total", "Tasks skipped")?;
33        registry.register(Box::new(started.clone()))?;
34        registry.register(Box::new(completed.clone()))?;
35        registry.register(Box::new(failed.clone()))?;
36        registry.register(Box::new(dead_lettered.clone()))?;
37        registry.register(Box::new(retries.clone()))?;
38        registry.register(Box::new(skipped.clone()))?;
39        Ok(PrometheusExporter {
40            registry,
41            started,
42            completed,
43            failed,
44            dead_lettered,
45            retries,
46            skipped,
47        })
48    }
49
50    /// The underlying registry, e.g. to register additional collectors.
51    pub fn registry(&self) -> &Registry {
52        &self.registry
53    }
54
55    /// Overwrite counter values from a snapshot.
56    ///
57    /// Prometheus counters are monotonic, so we only ever move them forward by
58    /// the delta between their current value and the snapshot.
59    pub fn update(&self, snap: &MetricsSnapshot) {
60        bump(&self.started, snap.tasks_started);
61        bump(&self.completed, snap.tasks_completed);
62        bump(&self.failed, snap.tasks_failed);
63        bump(&self.dead_lettered, snap.tasks_dead_lettered);
64        bump(&self.retries, snap.retries);
65        bump(&self.skipped, snap.tasks_skipped);
66    }
67
68    /// Render the current registry in Prometheus text exposition format.
69    pub fn render(&self) -> prometheus::Result<String> {
70        use prometheus::Encoder;
71        let encoder = prometheus::TextEncoder::new();
72        let mut buf = Vec::new();
73        encoder.encode(&self.registry.gather(), &mut buf)?;
74        Ok(String::from_utf8_lossy(&buf).into_owned())
75    }
76}
77
78fn bump(counter: &IntCounter, target: u64) {
79    let current = counter.get();
80    if target > current {
81        counter.inc_by(target - current);
82    }
83}