async_inspect/integrations/
prometheus.rs1use crate::inspector::Inspector;
7use crate::task::TaskState;
8use prometheus::{
9 Counter, CounterVec, Gauge, GaugeVec, HistogramOpts, HistogramVec, Opts, Registry,
10};
11use std::sync::Arc;
12
13pub struct PrometheusExporter {
29 registry: Registry,
30
31 _tasks_total: Counter,
33 _tasks_completed: Counter,
34 _tasks_failed: Counter,
35
36 tasks_by_state: GaugeVec,
38
39 task_duration: HistogramVec,
41
42 events_total: Counter,
44
45 poll_count: CounterVec,
47
48 active_tasks: Gauge,
50 blocked_tasks: Gauge,
51}
52
53impl PrometheusExporter {
54 pub fn new() -> prometheus::Result<Self> {
56 let registry = Registry::new();
57
58 let tasks_total = Counter::with_opts(Opts::new(
60 "async_inspect_tasks_total",
61 "Total number of tasks created",
62 ))?;
63 registry.register(Box::new(tasks_total.clone()))?;
64
65 let tasks_completed = Counter::with_opts(Opts::new(
66 "async_inspect_tasks_completed_total",
67 "Total number of tasks completed",
68 ))?;
69 registry.register(Box::new(tasks_completed.clone()))?;
70
71 let tasks_failed = Counter::with_opts(Opts::new(
72 "async_inspect_tasks_failed_total",
73 "Total number of tasks that failed",
74 ))?;
75 registry.register(Box::new(tasks_failed.clone()))?;
76
77 let tasks_by_state = GaugeVec::new(
79 Opts::new("async_inspect_tasks_by_state", "Number of tasks by state"),
80 &["state"],
81 )?;
82 registry.register(Box::new(tasks_by_state.clone()))?;
83
84 let task_duration = HistogramVec::new(
86 HistogramOpts::new(
87 "async_inspect_task_duration_seconds",
88 "Task execution duration in seconds",
89 )
90 .buckets(vec![
91 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
92 ]),
93 &["task_name"],
94 )?;
95 registry.register(Box::new(task_duration.clone()))?;
96
97 let events_total = Counter::with_opts(Opts::new(
99 "async_inspect_events_total",
100 "Total number of events recorded",
101 ))?;
102 registry.register(Box::new(events_total.clone()))?;
103
104 let poll_count = CounterVec::new(
106 Opts::new(
107 "async_inspect_task_polls_total",
108 "Total number of task polls",
109 ),
110 &["task_name"],
111 )?;
112 registry.register(Box::new(poll_count.clone()))?;
113
114 let active_tasks = Gauge::with_opts(Opts::new(
116 "async_inspect_active_tasks",
117 "Number of currently active tasks",
118 ))?;
119 registry.register(Box::new(active_tasks.clone()))?;
120
121 let blocked_tasks = Gauge::with_opts(Opts::new(
122 "async_inspect_blocked_tasks",
123 "Number of currently blocked tasks",
124 ))?;
125 registry.register(Box::new(blocked_tasks.clone()))?;
126
127 Ok(Self {
128 registry,
129 _tasks_total: tasks_total,
130 _tasks_completed: tasks_completed,
131 _tasks_failed: tasks_failed,
132 tasks_by_state,
133 task_duration,
134 events_total,
135 poll_count,
136 active_tasks,
137 blocked_tasks,
138 })
139 }
140
141 pub fn update(&self) {
143 let stats = Inspector::global().stats();
144
145 self.tasks_by_state
150 .with_label_values(&["running"])
151 .set(stats.running_tasks as f64);
152 self.tasks_by_state
153 .with_label_values(&["completed"])
154 .set(stats.completed_tasks as f64);
155 self.tasks_by_state
156 .with_label_values(&["failed"])
157 .set(stats.failed_tasks as f64);
158 self.tasks_by_state
159 .with_label_values(&["blocked"])
160 .set(stats.blocked_tasks as f64);
161
162 self.active_tasks.set(stats.running_tasks as f64);
164 self.blocked_tasks.set(stats.blocked_tasks as f64);
165
166 for task in Inspector::global().get_all_tasks() {
168 if matches!(task.state, TaskState::Completed | TaskState::Failed) {
170 self.task_duration
171 .with_label_values(&[&task.name])
172 .observe(task.total_run_time.as_secs_f64());
173 }
174
175 self.poll_count
177 .with_label_values(&[&task.name])
178 .inc_by(task.poll_count as f64);
179 }
180
181 self.events_total.inc_by(stats.total_events as f64);
183 }
184
185 #[must_use]
187 pub fn registry(&self) -> &Registry {
188 &self.registry
189 }
190
191 #[must_use]
193 pub fn gather(&self) -> String {
194 use prometheus::Encoder;
195
196 let encoder = prometheus::TextEncoder::new();
197 let metric_families = self.registry.gather();
198
199 let mut buffer = Vec::new();
200 encoder.encode(&metric_families, &mut buffer).unwrap();
201
202 String::from_utf8(buffer).unwrap()
203 }
204
205 #[cfg(feature = "tokio")]
207 #[must_use]
208 pub fn start_background_updater(
209 self: Arc<Self>,
210 interval: std::time::Duration,
211 ) -> tokio::task::JoinHandle<()> {
212 tokio::spawn(async move {
213 let mut interval = tokio::time::interval(interval);
214 loop {
215 interval.tick().await;
216 self.update();
217 }
218 })
219 }
220}
221
222impl Default for PrometheusExporter {
223 fn default() -> Self {
224 Self::new().expect("Failed to create Prometheus exporter")
225 }
226}
227
228#[cfg(test)]
229mod tests {
230 use super::*;
231
232 #[test]
233 fn test_exporter_creation() {
234 let exporter = PrometheusExporter::new().unwrap();
235 exporter.update();
236 let _metrics = exporter.gather();
237 }
238}