async_inspect/integrations/
prometheus.rs

1//! Prometheus metrics exporter
2//!
3//! This module exports async-inspect metrics in Prometheus format,
4//! allowing integration with Prometheus monitoring and Grafana dashboards.
5
6use crate::inspector::Inspector;
7use crate::task::TaskState;
8use prometheus::{
9    Counter, CounterVec, Gauge, GaugeVec, HistogramOpts, HistogramVec, Opts, Registry,
10};
11use std::sync::Arc;
12
13/// Prometheus metrics exporter for async-inspect
14///
15/// Exports task metrics in Prometheus format for monitoring and alerting.
16///
17/// # Example
18///
19/// ```rust,ignore
20/// use async_inspect::integrations::prometheus::PrometheusExporter;
21///
22/// let exporter = PrometheusExporter::new();
23/// exporter.update(); // Update metrics from Inspector
24///
25/// // Get metrics in Prometheus format
26/// let metrics = exporter.gather();
27/// ```
28pub struct PrometheusExporter {
29    registry: Registry,
30
31    // Task counters
32    _tasks_total: Counter,
33    _tasks_completed: Counter,
34    _tasks_failed: Counter,
35
36    // Task state gauges
37    tasks_by_state: GaugeVec,
38
39    // Task duration histogram
40    task_duration: HistogramVec,
41
42    // Event counters
43    events_total: Counter,
44
45    // Poll counters
46    poll_count: CounterVec,
47
48    // Runtime gauges
49    active_tasks: Gauge,
50    blocked_tasks: Gauge,
51}
52
53impl PrometheusExporter {
54    /// Create a new Prometheus exporter
55    pub fn new() -> prometheus::Result<Self> {
56        let registry = Registry::new();
57
58        // Task counters
59        let tasks_total = Counter::with_opts(Opts::new(
60            "async_inspect_tasks_total",
61            "Total number of tasks created",
62        ))?;
63        registry.register(Box::new(tasks_total.clone()))?;
64
65        let tasks_completed = Counter::with_opts(Opts::new(
66            "async_inspect_tasks_completed_total",
67            "Total number of tasks completed",
68        ))?;
69        registry.register(Box::new(tasks_completed.clone()))?;
70
71        let tasks_failed = Counter::with_opts(Opts::new(
72            "async_inspect_tasks_failed_total",
73            "Total number of tasks that failed",
74        ))?;
75        registry.register(Box::new(tasks_failed.clone()))?;
76
77        // Task state gauges
78        let tasks_by_state = GaugeVec::new(
79            Opts::new("async_inspect_tasks_by_state", "Number of tasks by state"),
80            &["state"],
81        )?;
82        registry.register(Box::new(tasks_by_state.clone()))?;
83
84        // Task duration histogram
85        let task_duration = HistogramVec::new(
86            HistogramOpts::new(
87                "async_inspect_task_duration_seconds",
88                "Task execution duration in seconds",
89            )
90            .buckets(vec![
91                0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
92            ]),
93            &["task_name"],
94        )?;
95        registry.register(Box::new(task_duration.clone()))?;
96
97        // Event counter
98        let events_total = Counter::with_opts(Opts::new(
99            "async_inspect_events_total",
100            "Total number of events recorded",
101        ))?;
102        registry.register(Box::new(events_total.clone()))?;
103
104        // Poll counter
105        let poll_count = CounterVec::new(
106            Opts::new(
107                "async_inspect_task_polls_total",
108                "Total number of task polls",
109            ),
110            &["task_name"],
111        )?;
112        registry.register(Box::new(poll_count.clone()))?;
113
114        // Runtime gauges
115        let active_tasks = Gauge::with_opts(Opts::new(
116            "async_inspect_active_tasks",
117            "Number of currently active tasks",
118        ))?;
119        registry.register(Box::new(active_tasks.clone()))?;
120
121        let blocked_tasks = Gauge::with_opts(Opts::new(
122            "async_inspect_blocked_tasks",
123            "Number of currently blocked tasks",
124        ))?;
125        registry.register(Box::new(blocked_tasks.clone()))?;
126
127        Ok(Self {
128            registry,
129            _tasks_total: tasks_total,
130            _tasks_completed: tasks_completed,
131            _tasks_failed: tasks_failed,
132            tasks_by_state,
133            task_duration,
134            events_total,
135            poll_count,
136            active_tasks,
137            blocked_tasks,
138        })
139    }
140
141    /// Update all metrics from the inspector
142    pub fn update(&self) {
143        let stats = Inspector::global().stats();
144
145        // Update counters (these are cumulative, so we need to set them carefully)
146        // Note: Prometheus counters can only increase, so we track changes
147
148        // Update state-based gauges
149        self.tasks_by_state
150            .with_label_values(&["running"])
151            .set(stats.running_tasks as f64);
152        self.tasks_by_state
153            .with_label_values(&["completed"])
154            .set(stats.completed_tasks as f64);
155        self.tasks_by_state
156            .with_label_values(&["failed"])
157            .set(stats.failed_tasks as f64);
158        self.tasks_by_state
159            .with_label_values(&["blocked"])
160            .set(stats.blocked_tasks as f64);
161
162        // Update runtime gauges
163        self.active_tasks.set(stats.running_tasks as f64);
164        self.blocked_tasks.set(stats.blocked_tasks as f64);
165
166        // Update task durations and polls
167        for task in Inspector::global().get_all_tasks() {
168            // Update task duration histogram for completed tasks
169            if matches!(task.state, TaskState::Completed | TaskState::Failed) {
170                self.task_duration
171                    .with_label_values(&[&task.name])
172                    .observe(task.total_run_time.as_secs_f64());
173            }
174
175            // Update poll count
176            self.poll_count
177                .with_label_values(&[&task.name])
178                .inc_by(task.poll_count as f64);
179        }
180
181        // Update event count
182        self.events_total.inc_by(stats.total_events as f64);
183    }
184
185    /// Get the Prometheus registry
186    #[must_use]
187    pub fn registry(&self) -> &Registry {
188        &self.registry
189    }
190
191    /// Gather metrics in Prometheus text format
192    #[must_use]
193    pub fn gather(&self) -> String {
194        use prometheus::Encoder;
195
196        let encoder = prometheus::TextEncoder::new();
197        let metric_families = self.registry.gather();
198
199        let mut buffer = Vec::new();
200        encoder.encode(&metric_families, &mut buffer).unwrap();
201
202        String::from_utf8(buffer).unwrap()
203    }
204
205    /// Start a background metrics updater that updates metrics periodically
206    #[cfg(feature = "tokio")]
207    #[must_use]
208    pub fn start_background_updater(
209        self: Arc<Self>,
210        interval: std::time::Duration,
211    ) -> tokio::task::JoinHandle<()> {
212        tokio::spawn(async move {
213            let mut interval = tokio::time::interval(interval);
214            loop {
215                interval.tick().await;
216                self.update();
217            }
218        })
219    }
220}
221
222impl Default for PrometheusExporter {
223    fn default() -> Self {
224        Self::new().expect("Failed to create Prometheus exporter")
225    }
226}
227
228#[cfg(test)]
229mod tests {
230    use super::*;
231
232    #[test]
233    fn test_exporter_creation() {
234        let exporter = PrometheusExporter::new().unwrap();
235        exporter.update();
236        let _metrics = exporter.gather();
237    }
238}