actionqueue_daemon/metrics/
recovery.rs1use std::sync::atomic::Ordering;
7
8pub fn update(state: &crate::http::RouterStateInner) {
14 let collectors = state.metrics.collectors();
15
16 collectors.recovery_events_applied_total().reset();
17 collectors
18 .recovery_events_applied_total()
19 .inc_by(state.recovery_observations.events_applied_total as f64);
20
21 if state
22 .recovery_histogram_observed
23 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
24 .is_ok()
25 {
26 collectors
27 .recovery_time_seconds()
28 .observe(state.recovery_observations.recovery_duration_seconds);
29 }
30}
31
32#[cfg(test)]
33mod tests {
34 use std::sync::Arc;
35
36 use actionqueue_storage::recovery::bootstrap::RecoveryObservations;
37 use actionqueue_storage::recovery::reducer::ReplayReducer;
38 use actionqueue_storage::wal::WalAppendTelemetry;
39
40 use super::update;
41 use crate::bootstrap::{ReadyStatus, RouterConfig};
42 use crate::metrics::registry::MetricsRegistry;
43 use crate::time::clock::{MockClock, SharedDaemonClock};
44
45 fn build_state(
46 metrics: Arc<MetricsRegistry>,
47 recovery_observations: RecoveryObservations,
48 ) -> crate::http::RouterStateInner {
49 let clock: SharedDaemonClock = Arc::new(MockClock::new(1_700_000_000));
50 crate::http::RouterStateInner::new(
51 RouterConfig { control_enabled: false, metrics_enabled: true },
52 Arc::new(std::sync::RwLock::new(ReplayReducer::new())),
53 crate::http::RouterObservability {
54 metrics,
55 wal_append_telemetry: WalAppendTelemetry::new(),
56 clock,
57 recovery_observations,
58 },
59 ReadyStatus::ready(),
60 )
61 }
62
63 #[test]
64 fn update_overwrites_counter_from_authoritative_total() {
65 let metrics =
66 Arc::new(MetricsRegistry::new(None).expect("metrics registry should initialize"));
67 let first_state = build_state(
68 Arc::clone(&metrics),
69 RecoveryObservations {
70 recovery_duration_seconds: 0.25,
71 events_applied_total: 11,
72 snapshot_events_applied: 7,
73 wal_replay_events_applied: 4,
74 },
75 );
76
77 update(&first_state);
78 assert_eq!(metrics.collectors().recovery_events_applied_total().get(), 11.0);
79
80 let second_state = build_state(
81 Arc::clone(&metrics),
82 RecoveryObservations {
83 recovery_duration_seconds: 0.5,
84 events_applied_total: 3,
85 snapshot_events_applied: 1,
86 wal_replay_events_applied: 2,
87 },
88 );
89
90 update(&second_state);
91 assert_eq!(metrics.collectors().recovery_events_applied_total().get(), 3.0);
92 }
93
94 #[test]
95 fn update_observes_histogram_once_per_process_lifetime() {
96 let metrics =
97 Arc::new(MetricsRegistry::new(None).expect("metrics registry should initialize"));
98 let state = build_state(
99 Arc::clone(&metrics),
100 RecoveryObservations {
101 recovery_duration_seconds: 0.125,
102 events_applied_total: 1,
103 snapshot_events_applied: 1,
104 wal_replay_events_applied: 0,
105 },
106 );
107
108 let before_count = metrics.collectors().recovery_time_seconds().get_sample_count();
109
110 update(&state);
111 update(&state);
112
113 let after_count = metrics.collectors().recovery_time_seconds().get_sample_count();
114 assert_eq!(after_count, before_count + 1);
115 }
116}