actionqueue_daemon/metrics/
wal.rs1pub fn update(state: &crate::http::RouterStateInner) {
13 let collectors = state.metrics.collectors();
14 let snapshot = state.wal_append_telemetry.snapshot();
15
16 collectors.wal_append_total().reset();
17 collectors.wal_append_failures_total().reset();
18
19 collectors.wal_append_total().inc_by(snapshot.append_success_total as f64);
20 collectors.wal_append_failures_total().inc_by(snapshot.append_failure_total as f64);
21}
22
23#[cfg(test)]
24mod tests {
25 use std::sync::Arc;
26
27 use actionqueue_storage::recovery::bootstrap::RecoveryObservations;
28 use actionqueue_storage::recovery::reducer::ReplayReducer;
29 use actionqueue_storage::wal::writer::{WalWriter, WalWriterError};
30 use actionqueue_storage::wal::{InstrumentedWalWriter, WalAppendTelemetry};
31
32 use super::update;
33 use crate::bootstrap::{ReadyStatus, RouterConfig};
34 use crate::metrics::registry::MetricsRegistry;
35 use crate::time::clock::{MockClock, SharedDaemonClock};
36
37 #[derive(Debug)]
38 struct SuccessWriter;
39
40 impl WalWriter for SuccessWriter {
41 fn append(
42 &mut self,
43 _event: &actionqueue_storage::wal::event::WalEvent,
44 ) -> Result<(), WalWriterError> {
45 Ok(())
46 }
47
48 fn flush(&mut self) -> Result<(), WalWriterError> {
49 Ok(())
50 }
51
52 fn close(self) -> Result<(), WalWriterError> {
53 Ok(())
54 }
55 }
56
57 #[derive(Debug)]
58 struct FailureWriter;
59
60 impl WalWriter for FailureWriter {
61 fn append(
62 &mut self,
63 _event: &actionqueue_storage::wal::event::WalEvent,
64 ) -> Result<(), WalWriterError> {
65 Err(WalWriterError::IoError("append failed".to_string()))
66 }
67
68 fn flush(&mut self) -> Result<(), WalWriterError> {
69 Ok(())
70 }
71
72 fn close(self) -> Result<(), WalWriterError> {
73 Ok(())
74 }
75 }
76
77 fn sample_event(sequence: u64) -> actionqueue_storage::wal::event::WalEvent {
78 actionqueue_storage::wal::event::WalEvent::new(
79 sequence,
80 actionqueue_storage::wal::event::WalEventType::EnginePaused { timestamp: sequence },
81 )
82 }
83
84 fn build_state(
85 metrics: Arc<MetricsRegistry>,
86 telemetry: WalAppendTelemetry,
87 ) -> crate::http::RouterStateInner {
88 let clock: SharedDaemonClock = Arc::new(MockClock::new(1_700_000_000));
89 crate::http::RouterStateInner::new(
90 RouterConfig { control_enabled: false, metrics_enabled: true },
91 Arc::new(std::sync::RwLock::new(ReplayReducer::new())),
92 crate::http::RouterObservability {
93 metrics,
94 wal_append_telemetry: telemetry,
95 clock,
96 recovery_observations: RecoveryObservations::zero(),
97 },
98 ReadyStatus::ready(),
99 )
100 }
101
102 #[test]
103 fn update_maps_telemetry_totals_to_wal_counters_exactly() {
104 let telemetry = WalAppendTelemetry::new();
105 let mut success_writer = InstrumentedWalWriter::new(SuccessWriter, telemetry.clone());
106 let mut failure_writer = InstrumentedWalWriter::new(FailureWriter, telemetry.clone());
107
108 success_writer.append(&sample_event(1)).expect("append should succeed");
109 let _ = failure_writer.append(&sample_event(2));
110 let _ = failure_writer.append(&sample_event(3));
111
112 let metrics =
113 Arc::new(MetricsRegistry::new(None).expect("metrics registry should initialize"));
114 let state = build_state(Arc::clone(&metrics), telemetry);
115 update(&state);
116
117 assert_eq!(metrics.collectors().wal_append_total().get(), 1.0);
118 assert_eq!(metrics.collectors().wal_append_failures_total().get(), 2.0);
119 }
120
121 #[test]
122 fn update_overwrites_prior_scrape_values_deterministically() {
123 let telemetry_one = WalAppendTelemetry::new();
124 let mut writer_one = InstrumentedWalWriter::new(SuccessWriter, telemetry_one.clone());
125 writer_one.append(&sample_event(1)).expect("append should succeed");
126 writer_one.append(&sample_event(2)).expect("append should succeed");
127
128 let metrics =
129 Arc::new(MetricsRegistry::new(None).expect("metrics registry should initialize"));
130
131 let first_state = build_state(Arc::clone(&metrics), telemetry_one);
132 update(&first_state);
133 assert_eq!(metrics.collectors().wal_append_total().get(), 2.0);
134 assert_eq!(metrics.collectors().wal_append_failures_total().get(), 0.0);
135
136 let telemetry_two = WalAppendTelemetry::new();
137 let mut writer_two = InstrumentedWalWriter::new(FailureWriter, telemetry_two.clone());
138 let _ = writer_two.append(&sample_event(10));
139
140 let second_state = build_state(Arc::clone(&metrics), telemetry_two);
141 update(&second_state);
142 assert_eq!(metrics.collectors().wal_append_total().get(), 0.0);
143 assert_eq!(metrics.collectors().wal_append_failures_total().get(), 1.0);
144 }
145
146 #[test]
147 fn update_does_not_touch_recovery_histogram_collector() {
148 let metrics =
149 Arc::new(MetricsRegistry::new(None).expect("metrics registry should initialize"));
150 let state = build_state(Arc::clone(&metrics), WalAppendTelemetry::new());
151 update(&state);
152
153 assert_eq!(metrics.collectors().recovery_time_seconds().get_sample_count(), 0);
154 assert_eq!(metrics.collectors().recovery_time_seconds().get_sample_sum(), 0.0);
155 }
156}