Skip to main content

binary_options_tools_core_pre/
statistics.rs

1use kanal::{AsyncReceiver, AsyncSender};
2use serde::{Deserialize, Serialize};
3use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6use tokio::sync::RwLock;
7use tokio_tungstenite::tungstenite::Message;
8
9/// Comprehensive connection statistics for WebSocket testing
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct ConnectionStats {
12    /// Total number of connection attempts
13    pub connection_attempts: u64,
14    /// Total number of successful connections
15    pub successful_connections: u64,
16    /// Total number of failed connections
17    pub failed_connections: u64,
18    /// Total number of disconnections
19    pub disconnections: u64,
20    /// Total number of reconnections
21    pub reconnections: u64,
22    /// Average connection latency in milliseconds
23    pub avg_connection_latency_ms: f64,
24    /// Last connection latency in milliseconds
25    pub last_connection_latency_ms: f64,
26    /// Total uptime in seconds
27    pub total_uptime_seconds: f64,
28    /// Current connection uptime in seconds (if connected)
29    pub current_uptime_seconds: f64,
30    /// Time since last disconnection in seconds
31    pub time_since_last_disconnection_seconds: f64,
32    /// Messages sent count
33    pub messages_sent: u64,
34    /// Messages received count
35    pub messages_received: u64,
36    /// Total bytes sent
37    pub bytes_sent: u64,
38    /// Total bytes received
39    pub bytes_received: u64,
40    /// Average messages per second (sent)
41    pub avg_messages_sent_per_second: f64,
42    /// Average messages per second (received)
43    pub avg_messages_received_per_second: f64,
44    /// Average bytes per second (sent)
45    pub avg_bytes_sent_per_second: f64,
46    /// Average bytes per second (received)
47    pub avg_bytes_received_per_second: f64,
48    /// Is currently connected
49    pub is_connected: bool,
50    /// Connection history (last 10 connections)
51    pub connection_history: Vec<ConnectionEvent>,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct ConnectionEvent {
56    pub event_type: ConnectionEventType,
57    pub timestamp: u64,           // Unix timestamp in milliseconds
58    pub duration_ms: Option<u64>, // Duration for connection events
59    pub reason: Option<String>,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub enum ConnectionEventType {
64    ConnectionAttempt,
65    ConnectionSuccess,
66    ConnectionFailure,
67    Disconnection,
68    Reconnection,
69    MessageSent,
70    MessageReceived,
71}
72
73impl Default for ConnectionStats {
74    fn default() -> Self {
75        Self {
76            connection_attempts: 0,
77            successful_connections: 0,
78            failed_connections: 0,
79            disconnections: 0,
80            reconnections: 0,
81            avg_connection_latency_ms: 0.0,
82            last_connection_latency_ms: 0.0,
83            total_uptime_seconds: 0.0,
84            current_uptime_seconds: 0.0,
85            time_since_last_disconnection_seconds: 0.0,
86            messages_sent: 0,
87            messages_received: 0,
88            bytes_sent: 0,
89            bytes_received: 0,
90            avg_messages_sent_per_second: 0.0,
91            avg_messages_received_per_second: 0.0,
92            avg_bytes_sent_per_second: 0.0,
93            avg_bytes_received_per_second: 0.0,
94            is_connected: false,
95            connection_history: Vec::new(),
96        }
97    }
98}
99
100/// Internal statistics tracker with atomic operations for performance
101pub struct StatisticsTracker {
102    // Atomic counters for thread-safe access
103    connection_attempts: AtomicU64,
104    successful_connections: AtomicU64,
105    failed_connections: AtomicU64,
106    disconnections: AtomicU64,
107    reconnections: AtomicU64,
108    messages_sent: AtomicU64,
109    messages_received: AtomicU64,
110    bytes_sent: AtomicU64,
111    bytes_received: AtomicU64,
112
113    // Connection timing
114    start_time: Instant,
115    last_connection_attempt: RwLock<Option<Instant>>,
116    current_connection_start: RwLock<Option<Instant>>,
117    last_disconnection: RwLock<Option<Instant>>,
118    total_uptime: RwLock<Duration>,
119
120    // Connection latency tracking
121    connection_latencies: RwLock<Vec<Duration>>,
122
123    // Connection state
124    is_connected: AtomicBool,
125
126    // Event history
127    event_history: RwLock<Vec<ConnectionEvent>>,
128}
129
130impl ConnectionStats {
131    /// Generate a comprehensive, user-readable summary of the connection statistics
132    pub fn summary(&self) -> String {
133        let mut summary = String::new();
134
135        // Header
136        summary.push_str(
137            "╔═══════════════════════════════════════════════════════════════════════════════╗\n",
138        );
139        summary.push_str(
140            "║                         WebSocket Connection Summary                          ║\n",
141        );
142        summary.push_str(
143            "╠═══════════════════════════════════════════════════════════════════════════════╣\n",
144        );
145
146        // Connection Status
147        let status = if self.is_connected {
148            "🟢 CONNECTED"
149        } else {
150            "🔴 DISCONNECTED"
151        };
152        summary.push_str(&format!("║ Status: {status:<67} ║\n"));
153
154        // Connection Statistics
155        summary.push_str(
156            "║                                                                               ║\n",
157        );
158        summary.push_str(
159            "║ Connection Statistics:                                                        ║\n",
160        );
161        summary.push_str(&format!(
162            "║   • Total Attempts: {:<57} ║\n",
163            self.connection_attempts
164        ));
165        summary.push_str(&format!(
166            "║   • Successful: {:<61} ║\n",
167            self.successful_connections
168        ));
169        summary.push_str(&format!(
170            "║   • Failed: {:<65} ║\n",
171            self.failed_connections
172        ));
173        summary.push_str(&format!(
174            "║   • Disconnections: {:<57} ║\n",
175            self.disconnections
176        ));
177        summary.push_str(&format!(
178            "║   • Reconnections: {:<58} ║\n",
179            self.reconnections
180        ));
181
182        // Success Rate
183        if self.connection_attempts > 0 {
184            let success_rate =
185                (self.successful_connections as f64 / self.connection_attempts as f64) * 100.0;
186            summary.push_str(&format!(
187                "║   • Success Rate: {:<59} ║\n",
188                format!("{:.1}%", success_rate)
189            ));
190        }
191
192        // Connection Latency
193        if self.avg_connection_latency_ms > 0.0 {
194            summary.push_str("║                                                                               ║\n");
195            summary.push_str("║ Connection Latency:                                                           ║\n");
196            summary.push_str(&format!(
197                "║   • Average: {:<62} ║\n",
198                format!("{:.2}ms", self.avg_connection_latency_ms)
199            ));
200            summary.push_str(&format!(
201                "║   • Last: {:<65} ║\n",
202                format!("{:.2}ms", self.last_connection_latency_ms)
203            ));
204        }
205
206        // Uptime Information
207        summary.push_str(
208            "║                                                                               ║\n",
209        );
210        summary.push_str(
211            "║ Uptime Information:                                                           ║\n",
212        );
213        summary.push_str(&format!(
214            "║   • Total Uptime: {:<57} ║\n",
215            Self::format_duration(self.total_uptime_seconds)
216        ));
217
218        if self.is_connected {
219            summary.push_str(&format!(
220                "║   • Current Connection: {:<51} ║\n",
221                Self::format_duration(self.current_uptime_seconds)
222            ));
223        }
224
225        if self.time_since_last_disconnection_seconds > 0.0 {
226            summary.push_str(&format!(
227                "║   • Since Last Disconnect: {:<46} ║\n",
228                Self::format_duration(self.time_since_last_disconnection_seconds)
229            ));
230        }
231
232        // Message Statistics
233        summary.push_str(
234            "║                                                                               ║\n",
235        );
236        summary.push_str(
237            "║ Message Statistics:                                                           ║\n",
238        );
239        summary.push_str(&format!(
240            "║   • Messages Sent: {:<56} ║\n",
241            format!(
242                "{} ({:.2}/s)",
243                self.messages_sent, self.avg_messages_sent_per_second
244            )
245        ));
246        summary.push_str(&format!(
247            "║   • Messages Received: {:<52} ║\n",
248            format!(
249                "{} ({:.2}/s)",
250                self.messages_received, self.avg_messages_received_per_second
251            )
252        ));
253
254        // Data Transfer Statistics
255        summary.push_str(
256            "║                                                                               ║\n",
257        );
258        summary.push_str(
259            "║ Data Transfer:                                                                ║\n",
260        );
261        summary.push_str(&format!(
262            "║   • Bytes Sent: {:<59} ║\n",
263            format!(
264                "{} ({}/s)",
265                Self::format_bytes(self.bytes_sent),
266                Self::format_bytes(self.avg_bytes_sent_per_second as u64)
267            )
268        ));
269        summary.push_str(&format!(
270            "║   • Bytes Received: {:<55} ║\n",
271            format!(
272                "{} ({}/s)",
273                Self::format_bytes(self.bytes_received),
274                Self::format_bytes(self.avg_bytes_received_per_second as u64)
275            )
276        ));
277
278        // Recent Activity
279        if !self.connection_history.is_empty() {
280            summary.push_str("║                                                                               ║\n");
281            summary.push_str("║ Recent Activity (Last 5 events):                                             ║\n");
282
283            let recent_events: Vec<&ConnectionEvent> =
284                self.connection_history.iter().rev().take(5).collect();
285            for event in recent_events.iter().rev() {
286                let timestamp = Self::format_timestamp(event.timestamp);
287                let event_desc = Self::format_event_description(event);
288                summary.push_str(&format!("║   • {timestamp}: {event_desc:<51} ║\n"));
289            }
290        }
291
292        // Connection Health Assessment
293        summary.push_str(
294            "║                                                                               ║\n",
295        );
296        summary.push_str(
297            "║ Connection Health:                                                            ║\n",
298        );
299        let health_status = self.assess_connection_health();
300        summary.push_str(&format!("║   • Overall Health: {health_status:<55} ║\n"));
301
302        // Performance Metrics
303        if self.total_uptime_seconds > 0.0 {
304            let stability = (self.total_uptime_seconds
305                / (self.total_uptime_seconds + (self.disconnections as f64 * 5.0)))
306                * 100.0;
307            summary.push_str(&format!(
308                "║   • Stability Score: {:<54} ║\n",
309                format!("{:.1}%", stability)
310            ));
311        }
312
313        // Footer
314        summary.push_str(
315            "╚═══════════════════════════════════════════════════════════════════════════════╝\n",
316        );
317
318        summary
319    }
320
321    /// Generate a compact, single-line summary
322    pub fn compact_summary(&self) -> String {
323        let status = if self.is_connected {
324            "CONNECTED"
325        } else {
326            "DISCONNECTED"
327        };
328        let success_rate = if self.connection_attempts > 0 {
329            (self.successful_connections as f64 / self.connection_attempts as f64) * 100.0
330        } else {
331            0.0
332        };
333
334        format!(
335            "Status: {} | Attempts: {} | Success Rate: {:.1}% | Uptime: {} | Messages: {}↑ {}↓ | Data: {}↑ {}↓",
336            status,
337            self.connection_attempts,
338            success_rate,
339            Self::format_duration(self.total_uptime_seconds),
340            self.messages_sent,
341            self.messages_received,
342            Self::format_bytes(self.bytes_sent),
343            Self::format_bytes(self.bytes_received)
344        )
345    }
346
347    /// Assess the overall health of the connection
348    fn assess_connection_health(&self) -> String {
349        let mut health_score = 100.0;
350        let mut issues = Vec::new();
351
352        // Check success rate
353        if self.connection_attempts > 0 {
354            let success_rate =
355                (self.successful_connections as f64 / self.connection_attempts as f64) * 100.0;
356            if success_rate < 50.0 {
357                health_score -= 40.0;
358                issues.push("Low success rate");
359            } else if success_rate < 80.0 {
360                health_score -= 20.0;
361                issues.push("Moderate success rate");
362            }
363        }
364
365        // Check disconnection frequency
366        if self.disconnections > 0 && self.total_uptime_seconds > 0.0 {
367            let disconnections_per_hour =
368                (self.disconnections as f64) / (self.total_uptime_seconds / 3600.0);
369            if disconnections_per_hour > 5.0 {
370                health_score -= 30.0;
371                issues.push("Frequent disconnections");
372            } else if disconnections_per_hour > 2.0 {
373                health_score -= 15.0;
374                issues.push("Occasional disconnections");
375            }
376        }
377
378        // Check connection latency
379        if self.avg_connection_latency_ms > 5000.0 {
380            health_score -= 20.0;
381            issues.push("High latency");
382        } else if self.avg_connection_latency_ms > 2000.0 {
383            health_score -= 10.0;
384            issues.push("Moderate latency");
385        }
386
387        // Check if currently connected
388        if !self.is_connected {
389            health_score -= 25.0;
390            issues.push("Currently disconnected");
391        }
392
393        let health_level = if health_score >= 90.0 {
394            "🟢 Excellent"
395        } else if health_score >= 70.0 {
396            "🟡 Good"
397        } else if health_score >= 50.0 {
398            "🟠 Fair"
399        } else {
400            "🔴 Poor"
401        };
402
403        if issues.is_empty() {
404            format!("{health_level} ({health_score:.0}/100)")
405        } else {
406            format!(
407                "{} ({:.0}/100) - {}",
408                health_level,
409                health_score,
410                issues.join(", ")
411            )
412        }
413    }
414
415    /// Format duration in a human-readable way
416    fn format_duration(seconds: f64) -> String {
417        if seconds < 60.0 {
418            format!("{seconds:.1}s")
419        } else if seconds < 3600.0 {
420            format!("{:.1}m", seconds / 60.0)
421        } else if seconds < 86400.0 {
422            format!("{:.1}h", seconds / 3600.0)
423        } else {
424            format!("{:.1}d", seconds / 86400.0)
425        }
426    }
427
428    /// Format bytes in a human-readable way
429    fn format_bytes(bytes: u64) -> String {
430        const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"];
431        let mut size = bytes as f64;
432        let mut unit_index = 0;
433
434        while size >= 1024.0 && unit_index < UNITS.len() - 1 {
435            size /= 1024.0;
436            unit_index += 1;
437        }
438
439        if unit_index == 0 {
440            format!("{} {}", bytes, UNITS[unit_index])
441        } else {
442            format!("{:.1} {}", size, UNITS[unit_index])
443        }
444    }
445
446    /// Format timestamp in a readable way
447    fn format_timestamp(timestamp: u64) -> String {
448        // Convert Unix timestamp to readable format
449        let duration = std::time::Duration::from_millis(timestamp);
450        let secs = duration.as_secs();
451        let now = std::time::SystemTime::now()
452            .duration_since(std::time::UNIX_EPOCH)
453            .unwrap_or_default()
454            .as_secs();
455
456        let diff = now.saturating_sub(secs);
457
458        if diff < 60 {
459            format!("{diff}s ago")
460        } else if diff < 3600 {
461            format!("{}m ago", diff / 60)
462        } else if diff < 86400 {
463            format!("{}h ago", diff / 3600)
464        } else {
465            format!("{}d ago", diff / 86400)
466        }
467    }
468
469    /// Format event description
470    fn format_event_description(event: &ConnectionEvent) -> String {
471        match &event.event_type {
472            ConnectionEventType::ConnectionAttempt => "Connection attempt".to_string(),
473            ConnectionEventType::ConnectionSuccess => {
474                if let Some(duration) = event.duration_ms {
475                    format!("Connected ({duration}ms)")
476                } else {
477                    "Connected".to_string()
478                }
479            }
480            ConnectionEventType::ConnectionFailure => {
481                if let Some(reason) = &event.reason {
482                    format!("Connection failed: {reason}")
483                } else {
484                    "Connection failed".to_string()
485                }
486            }
487            ConnectionEventType::Disconnection => {
488                if let Some(reason) = &event.reason {
489                    format!("Disconnected: {reason}")
490                } else {
491                    "Disconnected".to_string()
492                }
493            }
494            ConnectionEventType::Reconnection => "Reconnection attempt".to_string(),
495            ConnectionEventType::MessageSent => "Message sent".to_string(),
496            ConnectionEventType::MessageReceived => "Message received".to_string(),
497        }
498    }
499}
500
501impl StatisticsTracker {
502    pub fn new() -> Self {
503        Self {
504            connection_attempts: AtomicU64::new(0),
505            successful_connections: AtomicU64::new(0),
506            failed_connections: AtomicU64::new(0),
507            disconnections: AtomicU64::new(0),
508            reconnections: AtomicU64::new(0),
509            messages_sent: AtomicU64::new(0),
510            messages_received: AtomicU64::new(0),
511            bytes_sent: AtomicU64::new(0),
512            bytes_received: AtomicU64::new(0),
513            start_time: Instant::now(),
514            last_connection_attempt: RwLock::new(None),
515            current_connection_start: RwLock::new(None),
516            last_disconnection: RwLock::new(None),
517            total_uptime: RwLock::new(Duration::ZERO),
518            connection_latencies: RwLock::new(Vec::new()),
519            is_connected: AtomicBool::new(false),
520            event_history: RwLock::new(Vec::new()),
521        }
522    }
523
524    pub async fn record_connection_attempt(&self) {
525        self.connection_attempts.fetch_add(1, Ordering::SeqCst);
526        *self.last_connection_attempt.write().await = Some(Instant::now());
527
528        self.add_event(ConnectionEvent {
529            event_type: ConnectionEventType::ConnectionAttempt,
530            timestamp: Self::current_timestamp(),
531            duration_ms: None,
532            reason: None,
533        })
534        .await;
535    }
536
537    pub async fn record_connection_success(&self) {
538        self.successful_connections.fetch_add(1, Ordering::SeqCst);
539        self.is_connected.store(true, Ordering::SeqCst);
540
541        let now = Instant::now();
542        *self.current_connection_start.write().await = Some(now);
543
544        // Calculate connection latency
545        let latency = if let Some(attempt_time) = *self.last_connection_attempt.read().await {
546            now.duration_since(attempt_time)
547        } else {
548            Duration::ZERO
549        };
550
551        self.connection_latencies.write().await.push(latency);
552
553        self.add_event(ConnectionEvent {
554            event_type: ConnectionEventType::ConnectionSuccess,
555            timestamp: Self::current_timestamp(),
556            duration_ms: Some(latency.as_millis() as u64),
557            reason: None,
558        })
559        .await;
560    }
561
562    pub async fn record_connection_failure(&self, reason: Option<String>) {
563        self.failed_connections.fetch_add(1, Ordering::SeqCst);
564        self.is_connected.store(false, Ordering::SeqCst);
565
566        let latency = (*self.last_connection_attempt.read().await)
567            .map(|attempt_time| Instant::now().duration_since(attempt_time));
568
569        self.add_event(ConnectionEvent {
570            event_type: ConnectionEventType::ConnectionFailure,
571            timestamp: Self::current_timestamp(),
572            duration_ms: latency.map(|d| d.as_millis() as u64),
573            reason,
574        })
575        .await;
576    }
577
578    pub async fn record_disconnection(&self, reason: Option<String>) {
579        self.disconnections.fetch_add(1, Ordering::SeqCst);
580        self.is_connected.store(false, Ordering::SeqCst);
581
582        let now = Instant::now();
583        *self.last_disconnection.write().await = Some(now);
584
585        // Update total uptime
586        if let Some(connection_start) = *self.current_connection_start.read().await {
587            let uptime = now.duration_since(connection_start);
588            *self.total_uptime.write().await += uptime;
589        }
590
591        *self.current_connection_start.write().await = None;
592
593        self.add_event(ConnectionEvent {
594            event_type: ConnectionEventType::Disconnection,
595            timestamp: Self::current_timestamp(),
596            duration_ms: None,
597            reason,
598        })
599        .await;
600    }
601
602    pub async fn record_reconnection(&self) {
603        self.reconnections.fetch_add(1, Ordering::SeqCst);
604
605        self.add_event(ConnectionEvent {
606            event_type: ConnectionEventType::Reconnection,
607            timestamp: Self::current_timestamp(),
608            duration_ms: None,
609            reason: None,
610        })
611        .await;
612    }
613
614    pub async fn record_message_sent(&self, message: &Message) {
615        self.messages_sent.fetch_add(1, Ordering::SeqCst);
616        self.bytes_sent
617            .fetch_add(Self::message_size(message), Ordering::SeqCst);
618
619        self.add_event(ConnectionEvent {
620            event_type: ConnectionEventType::MessageSent,
621            timestamp: Self::current_timestamp(),
622            duration_ms: None,
623            reason: None,
624        })
625        .await;
626    }
627
628    pub async fn record_message_received(&self, message: &Message) {
629        self.messages_received.fetch_add(1, Ordering::SeqCst);
630        self.bytes_received
631            .fetch_add(Self::message_size(message), Ordering::SeqCst);
632
633        self.add_event(ConnectionEvent {
634            event_type: ConnectionEventType::MessageReceived,
635            timestamp: Self::current_timestamp(),
636            duration_ms: None,
637            reason: None,
638        })
639        .await;
640    }
641
642    pub async fn get_stats(&self) -> ConnectionStats {
643        let now = Instant::now();
644        let elapsed = now.duration_since(self.start_time);
645
646        let connection_latencies = self.connection_latencies.read().await;
647        let avg_latency = if connection_latencies.is_empty() {
648            0.0
649        } else {
650            connection_latencies.iter().sum::<Duration>().as_millis() as f64
651                / connection_latencies.len() as f64
652        };
653
654        let last_latency = connection_latencies
655            .last()
656            .map(|d| d.as_millis() as f64)
657            .unwrap_or(0.0);
658
659        let total_uptime = *self.total_uptime.read().await;
660        let current_uptime =
661            if let Some(connection_start) = *self.current_connection_start.read().await {
662                now.duration_since(connection_start)
663            } else {
664                Duration::ZERO
665            };
666
667        let time_since_last_disconnection =
668            if let Some(last_disc) = *self.last_disconnection.read().await {
669                now.duration_since(last_disc)
670            } else {
671                elapsed
672            };
673
674        let messages_sent = self.messages_sent.load(Ordering::SeqCst);
675        let messages_received = self.messages_received.load(Ordering::SeqCst);
676        let bytes_sent = self.bytes_sent.load(Ordering::SeqCst);
677        let bytes_received = self.bytes_received.load(Ordering::SeqCst);
678
679        let elapsed_seconds = elapsed.as_secs_f64();
680
681        ConnectionStats {
682            connection_attempts: self.connection_attempts.load(Ordering::SeqCst),
683            successful_connections: self.successful_connections.load(Ordering::SeqCst),
684            failed_connections: self.failed_connections.load(Ordering::SeqCst),
685            disconnections: self.disconnections.load(Ordering::SeqCst),
686            reconnections: self.reconnections.load(Ordering::SeqCst),
687            avg_connection_latency_ms: avg_latency,
688            last_connection_latency_ms: last_latency,
689            total_uptime_seconds: total_uptime.as_secs_f64(),
690            current_uptime_seconds: current_uptime.as_secs_f64(),
691            time_since_last_disconnection_seconds: time_since_last_disconnection.as_secs_f64(),
692            messages_sent,
693            messages_received,
694            bytes_sent,
695            bytes_received,
696            avg_messages_sent_per_second: if elapsed_seconds > 0.0 {
697                messages_sent as f64 / elapsed_seconds
698            } else {
699                0.0
700            },
701            avg_messages_received_per_second: if elapsed_seconds > 0.0 {
702                messages_received as f64 / elapsed_seconds
703            } else {
704                0.0
705            },
706            avg_bytes_sent_per_second: if elapsed_seconds > 0.0 {
707                bytes_sent as f64 / elapsed_seconds
708            } else {
709                0.0
710            },
711            avg_bytes_received_per_second: if elapsed_seconds > 0.0 {
712                bytes_received as f64 / elapsed_seconds
713            } else {
714                0.0
715            },
716            is_connected: self.is_connected.load(Ordering::SeqCst),
717            connection_history: self.event_history.read().await.clone(),
718        }
719    }
720
721    fn message_size(message: &Message) -> u64 {
722        match message {
723            Message::Text(text) => text.len() as u64,
724            Message::Binary(data) => data.len() as u64,
725            Message::Ping(data) => data.len() as u64,
726            Message::Pong(data) => data.len() as u64,
727            Message::Close(_) => 0,
728            Message::Frame(_) => 0,
729        }
730    }
731
732    fn current_timestamp() -> u64 {
733        std::time::SystemTime::now()
734            .duration_since(std::time::UNIX_EPOCH)
735            .unwrap_or_default()
736            .as_millis() as u64
737    }
738
739    async fn add_event(&self, event: ConnectionEvent) {
740        let mut history = self.event_history.write().await;
741        history.push(event);
742
743        // Keep only last 100 events to prevent memory growth
744        if history.len() > 100 {
745            history.drain(0..50); // Remove oldest 50 events
746        }
747    }
748}
749
750impl Default for StatisticsTracker {
751    fn default() -> Self {
752        Self::new()
753    }
754}
755
756/// Wrapper around AsyncSender to track message statistics
757pub struct TrackedSender<T> {
758    inner: AsyncSender<T>,
759    stats: Arc<StatisticsTracker>,
760}
761
762impl<T> TrackedSender<T> {
763    pub fn new(sender: AsyncSender<T>, stats: Arc<StatisticsTracker>) -> Self {
764        Self {
765            inner: sender,
766            stats,
767        }
768    }
769
770    pub async fn send(&self, item: T) -> Result<(), kanal::SendError> {
771        let result = self.inner.send(item).await;
772
773        // We'll track all sends for now, regardless of type
774        if result.is_ok() {
775            // Use tokio::spawn for async operation
776            let stats = self.stats.clone();
777            tokio::spawn(async move {
778                // For now, we'll just track the count without message details
779                // In a real implementation, you might want to have a trait for message sizing
780                stats
781                    .messages_sent
782                    .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
783            });
784        }
785
786        result
787    }
788}
789
790/// Wrapper around AsyncReceiver to track message statistics
791pub struct TrackedReceiver<T> {
792    inner: AsyncReceiver<T>,
793    stats: Arc<StatisticsTracker>,
794}
795
796impl<T> TrackedReceiver<T> {
797    pub fn new(receiver: AsyncReceiver<T>, stats: Arc<StatisticsTracker>) -> Self {
798        Self {
799            inner: receiver,
800            stats,
801        }
802    }
803
804    pub async fn recv(&self) -> Result<T, kanal::ReceiveError> {
805        let result = self.inner.recv().await;
806
807        // We'll track all receives for now, regardless of type
808        if result.is_ok() {
809            // Use tokio::spawn for async operation
810            let stats = self.stats.clone();
811            tokio::spawn(async move {
812                // For now, we'll just track the count without message details
813                // In a real implementation, you might want to have a trait for message sizing
814                stats
815                    .messages_received
816                    .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
817            });
818        }
819
820        result
821    }
822}