Skip to main content

solti_prometheus/
state.rs

1//! # Supervisor state collector.
2//!
3//! [`PrometheusStateCollector`] is a `prometheus::core::Collector` that exposes
4//! the current number of tasks per [`TaskPhase`] as `solti_sv_tasks_by_phase{phase}`.
5
6use prometheus::GaugeVec;
7use prometheus::core::{Collector, Desc};
8use prometheus::proto::MetricFamily;
9use solti_core::TaskState;
10use solti_model::TaskPhase;
11use std::collections::HashMap;
12
13use crate::register::Sub;
14
15/// All phases we want to be represented as gauges, even at zero.
16///
17/// Kept in code (not derived) because [`TaskPhase`] is `#[non_exhaustive]`:
18/// adding a variant upstream should be a conscious decision here too.
19const ALL_PHASES: &[TaskPhase] = &[
20    TaskPhase::Pending,
21    TaskPhase::Running,
22    TaskPhase::Succeeded,
23    TaskPhase::Failed,
24    TaskPhase::Timeout,
25    TaskPhase::Canceled,
26    TaskPhase::Exhausted,
27];
28
29#[inline]
30fn phase_label(phase: TaskPhase) -> &'static str {
31    match phase {
32        TaskPhase::Pending => "pending",
33        TaskPhase::Running => "running",
34        TaskPhase::Succeeded => "succeeded",
35        TaskPhase::Failed => "failed",
36        TaskPhase::Timeout => "timeout",
37        TaskPhase::Canceled => "canceled",
38        TaskPhase::Exhausted => "exhausted",
39        _ => "unknown",
40    }
41}
42
43/// Pull-based Prometheus collector for `solti_sv_tasks_by_phase{phase}`.
44///
45/// Register once with a shared [`prometheus::Registry`] alongside the other solti
46/// collectors. On each scrape, all phases from [`TaskPhase`] are emitted; empty
47/// phases return `0` so dashboards can rely on a stable label set.
48///
49/// ## Cost
50///
51/// `O(N)` per scrape where `N` is the current number of tasks in state.
52/// With a typical scrape interval of 10–30s and a fleet of <10k tasks this is
53/// negligible. If it ever becomes hot, the natural next step is to maintain
54/// phase counters inside [`TaskState`] directly.
55///
56/// ## Example
57///
58/// ```text
59/// use std::sync::Arc;
60/// use solti_prometheus::{PrometheusStateCollector, Registry};
61///
62/// let registry = Arc::new(Registry::new());
63/// let collector = PrometheusStateCollector::new(supervisor_api.state())?;
64/// registry.register(Box::new(collector))?;
65/// ```
66pub struct PrometheusStateCollector {
67    state: TaskState,
68    gauge: GaugeVec,
69}
70
71impl PrometheusStateCollector {
72    /// Create a new collector wired to `state`. The collector holds a cheap
73    /// `Arc` clone of [`TaskState`] and will always observe the most recent
74    /// mutations made by the supervisor's state subscriber.
75    pub fn new(state: TaskState) -> Result<Self, prometheus::Error> {
76        // We piggy-back on the existing `Sub` helper only for namespace/subsystem —
77        // the gauge is *not* registered into a registry here, the caller does that
78        // when they register the collector itself.
79        let gauge = Sub::gauge_vec_unregistered(
80            "sv",
81            "tasks_by_phase",
82            "Current number of tasks per phase (snapshot at scrape time)",
83            &["phase"],
84        )?;
85        Ok(Self { state, gauge })
86    }
87}
88
89impl std::fmt::Debug for PrometheusStateCollector {
90    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91        f.debug_struct("PrometheusStateCollector").finish()
92    }
93}
94
95impl Collector for PrometheusStateCollector {
96    fn desc(&self) -> Vec<&Desc> {
97        self.gauge.desc()
98    }
99
100    fn collect(&self) -> Vec<MetricFamily> {
101        let tasks = self.state.list_all();
102        let mut counts: HashMap<TaskPhase, u64> = HashMap::with_capacity(ALL_PHASES.len());
103        for task in &tasks {
104            *counts.entry(task.status().phase).or_insert(0) += 1;
105        }
106        for phase in ALL_PHASES {
107            let count = counts.get(phase).copied().unwrap_or(0);
108            self.gauge
109                .with_label_values(&[phase_label(*phase)])
110                .set(count as f64);
111        }
112        self.gauge.collect()
113    }
114}
115
116#[cfg(test)]
117mod tests {
118    use super::*;
119    use prometheus::Registry;
120    use solti_model::{TaskId, TaskKind, TaskSpec};
121    use std::sync::Arc;
122
123    fn spec() -> TaskSpec {
124        TaskSpec::builder("slot", TaskKind::Embedded, 5_000_u64)
125            .build()
126            .expect("valid spec")
127    }
128
129    fn gauge_value(families: &[MetricFamily], name: &str, phase: &str) -> Option<f64> {
130        families
131            .iter()
132            .find(|f| f.name() == name)?
133            .get_metric()
134            .iter()
135            .find(|m| {
136                m.get_label()
137                    .iter()
138                    .any(|l| l.name() == "phase" && l.value() == phase)
139            })
140            .map(|m| m.get_gauge().value())
141    }
142
143    #[test]
144    fn collector_returns_zero_for_all_phases_when_empty() {
145        let state = TaskState::new();
146        let collector = PrometheusStateCollector::new(state).unwrap();
147
148        let families = collector.collect();
149        for phase in [
150            "pending",
151            "running",
152            "succeeded",
153            "failed",
154            "timeout",
155            "canceled",
156            "exhausted",
157        ] {
158            assert_eq!(
159                gauge_value(&families, "solti_sv_tasks_by_phase", phase),
160                Some(0.0),
161                "phase {phase} must be zero on empty state",
162            );
163        }
164    }
165
166    #[test]
167    fn collector_counts_pending_tasks() {
168        let state = TaskState::new();
169        state.add_task(TaskId::from("t1"), spec());
170        state.add_task(TaskId::from("t2"), spec());
171        state.add_task(TaskId::from("t3"), spec());
172
173        let collector = PrometheusStateCollector::new(state).unwrap();
174        let families = collector.collect();
175
176        assert_eq!(
177            gauge_value(&families, "solti_sv_tasks_by_phase", "pending"),
178            Some(3.0)
179        );
180        assert_eq!(
181            gauge_value(&families, "solti_sv_tasks_by_phase", "running"),
182            Some(0.0)
183        );
184    }
185
186    #[test]
187    fn collector_reflects_transitions() {
188        let state = TaskState::new();
189        state.add_task(TaskId::from("t1"), spec());
190        state.add_task(TaskId::from("t2"), spec());
191        state.transition_starting(&TaskId::from("t1"));
192
193        let collector = PrometheusStateCollector::new(state.clone()).unwrap();
194        let families = collector.collect();
195
196        assert_eq!(
197            gauge_value(&families, "solti_sv_tasks_by_phase", "pending"),
198            Some(1.0)
199        );
200        assert_eq!(
201            gauge_value(&families, "solti_sv_tasks_by_phase", "running"),
202            Some(1.0)
203        );
204
205        // Finish the running task, rescrape — expected phase counts move.
206        state.transition_finished(&TaskId::from("t1"), TaskPhase::Succeeded, None, None);
207        let families = collector.collect();
208        assert_eq!(
209            gauge_value(&families, "solti_sv_tasks_by_phase", "running"),
210            Some(0.0)
211        );
212        assert_eq!(
213            gauge_value(&families, "solti_sv_tasks_by_phase", "succeeded"),
214            Some(1.0)
215        );
216    }
217
218    #[test]
219    fn collector_registers_into_registry_and_scrapes() {
220        let registry = Arc::new(Registry::new());
221        let state = TaskState::new();
222        state.add_task(TaskId::from("alpha"), spec());
223        state.transition_starting(&TaskId::from("alpha"));
224
225        let collector = PrometheusStateCollector::new(state).unwrap();
226        registry.register(Box::new(collector)).unwrap();
227
228        let families = registry.gather();
229        assert_eq!(
230            gauge_value(&families, "solti_sv_tasks_by_phase", "running"),
231            Some(1.0)
232        );
233        assert_eq!(
234            gauge_value(&families, "solti_sv_tasks_by_phase", "pending"),
235            Some(0.0)
236        );
237    }
238}