Skip to main content

actionqueue_daemon/metrics/
registry.rs

1//! Prometheus metrics registry for daemon observability surfaces.
2//!
3//! This module defines the daemon-local Prometheus registry, registers the
4//! required metric families for Phase 6, and provides a text-encoding export
5//! helper for the HTTP `/metrics` route.
6
7use std::net::SocketAddr;
8
9use prometheus::core::Collector;
10use prometheus::{
11    Counter, Encoder, Gauge, GaugeVec, Histogram, HistogramOpts, Opts, Registry, TextEncoder,
12};
13
14/// Bounded run-state label values for `actionqueue_runs_total{state=...}`.
15pub const RUN_STATE_LABEL_VALUES: [&str; 8] =
16    ["scheduled", "ready", "leased", "running", "retry_wait", "completed", "failed", "canceled"];
17
18/// Bounded attempt-result label values for `actionqueue_attempts_total{result=...}`.
19pub const ATTEMPT_RESULT_LABEL_VALUES: [&str; 3] = ["success", "failure", "timeout"];
20
21const METRIC_RUNS_TOTAL: &str = "actionqueue_runs_total";
22const HELP_RUNS_TOTAL: &str = "Total runs by current lifecycle state.";
23
24const METRIC_RUNS_READY: &str = "actionqueue_runs_ready";
25const HELP_RUNS_READY: &str = "Number of runs currently in the ready state.";
26
27const METRIC_RUNS_RUNNING: &str = "actionqueue_runs_running";
28const HELP_RUNS_RUNNING: &str = "Number of runs currently in the running state.";
29
30const METRIC_ATTEMPTS_TOTAL: &str = "actionqueue_attempts_total";
31const HELP_ATTEMPTS_TOTAL: &str = "Total attempt outcomes by result taxonomy.";
32
33const METRIC_SCHEDULING_LAG_SECONDS: &str = "actionqueue_scheduling_lag_seconds";
34const HELP_SCHEDULING_LAG_SECONDS: &str =
35    "Observed scheduling lag in seconds between eligibility and dispatch.";
36
37const METRIC_EXECUTOR_DURATION_SECONDS: &str = "actionqueue_executor_duration_seconds";
38const HELP_EXECUTOR_DURATION_SECONDS: &str = "Observed executor attempt duration in seconds.";
39
40const METRIC_WAL_APPEND_TOTAL: &str = "actionqueue_wal_append_total";
41const HELP_WAL_APPEND_TOTAL: &str = "Total successful WAL append operations.";
42
43const METRIC_WAL_APPEND_FAILURES_TOTAL: &str = "actionqueue_wal_append_failures_total";
44const HELP_WAL_APPEND_FAILURES_TOTAL: &str = "Total failed WAL append operations.";
45
46const METRIC_RECOVERY_TIME_SECONDS: &str = "actionqueue_recovery_time_seconds";
47const HELP_RECOVERY_TIME_SECONDS: &str = "Observed recovery replay duration in seconds.";
48
49const METRIC_RECOVERY_EVENTS_APPLIED_TOTAL: &str = "actionqueue_recovery_events_applied_total";
50const HELP_RECOVERY_EVENTS_APPLIED_TOTAL: &str =
51    "Total recovery-applied events from snapshot hydration plus WAL replay.";
52
53/// Typed metrics registry errors.
54#[derive(Debug, Clone, PartialEq, Eq)]
55pub enum MetricsRegistryError {
56    /// Collector registration failed.
57    Registration { metric: &'static str, message: String },
58    /// Prometheus text encoding failed.
59    Encode(String),
60}
61
62impl std::fmt::Display for MetricsRegistryError {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        match self {
65            MetricsRegistryError::Registration { metric, message } => {
66                write!(f, "metrics_registration_failed[{metric}]: {message}")
67            }
68            MetricsRegistryError::Encode(message) => {
69                write!(f, "metrics_encode_failed: {message}")
70            }
71        }
72    }
73}
74
75impl std::error::Error for MetricsRegistryError {}
76
77/// HTTP-ready encoded metrics payload.
78#[derive(Debug, Clone, PartialEq, Eq)]
79#[must_use]
80pub struct EncodedMetrics {
81    /// Prometheus content type for text exposition.
82    pub content_type: String,
83    /// UTF-8 encoded exposition payload.
84    pub body: String,
85}
86
87/// Registered collector handles for future metric update wiring.
88#[derive(Debug, Clone)]
89pub struct RegisteredMetrics {
90    runs_total: GaugeVec,
91    runs_ready: Gauge,
92    runs_running: Gauge,
93    attempts_total: GaugeVec,
94    scheduling_lag_seconds: Histogram,
95    executor_duration_seconds: Histogram,
96    wal_append_total: Counter,
97    wal_append_failures_total: Counter,
98    recovery_time_seconds: Histogram,
99    recovery_events_applied_total: Counter,
100}
101
102impl RegisteredMetrics {
103    /// Returns the `actionqueue_runs_total` gauge vector.
104    pub fn runs_total(&self) -> &GaugeVec {
105        &self.runs_total
106    }
107
108    /// Returns the `actionqueue_runs_ready` gauge.
109    pub fn runs_ready(&self) -> &Gauge {
110        &self.runs_ready
111    }
112
113    /// Returns the `actionqueue_runs_running` gauge.
114    pub fn runs_running(&self) -> &Gauge {
115        &self.runs_running
116    }
117
118    /// Returns the `actionqueue_attempts_total` gauge vector.
119    pub fn attempts_total(&self) -> &GaugeVec {
120        &self.attempts_total
121    }
122
123    /// Returns the `actionqueue_scheduling_lag_seconds` histogram.
124    pub fn scheduling_lag_seconds(&self) -> &Histogram {
125        &self.scheduling_lag_seconds
126    }
127
128    /// Returns the `actionqueue_executor_duration_seconds` histogram.
129    pub fn executor_duration_seconds(&self) -> &Histogram {
130        &self.executor_duration_seconds
131    }
132
133    /// Returns the `actionqueue_wal_append_total` counter.
134    pub fn wal_append_total(&self) -> &Counter {
135        &self.wal_append_total
136    }
137
138    /// Returns the `actionqueue_wal_append_failures_total` counter.
139    pub fn wal_append_failures_total(&self) -> &Counter {
140        &self.wal_append_failures_total
141    }
142
143    /// Returns the `actionqueue_recovery_time_seconds` histogram.
144    pub fn recovery_time_seconds(&self) -> &Histogram {
145        &self.recovery_time_seconds
146    }
147
148    /// Returns the `actionqueue_recovery_events_applied_total` counter.
149    pub fn recovery_events_applied_total(&self) -> &Counter {
150        &self.recovery_events_applied_total
151    }
152}
153
154/// Daemon-scoped metrics registry and collector handles.
155#[derive(Debug, Clone)]
156pub struct MetricsRegistry {
157    metrics_bind: Option<SocketAddr>,
158    registry: Registry,
159    collectors: RegisteredMetrics,
160}
161
162impl MetricsRegistry {
163    /// Builds a new registry instance and registers all required collector families.
164    pub fn new(metrics_bind: Option<SocketAddr>) -> Result<Self, MetricsRegistryError> {
165        let registry = Registry::new();
166
167        let runs_total = GaugeVec::new(Opts::new(METRIC_RUNS_TOTAL, HELP_RUNS_TOTAL), &["state"])
168            .map_err(|error| MetricsRegistryError::Registration {
169            metric: METRIC_RUNS_TOTAL,
170            message: error.to_string(),
171        })?;
172        register_collector(&registry, METRIC_RUNS_TOTAL, runs_total.clone())?;
173
174        let runs_ready =
175            Gauge::with_opts(Opts::new(METRIC_RUNS_READY, HELP_RUNS_READY)).map_err(|error| {
176                MetricsRegistryError::Registration {
177                    metric: METRIC_RUNS_READY,
178                    message: error.to_string(),
179                }
180            })?;
181        register_collector(&registry, METRIC_RUNS_READY, runs_ready.clone())?;
182
183        let runs_running = Gauge::with_opts(Opts::new(METRIC_RUNS_RUNNING, HELP_RUNS_RUNNING))
184            .map_err(|error| MetricsRegistryError::Registration {
185                metric: METRIC_RUNS_RUNNING,
186                message: error.to_string(),
187            })?;
188        register_collector(&registry, METRIC_RUNS_RUNNING, runs_running.clone())?;
189
190        let attempts_total =
191            GaugeVec::new(Opts::new(METRIC_ATTEMPTS_TOTAL, HELP_ATTEMPTS_TOTAL), &["result"])
192                .map_err(|error| MetricsRegistryError::Registration {
193                    metric: METRIC_ATTEMPTS_TOTAL,
194                    message: error.to_string(),
195                })?;
196        register_collector(&registry, METRIC_ATTEMPTS_TOTAL, attempts_total.clone())?;
197
198        let scheduling_lag_seconds = Histogram::with_opts(HistogramOpts::new(
199            METRIC_SCHEDULING_LAG_SECONDS,
200            HELP_SCHEDULING_LAG_SECONDS,
201        ))
202        .map_err(|error| MetricsRegistryError::Registration {
203            metric: METRIC_SCHEDULING_LAG_SECONDS,
204            message: error.to_string(),
205        })?;
206        register_collector(
207            &registry,
208            METRIC_SCHEDULING_LAG_SECONDS,
209            scheduling_lag_seconds.clone(),
210        )?;
211
212        let executor_duration_seconds = Histogram::with_opts(HistogramOpts::new(
213            METRIC_EXECUTOR_DURATION_SECONDS,
214            HELP_EXECUTOR_DURATION_SECONDS,
215        ))
216        .map_err(|error| MetricsRegistryError::Registration {
217            metric: METRIC_EXECUTOR_DURATION_SECONDS,
218            message: error.to_string(),
219        })?;
220        register_collector(
221            &registry,
222            METRIC_EXECUTOR_DURATION_SECONDS,
223            executor_duration_seconds.clone(),
224        )?;
225
226        let wal_append_total =
227            Counter::with_opts(Opts::new(METRIC_WAL_APPEND_TOTAL, HELP_WAL_APPEND_TOTAL)).map_err(
228                |error| MetricsRegistryError::Registration {
229                    metric: METRIC_WAL_APPEND_TOTAL,
230                    message: error.to_string(),
231                },
232            )?;
233        register_collector(&registry, METRIC_WAL_APPEND_TOTAL, wal_append_total.clone())?;
234
235        let wal_append_failures_total = Counter::with_opts(Opts::new(
236            METRIC_WAL_APPEND_FAILURES_TOTAL,
237            HELP_WAL_APPEND_FAILURES_TOTAL,
238        ))
239        .map_err(|error| MetricsRegistryError::Registration {
240            metric: METRIC_WAL_APPEND_FAILURES_TOTAL,
241            message: error.to_string(),
242        })?;
243        register_collector(
244            &registry,
245            METRIC_WAL_APPEND_FAILURES_TOTAL,
246            wal_append_failures_total.clone(),
247        )?;
248
249        let recovery_time_seconds = Histogram::with_opts(HistogramOpts::new(
250            METRIC_RECOVERY_TIME_SECONDS,
251            HELP_RECOVERY_TIME_SECONDS,
252        ))
253        .map_err(|error| MetricsRegistryError::Registration {
254            metric: METRIC_RECOVERY_TIME_SECONDS,
255            message: error.to_string(),
256        })?;
257        register_collector(&registry, METRIC_RECOVERY_TIME_SECONDS, recovery_time_seconds.clone())?;
258
259        let recovery_events_applied_total = Counter::with_opts(Opts::new(
260            METRIC_RECOVERY_EVENTS_APPLIED_TOTAL,
261            HELP_RECOVERY_EVENTS_APPLIED_TOTAL,
262        ))
263        .map_err(|error| MetricsRegistryError::Registration {
264            metric: METRIC_RECOVERY_EVENTS_APPLIED_TOTAL,
265            message: error.to_string(),
266        })?;
267        register_collector(
268            &registry,
269            METRIC_RECOVERY_EVENTS_APPLIED_TOTAL,
270            recovery_events_applied_total.clone(),
271        )?;
272
273        for label in RUN_STATE_LABEL_VALUES {
274            let _ = runs_total.with_label_values(&[label]);
275        }
276        for label in ATTEMPT_RESULT_LABEL_VALUES {
277            let _ = attempts_total.with_label_values(&[label]);
278        }
279
280        Ok(Self {
281            metrics_bind,
282            registry,
283            collectors: RegisteredMetrics {
284                runs_total,
285                runs_ready,
286                runs_running,
287                attempts_total,
288                scheduling_lag_seconds,
289                executor_duration_seconds,
290                wal_append_total,
291                wal_append_failures_total,
292                recovery_time_seconds,
293                recovery_events_applied_total,
294            },
295        })
296    }
297
298    /// Returns the configured metrics bind address when enabled.
299    pub fn bind_address(&self) -> Option<SocketAddr> {
300        self.metrics_bind
301    }
302
303    /// Returns `true` if metrics routing is enabled.
304    pub fn is_enabled(&self) -> bool {
305        self.metrics_bind.is_some()
306    }
307
308    /// Returns the collector handle set.
309    pub fn collectors(&self) -> &RegisteredMetrics {
310        &self.collectors
311    }
312
313    /// Gathers metric families from the daemon-local registry.
314    pub fn gather(&self) -> Vec<prometheus::proto::MetricFamily> {
315        self.registry.gather()
316    }
317
318    /// Encodes gathered metric families to Prometheus text exposition.
319    pub fn encode_text(&self) -> Result<EncodedMetrics, MetricsRegistryError> {
320        let metric_families = self.gather();
321        let encoder = TextEncoder::new();
322        let mut buffer = Vec::new();
323        encoder
324            .encode(&metric_families, &mut buffer)
325            .map_err(|error| MetricsRegistryError::Encode(error.to_string()))?;
326        let body = String::from_utf8(buffer).map_err(|error| {
327            MetricsRegistryError::Encode(format!(
328                "metrics encoding produced non-utf8 bytes: {error}"
329            ))
330        })?;
331
332        Ok(EncodedMetrics { content_type: encoder.format_type().to_string(), body })
333    }
334}
335
336fn register_collector<T: Collector + Clone + 'static>(
337    registry: &Registry,
338    metric: &'static str,
339    collector: T,
340) -> Result<(), MetricsRegistryError> {
341    registry
342        .register(Box::new(collector))
343        .map_err(|error| MetricsRegistryError::Registration { metric, message: error.to_string() })
344}
345
346#[cfg(test)]
347mod tests {
348    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
349
350    use super::*;
351
352    #[test]
353    fn registry_constructor_registers_required_families_and_labels() {
354        let registry =
355            MetricsRegistry::new(Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9090)))
356                .expect("registry should initialize");
357
358        assert!(registry.is_enabled());
359        assert_eq!(
360            registry.bind_address(),
361            Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9090))
362        );
363
364        let encoded = registry.encode_text().expect("encoding should succeed");
365        assert!(encoded.body.contains(METRIC_RUNS_TOTAL));
366        assert!(encoded.body.contains(METRIC_RUNS_READY));
367        assert!(encoded.body.contains(METRIC_RUNS_RUNNING));
368        assert!(encoded.body.contains(METRIC_ATTEMPTS_TOTAL));
369        assert!(encoded.body.contains(METRIC_SCHEDULING_LAG_SECONDS));
370        assert!(encoded.body.contains(METRIC_EXECUTOR_DURATION_SECONDS));
371        assert!(encoded.body.contains(METRIC_WAL_APPEND_TOTAL));
372        assert!(encoded.body.contains(METRIC_WAL_APPEND_FAILURES_TOTAL));
373        assert!(encoded.body.contains(METRIC_RECOVERY_TIME_SECONDS));
374        assert!(encoded.body.contains(METRIC_RECOVERY_EVENTS_APPLIED_TOTAL));
375
376        for label in RUN_STATE_LABEL_VALUES {
377            assert!(
378                encoded.body.contains(&format!("state=\"{label}\"")),
379                "missing pre-seeded run-state label {label}"
380            );
381        }
382        for label in ATTEMPT_RESULT_LABEL_VALUES {
383            assert!(
384                encoded.body.contains(&format!("result=\"{label}\"")),
385                "missing pre-seeded attempt-result label {label}"
386            );
387        }
388    }
389
390    #[test]
391    fn duplicate_registration_path_returns_typed_deterministic_error() {
392        let registry = Registry::new();
393        let gauge = Gauge::with_opts(Opts::new(
394            "actionqueue_registry_duplicate_test",
395            "duplicate test gauge",
396        ))
397        .expect("gauge should initialize");
398        register_collector(&registry, "actionqueue_registry_duplicate_test", gauge.clone())
399            .expect("first registration should succeed");
400
401        let error = register_collector(&registry, "actionqueue_registry_duplicate_test", gauge)
402            .expect_err("second registration should fail deterministically");
403
404        match error {
405            MetricsRegistryError::Registration { metric, .. } => {
406                assert_eq!(metric, "actionqueue_registry_duplicate_test");
407            }
408            other => panic!("unexpected error variant: {other}"),
409        }
410    }
411}