use prometheus::GaugeVec;
use prometheus::core::{Collector, Desc};
use prometheus::proto::MetricFamily;
use solti_core::TaskState;
use solti_model::TaskPhase;
use std::collections::HashMap;
use crate::register::Sub;
const ALL_PHASES: &[TaskPhase] = &[
TaskPhase::Pending,
TaskPhase::Running,
TaskPhase::Succeeded,
TaskPhase::Failed,
TaskPhase::Timeout,
TaskPhase::Canceled,
TaskPhase::Exhausted,
];
#[inline]
fn phase_label(phase: TaskPhase) -> &'static str {
match phase {
TaskPhase::Pending => "pending",
TaskPhase::Running => "running",
TaskPhase::Succeeded => "succeeded",
TaskPhase::Failed => "failed",
TaskPhase::Timeout => "timeout",
TaskPhase::Canceled => "canceled",
TaskPhase::Exhausted => "exhausted",
_ => "unknown",
}
}
pub struct PrometheusStateCollector {
state: TaskState,
gauge: GaugeVec,
}
impl PrometheusStateCollector {
pub fn new(state: TaskState) -> Result<Self, prometheus::Error> {
let gauge = Sub::gauge_vec_unregistered(
"sv",
"tasks_by_phase",
"Current number of tasks per phase (snapshot at scrape time)",
&["phase"],
)?;
Ok(Self { state, gauge })
}
}
impl std::fmt::Debug for PrometheusStateCollector {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PrometheusStateCollector").finish()
}
}
impl Collector for PrometheusStateCollector {
fn desc(&self) -> Vec<&Desc> {
self.gauge.desc()
}
fn collect(&self) -> Vec<MetricFamily> {
let tasks = self.state.list_all();
let mut counts: HashMap<TaskPhase, u64> = HashMap::with_capacity(ALL_PHASES.len());
for task in &tasks {
*counts.entry(task.status().phase).or_insert(0) += 1;
}
for phase in ALL_PHASES {
let count = counts.get(phase).copied().unwrap_or(0);
self.gauge
.with_label_values(&[phase_label(*phase)])
.set(count as f64);
}
self.gauge.collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
use prometheus::Registry;
use solti_model::{TaskId, TaskKind, TaskSpec};
use std::sync::Arc;
fn spec() -> TaskSpec {
TaskSpec::builder("slot", TaskKind::Embedded, 5_000_u64)
.build()
.expect("valid spec")
}
fn gauge_value(families: &[MetricFamily], name: &str, phase: &str) -> Option<f64> {
families
.iter()
.find(|f| f.name() == name)?
.get_metric()
.iter()
.find(|m| {
m.get_label()
.iter()
.any(|l| l.name() == "phase" && l.value() == phase)
})
.map(|m| m.get_gauge().value())
}
#[test]
fn collector_returns_zero_for_all_phases_when_empty() {
let state = TaskState::new();
let collector = PrometheusStateCollector::new(state).unwrap();
let families = collector.collect();
for phase in [
"pending",
"running",
"succeeded",
"failed",
"timeout",
"canceled",
"exhausted",
] {
assert_eq!(
gauge_value(&families, "solti_sv_tasks_by_phase", phase),
Some(0.0),
"phase {phase} must be zero on empty state",
);
}
}
#[test]
fn collector_counts_pending_tasks() {
let state = TaskState::new();
state.add_task(TaskId::from("t1"), spec());
state.add_task(TaskId::from("t2"), spec());
state.add_task(TaskId::from("t3"), spec());
let collector = PrometheusStateCollector::new(state).unwrap();
let families = collector.collect();
assert_eq!(
gauge_value(&families, "solti_sv_tasks_by_phase", "pending"),
Some(3.0)
);
assert_eq!(
gauge_value(&families, "solti_sv_tasks_by_phase", "running"),
Some(0.0)
);
}
#[test]
fn collector_reflects_transitions() {
let state = TaskState::new();
state.add_task(TaskId::from("t1"), spec());
state.add_task(TaskId::from("t2"), spec());
state.transition_starting(&TaskId::from("t1"));
let collector = PrometheusStateCollector::new(state.clone()).unwrap();
let families = collector.collect();
assert_eq!(
gauge_value(&families, "solti_sv_tasks_by_phase", "pending"),
Some(1.0)
);
assert_eq!(
gauge_value(&families, "solti_sv_tasks_by_phase", "running"),
Some(1.0)
);
state.transition_finished(&TaskId::from("t1"), TaskPhase::Succeeded, None, None);
let families = collector.collect();
assert_eq!(
gauge_value(&families, "solti_sv_tasks_by_phase", "running"),
Some(0.0)
);
assert_eq!(
gauge_value(&families, "solti_sv_tasks_by_phase", "succeeded"),
Some(1.0)
);
}
#[test]
fn collector_registers_into_registry_and_scrapes() {
let registry = Arc::new(Registry::new());
let state = TaskState::new();
state.add_task(TaskId::from("alpha"), spec());
state.transition_starting(&TaskId::from("alpha"));
let collector = PrometheusStateCollector::new(state).unwrap();
registry.register(Box::new(collector)).unwrap();
let families = registry.gather();
assert_eq!(
gauge_value(&families, "solti_sv_tasks_by_phase", "running"),
Some(1.0)
);
assert_eq!(
gauge_value(&families, "solti_sv_tasks_by_phase", "pending"),
Some(0.0)
);
}
}