solti_prometheus/
state.rs1use prometheus::GaugeVec;
7use prometheus::core::{Collector, Desc};
8use prometheus::proto::MetricFamily;
9use solti_core::TaskState;
10use solti_model::TaskPhase;
11use std::collections::HashMap;
12
13use crate::register::Sub;
14
15const ALL_PHASES: &[TaskPhase] = &[
20 TaskPhase::Pending,
21 TaskPhase::Running,
22 TaskPhase::Succeeded,
23 TaskPhase::Failed,
24 TaskPhase::Timeout,
25 TaskPhase::Canceled,
26 TaskPhase::Exhausted,
27];
28
29#[inline]
30fn phase_label(phase: TaskPhase) -> &'static str {
31 match phase {
32 TaskPhase::Pending => "pending",
33 TaskPhase::Running => "running",
34 TaskPhase::Succeeded => "succeeded",
35 TaskPhase::Failed => "failed",
36 TaskPhase::Timeout => "timeout",
37 TaskPhase::Canceled => "canceled",
38 TaskPhase::Exhausted => "exhausted",
39 _ => "unknown",
40 }
41}
42
43pub struct PrometheusStateCollector {
67 state: TaskState,
68 gauge: GaugeVec,
69}
70
71impl PrometheusStateCollector {
72 pub fn new(state: TaskState) -> Result<Self, prometheus::Error> {
76 let gauge = Sub::gauge_vec_unregistered(
80 "sv",
81 "tasks_by_phase",
82 "Current number of tasks per phase (snapshot at scrape time)",
83 &["phase"],
84 )?;
85 Ok(Self { state, gauge })
86 }
87}
88
89impl std::fmt::Debug for PrometheusStateCollector {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 f.debug_struct("PrometheusStateCollector").finish()
92 }
93}
94
95impl Collector for PrometheusStateCollector {
96 fn desc(&self) -> Vec<&Desc> {
97 self.gauge.desc()
98 }
99
100 fn collect(&self) -> Vec<MetricFamily> {
101 let tasks = self.state.list_all();
102 let mut counts: HashMap<TaskPhase, u64> = HashMap::with_capacity(ALL_PHASES.len());
103 for task in &tasks {
104 *counts.entry(task.status().phase).or_insert(0) += 1;
105 }
106 for phase in ALL_PHASES {
107 let count = counts.get(phase).copied().unwrap_or(0);
108 self.gauge
109 .with_label_values(&[phase_label(*phase)])
110 .set(count as f64);
111 }
112 self.gauge.collect()
113 }
114}
115
116#[cfg(test)]
117mod tests {
118 use super::*;
119 use prometheus::Registry;
120 use solti_model::{TaskId, TaskKind, TaskSpec};
121 use std::sync::Arc;
122
123 fn spec() -> TaskSpec {
124 TaskSpec::builder("slot", TaskKind::Embedded, 5_000_u64)
125 .build()
126 .expect("valid spec")
127 }
128
129 fn gauge_value(families: &[MetricFamily], name: &str, phase: &str) -> Option<f64> {
130 families
131 .iter()
132 .find(|f| f.name() == name)?
133 .get_metric()
134 .iter()
135 .find(|m| {
136 m.get_label()
137 .iter()
138 .any(|l| l.name() == "phase" && l.value() == phase)
139 })
140 .map(|m| m.get_gauge().value())
141 }
142
143 #[test]
144 fn collector_returns_zero_for_all_phases_when_empty() {
145 let state = TaskState::new();
146 let collector = PrometheusStateCollector::new(state).unwrap();
147
148 let families = collector.collect();
149 for phase in [
150 "pending",
151 "running",
152 "succeeded",
153 "failed",
154 "timeout",
155 "canceled",
156 "exhausted",
157 ] {
158 assert_eq!(
159 gauge_value(&families, "solti_sv_tasks_by_phase", phase),
160 Some(0.0),
161 "phase {phase} must be zero on empty state",
162 );
163 }
164 }
165
166 #[test]
167 fn collector_counts_pending_tasks() {
168 let state = TaskState::new();
169 state.add_task(TaskId::from("t1"), spec());
170 state.add_task(TaskId::from("t2"), spec());
171 state.add_task(TaskId::from("t3"), spec());
172
173 let collector = PrometheusStateCollector::new(state).unwrap();
174 let families = collector.collect();
175
176 assert_eq!(
177 gauge_value(&families, "solti_sv_tasks_by_phase", "pending"),
178 Some(3.0)
179 );
180 assert_eq!(
181 gauge_value(&families, "solti_sv_tasks_by_phase", "running"),
182 Some(0.0)
183 );
184 }
185
186 #[test]
187 fn collector_reflects_transitions() {
188 let state = TaskState::new();
189 state.add_task(TaskId::from("t1"), spec());
190 state.add_task(TaskId::from("t2"), spec());
191 state.transition_starting(&TaskId::from("t1"));
192
193 let collector = PrometheusStateCollector::new(state.clone()).unwrap();
194 let families = collector.collect();
195
196 assert_eq!(
197 gauge_value(&families, "solti_sv_tasks_by_phase", "pending"),
198 Some(1.0)
199 );
200 assert_eq!(
201 gauge_value(&families, "solti_sv_tasks_by_phase", "running"),
202 Some(1.0)
203 );
204
205 state.transition_finished(&TaskId::from("t1"), TaskPhase::Succeeded, None, None);
207 let families = collector.collect();
208 assert_eq!(
209 gauge_value(&families, "solti_sv_tasks_by_phase", "running"),
210 Some(0.0)
211 );
212 assert_eq!(
213 gauge_value(&families, "solti_sv_tasks_by_phase", "succeeded"),
214 Some(1.0)
215 );
216 }
217
218 #[test]
219 fn collector_registers_into_registry_and_scrapes() {
220 let registry = Arc::new(Registry::new());
221 let state = TaskState::new();
222 state.add_task(TaskId::from("alpha"), spec());
223 state.transition_starting(&TaskId::from("alpha"));
224
225 let collector = PrometheusStateCollector::new(state).unwrap();
226 registry.register(Box::new(collector)).unwrap();
227
228 let families = registry.gather();
229 assert_eq!(
230 gauge_value(&families, "solti_sv_tasks_by_phase", "running"),
231 Some(1.0)
232 );
233 assert_eq!(
234 gauge_value(&families, "solti_sv_tasks_by_phase", "pending"),
235 Some(0.0)
236 );
237 }
238}