Skip to main content

noxu_rep/
rep_stats.rs

1//! Replication statistics.
2//!
3
4use std::sync::atomic::{AtomicU64, Ordering};
5
6/// Statistics for replication operations.
7pub struct RepStats {
8    // Election stats
9    pub elections_held: AtomicU64,
10    pub elections_won: AtomicU64,
11    pub elections_lost: AtomicU64,
12
13    // Feeder stats
14    pub feeders_created: AtomicU64,
15    pub feeders_shutdown: AtomicU64,
16
17    // Ack stats
18    pub acks_received: AtomicU64,
19    pub ack_timeouts: AtomicU64,
20
21    // Replication stream stats
22    pub entries_replicated: AtomicU64,
23    pub entries_applied: AtomicU64,
24    pub bytes_replicated: AtomicU64,
25
26    // Lag stats
27    pub max_replica_lag_ms: AtomicU64,
28}
29
30impl RepStats {
31    /// Creates a new stats instance with all counters at zero.
32    pub fn new() -> Self {
33        Self {
34            elections_held: AtomicU64::new(0),
35            elections_won: AtomicU64::new(0),
36            elections_lost: AtomicU64::new(0),
37            feeders_created: AtomicU64::new(0),
38            feeders_shutdown: AtomicU64::new(0),
39            acks_received: AtomicU64::new(0),
40            ack_timeouts: AtomicU64::new(0),
41            entries_replicated: AtomicU64::new(0),
42            entries_applied: AtomicU64::new(0),
43            bytes_replicated: AtomicU64::new(0),
44            max_replica_lag_ms: AtomicU64::new(0),
45        }
46    }
47
48    pub fn increment_elections_held(&self) {
49        self.elections_held.fetch_add(1, Ordering::Relaxed);
50    }
51
52    pub fn increment_elections_won(&self) {
53        self.elections_won.fetch_add(1, Ordering::Relaxed);
54    }
55
56    pub fn increment_elections_lost(&self) {
57        self.elections_lost.fetch_add(1, Ordering::Relaxed);
58    }
59
60    pub fn add_entries_replicated(&self, count: u64) {
61        self.entries_replicated.fetch_add(count, Ordering::Relaxed);
62    }
63
64    pub fn add_bytes_replicated(&self, bytes: u64) {
65        self.bytes_replicated.fetch_add(bytes, Ordering::Relaxed);
66    }
67
68    pub fn add_entries_applied(&self, count: u64) {
69        self.entries_applied.fetch_add(count, Ordering::Relaxed);
70    }
71
72    pub fn update_max_lag(&self, lag_ms: u64) {
73        self.max_replica_lag_ms.fetch_max(lag_ms, Ordering::Relaxed);
74    }
75
76    /// Reset all counters to zero.
77    pub fn reset(&self) {
78        self.elections_held.store(0, Ordering::Relaxed);
79        self.elections_won.store(0, Ordering::Relaxed);
80        self.elections_lost.store(0, Ordering::Relaxed);
81        self.feeders_created.store(0, Ordering::Relaxed);
82        self.feeders_shutdown.store(0, Ordering::Relaxed);
83        self.acks_received.store(0, Ordering::Relaxed);
84        self.ack_timeouts.store(0, Ordering::Relaxed);
85        self.entries_replicated.store(0, Ordering::Relaxed);
86        self.entries_applied.store(0, Ordering::Relaxed);
87        self.bytes_replicated.store(0, Ordering::Relaxed);
88        self.max_replica_lag_ms.store(0, Ordering::Relaxed);
89    }
90
91    /// Get a snapshot of all stats as a formatted string.
92    pub fn summary(&self) -> String {
93        format!(
94            "RepStats {{ elections: held={} won={} lost={}, \
95             feeders: created={} shutdown={}, \
96             acks: received={} timeouts={}, \
97             stream: replicated={} applied={} bytes={}, \
98             max_lag_ms={} }}",
99            self.elections_held.load(Ordering::Relaxed),
100            self.elections_won.load(Ordering::Relaxed),
101            self.elections_lost.load(Ordering::Relaxed),
102            self.feeders_created.load(Ordering::Relaxed),
103            self.feeders_shutdown.load(Ordering::Relaxed),
104            self.acks_received.load(Ordering::Relaxed),
105            self.ack_timeouts.load(Ordering::Relaxed),
106            self.entries_replicated.load(Ordering::Relaxed),
107            self.entries_applied.load(Ordering::Relaxed),
108            self.bytes_replicated.load(Ordering::Relaxed),
109            self.max_replica_lag_ms.load(Ordering::Relaxed),
110        )
111    }
112}
113
114impl Default for RepStats {
115    fn default() -> Self {
116        Self::new()
117    }
118}
119
120#[cfg(test)]
121mod tests {
122    use super::*;
123
124    #[test]
125    fn test_new_stats_are_zero() {
126        let stats = RepStats::new();
127        assert_eq!(stats.elections_held.load(Ordering::Relaxed), 0);
128        assert_eq!(stats.entries_replicated.load(Ordering::Relaxed), 0);
129        assert_eq!(stats.max_replica_lag_ms.load(Ordering::Relaxed), 0);
130    }
131
132    #[test]
133    fn test_increment_elections() {
134        let stats = RepStats::new();
135        stats.increment_elections_held();
136        stats.increment_elections_held();
137        stats.increment_elections_won();
138        stats.increment_elections_lost();
139        assert_eq!(stats.elections_held.load(Ordering::Relaxed), 2);
140        assert_eq!(stats.elections_won.load(Ordering::Relaxed), 1);
141        assert_eq!(stats.elections_lost.load(Ordering::Relaxed), 1);
142    }
143
144    #[test]
145    fn test_add_entries() {
146        let stats = RepStats::new();
147        stats.add_entries_replicated(10);
148        stats.add_entries_replicated(5);
149        stats.add_entries_applied(8);
150        stats.add_bytes_replicated(1024);
151        assert_eq!(stats.entries_replicated.load(Ordering::Relaxed), 15);
152        assert_eq!(stats.entries_applied.load(Ordering::Relaxed), 8);
153        assert_eq!(stats.bytes_replicated.load(Ordering::Relaxed), 1024);
154    }
155
156    #[test]
157    fn test_update_max_lag() {
158        let stats = RepStats::new();
159        stats.update_max_lag(100);
160        assert_eq!(stats.max_replica_lag_ms.load(Ordering::Relaxed), 100);
161        stats.update_max_lag(50); // should not decrease
162        assert_eq!(stats.max_replica_lag_ms.load(Ordering::Relaxed), 100);
163        stats.update_max_lag(200);
164        assert_eq!(stats.max_replica_lag_ms.load(Ordering::Relaxed), 200);
165    }
166
167    #[test]
168    fn test_reset() {
169        let stats = RepStats::new();
170        stats.increment_elections_held();
171        stats.add_entries_replicated(100);
172        stats.update_max_lag(500);
173        stats.reset();
174        assert_eq!(stats.elections_held.load(Ordering::Relaxed), 0);
175        assert_eq!(stats.entries_replicated.load(Ordering::Relaxed), 0);
176        assert_eq!(stats.max_replica_lag_ms.load(Ordering::Relaxed), 0);
177    }
178
179    #[test]
180    fn test_summary() {
181        let stats = RepStats::new();
182        stats.increment_elections_held();
183        stats.add_entries_replicated(42);
184        let summary = stats.summary();
185        assert!(summary.contains("held=1"));
186        assert!(summary.contains("replicated=42"));
187    }
188
189    #[test]
190    fn test_default() {
191        let stats = RepStats::default();
192        assert_eq!(stats.elections_held.load(Ordering::Relaxed), 0);
193    }
194}