use crate::metrics::collector::MetricsSnapshot;
use prometheus::{IntCounter, Registry};
pub struct PrometheusExporter {
registry: Registry,
started: IntCounter,
completed: IntCounter,
failed: IntCounter,
dead_lettered: IntCounter,
retries: IntCounter,
skipped: IntCounter,
}
impl PrometheusExporter {
pub fn new() -> prometheus::Result<Self> {
let registry = Registry::new();
let started = IntCounter::new("dag_tasks_started_total", "Tasks started")?;
let completed = IntCounter::new("dag_tasks_completed_total", "Tasks completed")?;
let failed = IntCounter::new("dag_tasks_failed_total", "Tasks failed")?;
let dead_lettered =
IntCounter::new("dag_tasks_dead_lettered_total", "Tasks dead-lettered")?;
let retries = IntCounter::new("dag_task_retries_total", "Retry attempts")?;
let skipped = IntCounter::new("dag_tasks_skipped_total", "Tasks skipped")?;
registry.register(Box::new(started.clone()))?;
registry.register(Box::new(completed.clone()))?;
registry.register(Box::new(failed.clone()))?;
registry.register(Box::new(dead_lettered.clone()))?;
registry.register(Box::new(retries.clone()))?;
registry.register(Box::new(skipped.clone()))?;
Ok(PrometheusExporter {
registry,
started,
completed,
failed,
dead_lettered,
retries,
skipped,
})
}
pub fn registry(&self) -> &Registry {
&self.registry
}
pub fn update(&self, snap: &MetricsSnapshot) {
bump(&self.started, snap.tasks_started);
bump(&self.completed, snap.tasks_completed);
bump(&self.failed, snap.tasks_failed);
bump(&self.dead_lettered, snap.tasks_dead_lettered);
bump(&self.retries, snap.retries);
bump(&self.skipped, snap.tasks_skipped);
}
pub fn render(&self) -> prometheus::Result<String> {
use prometheus::Encoder;
let encoder = prometheus::TextEncoder::new();
let mut buf = Vec::new();
encoder.encode(&self.registry.gather(), &mut buf)?;
Ok(String::from_utf8_lossy(&buf).into_owned())
}
}
fn bump(counter: &IntCounter, target: u64) {
let current = counter.get();
if target > current {
counter.inc_by(target - current);
}
}