1use std::sync::atomic::{AtomicU64, Ordering};
5
6pub struct RepStats {
8 pub elections_held: AtomicU64,
10 pub elections_won: AtomicU64,
11 pub elections_lost: AtomicU64,
12
13 pub feeders_created: AtomicU64,
15 pub feeders_shutdown: AtomicU64,
16
17 pub acks_received: AtomicU64,
19 pub ack_timeouts: AtomicU64,
20
21 pub entries_replicated: AtomicU64,
23 pub entries_applied: AtomicU64,
24 pub bytes_replicated: AtomicU64,
25
26 pub max_replica_lag_ms: AtomicU64,
28}
29
30impl RepStats {
31 pub fn new() -> Self {
33 Self {
34 elections_held: AtomicU64::new(0),
35 elections_won: AtomicU64::new(0),
36 elections_lost: AtomicU64::new(0),
37 feeders_created: AtomicU64::new(0),
38 feeders_shutdown: AtomicU64::new(0),
39 acks_received: AtomicU64::new(0),
40 ack_timeouts: AtomicU64::new(0),
41 entries_replicated: AtomicU64::new(0),
42 entries_applied: AtomicU64::new(0),
43 bytes_replicated: AtomicU64::new(0),
44 max_replica_lag_ms: AtomicU64::new(0),
45 }
46 }
47
48 pub fn increment_elections_held(&self) {
49 self.elections_held.fetch_add(1, Ordering::Relaxed);
50 }
51
52 pub fn increment_elections_won(&self) {
53 self.elections_won.fetch_add(1, Ordering::Relaxed);
54 }
55
56 pub fn increment_elections_lost(&self) {
57 self.elections_lost.fetch_add(1, Ordering::Relaxed);
58 }
59
60 pub fn add_entries_replicated(&self, count: u64) {
61 self.entries_replicated.fetch_add(count, Ordering::Relaxed);
62 }
63
64 pub fn add_bytes_replicated(&self, bytes: u64) {
65 self.bytes_replicated.fetch_add(bytes, Ordering::Relaxed);
66 }
67
68 pub fn add_entries_applied(&self, count: u64) {
69 self.entries_applied.fetch_add(count, Ordering::Relaxed);
70 }
71
72 pub fn update_max_lag(&self, lag_ms: u64) {
73 self.max_replica_lag_ms.fetch_max(lag_ms, Ordering::Relaxed);
74 }
75
76 pub fn reset(&self) {
78 self.elections_held.store(0, Ordering::Relaxed);
79 self.elections_won.store(0, Ordering::Relaxed);
80 self.elections_lost.store(0, Ordering::Relaxed);
81 self.feeders_created.store(0, Ordering::Relaxed);
82 self.feeders_shutdown.store(0, Ordering::Relaxed);
83 self.acks_received.store(0, Ordering::Relaxed);
84 self.ack_timeouts.store(0, Ordering::Relaxed);
85 self.entries_replicated.store(0, Ordering::Relaxed);
86 self.entries_applied.store(0, Ordering::Relaxed);
87 self.bytes_replicated.store(0, Ordering::Relaxed);
88 self.max_replica_lag_ms.store(0, Ordering::Relaxed);
89 }
90
91 pub fn summary(&self) -> String {
93 format!(
94 "RepStats {{ elections: held={} won={} lost={}, \
95 feeders: created={} shutdown={}, \
96 acks: received={} timeouts={}, \
97 stream: replicated={} applied={} bytes={}, \
98 max_lag_ms={} }}",
99 self.elections_held.load(Ordering::Relaxed),
100 self.elections_won.load(Ordering::Relaxed),
101 self.elections_lost.load(Ordering::Relaxed),
102 self.feeders_created.load(Ordering::Relaxed),
103 self.feeders_shutdown.load(Ordering::Relaxed),
104 self.acks_received.load(Ordering::Relaxed),
105 self.ack_timeouts.load(Ordering::Relaxed),
106 self.entries_replicated.load(Ordering::Relaxed),
107 self.entries_applied.load(Ordering::Relaxed),
108 self.bytes_replicated.load(Ordering::Relaxed),
109 self.max_replica_lag_ms.load(Ordering::Relaxed),
110 )
111 }
112}
113
114impl Default for RepStats {
115 fn default() -> Self {
116 Self::new()
117 }
118}
119
120#[cfg(test)]
121mod tests {
122 use super::*;
123
124 #[test]
125 fn test_new_stats_are_zero() {
126 let stats = RepStats::new();
127 assert_eq!(stats.elections_held.load(Ordering::Relaxed), 0);
128 assert_eq!(stats.entries_replicated.load(Ordering::Relaxed), 0);
129 assert_eq!(stats.max_replica_lag_ms.load(Ordering::Relaxed), 0);
130 }
131
132 #[test]
133 fn test_increment_elections() {
134 let stats = RepStats::new();
135 stats.increment_elections_held();
136 stats.increment_elections_held();
137 stats.increment_elections_won();
138 stats.increment_elections_lost();
139 assert_eq!(stats.elections_held.load(Ordering::Relaxed), 2);
140 assert_eq!(stats.elections_won.load(Ordering::Relaxed), 1);
141 assert_eq!(stats.elections_lost.load(Ordering::Relaxed), 1);
142 }
143
144 #[test]
145 fn test_add_entries() {
146 let stats = RepStats::new();
147 stats.add_entries_replicated(10);
148 stats.add_entries_replicated(5);
149 stats.add_entries_applied(8);
150 stats.add_bytes_replicated(1024);
151 assert_eq!(stats.entries_replicated.load(Ordering::Relaxed), 15);
152 assert_eq!(stats.entries_applied.load(Ordering::Relaxed), 8);
153 assert_eq!(stats.bytes_replicated.load(Ordering::Relaxed), 1024);
154 }
155
156 #[test]
157 fn test_update_max_lag() {
158 let stats = RepStats::new();
159 stats.update_max_lag(100);
160 assert_eq!(stats.max_replica_lag_ms.load(Ordering::Relaxed), 100);
161 stats.update_max_lag(50); assert_eq!(stats.max_replica_lag_ms.load(Ordering::Relaxed), 100);
163 stats.update_max_lag(200);
164 assert_eq!(stats.max_replica_lag_ms.load(Ordering::Relaxed), 200);
165 }
166
167 #[test]
168 fn test_reset() {
169 let stats = RepStats::new();
170 stats.increment_elections_held();
171 stats.add_entries_replicated(100);
172 stats.update_max_lag(500);
173 stats.reset();
174 assert_eq!(stats.elections_held.load(Ordering::Relaxed), 0);
175 assert_eq!(stats.entries_replicated.load(Ordering::Relaxed), 0);
176 assert_eq!(stats.max_replica_lag_ms.load(Ordering::Relaxed), 0);
177 }
178
179 #[test]
180 fn test_summary() {
181 let stats = RepStats::new();
182 stats.increment_elections_held();
183 stats.add_entries_replicated(42);
184 let summary = stats.summary();
185 assert!(summary.contains("held=1"));
186 assert!(summary.contains("replicated=42"));
187 }
188
189 #[test]
190 fn test_default() {
191 let stats = RepStats::default();
192 assert_eq!(stats.elections_held.load(Ordering::Relaxed), 0);
193 }
194}