actionqueue_daemon/metrics/
registry.rs1use std::net::SocketAddr;
8
9use prometheus::core::Collector;
10use prometheus::{
11 Counter, Encoder, Gauge, GaugeVec, Histogram, HistogramOpts, Opts, Registry, TextEncoder,
12};
13
14pub const RUN_STATE_LABEL_VALUES: [&str; 8] =
16 ["scheduled", "ready", "leased", "running", "retry_wait", "completed", "failed", "canceled"];
17
18pub const ATTEMPT_RESULT_LABEL_VALUES: [&str; 3] = ["success", "failure", "timeout"];
20
21const METRIC_RUNS_TOTAL: &str = "actionqueue_runs_total";
22const HELP_RUNS_TOTAL: &str = "Total runs by current lifecycle state.";
23
24const METRIC_RUNS_READY: &str = "actionqueue_runs_ready";
25const HELP_RUNS_READY: &str = "Number of runs currently in the ready state.";
26
27const METRIC_RUNS_RUNNING: &str = "actionqueue_runs_running";
28const HELP_RUNS_RUNNING: &str = "Number of runs currently in the running state.";
29
30const METRIC_ATTEMPTS_TOTAL: &str = "actionqueue_attempts_total";
31const HELP_ATTEMPTS_TOTAL: &str = "Total attempt outcomes by result taxonomy.";
32
33const METRIC_SCHEDULING_LAG_SECONDS: &str = "actionqueue_scheduling_lag_seconds";
34const HELP_SCHEDULING_LAG_SECONDS: &str =
35 "Observed scheduling lag in seconds between eligibility and dispatch.";
36
37const METRIC_EXECUTOR_DURATION_SECONDS: &str = "actionqueue_executor_duration_seconds";
38const HELP_EXECUTOR_DURATION_SECONDS: &str = "Observed executor attempt duration in seconds.";
39
40const METRIC_WAL_APPEND_TOTAL: &str = "actionqueue_wal_append_total";
41const HELP_WAL_APPEND_TOTAL: &str = "Total successful WAL append operations.";
42
43const METRIC_WAL_APPEND_FAILURES_TOTAL: &str = "actionqueue_wal_append_failures_total";
44const HELP_WAL_APPEND_FAILURES_TOTAL: &str = "Total failed WAL append operations.";
45
46const METRIC_RECOVERY_TIME_SECONDS: &str = "actionqueue_recovery_time_seconds";
47const HELP_RECOVERY_TIME_SECONDS: &str = "Observed recovery replay duration in seconds.";
48
49const METRIC_RECOVERY_EVENTS_APPLIED_TOTAL: &str = "actionqueue_recovery_events_applied_total";
50const HELP_RECOVERY_EVENTS_APPLIED_TOTAL: &str =
51 "Total recovery-applied events from snapshot hydration plus WAL replay.";
52
53#[derive(Debug, Clone, PartialEq, Eq)]
55pub enum MetricsRegistryError {
56 Registration { metric: &'static str, message: String },
58 Encode(String),
60}
61
62impl std::fmt::Display for MetricsRegistryError {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 match self {
65 MetricsRegistryError::Registration { metric, message } => {
66 write!(f, "metrics_registration_failed[{metric}]: {message}")
67 }
68 MetricsRegistryError::Encode(message) => {
69 write!(f, "metrics_encode_failed: {message}")
70 }
71 }
72 }
73}
74
75impl std::error::Error for MetricsRegistryError {}
76
77#[derive(Debug, Clone, PartialEq, Eq)]
79#[must_use]
80pub struct EncodedMetrics {
81 pub content_type: String,
83 pub body: String,
85}
86
87#[derive(Debug, Clone)]
89pub struct RegisteredMetrics {
90 runs_total: GaugeVec,
91 runs_ready: Gauge,
92 runs_running: Gauge,
93 attempts_total: GaugeVec,
94 scheduling_lag_seconds: Histogram,
95 executor_duration_seconds: Histogram,
96 wal_append_total: Counter,
97 wal_append_failures_total: Counter,
98 recovery_time_seconds: Histogram,
99 recovery_events_applied_total: Counter,
100}
101
102impl RegisteredMetrics {
103 pub fn runs_total(&self) -> &GaugeVec {
105 &self.runs_total
106 }
107
108 pub fn runs_ready(&self) -> &Gauge {
110 &self.runs_ready
111 }
112
113 pub fn runs_running(&self) -> &Gauge {
115 &self.runs_running
116 }
117
118 pub fn attempts_total(&self) -> &GaugeVec {
120 &self.attempts_total
121 }
122
123 pub fn scheduling_lag_seconds(&self) -> &Histogram {
125 &self.scheduling_lag_seconds
126 }
127
128 pub fn executor_duration_seconds(&self) -> &Histogram {
130 &self.executor_duration_seconds
131 }
132
133 pub fn wal_append_total(&self) -> &Counter {
135 &self.wal_append_total
136 }
137
138 pub fn wal_append_failures_total(&self) -> &Counter {
140 &self.wal_append_failures_total
141 }
142
143 pub fn recovery_time_seconds(&self) -> &Histogram {
145 &self.recovery_time_seconds
146 }
147
148 pub fn recovery_events_applied_total(&self) -> &Counter {
150 &self.recovery_events_applied_total
151 }
152}
153
154#[derive(Debug, Clone)]
156pub struct MetricsRegistry {
157 metrics_bind: Option<SocketAddr>,
158 registry: Registry,
159 collectors: RegisteredMetrics,
160}
161
162impl MetricsRegistry {
163 pub fn new(metrics_bind: Option<SocketAddr>) -> Result<Self, MetricsRegistryError> {
165 let registry = Registry::new();
166
167 let runs_total = GaugeVec::new(Opts::new(METRIC_RUNS_TOTAL, HELP_RUNS_TOTAL), &["state"])
168 .map_err(|error| MetricsRegistryError::Registration {
169 metric: METRIC_RUNS_TOTAL,
170 message: error.to_string(),
171 })?;
172 register_collector(®istry, METRIC_RUNS_TOTAL, runs_total.clone())?;
173
174 let runs_ready =
175 Gauge::with_opts(Opts::new(METRIC_RUNS_READY, HELP_RUNS_READY)).map_err(|error| {
176 MetricsRegistryError::Registration {
177 metric: METRIC_RUNS_READY,
178 message: error.to_string(),
179 }
180 })?;
181 register_collector(®istry, METRIC_RUNS_READY, runs_ready.clone())?;
182
183 let runs_running = Gauge::with_opts(Opts::new(METRIC_RUNS_RUNNING, HELP_RUNS_RUNNING))
184 .map_err(|error| MetricsRegistryError::Registration {
185 metric: METRIC_RUNS_RUNNING,
186 message: error.to_string(),
187 })?;
188 register_collector(®istry, METRIC_RUNS_RUNNING, runs_running.clone())?;
189
190 let attempts_total =
191 GaugeVec::new(Opts::new(METRIC_ATTEMPTS_TOTAL, HELP_ATTEMPTS_TOTAL), &["result"])
192 .map_err(|error| MetricsRegistryError::Registration {
193 metric: METRIC_ATTEMPTS_TOTAL,
194 message: error.to_string(),
195 })?;
196 register_collector(®istry, METRIC_ATTEMPTS_TOTAL, attempts_total.clone())?;
197
198 let scheduling_lag_seconds = Histogram::with_opts(HistogramOpts::new(
199 METRIC_SCHEDULING_LAG_SECONDS,
200 HELP_SCHEDULING_LAG_SECONDS,
201 ))
202 .map_err(|error| MetricsRegistryError::Registration {
203 metric: METRIC_SCHEDULING_LAG_SECONDS,
204 message: error.to_string(),
205 })?;
206 register_collector(
207 ®istry,
208 METRIC_SCHEDULING_LAG_SECONDS,
209 scheduling_lag_seconds.clone(),
210 )?;
211
212 let executor_duration_seconds = Histogram::with_opts(HistogramOpts::new(
213 METRIC_EXECUTOR_DURATION_SECONDS,
214 HELP_EXECUTOR_DURATION_SECONDS,
215 ))
216 .map_err(|error| MetricsRegistryError::Registration {
217 metric: METRIC_EXECUTOR_DURATION_SECONDS,
218 message: error.to_string(),
219 })?;
220 register_collector(
221 ®istry,
222 METRIC_EXECUTOR_DURATION_SECONDS,
223 executor_duration_seconds.clone(),
224 )?;
225
226 let wal_append_total =
227 Counter::with_opts(Opts::new(METRIC_WAL_APPEND_TOTAL, HELP_WAL_APPEND_TOTAL)).map_err(
228 |error| MetricsRegistryError::Registration {
229 metric: METRIC_WAL_APPEND_TOTAL,
230 message: error.to_string(),
231 },
232 )?;
233 register_collector(®istry, METRIC_WAL_APPEND_TOTAL, wal_append_total.clone())?;
234
235 let wal_append_failures_total = Counter::with_opts(Opts::new(
236 METRIC_WAL_APPEND_FAILURES_TOTAL,
237 HELP_WAL_APPEND_FAILURES_TOTAL,
238 ))
239 .map_err(|error| MetricsRegistryError::Registration {
240 metric: METRIC_WAL_APPEND_FAILURES_TOTAL,
241 message: error.to_string(),
242 })?;
243 register_collector(
244 ®istry,
245 METRIC_WAL_APPEND_FAILURES_TOTAL,
246 wal_append_failures_total.clone(),
247 )?;
248
249 let recovery_time_seconds = Histogram::with_opts(HistogramOpts::new(
250 METRIC_RECOVERY_TIME_SECONDS,
251 HELP_RECOVERY_TIME_SECONDS,
252 ))
253 .map_err(|error| MetricsRegistryError::Registration {
254 metric: METRIC_RECOVERY_TIME_SECONDS,
255 message: error.to_string(),
256 })?;
257 register_collector(®istry, METRIC_RECOVERY_TIME_SECONDS, recovery_time_seconds.clone())?;
258
259 let recovery_events_applied_total = Counter::with_opts(Opts::new(
260 METRIC_RECOVERY_EVENTS_APPLIED_TOTAL,
261 HELP_RECOVERY_EVENTS_APPLIED_TOTAL,
262 ))
263 .map_err(|error| MetricsRegistryError::Registration {
264 metric: METRIC_RECOVERY_EVENTS_APPLIED_TOTAL,
265 message: error.to_string(),
266 })?;
267 register_collector(
268 ®istry,
269 METRIC_RECOVERY_EVENTS_APPLIED_TOTAL,
270 recovery_events_applied_total.clone(),
271 )?;
272
273 for label in RUN_STATE_LABEL_VALUES {
274 let _ = runs_total.with_label_values(&[label]);
275 }
276 for label in ATTEMPT_RESULT_LABEL_VALUES {
277 let _ = attempts_total.with_label_values(&[label]);
278 }
279
280 Ok(Self {
281 metrics_bind,
282 registry,
283 collectors: RegisteredMetrics {
284 runs_total,
285 runs_ready,
286 runs_running,
287 attempts_total,
288 scheduling_lag_seconds,
289 executor_duration_seconds,
290 wal_append_total,
291 wal_append_failures_total,
292 recovery_time_seconds,
293 recovery_events_applied_total,
294 },
295 })
296 }
297
298 pub fn bind_address(&self) -> Option<SocketAddr> {
300 self.metrics_bind
301 }
302
303 pub fn is_enabled(&self) -> bool {
305 self.metrics_bind.is_some()
306 }
307
308 pub fn collectors(&self) -> &RegisteredMetrics {
310 &self.collectors
311 }
312
313 pub fn gather(&self) -> Vec<prometheus::proto::MetricFamily> {
315 self.registry.gather()
316 }
317
318 pub fn encode_text(&self) -> Result<EncodedMetrics, MetricsRegistryError> {
320 let metric_families = self.gather();
321 let encoder = TextEncoder::new();
322 let mut buffer = Vec::new();
323 encoder
324 .encode(&metric_families, &mut buffer)
325 .map_err(|error| MetricsRegistryError::Encode(error.to_string()))?;
326 let body = String::from_utf8(buffer).map_err(|error| {
327 MetricsRegistryError::Encode(format!(
328 "metrics encoding produced non-utf8 bytes: {error}"
329 ))
330 })?;
331
332 Ok(EncodedMetrics { content_type: encoder.format_type().to_string(), body })
333 }
334}
335
336fn register_collector<T: Collector + Clone + 'static>(
337 registry: &Registry,
338 metric: &'static str,
339 collector: T,
340) -> Result<(), MetricsRegistryError> {
341 registry
342 .register(Box::new(collector))
343 .map_err(|error| MetricsRegistryError::Registration { metric, message: error.to_string() })
344}
345
346#[cfg(test)]
347mod tests {
348 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
349
350 use super::*;
351
352 #[test]
353 fn registry_constructor_registers_required_families_and_labels() {
354 let registry =
355 MetricsRegistry::new(Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9090)))
356 .expect("registry should initialize");
357
358 assert!(registry.is_enabled());
359 assert_eq!(
360 registry.bind_address(),
361 Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9090))
362 );
363
364 let encoded = registry.encode_text().expect("encoding should succeed");
365 assert!(encoded.body.contains(METRIC_RUNS_TOTAL));
366 assert!(encoded.body.contains(METRIC_RUNS_READY));
367 assert!(encoded.body.contains(METRIC_RUNS_RUNNING));
368 assert!(encoded.body.contains(METRIC_ATTEMPTS_TOTAL));
369 assert!(encoded.body.contains(METRIC_SCHEDULING_LAG_SECONDS));
370 assert!(encoded.body.contains(METRIC_EXECUTOR_DURATION_SECONDS));
371 assert!(encoded.body.contains(METRIC_WAL_APPEND_TOTAL));
372 assert!(encoded.body.contains(METRIC_WAL_APPEND_FAILURES_TOTAL));
373 assert!(encoded.body.contains(METRIC_RECOVERY_TIME_SECONDS));
374 assert!(encoded.body.contains(METRIC_RECOVERY_EVENTS_APPLIED_TOTAL));
375
376 for label in RUN_STATE_LABEL_VALUES {
377 assert!(
378 encoded.body.contains(&format!("state=\"{label}\"")),
379 "missing pre-seeded run-state label {label}"
380 );
381 }
382 for label in ATTEMPT_RESULT_LABEL_VALUES {
383 assert!(
384 encoded.body.contains(&format!("result=\"{label}\"")),
385 "missing pre-seeded attempt-result label {label}"
386 );
387 }
388 }
389
390 #[test]
391 fn duplicate_registration_path_returns_typed_deterministic_error() {
392 let registry = Registry::new();
393 let gauge = Gauge::with_opts(Opts::new(
394 "actionqueue_registry_duplicate_test",
395 "duplicate test gauge",
396 ))
397 .expect("gauge should initialize");
398 register_collector(®istry, "actionqueue_registry_duplicate_test", gauge.clone())
399 .expect("first registration should succeed");
400
401 let error = register_collector(®istry, "actionqueue_registry_duplicate_test", gauge)
402 .expect_err("second registration should fail deterministically");
403
404 match error {
405 MetricsRegistryError::Registration { metric, .. } => {
406 assert_eq!(metric, "actionqueue_registry_duplicate_test");
407 }
408 other => panic!("unexpected error variant: {other}"),
409 }
410 }
411}