use crate::inspector::Inspector;
use crate::task::TaskState;
use prometheus::{
Counter, CounterVec, Gauge, GaugeVec, HistogramOpts, HistogramVec, Opts, Registry,
};
use std::sync::Arc;
pub struct PrometheusExporter {
registry: Registry,
_tasks_total: Counter,
_tasks_completed: Counter,
_tasks_failed: Counter,
tasks_by_state: GaugeVec,
task_duration: HistogramVec,
events_total: Counter,
poll_count: CounterVec,
active_tasks: Gauge,
blocked_tasks: Gauge,
}
impl PrometheusExporter {
pub fn new() -> prometheus::Result<Self> {
let registry = Registry::new();
let tasks_total = Counter::with_opts(Opts::new(
"async_inspect_tasks_total",
"Total number of tasks created",
))?;
registry.register(Box::new(tasks_total.clone()))?;
let tasks_completed = Counter::with_opts(Opts::new(
"async_inspect_tasks_completed_total",
"Total number of tasks completed",
))?;
registry.register(Box::new(tasks_completed.clone()))?;
let tasks_failed = Counter::with_opts(Opts::new(
"async_inspect_tasks_failed_total",
"Total number of tasks that failed",
))?;
registry.register(Box::new(tasks_failed.clone()))?;
let tasks_by_state = GaugeVec::new(
Opts::new("async_inspect_tasks_by_state", "Number of tasks by state"),
&["state"],
)?;
registry.register(Box::new(tasks_by_state.clone()))?;
let task_duration = HistogramVec::new(
HistogramOpts::new(
"async_inspect_task_duration_seconds",
"Task execution duration in seconds",
)
.buckets(vec![
0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
]),
&["task_name"],
)?;
registry.register(Box::new(task_duration.clone()))?;
let events_total = Counter::with_opts(Opts::new(
"async_inspect_events_total",
"Total number of events recorded",
))?;
registry.register(Box::new(events_total.clone()))?;
let poll_count = CounterVec::new(
Opts::new(
"async_inspect_task_polls_total",
"Total number of task polls",
),
&["task_name"],
)?;
registry.register(Box::new(poll_count.clone()))?;
let active_tasks = Gauge::with_opts(Opts::new(
"async_inspect_active_tasks",
"Number of currently active tasks",
))?;
registry.register(Box::new(active_tasks.clone()))?;
let blocked_tasks = Gauge::with_opts(Opts::new(
"async_inspect_blocked_tasks",
"Number of currently blocked tasks",
))?;
registry.register(Box::new(blocked_tasks.clone()))?;
Ok(Self {
registry,
_tasks_total: tasks_total,
_tasks_completed: tasks_completed,
_tasks_failed: tasks_failed,
tasks_by_state,
task_duration,
events_total,
poll_count,
active_tasks,
blocked_tasks,
})
}
pub fn update(&self) {
let stats = Inspector::global().stats();
self.tasks_by_state
.with_label_values(&["running"])
.set(stats.running_tasks as f64);
self.tasks_by_state
.with_label_values(&["completed"])
.set(stats.completed_tasks as f64);
self.tasks_by_state
.with_label_values(&["failed"])
.set(stats.failed_tasks as f64);
self.tasks_by_state
.with_label_values(&["blocked"])
.set(stats.blocked_tasks as f64);
self.active_tasks.set(stats.running_tasks as f64);
self.blocked_tasks.set(stats.blocked_tasks as f64);
for task in Inspector::global().get_all_tasks() {
if matches!(task.state, TaskState::Completed | TaskState::Failed) {
self.task_duration
.with_label_values(&[&task.name])
.observe(task.total_run_time.as_secs_f64());
}
self.poll_count
.with_label_values(&[&task.name])
.inc_by(task.poll_count as f64);
}
self.events_total.inc_by(stats.total_events as f64);
}
#[must_use]
pub fn registry(&self) -> &Registry {
&self.registry
}
#[must_use]
pub fn gather(&self) -> String {
use prometheus::Encoder;
let encoder = prometheus::TextEncoder::new();
let metric_families = self.registry.gather();
let mut buffer = Vec::new();
encoder.encode(&metric_families, &mut buffer).unwrap();
String::from_utf8(buffer).unwrap()
}
#[cfg(feature = "tokio")]
#[must_use]
pub fn start_background_updater(
self: Arc<Self>,
interval: std::time::Duration,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(interval);
loop {
interval.tick().await;
self.update();
}
})
}
}
impl Default for PrometheusExporter {
fn default() -> Self {
Self::new().expect("Failed to create Prometheus exporter")
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_exporter_creation() {
let exporter = PrometheusExporter::new().unwrap();
exporter.update();
let _metrics = exporter.gather();
}
}