Skip to main content

actionqueue_daemon/metrics/
wal.rs

1//! Authoritative WAL append metric population from storage telemetry.
2//!
3//! This updater reads append outcome totals emitted by the instrumented WAL
4//! writer and overwrites daemon counters deterministically on each scrape.
5
6/// Recomputes WAL append counters from authoritative storage telemetry.
7///
8/// Update behavior is deterministic for each scrape:
9/// - Reads immutable append-success/failure totals from router state telemetry.
10/// - Resets both Prometheus counters.
11/// - Applies absolute values via `inc_by`.
12pub fn update(state: &crate::http::RouterStateInner) {
13    let collectors = state.metrics.collectors();
14    let snapshot = state.wal_append_telemetry.snapshot();
15
16    collectors.wal_append_total().reset();
17    collectors.wal_append_failures_total().reset();
18
19    collectors.wal_append_total().inc_by(snapshot.append_success_total as f64);
20    collectors.wal_append_failures_total().inc_by(snapshot.append_failure_total as f64);
21}
22
23#[cfg(test)]
24mod tests {
25    use std::sync::Arc;
26
27    use actionqueue_storage::recovery::bootstrap::RecoveryObservations;
28    use actionqueue_storage::recovery::reducer::ReplayReducer;
29    use actionqueue_storage::wal::writer::{WalWriter, WalWriterError};
30    use actionqueue_storage::wal::{InstrumentedWalWriter, WalAppendTelemetry};
31
32    use super::update;
33    use crate::bootstrap::{ReadyStatus, RouterConfig};
34    use crate::metrics::registry::MetricsRegistry;
35    use crate::time::clock::{MockClock, SharedDaemonClock};
36
37    #[derive(Debug)]
38    struct SuccessWriter;
39
40    impl WalWriter for SuccessWriter {
41        fn append(
42            &mut self,
43            _event: &actionqueue_storage::wal::event::WalEvent,
44        ) -> Result<(), WalWriterError> {
45            Ok(())
46        }
47
48        fn flush(&mut self) -> Result<(), WalWriterError> {
49            Ok(())
50        }
51
52        fn close(self) -> Result<(), WalWriterError> {
53            Ok(())
54        }
55    }
56
57    #[derive(Debug)]
58    struct FailureWriter;
59
60    impl WalWriter for FailureWriter {
61        fn append(
62            &mut self,
63            _event: &actionqueue_storage::wal::event::WalEvent,
64        ) -> Result<(), WalWriterError> {
65            Err(WalWriterError::IoError("append failed".to_string()))
66        }
67
68        fn flush(&mut self) -> Result<(), WalWriterError> {
69            Ok(())
70        }
71
72        fn close(self) -> Result<(), WalWriterError> {
73            Ok(())
74        }
75    }
76
77    fn sample_event(sequence: u64) -> actionqueue_storage::wal::event::WalEvent {
78        actionqueue_storage::wal::event::WalEvent::new(
79            sequence,
80            actionqueue_storage::wal::event::WalEventType::EnginePaused { timestamp: sequence },
81        )
82    }
83
84    fn build_state(
85        metrics: Arc<MetricsRegistry>,
86        telemetry: WalAppendTelemetry,
87    ) -> crate::http::RouterStateInner {
88        let clock: SharedDaemonClock = Arc::new(MockClock::new(1_700_000_000));
89        crate::http::RouterStateInner::new(
90            RouterConfig { control_enabled: false, metrics_enabled: true },
91            Arc::new(std::sync::RwLock::new(ReplayReducer::new())),
92            crate::http::RouterObservability {
93                metrics,
94                wal_append_telemetry: telemetry,
95                clock,
96                recovery_observations: RecoveryObservations::zero(),
97            },
98            ReadyStatus::ready(),
99        )
100    }
101
102    #[test]
103    fn update_maps_telemetry_totals_to_wal_counters_exactly() {
104        let telemetry = WalAppendTelemetry::new();
105        let mut success_writer = InstrumentedWalWriter::new(SuccessWriter, telemetry.clone());
106        let mut failure_writer = InstrumentedWalWriter::new(FailureWriter, telemetry.clone());
107
108        success_writer.append(&sample_event(1)).expect("append should succeed");
109        let _ = failure_writer.append(&sample_event(2));
110        let _ = failure_writer.append(&sample_event(3));
111
112        let metrics =
113            Arc::new(MetricsRegistry::new(None).expect("metrics registry should initialize"));
114        let state = build_state(Arc::clone(&metrics), telemetry);
115        update(&state);
116
117        assert_eq!(metrics.collectors().wal_append_total().get(), 1.0);
118        assert_eq!(metrics.collectors().wal_append_failures_total().get(), 2.0);
119    }
120
121    #[test]
122    fn update_overwrites_prior_scrape_values_deterministically() {
123        let telemetry_one = WalAppendTelemetry::new();
124        let mut writer_one = InstrumentedWalWriter::new(SuccessWriter, telemetry_one.clone());
125        writer_one.append(&sample_event(1)).expect("append should succeed");
126        writer_one.append(&sample_event(2)).expect("append should succeed");
127
128        let metrics =
129            Arc::new(MetricsRegistry::new(None).expect("metrics registry should initialize"));
130
131        let first_state = build_state(Arc::clone(&metrics), telemetry_one);
132        update(&first_state);
133        assert_eq!(metrics.collectors().wal_append_total().get(), 2.0);
134        assert_eq!(metrics.collectors().wal_append_failures_total().get(), 0.0);
135
136        let telemetry_two = WalAppendTelemetry::new();
137        let mut writer_two = InstrumentedWalWriter::new(FailureWriter, telemetry_two.clone());
138        let _ = writer_two.append(&sample_event(10));
139
140        let second_state = build_state(Arc::clone(&metrics), telemetry_two);
141        update(&second_state);
142        assert_eq!(metrics.collectors().wal_append_total().get(), 0.0);
143        assert_eq!(metrics.collectors().wal_append_failures_total().get(), 1.0);
144    }
145
146    #[test]
147    fn update_does_not_touch_recovery_histogram_collector() {
148        let metrics =
149            Arc::new(MetricsRegistry::new(None).expect("metrics registry should initialize"));
150        let state = build_state(Arc::clone(&metrics), WalAppendTelemetry::new());
151        update(&state);
152
153        assert_eq!(metrics.collectors().recovery_time_seconds().get_sample_count(), 0);
154        assert_eq!(metrics.collectors().recovery_time_seconds().get_sample_sum(), 0.0);
155    }
156}