absurder_sql/storage/
coordination_metrics.rs1use serde::{Deserialize, Serialize};
11use std::collections::VecDeque;
12
13#[derive(Clone, Debug, Serialize, Deserialize)]
15pub struct CoordinationMetrics {
16 pub leadership_changes: u64,
18 pub write_conflicts: u64,
20 pub follower_refreshes: u64,
22 pub avg_notification_latency_ms: f64,
24 pub total_notifications: u64,
26 pub start_timestamp: f64,
28}
29
30pub struct CoordinationMetricsManager {
32 enabled: bool,
34 metrics: CoordinationMetrics,
36 latency_samples: VecDeque<f64>,
38 max_latency_samples: usize,
40}
41
42impl CoordinationMetricsManager {
43 pub fn new() -> Self {
45 #[cfg(target_arch = "wasm32")]
46 let start_timestamp = js_sys::Date::now();
47
48 #[cfg(not(target_arch = "wasm32"))]
49 let start_timestamp = std::time::SystemTime::now()
50 .duration_since(std::time::UNIX_EPOCH)
51 .unwrap_or_else(|_| {
52 log::warn!("SystemTime before UNIX_EPOCH, using 0 as start_timestamp");
54 std::time::Duration::from_secs(0)
55 })
56 .as_secs_f64()
57 * 1000.0;
58
59 Self {
60 enabled: false,
61 metrics: CoordinationMetrics {
62 leadership_changes: 0,
63 write_conflicts: 0,
64 follower_refreshes: 0,
65 avg_notification_latency_ms: 0.0,
66 total_notifications: 0,
67 start_timestamp,
68 },
69 latency_samples: VecDeque::new(),
70 max_latency_samples: 100, }
72 }
73
74 pub fn set_enabled(&mut self, enabled: bool) {
76 self.enabled = enabled;
77 if !enabled {
78 self.reset();
80 }
81
82 #[cfg(target_arch = "wasm32")]
83 web_sys::console::log_1(
84 &format!(
85 "Coordination metrics {}",
86 if enabled { "enabled" } else { "disabled" }
87 )
88 .into(),
89 );
90 }
91
92 pub fn is_enabled(&self) -> bool {
94 self.enabled
95 }
96
97 pub fn record_leadership_change(&mut self, _became_leader: bool) {
99 if !self.enabled {
100 return;
101 }
102
103 self.metrics.leadership_changes += 1;
104
105 #[cfg(target_arch = "wasm32")]
106 web_sys::console::log_1(
107 &format!(
108 "Leadership change recorded (became_leader: {}). Total: {}",
109 _became_leader, self.metrics.leadership_changes
110 )
111 .into(),
112 );
113 }
114
115 pub fn record_write_conflict(&mut self) {
117 if !self.enabled {
118 return;
119 }
120
121 self.metrics.write_conflicts += 1;
122
123 #[cfg(target_arch = "wasm32")]
124 web_sys::console::log_1(
125 &format!(
126 "Write conflict recorded. Total: {}",
127 self.metrics.write_conflicts
128 )
129 .into(),
130 );
131 }
132
133 pub fn record_follower_refresh(&mut self) {
135 if !self.enabled {
136 return;
137 }
138
139 self.metrics.follower_refreshes += 1;
140
141 #[cfg(target_arch = "wasm32")]
142 web_sys::console::log_1(
143 &format!(
144 "Follower refresh recorded. Total: {}",
145 self.metrics.follower_refreshes
146 )
147 .into(),
148 );
149 }
150
151 pub fn record_notification_latency(&mut self, latency_ms: f64) {
153 if !self.enabled {
154 return;
155 }
156
157 self.latency_samples.push_back(latency_ms);
159
160 if self.latency_samples.len() > self.max_latency_samples {
162 self.latency_samples.pop_front();
163 }
164
165 let sum: f64 = self.latency_samples.iter().sum();
167 self.metrics.avg_notification_latency_ms = sum / self.latency_samples.len() as f64;
168 self.metrics.total_notifications += 1;
169
170 #[cfg(target_arch = "wasm32")]
171 web_sys::console::log_1(
172 &format!(
173 "Notification latency recorded: {:.2}ms. Avg: {:.2}ms",
174 latency_ms, self.metrics.avg_notification_latency_ms
175 )
176 .into(),
177 );
178 }
179
180 pub fn get_metrics(&self) -> &CoordinationMetrics {
182 &self.metrics
183 }
184
185 pub fn get_metrics_json(&self) -> Result<String, String> {
187 serde_json::to_string(&self.metrics)
188 .map_err(|e| format!("Failed to serialize metrics: {}", e))
189 }
190
191 pub fn reset(&mut self) {
193 #[cfg(target_arch = "wasm32")]
194 let start_timestamp = js_sys::Date::now();
195
196 #[cfg(not(target_arch = "wasm32"))]
197 let start_timestamp = std::time::SystemTime::now()
198 .duration_since(std::time::UNIX_EPOCH)
199 .unwrap_or_else(|_| {
200 log::warn!("SystemTime before UNIX_EPOCH in reset, using 0 as start_timestamp");
202 std::time::Duration::from_secs(0)
203 })
204 .as_secs_f64()
205 * 1000.0;
206
207 self.metrics = CoordinationMetrics {
208 leadership_changes: 0,
209 write_conflicts: 0,
210 follower_refreshes: 0,
211 avg_notification_latency_ms: 0.0,
212 total_notifications: 0,
213 start_timestamp,
214 };
215 self.latency_samples.clear();
216
217 #[cfg(target_arch = "wasm32")]
218 web_sys::console::log_1(&"Coordination metrics reset".into());
219 }
220
221 pub fn get_leadership_changes_per_minute(&self) -> f64 {
223 #[cfg(target_arch = "wasm32")]
224 let current_time = js_sys::Date::now();
225
226 #[cfg(not(target_arch = "wasm32"))]
227 let current_time = std::time::SystemTime::now()
228 .duration_since(std::time::UNIX_EPOCH)
229 .unwrap_or_else(|_| {
230 log::warn!(
232 "SystemTime before UNIX_EPOCH in get_leadership_changes_per_minute, using 0"
233 );
234 std::time::Duration::from_secs(0)
235 })
236 .as_secs_f64()
237 * 1000.0;
238
239 let elapsed_minutes = (current_time - self.metrics.start_timestamp) / 60000.0;
240
241 if elapsed_minutes > 0.0 {
242 self.metrics.leadership_changes as f64 / elapsed_minutes
243 } else {
244 0.0
245 }
246 }
247}
248
249impl Default for CoordinationMetricsManager {
250 fn default() -> Self {
251 Self::new()
252 }
253}
254
255#[cfg(test)]
256mod tests {
257 use super::*;
258
259 #[test]
260 fn test_enable_disable() {
261 let mut manager = CoordinationMetricsManager::new();
262 assert!(!manager.is_enabled());
263
264 manager.set_enabled(true);
265 assert!(manager.is_enabled());
266
267 manager.set_enabled(false);
268 assert!(!manager.is_enabled());
269 }
270
271 #[test]
272 fn test_record_leadership_change() {
273 let mut manager = CoordinationMetricsManager::new();
274 manager.set_enabled(true);
275
276 manager.record_leadership_change(true);
277 manager.record_leadership_change(false);
278
279 assert_eq!(manager.get_metrics().leadership_changes, 2);
280 }
281
282 #[test]
283 fn test_record_write_conflict() {
284 let mut manager = CoordinationMetricsManager::new();
285 manager.set_enabled(true);
286
287 manager.record_write_conflict();
288 manager.record_write_conflict();
289 manager.record_write_conflict();
290
291 assert_eq!(manager.get_metrics().write_conflicts, 3);
292 }
293
294 #[test]
295 fn test_record_follower_refresh() {
296 let mut manager = CoordinationMetricsManager::new();
297 manager.set_enabled(true);
298
299 manager.record_follower_refresh();
300
301 assert_eq!(manager.get_metrics().follower_refreshes, 1);
302 }
303
304 #[test]
305 fn test_record_notification_latency() {
306 let mut manager = CoordinationMetricsManager::new();
307 manager.set_enabled(true);
308
309 manager.record_notification_latency(10.0);
310 manager.record_notification_latency(20.0);
311 manager.record_notification_latency(30.0);
312
313 let metrics = manager.get_metrics();
314 assert_eq!(metrics.total_notifications, 3);
315 assert!((metrics.avg_notification_latency_ms - 20.0).abs() < 0.001);
316 }
317
318 #[test]
319 fn test_reset() {
320 let mut manager = CoordinationMetricsManager::new();
321 manager.set_enabled(true);
322
323 manager.record_leadership_change(true);
324 manager.record_write_conflict();
325 manager.record_follower_refresh();
326
327 manager.reset();
328
329 let metrics = manager.get_metrics();
330 assert_eq!(metrics.leadership_changes, 0);
331 assert_eq!(metrics.write_conflicts, 0);
332 assert_eq!(metrics.follower_refreshes, 0);
333 }
334
335 #[test]
336 fn test_metrics_json() {
337 let mut manager = CoordinationMetricsManager::new();
338 manager.set_enabled(true);
339
340 manager.record_leadership_change(true);
341
342 let json = manager.get_metrics_json().unwrap();
343 assert!(json.contains("leadership_changes"));
344 }
345}