absurder_sql/storage/
coordination_metrics.rs

1/// Coordination Metrics Module
2///
3/// Tracks performance and coordination metrics for multi-tab operations.
4///
5/// Key Metrics:
6/// - Leadership changes per minute
7/// - Notification latency (average time for BroadcastChannel messages)
8/// - Write conflicts (when non-leader attempts write)
9/// - Follower refresh count (how often followers sync from leader)
10use serde::{Deserialize, Serialize};
11use std::collections::VecDeque;
12
13/// Coordination metrics for multi-tab coordination
14#[derive(Clone, Debug, Serialize, Deserialize)]
15pub struct CoordinationMetrics {
16    /// Total number of leadership changes
17    pub leadership_changes: u64,
18    /// Total number of write conflicts (non-leader write attempts)
19    pub write_conflicts: u64,
20    /// Total number of follower refreshes
21    pub follower_refreshes: u64,
22    /// Average notification latency in milliseconds
23    pub avg_notification_latency_ms: f64,
24    /// Total notifications sent/received
25    pub total_notifications: u64,
26    /// Timestamp when metrics started tracking
27    pub start_timestamp: f64,
28}
29
30/// Manager for tracking coordination metrics
31pub struct CoordinationMetricsManager {
32    /// Whether metrics tracking is enabled
33    enabled: bool,
34    /// Current metrics
35    metrics: CoordinationMetrics,
36    /// Recent notification latencies (for calculating average)
37    latency_samples: VecDeque<f64>,
38    /// Maximum number of latency samples to keep
39    max_latency_samples: usize,
40}
41
42impl CoordinationMetricsManager {
43    /// Create a new coordination metrics manager
44    pub fn new() -> Self {
45        #[cfg(target_arch = "wasm32")]
46        let start_timestamp = js_sys::Date::now();
47
48        #[cfg(not(target_arch = "wasm32"))]
49        let start_timestamp = std::time::SystemTime::now()
50            .duration_since(std::time::UNIX_EPOCH)
51            .unwrap_or_else(|_| {
52                // Fallback: if system time is before UNIX_EPOCH, use 0
53                log::warn!("SystemTime before UNIX_EPOCH, using 0 as start_timestamp");
54                std::time::Duration::from_secs(0)
55            })
56            .as_secs_f64()
57            * 1000.0;
58
59        Self {
60            enabled: false,
61            metrics: CoordinationMetrics {
62                leadership_changes: 0,
63                write_conflicts: 0,
64                follower_refreshes: 0,
65                avg_notification_latency_ms: 0.0,
66                total_notifications: 0,
67                start_timestamp,
68            },
69            latency_samples: VecDeque::new(),
70            max_latency_samples: 100, // Keep last 100 samples
71        }
72    }
73
74    /// Enable or disable metrics tracking
75    pub fn set_enabled(&mut self, enabled: bool) {
76        self.enabled = enabled;
77        if !enabled {
78            // Reset metrics when disabled
79            self.reset();
80        }
81
82        #[cfg(target_arch = "wasm32")]
83        web_sys::console::log_1(
84            &format!(
85                "Coordination metrics {}",
86                if enabled { "enabled" } else { "disabled" }
87            )
88            .into(),
89        );
90    }
91
92    /// Check if metrics tracking is enabled
93    pub fn is_enabled(&self) -> bool {
94        self.enabled
95    }
96
97    /// Record a leadership change
98    pub fn record_leadership_change(&mut self, _became_leader: bool) {
99        if !self.enabled {
100            return;
101        }
102
103        self.metrics.leadership_changes += 1;
104
105        #[cfg(target_arch = "wasm32")]
106        web_sys::console::log_1(
107            &format!(
108                "Leadership change recorded (became_leader: {}). Total: {}",
109                _became_leader, self.metrics.leadership_changes
110            )
111            .into(),
112        );
113    }
114
115    /// Record a write conflict
116    pub fn record_write_conflict(&mut self) {
117        if !self.enabled {
118            return;
119        }
120
121        self.metrics.write_conflicts += 1;
122
123        #[cfg(target_arch = "wasm32")]
124        web_sys::console::log_1(
125            &format!(
126                "Write conflict recorded. Total: {}",
127                self.metrics.write_conflicts
128            )
129            .into(),
130        );
131    }
132
133    /// Record a follower refresh
134    pub fn record_follower_refresh(&mut self) {
135        if !self.enabled {
136            return;
137        }
138
139        self.metrics.follower_refreshes += 1;
140
141        #[cfg(target_arch = "wasm32")]
142        web_sys::console::log_1(
143            &format!(
144                "Follower refresh recorded. Total: {}",
145                self.metrics.follower_refreshes
146            )
147            .into(),
148        );
149    }
150
151    /// Record notification latency in milliseconds
152    pub fn record_notification_latency(&mut self, latency_ms: f64) {
153        if !self.enabled {
154            return;
155        }
156
157        // Add to samples
158        self.latency_samples.push_back(latency_ms);
159
160        // Keep only the most recent samples
161        if self.latency_samples.len() > self.max_latency_samples {
162            self.latency_samples.pop_front();
163        }
164
165        // Recalculate average
166        let sum: f64 = self.latency_samples.iter().sum();
167        self.metrics.avg_notification_latency_ms = sum / self.latency_samples.len() as f64;
168        self.metrics.total_notifications += 1;
169
170        #[cfg(target_arch = "wasm32")]
171        web_sys::console::log_1(
172            &format!(
173                "Notification latency recorded: {:.2}ms. Avg: {:.2}ms",
174                latency_ms, self.metrics.avg_notification_latency_ms
175            )
176            .into(),
177        );
178    }
179
180    /// Get current metrics
181    pub fn get_metrics(&self) -> &CoordinationMetrics {
182        &self.metrics
183    }
184
185    /// Get metrics as JSON string
186    pub fn get_metrics_json(&self) -> Result<String, String> {
187        serde_json::to_string(&self.metrics)
188            .map_err(|e| format!("Failed to serialize metrics: {}", e))
189    }
190
191    /// Reset all metrics
192    pub fn reset(&mut self) {
193        #[cfg(target_arch = "wasm32")]
194        let start_timestamp = js_sys::Date::now();
195
196        #[cfg(not(target_arch = "wasm32"))]
197        let start_timestamp = std::time::SystemTime::now()
198            .duration_since(std::time::UNIX_EPOCH)
199            .unwrap_or_else(|_| {
200                // Fallback: if system time is before UNIX_EPOCH, use 0
201                log::warn!("SystemTime before UNIX_EPOCH in reset, using 0 as start_timestamp");
202                std::time::Duration::from_secs(0)
203            })
204            .as_secs_f64()
205            * 1000.0;
206
207        self.metrics = CoordinationMetrics {
208            leadership_changes: 0,
209            write_conflicts: 0,
210            follower_refreshes: 0,
211            avg_notification_latency_ms: 0.0,
212            total_notifications: 0,
213            start_timestamp,
214        };
215        self.latency_samples.clear();
216
217        #[cfg(target_arch = "wasm32")]
218        web_sys::console::log_1(&"Coordination metrics reset".into());
219    }
220
221    /// Get leadership changes per minute
222    pub fn get_leadership_changes_per_minute(&self) -> f64 {
223        #[cfg(target_arch = "wasm32")]
224        let current_time = js_sys::Date::now();
225
226        #[cfg(not(target_arch = "wasm32"))]
227        let current_time = std::time::SystemTime::now()
228            .duration_since(std::time::UNIX_EPOCH)
229            .unwrap_or_else(|_| {
230                // Fallback: if system time is before UNIX_EPOCH, use 0
231                log::warn!(
232                    "SystemTime before UNIX_EPOCH in get_leadership_changes_per_minute, using 0"
233                );
234                std::time::Duration::from_secs(0)
235            })
236            .as_secs_f64()
237            * 1000.0;
238
239        let elapsed_minutes = (current_time - self.metrics.start_timestamp) / 60000.0;
240
241        if elapsed_minutes > 0.0 {
242            self.metrics.leadership_changes as f64 / elapsed_minutes
243        } else {
244            0.0
245        }
246    }
247}
248
249impl Default for CoordinationMetricsManager {
250    fn default() -> Self {
251        Self::new()
252    }
253}
254
255#[cfg(test)]
256mod tests {
257    use super::*;
258
259    #[test]
260    fn test_enable_disable() {
261        let mut manager = CoordinationMetricsManager::new();
262        assert!(!manager.is_enabled());
263
264        manager.set_enabled(true);
265        assert!(manager.is_enabled());
266
267        manager.set_enabled(false);
268        assert!(!manager.is_enabled());
269    }
270
271    #[test]
272    fn test_record_leadership_change() {
273        let mut manager = CoordinationMetricsManager::new();
274        manager.set_enabled(true);
275
276        manager.record_leadership_change(true);
277        manager.record_leadership_change(false);
278
279        assert_eq!(manager.get_metrics().leadership_changes, 2);
280    }
281
282    #[test]
283    fn test_record_write_conflict() {
284        let mut manager = CoordinationMetricsManager::new();
285        manager.set_enabled(true);
286
287        manager.record_write_conflict();
288        manager.record_write_conflict();
289        manager.record_write_conflict();
290
291        assert_eq!(manager.get_metrics().write_conflicts, 3);
292    }
293
294    #[test]
295    fn test_record_follower_refresh() {
296        let mut manager = CoordinationMetricsManager::new();
297        manager.set_enabled(true);
298
299        manager.record_follower_refresh();
300
301        assert_eq!(manager.get_metrics().follower_refreshes, 1);
302    }
303
304    #[test]
305    fn test_record_notification_latency() {
306        let mut manager = CoordinationMetricsManager::new();
307        manager.set_enabled(true);
308
309        manager.record_notification_latency(10.0);
310        manager.record_notification_latency(20.0);
311        manager.record_notification_latency(30.0);
312
313        let metrics = manager.get_metrics();
314        assert_eq!(metrics.total_notifications, 3);
315        assert!((metrics.avg_notification_latency_ms - 20.0).abs() < 0.001);
316    }
317
318    #[test]
319    fn test_reset() {
320        let mut manager = CoordinationMetricsManager::new();
321        manager.set_enabled(true);
322
323        manager.record_leadership_change(true);
324        manager.record_write_conflict();
325        manager.record_follower_refresh();
326
327        manager.reset();
328
329        let metrics = manager.get_metrics();
330        assert_eq!(metrics.leadership_changes, 0);
331        assert_eq!(metrics.write_conflicts, 0);
332        assert_eq!(metrics.follower_refreshes, 0);
333    }
334
335    #[test]
336    fn test_metrics_json() {
337        let mut manager = CoordinationMetricsManager::new();
338        manager.set_enabled(true);
339
340        manager.record_leadership_change(true);
341
342        let json = manager.get_metrics_json().unwrap();
343        assert!(json.contains("leadership_changes"));
344    }
345}