dag-executor 0.1.0

A production-ready DAG executor with state management and advanced patterns
Documentation
//! Prometheus exposition for collected metrics.
//!
//! This module is only compiled when the `metrics` feature is enabled. It
//! translates a [`MetricsSnapshot`] into the Prometheus text exposition format,
//! which can be served from any HTTP handler (kept out of this crate to avoid
//! pulling in a web server).

use crate::metrics::collector::MetricsSnapshot;
use prometheus::{IntCounter, Registry};

/// Holds Prometheus counters mirroring the executor's metrics.
pub struct PrometheusExporter {
    registry: Registry,
    started: IntCounter,
    completed: IntCounter,
    failed: IntCounter,
    dead_lettered: IntCounter,
    retries: IntCounter,
    skipped: IntCounter,
}

impl PrometheusExporter {
    /// Build an exporter with a fresh registry.
    pub fn new() -> prometheus::Result<Self> {
        let registry = Registry::new();
        let started = IntCounter::new("dag_tasks_started_total", "Tasks started")?;
        let completed = IntCounter::new("dag_tasks_completed_total", "Tasks completed")?;
        let failed = IntCounter::new("dag_tasks_failed_total", "Tasks failed")?;
        let dead_lettered =
            IntCounter::new("dag_tasks_dead_lettered_total", "Tasks dead-lettered")?;
        let retries = IntCounter::new("dag_task_retries_total", "Retry attempts")?;
        let skipped = IntCounter::new("dag_tasks_skipped_total", "Tasks skipped")?;
        registry.register(Box::new(started.clone()))?;
        registry.register(Box::new(completed.clone()))?;
        registry.register(Box::new(failed.clone()))?;
        registry.register(Box::new(dead_lettered.clone()))?;
        registry.register(Box::new(retries.clone()))?;
        registry.register(Box::new(skipped.clone()))?;
        Ok(PrometheusExporter {
            registry,
            started,
            completed,
            failed,
            dead_lettered,
            retries,
            skipped,
        })
    }

    /// The underlying registry, e.g. to register additional collectors.
    pub fn registry(&self) -> &Registry {
        &self.registry
    }

    /// Overwrite counter values from a snapshot.
    ///
    /// Prometheus counters are monotonic, so we only ever move them forward by
    /// the delta between their current value and the snapshot.
    pub fn update(&self, snap: &MetricsSnapshot) {
        bump(&self.started, snap.tasks_started);
        bump(&self.completed, snap.tasks_completed);
        bump(&self.failed, snap.tasks_failed);
        bump(&self.dead_lettered, snap.tasks_dead_lettered);
        bump(&self.retries, snap.retries);
        bump(&self.skipped, snap.tasks_skipped);
    }

    /// Render the current registry in Prometheus text exposition format.
    pub fn render(&self) -> prometheus::Result<String> {
        use prometheus::Encoder;
        let encoder = prometheus::TextEncoder::new();
        let mut buf = Vec::new();
        encoder.encode(&self.registry.gather(), &mut buf)?;
        Ok(String::from_utf8_lossy(&buf).into_owned())
    }
}

fn bump(counter: &IntCounter, target: u64) {
    let current = counter.get();
    if target > current {
        counter.inc_by(target - current);
    }
}