dag_executor/metrics/
prometheus.rs1use crate::metrics::collector::MetricsSnapshot;
9use prometheus::{IntCounter, Registry};
10
11pub struct PrometheusExporter {
13 registry: Registry,
14 started: IntCounter,
15 completed: IntCounter,
16 failed: IntCounter,
17 dead_lettered: IntCounter,
18 retries: IntCounter,
19 skipped: IntCounter,
20}
21
22impl PrometheusExporter {
23 pub fn new() -> prometheus::Result<Self> {
25 let registry = Registry::new();
26 let started = IntCounter::new("dag_tasks_started_total", "Tasks started")?;
27 let completed = IntCounter::new("dag_tasks_completed_total", "Tasks completed")?;
28 let failed = IntCounter::new("dag_tasks_failed_total", "Tasks failed")?;
29 let dead_lettered =
30 IntCounter::new("dag_tasks_dead_lettered_total", "Tasks dead-lettered")?;
31 let retries = IntCounter::new("dag_task_retries_total", "Retry attempts")?;
32 let skipped = IntCounter::new("dag_tasks_skipped_total", "Tasks skipped")?;
33 registry.register(Box::new(started.clone()))?;
34 registry.register(Box::new(completed.clone()))?;
35 registry.register(Box::new(failed.clone()))?;
36 registry.register(Box::new(dead_lettered.clone()))?;
37 registry.register(Box::new(retries.clone()))?;
38 registry.register(Box::new(skipped.clone()))?;
39 Ok(PrometheusExporter {
40 registry,
41 started,
42 completed,
43 failed,
44 dead_lettered,
45 retries,
46 skipped,
47 })
48 }
49
50 pub fn registry(&self) -> &Registry {
52 &self.registry
53 }
54
55 pub fn update(&self, snap: &MetricsSnapshot) {
60 bump(&self.started, snap.tasks_started);
61 bump(&self.completed, snap.tasks_completed);
62 bump(&self.failed, snap.tasks_failed);
63 bump(&self.dead_lettered, snap.tasks_dead_lettered);
64 bump(&self.retries, snap.retries);
65 bump(&self.skipped, snap.tasks_skipped);
66 }
67
68 pub fn render(&self) -> prometheus::Result<String> {
70 use prometheus::Encoder;
71 let encoder = prometheus::TextEncoder::new();
72 let mut buf = Vec::new();
73 encoder.encode(&self.registry.gather(), &mut buf)?;
74 Ok(String::from_utf8_lossy(&buf).into_owned())
75 }
76}
77
78fn bump(counter: &IntCounter, target: u64) {
79 let current = counter.get();
80 if target > current {
81 counter.inc_by(target - current);
82 }
83}