1use kanal::{AsyncReceiver, AsyncSender};
2use serde::{Deserialize, Serialize};
3use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6use tokio::sync::RwLock;
7use tokio_tungstenite::tungstenite::Message;
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct ConnectionStats {
12 pub connection_attempts: u64,
14 pub successful_connections: u64,
16 pub failed_connections: u64,
18 pub disconnections: u64,
20 pub reconnections: u64,
22 pub avg_connection_latency_ms: f64,
24 pub last_connection_latency_ms: f64,
26 pub total_uptime_seconds: f64,
28 pub current_uptime_seconds: f64,
30 pub time_since_last_disconnection_seconds: f64,
32 pub messages_sent: u64,
34 pub messages_received: u64,
36 pub bytes_sent: u64,
38 pub bytes_received: u64,
40 pub avg_messages_sent_per_second: f64,
42 pub avg_messages_received_per_second: f64,
44 pub avg_bytes_sent_per_second: f64,
46 pub avg_bytes_received_per_second: f64,
48 pub is_connected: bool,
50 pub connection_history: Vec<ConnectionEvent>,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct ConnectionEvent {
56 pub event_type: ConnectionEventType,
57 pub timestamp: u64, pub duration_ms: Option<u64>, pub reason: Option<String>,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub enum ConnectionEventType {
64 ConnectionAttempt,
65 ConnectionSuccess,
66 ConnectionFailure,
67 Disconnection,
68 Reconnection,
69 MessageSent,
70 MessageReceived,
71}
72
73impl Default for ConnectionStats {
74 fn default() -> Self {
75 Self {
76 connection_attempts: 0,
77 successful_connections: 0,
78 failed_connections: 0,
79 disconnections: 0,
80 reconnections: 0,
81 avg_connection_latency_ms: 0.0,
82 last_connection_latency_ms: 0.0,
83 total_uptime_seconds: 0.0,
84 current_uptime_seconds: 0.0,
85 time_since_last_disconnection_seconds: 0.0,
86 messages_sent: 0,
87 messages_received: 0,
88 bytes_sent: 0,
89 bytes_received: 0,
90 avg_messages_sent_per_second: 0.0,
91 avg_messages_received_per_second: 0.0,
92 avg_bytes_sent_per_second: 0.0,
93 avg_bytes_received_per_second: 0.0,
94 is_connected: false,
95 connection_history: Vec::new(),
96 }
97 }
98}
99
100pub struct StatisticsTracker {
102 connection_attempts: AtomicU64,
104 successful_connections: AtomicU64,
105 failed_connections: AtomicU64,
106 disconnections: AtomicU64,
107 reconnections: AtomicU64,
108 messages_sent: AtomicU64,
109 messages_received: AtomicU64,
110 bytes_sent: AtomicU64,
111 bytes_received: AtomicU64,
112
113 start_time: Instant,
115 last_connection_attempt: RwLock<Option<Instant>>,
116 current_connection_start: RwLock<Option<Instant>>,
117 last_disconnection: RwLock<Option<Instant>>,
118 total_uptime: RwLock<Duration>,
119
120 connection_latencies: RwLock<Vec<Duration>>,
122
123 is_connected: AtomicBool,
125
126 event_history: RwLock<Vec<ConnectionEvent>>,
128}
129
130impl ConnectionStats {
131 pub fn summary(&self) -> String {
133 let mut summary = String::new();
134
135 summary.push_str(
137 "╔═══════════════════════════════════════════════════════════════════════════════╗\n",
138 );
139 summary.push_str(
140 "║ WebSocket Connection Summary ║\n",
141 );
142 summary.push_str(
143 "╠═══════════════════════════════════════════════════════════════════════════════╣\n",
144 );
145
146 let status = if self.is_connected {
148 "🟢 CONNECTED"
149 } else {
150 "🔴 DISCONNECTED"
151 };
152 summary.push_str(&format!("║ Status: {status:<67} ║\n"));
153
154 summary.push_str(
156 "║ ║\n",
157 );
158 summary.push_str(
159 "║ Connection Statistics: ║\n",
160 );
161 summary.push_str(&format!(
162 "║ • Total Attempts: {:<57} ║\n",
163 self.connection_attempts
164 ));
165 summary.push_str(&format!(
166 "║ • Successful: {:<61} ║\n",
167 self.successful_connections
168 ));
169 summary.push_str(&format!(
170 "║ • Failed: {:<65} ║\n",
171 self.failed_connections
172 ));
173 summary.push_str(&format!(
174 "║ • Disconnections: {:<57} ║\n",
175 self.disconnections
176 ));
177 summary.push_str(&format!(
178 "║ • Reconnections: {:<58} ║\n",
179 self.reconnections
180 ));
181
182 if self.connection_attempts > 0 {
184 let success_rate =
185 (self.successful_connections as f64 / self.connection_attempts as f64) * 100.0;
186 summary.push_str(&format!(
187 "║ • Success Rate: {:<59} ║\n",
188 format!("{:.1}%", success_rate)
189 ));
190 }
191
192 if self.avg_connection_latency_ms > 0.0 {
194 summary.push_str("║ ║\n");
195 summary.push_str("║ Connection Latency: ║\n");
196 summary.push_str(&format!(
197 "║ • Average: {:<62} ║\n",
198 format!("{:.2}ms", self.avg_connection_latency_ms)
199 ));
200 summary.push_str(&format!(
201 "║ • Last: {:<65} ║\n",
202 format!("{:.2}ms", self.last_connection_latency_ms)
203 ));
204 }
205
206 summary.push_str(
208 "║ ║\n",
209 );
210 summary.push_str(
211 "║ Uptime Information: ║\n",
212 );
213 summary.push_str(&format!(
214 "║ • Total Uptime: {:<57} ║\n",
215 Self::format_duration(self.total_uptime_seconds)
216 ));
217
218 if self.is_connected {
219 summary.push_str(&format!(
220 "║ • Current Connection: {:<51} ║\n",
221 Self::format_duration(self.current_uptime_seconds)
222 ));
223 }
224
225 if self.time_since_last_disconnection_seconds > 0.0 {
226 summary.push_str(&format!(
227 "║ • Since Last Disconnect: {:<46} ║\n",
228 Self::format_duration(self.time_since_last_disconnection_seconds)
229 ));
230 }
231
232 summary.push_str(
234 "║ ║\n",
235 );
236 summary.push_str(
237 "║ Message Statistics: ║\n",
238 );
239 summary.push_str(&format!(
240 "║ • Messages Sent: {:<56} ║\n",
241 format!(
242 "{} ({:.2}/s)",
243 self.messages_sent, self.avg_messages_sent_per_second
244 )
245 ));
246 summary.push_str(&format!(
247 "║ • Messages Received: {:<52} ║\n",
248 format!(
249 "{} ({:.2}/s)",
250 self.messages_received, self.avg_messages_received_per_second
251 )
252 ));
253
254 summary.push_str(
256 "║ ║\n",
257 );
258 summary.push_str(
259 "║ Data Transfer: ║\n",
260 );
261 summary.push_str(&format!(
262 "║ • Bytes Sent: {:<59} ║\n",
263 format!(
264 "{} ({}/s)",
265 Self::format_bytes(self.bytes_sent),
266 Self::format_bytes(self.avg_bytes_sent_per_second as u64)
267 )
268 ));
269 summary.push_str(&format!(
270 "║ • Bytes Received: {:<55} ║\n",
271 format!(
272 "{} ({}/s)",
273 Self::format_bytes(self.bytes_received),
274 Self::format_bytes(self.avg_bytes_received_per_second as u64)
275 )
276 ));
277
278 if !self.connection_history.is_empty() {
280 summary.push_str("║ ║\n");
281 summary.push_str("║ Recent Activity (Last 5 events): ║\n");
282
283 let recent_events: Vec<&ConnectionEvent> =
284 self.connection_history.iter().rev().take(5).collect();
285 for event in recent_events.iter().rev() {
286 let timestamp = Self::format_timestamp(event.timestamp);
287 let event_desc = Self::format_event_description(event);
288 summary.push_str(&format!("║ • {timestamp}: {event_desc:<51} ║\n"));
289 }
290 }
291
292 summary.push_str(
294 "║ ║\n",
295 );
296 summary.push_str(
297 "║ Connection Health: ║\n",
298 );
299 let health_status = self.assess_connection_health();
300 summary.push_str(&format!("║ • Overall Health: {health_status:<55} ║\n"));
301
302 if self.total_uptime_seconds > 0.0 {
304 let stability = (self.total_uptime_seconds
305 / (self.total_uptime_seconds + (self.disconnections as f64 * 5.0)))
306 * 100.0;
307 summary.push_str(&format!(
308 "║ • Stability Score: {:<54} ║\n",
309 format!("{:.1}%", stability)
310 ));
311 }
312
313 summary.push_str(
315 "╚═══════════════════════════════════════════════════════════════════════════════╝\n",
316 );
317
318 summary
319 }
320
321 pub fn compact_summary(&self) -> String {
323 let status = if self.is_connected {
324 "CONNECTED"
325 } else {
326 "DISCONNECTED"
327 };
328 let success_rate = if self.connection_attempts > 0 {
329 (self.successful_connections as f64 / self.connection_attempts as f64) * 100.0
330 } else {
331 0.0
332 };
333
334 format!(
335 "Status: {} | Attempts: {} | Success Rate: {:.1}% | Uptime: {} | Messages: {}↑ {}↓ | Data: {}↑ {}↓",
336 status,
337 self.connection_attempts,
338 success_rate,
339 Self::format_duration(self.total_uptime_seconds),
340 self.messages_sent,
341 self.messages_received,
342 Self::format_bytes(self.bytes_sent),
343 Self::format_bytes(self.bytes_received)
344 )
345 }
346
347 fn assess_connection_health(&self) -> String {
349 let mut health_score = 100.0;
350 let mut issues = Vec::new();
351
352 if self.connection_attempts > 0 {
354 let success_rate =
355 (self.successful_connections as f64 / self.connection_attempts as f64) * 100.0;
356 if success_rate < 50.0 {
357 health_score -= 40.0;
358 issues.push("Low success rate");
359 } else if success_rate < 80.0 {
360 health_score -= 20.0;
361 issues.push("Moderate success rate");
362 }
363 }
364
365 if self.disconnections > 0 && self.total_uptime_seconds > 0.0 {
367 let disconnections_per_hour =
368 (self.disconnections as f64) / (self.total_uptime_seconds / 3600.0);
369 if disconnections_per_hour > 5.0 {
370 health_score -= 30.0;
371 issues.push("Frequent disconnections");
372 } else if disconnections_per_hour > 2.0 {
373 health_score -= 15.0;
374 issues.push("Occasional disconnections");
375 }
376 }
377
378 if self.avg_connection_latency_ms > 5000.0 {
380 health_score -= 20.0;
381 issues.push("High latency");
382 } else if self.avg_connection_latency_ms > 2000.0 {
383 health_score -= 10.0;
384 issues.push("Moderate latency");
385 }
386
387 if !self.is_connected {
389 health_score -= 25.0;
390 issues.push("Currently disconnected");
391 }
392
393 let health_level = if health_score >= 90.0 {
394 "🟢 Excellent"
395 } else if health_score >= 70.0 {
396 "🟡 Good"
397 } else if health_score >= 50.0 {
398 "🟠 Fair"
399 } else {
400 "🔴 Poor"
401 };
402
403 if issues.is_empty() {
404 format!("{health_level} ({health_score:.0}/100)")
405 } else {
406 format!(
407 "{} ({:.0}/100) - {}",
408 health_level,
409 health_score,
410 issues.join(", ")
411 )
412 }
413 }
414
415 fn format_duration(seconds: f64) -> String {
417 if seconds < 60.0 {
418 format!("{seconds:.1}s")
419 } else if seconds < 3600.0 {
420 format!("{:.1}m", seconds / 60.0)
421 } else if seconds < 86400.0 {
422 format!("{:.1}h", seconds / 3600.0)
423 } else {
424 format!("{:.1}d", seconds / 86400.0)
425 }
426 }
427
428 fn format_bytes(bytes: u64) -> String {
430 const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"];
431 let mut size = bytes as f64;
432 let mut unit_index = 0;
433
434 while size >= 1024.0 && unit_index < UNITS.len() - 1 {
435 size /= 1024.0;
436 unit_index += 1;
437 }
438
439 if unit_index == 0 {
440 format!("{} {}", bytes, UNITS[unit_index])
441 } else {
442 format!("{:.1} {}", size, UNITS[unit_index])
443 }
444 }
445
446 fn format_timestamp(timestamp: u64) -> String {
448 let duration = std::time::Duration::from_millis(timestamp);
450 let secs = duration.as_secs();
451 let now = std::time::SystemTime::now()
452 .duration_since(std::time::UNIX_EPOCH)
453 .unwrap_or_default()
454 .as_secs();
455
456 let diff = now.saturating_sub(secs);
457
458 if diff < 60 {
459 format!("{diff}s ago")
460 } else if diff < 3600 {
461 format!("{}m ago", diff / 60)
462 } else if diff < 86400 {
463 format!("{}h ago", diff / 3600)
464 } else {
465 format!("{}d ago", diff / 86400)
466 }
467 }
468
469 fn format_event_description(event: &ConnectionEvent) -> String {
471 match &event.event_type {
472 ConnectionEventType::ConnectionAttempt => "Connection attempt".to_string(),
473 ConnectionEventType::ConnectionSuccess => {
474 if let Some(duration) = event.duration_ms {
475 format!("Connected ({duration}ms)")
476 } else {
477 "Connected".to_string()
478 }
479 }
480 ConnectionEventType::ConnectionFailure => {
481 if let Some(reason) = &event.reason {
482 format!("Connection failed: {reason}")
483 } else {
484 "Connection failed".to_string()
485 }
486 }
487 ConnectionEventType::Disconnection => {
488 if let Some(reason) = &event.reason {
489 format!("Disconnected: {reason}")
490 } else {
491 "Disconnected".to_string()
492 }
493 }
494 ConnectionEventType::Reconnection => "Reconnection attempt".to_string(),
495 ConnectionEventType::MessageSent => "Message sent".to_string(),
496 ConnectionEventType::MessageReceived => "Message received".to_string(),
497 }
498 }
499}
500
501impl StatisticsTracker {
502 pub fn new() -> Self {
503 Self {
504 connection_attempts: AtomicU64::new(0),
505 successful_connections: AtomicU64::new(0),
506 failed_connections: AtomicU64::new(0),
507 disconnections: AtomicU64::new(0),
508 reconnections: AtomicU64::new(0),
509 messages_sent: AtomicU64::new(0),
510 messages_received: AtomicU64::new(0),
511 bytes_sent: AtomicU64::new(0),
512 bytes_received: AtomicU64::new(0),
513 start_time: Instant::now(),
514 last_connection_attempt: RwLock::new(None),
515 current_connection_start: RwLock::new(None),
516 last_disconnection: RwLock::new(None),
517 total_uptime: RwLock::new(Duration::ZERO),
518 connection_latencies: RwLock::new(Vec::new()),
519 is_connected: AtomicBool::new(false),
520 event_history: RwLock::new(Vec::new()),
521 }
522 }
523
524 pub async fn record_connection_attempt(&self) {
525 self.connection_attempts.fetch_add(1, Ordering::SeqCst);
526 *self.last_connection_attempt.write().await = Some(Instant::now());
527
528 self.add_event(ConnectionEvent {
529 event_type: ConnectionEventType::ConnectionAttempt,
530 timestamp: Self::current_timestamp(),
531 duration_ms: None,
532 reason: None,
533 })
534 .await;
535 }
536
537 pub async fn record_connection_success(&self) {
538 self.successful_connections.fetch_add(1, Ordering::SeqCst);
539 self.is_connected.store(true, Ordering::SeqCst);
540
541 let now = Instant::now();
542 *self.current_connection_start.write().await = Some(now);
543
544 let latency = if let Some(attempt_time) = *self.last_connection_attempt.read().await {
546 now.duration_since(attempt_time)
547 } else {
548 Duration::ZERO
549 };
550
551 self.connection_latencies.write().await.push(latency);
552
553 self.add_event(ConnectionEvent {
554 event_type: ConnectionEventType::ConnectionSuccess,
555 timestamp: Self::current_timestamp(),
556 duration_ms: Some(latency.as_millis() as u64),
557 reason: None,
558 })
559 .await;
560 }
561
562 pub async fn record_connection_failure(&self, reason: Option<String>) {
563 self.failed_connections.fetch_add(1, Ordering::SeqCst);
564 self.is_connected.store(false, Ordering::SeqCst);
565
566 let latency = (*self.last_connection_attempt.read().await)
567 .map(|attempt_time| Instant::now().duration_since(attempt_time));
568
569 self.add_event(ConnectionEvent {
570 event_type: ConnectionEventType::ConnectionFailure,
571 timestamp: Self::current_timestamp(),
572 duration_ms: latency.map(|d| d.as_millis() as u64),
573 reason,
574 })
575 .await;
576 }
577
578 pub async fn record_disconnection(&self, reason: Option<String>) {
579 self.disconnections.fetch_add(1, Ordering::SeqCst);
580 self.is_connected.store(false, Ordering::SeqCst);
581
582 let now = Instant::now();
583 *self.last_disconnection.write().await = Some(now);
584
585 if let Some(connection_start) = *self.current_connection_start.read().await {
587 let uptime = now.duration_since(connection_start);
588 *self.total_uptime.write().await += uptime;
589 }
590
591 *self.current_connection_start.write().await = None;
592
593 self.add_event(ConnectionEvent {
594 event_type: ConnectionEventType::Disconnection,
595 timestamp: Self::current_timestamp(),
596 duration_ms: None,
597 reason,
598 })
599 .await;
600 }
601
602 pub async fn record_reconnection(&self) {
603 self.reconnections.fetch_add(1, Ordering::SeqCst);
604
605 self.add_event(ConnectionEvent {
606 event_type: ConnectionEventType::Reconnection,
607 timestamp: Self::current_timestamp(),
608 duration_ms: None,
609 reason: None,
610 })
611 .await;
612 }
613
614 pub async fn record_message_sent(&self, message: &Message) {
615 self.messages_sent.fetch_add(1, Ordering::SeqCst);
616 self.bytes_sent
617 .fetch_add(Self::message_size(message), Ordering::SeqCst);
618
619 self.add_event(ConnectionEvent {
620 event_type: ConnectionEventType::MessageSent,
621 timestamp: Self::current_timestamp(),
622 duration_ms: None,
623 reason: None,
624 })
625 .await;
626 }
627
628 pub async fn record_message_received(&self, message: &Message) {
629 self.messages_received.fetch_add(1, Ordering::SeqCst);
630 self.bytes_received
631 .fetch_add(Self::message_size(message), Ordering::SeqCst);
632
633 self.add_event(ConnectionEvent {
634 event_type: ConnectionEventType::MessageReceived,
635 timestamp: Self::current_timestamp(),
636 duration_ms: None,
637 reason: None,
638 })
639 .await;
640 }
641
642 pub async fn get_stats(&self) -> ConnectionStats {
643 let now = Instant::now();
644 let elapsed = now.duration_since(self.start_time);
645
646 let connection_latencies = self.connection_latencies.read().await;
647 let avg_latency = if connection_latencies.is_empty() {
648 0.0
649 } else {
650 connection_latencies.iter().sum::<Duration>().as_millis() as f64
651 / connection_latencies.len() as f64
652 };
653
654 let last_latency = connection_latencies
655 .last()
656 .map(|d| d.as_millis() as f64)
657 .unwrap_or(0.0);
658
659 let total_uptime = *self.total_uptime.read().await;
660 let current_uptime =
661 if let Some(connection_start) = *self.current_connection_start.read().await {
662 now.duration_since(connection_start)
663 } else {
664 Duration::ZERO
665 };
666
667 let time_since_last_disconnection =
668 if let Some(last_disc) = *self.last_disconnection.read().await {
669 now.duration_since(last_disc)
670 } else {
671 elapsed
672 };
673
674 let messages_sent = self.messages_sent.load(Ordering::SeqCst);
675 let messages_received = self.messages_received.load(Ordering::SeqCst);
676 let bytes_sent = self.bytes_sent.load(Ordering::SeqCst);
677 let bytes_received = self.bytes_received.load(Ordering::SeqCst);
678
679 let elapsed_seconds = elapsed.as_secs_f64();
680
681 ConnectionStats {
682 connection_attempts: self.connection_attempts.load(Ordering::SeqCst),
683 successful_connections: self.successful_connections.load(Ordering::SeqCst),
684 failed_connections: self.failed_connections.load(Ordering::SeqCst),
685 disconnections: self.disconnections.load(Ordering::SeqCst),
686 reconnections: self.reconnections.load(Ordering::SeqCst),
687 avg_connection_latency_ms: avg_latency,
688 last_connection_latency_ms: last_latency,
689 total_uptime_seconds: total_uptime.as_secs_f64(),
690 current_uptime_seconds: current_uptime.as_secs_f64(),
691 time_since_last_disconnection_seconds: time_since_last_disconnection.as_secs_f64(),
692 messages_sent,
693 messages_received,
694 bytes_sent,
695 bytes_received,
696 avg_messages_sent_per_second: if elapsed_seconds > 0.0 {
697 messages_sent as f64 / elapsed_seconds
698 } else {
699 0.0
700 },
701 avg_messages_received_per_second: if elapsed_seconds > 0.0 {
702 messages_received as f64 / elapsed_seconds
703 } else {
704 0.0
705 },
706 avg_bytes_sent_per_second: if elapsed_seconds > 0.0 {
707 bytes_sent as f64 / elapsed_seconds
708 } else {
709 0.0
710 },
711 avg_bytes_received_per_second: if elapsed_seconds > 0.0 {
712 bytes_received as f64 / elapsed_seconds
713 } else {
714 0.0
715 },
716 is_connected: self.is_connected.load(Ordering::SeqCst),
717 connection_history: self.event_history.read().await.clone(),
718 }
719 }
720
721 fn message_size(message: &Message) -> u64 {
722 match message {
723 Message::Text(text) => text.len() as u64,
724 Message::Binary(data) => data.len() as u64,
725 Message::Ping(data) => data.len() as u64,
726 Message::Pong(data) => data.len() as u64,
727 Message::Close(_) => 0,
728 Message::Frame(_) => 0,
729 }
730 }
731
732 fn current_timestamp() -> u64 {
733 std::time::SystemTime::now()
734 .duration_since(std::time::UNIX_EPOCH)
735 .unwrap_or_default()
736 .as_millis() as u64
737 }
738
739 async fn add_event(&self, event: ConnectionEvent) {
740 let mut history = self.event_history.write().await;
741 history.push(event);
742
743 if history.len() > 100 {
745 history.drain(0..50); }
747 }
748}
749
750impl Default for StatisticsTracker {
751 fn default() -> Self {
752 Self::new()
753 }
754}
755
756pub struct TrackedSender<T> {
758 inner: AsyncSender<T>,
759 stats: Arc<StatisticsTracker>,
760}
761
762impl<T> TrackedSender<T> {
763 pub fn new(sender: AsyncSender<T>, stats: Arc<StatisticsTracker>) -> Self {
764 Self {
765 inner: sender,
766 stats,
767 }
768 }
769
770 pub async fn send(&self, item: T) -> Result<(), kanal::SendError> {
771 let result = self.inner.send(item).await;
772
773 if result.is_ok() {
775 let stats = self.stats.clone();
777 tokio::spawn(async move {
778 stats
781 .messages_sent
782 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
783 });
784 }
785
786 result
787 }
788}
789
790pub struct TrackedReceiver<T> {
792 inner: AsyncReceiver<T>,
793 stats: Arc<StatisticsTracker>,
794}
795
796impl<T> TrackedReceiver<T> {
797 pub fn new(receiver: AsyncReceiver<T>, stats: Arc<StatisticsTracker>) -> Self {
798 Self {
799 inner: receiver,
800 stats,
801 }
802 }
803
804 pub async fn recv(&self) -> Result<T, kanal::ReceiveError> {
805 let result = self.inner.recv().await;
806
807 if result.is_ok() {
809 let stats = self.stats.clone();
811 tokio::spawn(async move {
812 stats
815 .messages_received
816 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
817 });
818 }
819
820 result
821 }
822}