1use crate::error::StreamError;
13use futures_util::{SinkExt, StreamExt};
14use std::time::Duration;
15use tokio::sync::mpsc;
16use tokio::time;
17use tokio_tungstenite::{connect_async, tungstenite::Message};
18use tracing::{debug, info, warn};
19
20#[derive(Debug, Clone, Copy, Default)]
22pub struct WsStats {
23 pub total_messages_received: u64,
25 pub total_bytes_received: u64,
27}
28
29impl WsStats {
30 pub fn message_rate(&self, elapsed_ms: u64) -> f64 {
34 if elapsed_ms == 0 {
35 return 0.0;
36 }
37 self.total_messages_received as f64 / (elapsed_ms as f64 / 1000.0)
38 }
39
40 pub fn byte_rate(&self, elapsed_ms: u64) -> f64 {
44 if elapsed_ms == 0 {
45 return 0.0;
46 }
47 self.total_bytes_received as f64 / (elapsed_ms as f64 / 1000.0)
48 }
49
50 pub fn avg_message_size(&self) -> Option<f64> {
54 if self.total_messages_received == 0 {
55 return None;
56 }
57 Some(self.total_bytes_received as f64 / self.total_messages_received as f64)
58 }
59
60 pub fn total_data_mb(&self) -> f64 {
62 self.total_bytes_received as f64 / 1_048_576.0
63 }
64
65 pub fn total_data_kb(&self) -> f64 {
67 self.total_bytes_received as f64 / 1_024.0
68 }
69
70 pub fn bandwidth_bps(&self, elapsed_ms: u64) -> f64 {
74 if elapsed_ms == 0 {
75 return 0.0;
76 }
77 self.total_bytes_received as f64 * 1_000.0 / elapsed_ms as f64
78 }
79
80 pub fn messages_per_byte(&self) -> Option<f64> {
85 if self.total_bytes_received == 0 {
86 return None;
87 }
88 Some(self.total_messages_received as f64 / self.total_bytes_received as f64)
89 }
90
91 pub fn avg_message_size_bytes(&self) -> Option<f64> {
95 if self.total_messages_received == 0 {
96 return None;
97 }
98 Some(self.total_bytes_received as f64 / self.total_messages_received as f64)
99 }
100
101 pub fn bandwidth_kbps(&self, elapsed_ms: u64) -> f64 {
105 if elapsed_ms == 0 {
106 return 0.0;
107 }
108 self.total_bytes_received as f64 * 8.0 / 1_000.0 / (elapsed_ms as f64 / 1_000.0)
109 }
110
111 pub fn is_idle(&self, elapsed_ms: u64, min_rate: f64) -> bool {
116 self.message_rate(elapsed_ms) < min_rate
117 }
118
119 pub fn has_traffic(&self) -> bool {
121 self.total_messages_received > 0
122 }
123
124 pub fn is_high_volume(&self, threshold: u64) -> bool {
126 self.total_messages_received >= threshold
127 }
128
129 pub fn average_message_size_bytes(&self) -> Option<f64> {
133 self.bytes_per_message()
134 }
135
136 pub fn bytes_per_message(&self) -> Option<f64> {
140 if self.total_messages_received == 0 {
141 return None;
142 }
143 Some(self.total_bytes_received as f64 / self.total_messages_received as f64)
144 }
145
146 pub fn total_data_gb(&self) -> f64 {
148 self.total_bytes_received as f64 / 1_073_741_824.0
149 }
150
151 pub fn is_active(&self, min_messages: u64) -> bool {
153 self.total_messages_received >= min_messages
154 }
155
156 pub fn has_received_bytes(&self) -> bool {
158 self.total_bytes_received > 0
159 }
160
161 pub fn efficiency_ratio(&self) -> Option<f64> {
166 if self.total_bytes_received == 0 {
167 return None;
168 }
169 Some(self.total_messages_received as f64 / self.total_bytes_received as f64)
170 }
171
172 pub fn message_density(&self, elapsed_ms: u64) -> f64 {
176 self.message_rate(elapsed_ms)
177 }
178
179 pub fn compression_ratio(&self) -> Option<f64> {
184 self.efficiency_ratio()
185 }
186
187 pub fn uptime_fraction(&self, total_ms: u64) -> f64 {
195 if total_ms == 0 {
196 return 0.0;
197 }
198 (self.total_bytes_received as f64 / total_ms as f64).min(1.0)
199 }
200
201
202}
203
204#[derive(Debug, Clone)]
209pub struct ReconnectPolicy {
210 pub max_attempts: u32,
212 pub initial_backoff: Duration,
214 pub max_backoff: Duration,
216 pub multiplier: f64,
218 pub jitter: f64,
221}
222
223impl ReconnectPolicy {
224 pub fn new(
231 max_attempts: u32,
232 initial_backoff: Duration,
233 max_backoff: Duration,
234 multiplier: f64,
235 ) -> Result<Self, StreamError> {
236 if multiplier < 1.0 {
237 return Err(StreamError::ConfigError {
238 reason: format!(
239 "reconnect multiplier must be >= 1.0, got {multiplier}"
240 ),
241 });
242 }
243 if max_attempts == 0 {
244 return Err(StreamError::ConfigError {
245 reason: "max_attempts must be > 0".into(),
246 });
247 }
248 Ok(Self {
249 max_attempts,
250 initial_backoff,
251 max_backoff,
252 multiplier,
253 jitter: 0.0,
254 })
255 }
256
257 pub fn with_max_attempts(mut self, max_attempts: u32) -> Result<Self, StreamError> {
263 if max_attempts == 0 {
264 return Err(StreamError::ConfigError {
265 reason: "max_attempts must be > 0".into(),
266 });
267 }
268 self.max_attempts = max_attempts;
269 Ok(self)
270 }
271
272 pub fn with_multiplier(mut self, multiplier: f64) -> Result<Self, StreamError> {
279 if multiplier < 1.0 {
280 return Err(StreamError::ConfigError {
281 reason: format!("reconnect multiplier must be >= 1.0, got {multiplier}"),
282 });
283 }
284 self.multiplier = multiplier;
285 Ok(self)
286 }
287
288 pub fn with_initial_backoff(mut self, duration: Duration) -> Self {
290 self.initial_backoff = duration;
291 self
292 }
293
294 pub fn with_max_backoff(mut self, duration: Duration) -> Self {
296 self.max_backoff = duration;
297 self
298 }
299
300 pub fn with_jitter(mut self, ratio: f64) -> Result<Self, StreamError> {
310 if !(0.0..=1.0).contains(&ratio) {
311 return Err(StreamError::ConfigError {
312 reason: format!("jitter ratio must be in [0.0, 1.0], got {ratio}"),
313 });
314 }
315 self.jitter = ratio;
316 Ok(self)
317 }
318
319 pub fn total_max_delay(&self) -> Duration {
324 let total_ms: u64 = (0..self.max_attempts)
325 .map(|a| self.backoff_for_attempt(a).as_millis() as u64)
326 .fold(0u64, |acc, ms| acc.saturating_add(ms));
327 Duration::from_millis(total_ms)
328 }
329
330 pub fn max_attempts(&self) -> u32 {
332 self.max_attempts
333 }
334
335 pub fn total_attempts_remaining(&self, current_attempt: u32) -> u32 {
339 self.max_attempts.saturating_sub(current_attempt)
340 }
341
342 pub fn delay_for_next(&self, current_attempt: u32) -> Duration {
348 self.backoff_for_attempt(current_attempt.saturating_add(1))
349 }
350
351 pub fn is_exhausted(&self, attempts: u32) -> bool {
356 attempts >= self.max_attempts
357 }
358
359 pub fn backoff_for_attempt(&self, attempt: u32) -> Duration {
361 let factor = self.multiplier.powi(attempt as i32);
362 let max_ms = self.max_backoff.as_millis() as f64;
368 let base_ms = (self.initial_backoff.as_millis() as f64 * factor).min(max_ms);
369 let ms = if self.jitter > 0.0 {
370 let hash = (attempt as u64)
372 .wrapping_mul(2654435769)
373 .wrapping_add(1013904223);
374 let noise = (hash & 0xFFFF) as f64 / 65535.0; let delta = base_ms * self.jitter * (noise * 2.0 - 1.0); (base_ms + delta).clamp(0.0, max_ms)
377 } else {
378 base_ms
379 };
380 Duration::from_millis(ms as u64)
381 }
382}
383
384impl Default for ReconnectPolicy {
385 fn default() -> Self {
386 Self {
387 max_attempts: 10,
388 initial_backoff: Duration::from_millis(500),
389 max_backoff: Duration::from_secs(30),
390 multiplier: 2.0,
391 jitter: 0.0,
392 }
393 }
394}
395
396#[derive(Debug, Clone)]
398pub struct ConnectionConfig {
399 pub url: String,
401 pub channel_capacity: usize,
403 pub reconnect: ReconnectPolicy,
405 pub ping_interval: Duration,
407}
408
409impl ConnectionConfig {
410 pub fn new(url: impl Into<String>, channel_capacity: usize) -> Result<Self, StreamError> {
418 let url = url.into();
419 if url.is_empty() {
420 return Err(StreamError::ConfigError {
421 reason: "WebSocket URL must not be empty".into(),
422 });
423 }
424 if channel_capacity == 0 {
425 return Err(StreamError::ConfigError {
426 reason: "channel_capacity must be > 0".into(),
427 });
428 }
429 Ok(Self {
430 url,
431 channel_capacity,
432 reconnect: ReconnectPolicy::default(),
433 ping_interval: Duration::from_secs(20),
434 })
435 }
436
437 pub fn with_reconnect(mut self, policy: ReconnectPolicy) -> Self {
439 self.reconnect = policy;
440 self
441 }
442
443 pub fn with_ping_interval(mut self, interval: Duration) -> Self {
445 self.ping_interval = interval;
446 self
447 }
448
449 pub fn with_reconnect_attempts(mut self, n: u32) -> Result<Self, StreamError> {
456 self.reconnect = self.reconnect.with_max_attempts(n)?;
457 Ok(self)
458 }
459
460 pub fn with_channel_capacity(mut self, capacity: usize) -> Result<Self, StreamError> {
466 if capacity == 0 {
467 return Err(StreamError::ConfigError {
468 reason: "channel_capacity must be > 0".into(),
469 });
470 }
471 self.channel_capacity = capacity;
472 Ok(self)
473 }
474}
475
476pub struct WsManager {
482 config: ConnectionConfig,
483 connect_attempts: u32,
484 is_connected: bool,
485 stats: WsStats,
486}
487
488impl WsManager {
489 pub fn new(config: ConnectionConfig) -> Self {
491 Self {
492 config,
493 connect_attempts: 0,
494 is_connected: false,
495 stats: WsStats::default(),
496 }
497 }
498
499 pub async fn run(
516 &mut self,
517 message_tx: mpsc::Sender<String>,
518 mut outbound_rx: Option<mpsc::Receiver<String>>,
519 ) -> Result<(), StreamError> {
520 loop {
521 info!(url = %self.config.url, attempt = self.connect_attempts, "connecting");
522 match self.try_connect(&message_tx, &mut outbound_rx).await {
523 Ok(()) => {
524 self.is_connected = false;
526 debug!(url = %self.config.url, "connection closed cleanly");
527 if message_tx.is_closed() {
528 return Ok(());
529 }
530 }
531 Err(e) => {
532 self.is_connected = false;
533 warn!(url = %self.config.url, error = %e, "connection error");
534 }
535 }
536
537 if !self.can_reconnect() {
538 return Err(StreamError::ReconnectExhausted {
539 url: self.config.url.clone(),
540 attempts: self.connect_attempts,
541 });
542 }
543 let backoff = self.next_reconnect_backoff()?;
544 info!(url = %self.config.url, backoff_ms = backoff.as_millis(), "reconnecting after backoff");
545 tokio::time::sleep(backoff).await;
546 }
547 }
548
549 async fn try_connect(
551 &mut self,
552 message_tx: &mpsc::Sender<String>,
553 outbound_rx: &mut Option<mpsc::Receiver<String>>,
554 ) -> Result<(), StreamError> {
555 let (ws_stream, _response) =
556 connect_async(&self.config.url)
557 .await
558 .map_err(|e| StreamError::ConnectionFailed {
559 url: self.config.url.clone(),
560 reason: e.to_string(),
561 })?;
562
563 self.is_connected = true;
564 self.connect_attempts += 1;
565 info!(url = %self.config.url, "connected");
566
567 let (mut write, mut read) = ws_stream.split();
568 let mut ping_interval = time::interval(self.config.ping_interval);
569 ping_interval.tick().await;
571
572 loop {
573 tokio::select! {
574 msg = read.next() => {
575 match msg {
576 Some(Ok(Message::Text(text))) => {
577 self.stats.total_messages_received += 1;
578 self.stats.total_bytes_received += text.len() as u64;
579 if message_tx.send(text.to_string()).await.is_err() {
580 return Ok(());
582 }
583 }
584 Some(Ok(Message::Binary(bytes))) => {
585 self.stats.total_messages_received += 1;
586 self.stats.total_bytes_received += bytes.len() as u64;
587 if let Ok(text) = String::from_utf8(bytes.to_vec()) {
588 if message_tx.send(text).await.is_err() {
589 return Ok(());
590 }
591 }
592 }
593 Some(Ok(Message::Ping(_) | Message::Pong(_) | Message::Frame(_))) => {
594 }
596 Some(Ok(Message::Close(_))) | None => {
597 return Ok(());
598 }
599 Some(Err(e)) => {
600 return Err(StreamError::WebSocket(e.to_string()));
601 }
602 }
603 }
604 _ = ping_interval.tick() => {
605 debug!(url = %self.config.url, "sending keepalive ping");
606 if write.send(Message::Ping(vec![].into())).await.is_err() {
607 return Ok(());
608 }
609 }
610 outbound = recv_outbound(outbound_rx) => {
611 if let Some(text) = outbound {
612 let _ = write.send(Message::Text(text.into())).await;
613 }
614 }
615 }
616 }
617 }
618
619 pub fn connect_simulated(&mut self) {
622 self.connect_attempts += 1;
623 self.is_connected = true;
624 }
625
626 pub fn disconnect_simulated(&mut self) {
628 self.is_connected = false;
629 }
630
631 pub fn is_connected(&self) -> bool {
633 self.is_connected
634 }
635
636 pub fn connect_attempts(&self) -> u32 {
638 self.connect_attempts
639 }
640
641 pub fn config(&self) -> &ConnectionConfig {
643 &self.config
644 }
645
646 pub fn stats(&self) -> &WsStats {
648 &self.stats
649 }
650
651 pub fn can_reconnect(&self) -> bool {
653 self.connect_attempts < self.config.reconnect.max_attempts
654 }
655
656 pub fn next_reconnect_backoff(&mut self) -> Result<Duration, StreamError> {
658 if !self.can_reconnect() {
659 return Err(StreamError::ReconnectExhausted {
660 url: self.config.url.clone(),
661 attempts: self.connect_attempts,
662 });
663 }
664 let backoff = self
665 .config
666 .reconnect
667 .backoff_for_attempt(self.connect_attempts);
668 self.connect_attempts += 1;
669 Ok(backoff)
670 }
671}
672
673async fn recv_outbound(rx: &mut Option<mpsc::Receiver<String>>) -> Option<String> {
678 match rx {
679 Some(rx) => rx.recv().await,
680 None => std::future::pending().await,
681 }
682}
683
684#[cfg(test)]
685mod tests {
686 use super::*;
687
688 fn default_config() -> ConnectionConfig {
689 ConnectionConfig::new("wss://example.com/ws", 1024).unwrap()
690 }
691
692 #[test]
693 fn test_reconnect_policy_default_values() {
694 let p = ReconnectPolicy::default();
695 assert_eq!(p.max_attempts, 10);
696 assert_eq!(p.multiplier, 2.0);
697 }
698
699 #[test]
700 fn test_reconnect_policy_backoff_exponential() {
701 let p = ReconnectPolicy::new(10, Duration::from_millis(100), Duration::from_secs(30), 2.0)
702 .unwrap();
703 assert_eq!(p.backoff_for_attempt(0), Duration::from_millis(100));
704 assert_eq!(p.backoff_for_attempt(1), Duration::from_millis(200));
705 assert_eq!(p.backoff_for_attempt(2), Duration::from_millis(400));
706 }
707
708 #[test]
709 fn test_reconnect_policy_backoff_capped_at_max() {
710 let p = ReconnectPolicy::new(10, Duration::from_millis(1000), Duration::from_secs(5), 2.0)
711 .unwrap();
712 let backoff = p.backoff_for_attempt(10);
713 assert!(backoff <= Duration::from_secs(5));
714 }
715
716 #[test]
717 fn test_reconnect_policy_multiplier_below_1_rejected() {
718 let result =
719 ReconnectPolicy::new(10, Duration::from_millis(100), Duration::from_secs(30), 0.5);
720 assert!(matches!(result, Err(StreamError::ConfigError { .. })));
721 }
722
723 #[test]
724 fn test_reconnect_policy_zero_attempts_rejected() {
725 let result =
726 ReconnectPolicy::new(0, Duration::from_millis(100), Duration::from_secs(30), 2.0);
727 assert!(matches!(result, Err(StreamError::ConfigError { .. })));
728 }
729
730 #[test]
731 fn test_connection_config_empty_url_rejected() {
732 let result = ConnectionConfig::new("", 1024);
733 assert!(matches!(result, Err(StreamError::ConfigError { .. })));
734 }
735
736 #[test]
737 fn test_connection_config_zero_capacity_rejected() {
738 let result = ConnectionConfig::new("wss://example.com", 0);
739 assert!(matches!(result, Err(StreamError::ConfigError { .. })));
740 }
741
742 #[test]
743 fn test_connection_config_with_reconnect() {
744 let policy =
745 ReconnectPolicy::new(3, Duration::from_millis(200), Duration::from_secs(10), 2.0)
746 .unwrap();
747 let config = default_config().with_reconnect(policy);
748 assert_eq!(config.reconnect.max_attempts, 3);
749 }
750
751 #[test]
752 fn test_connection_config_with_ping_interval() {
753 let config = default_config().with_ping_interval(Duration::from_secs(30));
754 assert_eq!(config.ping_interval, Duration::from_secs(30));
755 }
756
757 #[test]
758 fn test_ws_manager_initial_state() {
759 let mgr = WsManager::new(default_config());
760 assert!(!mgr.is_connected());
761 assert_eq!(mgr.connect_attempts(), 0);
762 }
763
764 #[test]
765 fn test_ws_manager_connect_simulated() {
766 let mut mgr = WsManager::new(default_config());
767 mgr.connect_simulated();
768 assert!(mgr.is_connected());
769 assert_eq!(mgr.connect_attempts(), 1);
770 }
771
772 #[test]
773 fn test_ws_manager_disconnect_simulated() {
774 let mut mgr = WsManager::new(default_config());
775 mgr.connect_simulated();
776 mgr.disconnect_simulated();
777 assert!(!mgr.is_connected());
778 }
779
780 #[test]
781 fn test_ws_manager_can_reconnect_within_limit() {
782 let mut mgr = WsManager::new(
783 default_config().with_reconnect(
784 ReconnectPolicy::new(3, Duration::from_millis(10), Duration::from_secs(1), 2.0)
785 .unwrap(),
786 ),
787 );
788 assert!(mgr.can_reconnect());
789 mgr.next_reconnect_backoff().unwrap();
790 mgr.next_reconnect_backoff().unwrap();
791 mgr.next_reconnect_backoff().unwrap();
792 assert!(!mgr.can_reconnect());
793 }
794
795 #[test]
796 fn test_ws_manager_reconnect_exhausted_error() {
797 let mut mgr = WsManager::new(
798 default_config().with_reconnect(
799 ReconnectPolicy::new(1, Duration::from_millis(10), Duration::from_secs(1), 2.0)
800 .unwrap(),
801 ),
802 );
803 mgr.next_reconnect_backoff().unwrap();
804 let result = mgr.next_reconnect_backoff();
805 assert!(matches!(
806 result,
807 Err(StreamError::ReconnectExhausted { .. })
808 ));
809 }
810
811 #[test]
812 fn test_ws_manager_backoff_increases() {
813 let mut mgr = WsManager::new(
814 default_config().with_reconnect(
815 ReconnectPolicy::new(5, Duration::from_millis(100), Duration::from_secs(30), 2.0)
816 .unwrap(),
817 ),
818 );
819 let b0 = mgr.next_reconnect_backoff().unwrap();
820 let b1 = mgr.next_reconnect_backoff().unwrap();
821 assert!(b1 >= b0);
822 }
823
824 #[test]
825 fn test_ws_manager_config_accessor() {
826 let mgr = WsManager::new(default_config());
827 assert_eq!(mgr.config().url, "wss://example.com/ws");
828 assert_eq!(mgr.config().channel_capacity, 1024);
829 }
830
831 #[tokio::test]
835 async fn test_recv_outbound_none_is_always_pending() {
836 let mut rx: Option<mpsc::Receiver<String>> = None;
837 tokio::select! {
839 _ = recv_outbound(&mut rx) => {
840 panic!("recv_outbound(None) should never resolve");
841 }
842 _ = std::future::ready(()) => {
843 }
845 }
846 }
847
848 #[tokio::test]
850 async fn test_recv_outbound_some_resolves_with_message() {
851 let (tx, mut channel_rx) = mpsc::channel::<String>(1);
852 tx.send("subscribe".into()).await.unwrap();
853 let mut rx: Option<mpsc::Receiver<String>> = Some(channel_rx);
854 let msg = recv_outbound(&mut rx).await;
855 assert_eq!(msg.as_deref(), Some("subscribe"));
856 let _ = rx;
858 }
859
860 #[test]
861 fn test_ws_stats_initial_zero() {
862 let mgr = WsManager::new(default_config());
863 let s = mgr.stats();
864 assert_eq!(s.total_messages_received, 0);
865 assert_eq!(s.total_bytes_received, 0);
866 }
867
868 #[test]
869 fn test_ws_stats_default() {
870 let s = WsStats::default();
871 assert_eq!(s.total_messages_received, 0);
872 assert_eq!(s.total_bytes_received, 0);
873 }
874
875 #[test]
876 fn test_reconnect_policy_with_jitter_valid() {
877 let p = ReconnectPolicy::new(10, Duration::from_millis(100), Duration::from_secs(30), 2.0)
878 .unwrap()
879 .with_jitter(0.5)
880 .unwrap();
881 assert_eq!(p.jitter, 0.5);
882 }
883
884 #[test]
885 fn test_reconnect_policy_with_jitter_zero_is_deterministic() {
886 let p = ReconnectPolicy::new(10, Duration::from_millis(100), Duration::from_secs(30), 2.0)
887 .unwrap()
888 .with_jitter(0.0)
889 .unwrap();
890 let b0 = p.backoff_for_attempt(0);
892 let b1 = p.backoff_for_attempt(0);
893 assert_eq!(b0, b1);
894 assert_eq!(b0, Duration::from_millis(100));
895 }
896
897 #[test]
898 fn test_reconnect_policy_with_jitter_invalid_ratio() {
899 let result =
900 ReconnectPolicy::new(10, Duration::from_millis(100), Duration::from_secs(30), 2.0)
901 .unwrap()
902 .with_jitter(1.5);
903 assert!(matches!(result, Err(StreamError::ConfigError { .. })));
904 }
905
906 #[test]
907 fn test_reconnect_policy_with_jitter_negative_ratio() {
908 let result =
909 ReconnectPolicy::new(10, Duration::from_millis(100), Duration::from_secs(30), 2.0)
910 .unwrap()
911 .with_jitter(-0.1);
912 assert!(matches!(result, Err(StreamError::ConfigError { .. })));
913 }
914
915 #[test]
916 fn test_reconnect_policy_with_jitter_stays_within_bounds() {
917 let p = ReconnectPolicy::new(20, Duration::from_millis(100), Duration::from_secs(30), 2.0)
918 .unwrap()
919 .with_jitter(1.0)
920 .unwrap();
921 for attempt in 0..20 {
923 let b = p.backoff_for_attempt(attempt);
924 assert!(b <= Duration::from_secs(30), "attempt {attempt} exceeded max_backoff");
925 }
926 }
927
928 #[test]
929 fn test_reconnect_policy_with_max_attempts_valid() {
930 let p = ReconnectPolicy::default().with_max_attempts(5).unwrap();
931 assert_eq!(p.max_attempts, 5);
932 }
933
934 #[test]
935 fn test_reconnect_policy_with_max_attempts_zero_rejected() {
936 let result = ReconnectPolicy::default().with_max_attempts(0);
937 assert!(matches!(result, Err(StreamError::ConfigError { .. })));
938 }
939
940 #[test]
941 fn test_connection_config_with_reconnect_attempts_valid() {
942 let config = default_config().with_reconnect_attempts(3).unwrap();
943 assert_eq!(config.reconnect.max_attempts, 3);
944 }
945
946 #[test]
947 fn test_connection_config_with_reconnect_attempts_zero_rejected() {
948 let result = default_config().with_reconnect_attempts(0);
949 assert!(matches!(result, Err(StreamError::ConfigError { .. })));
950 }
951
952 #[test]
953 fn test_reconnect_policy_total_max_delay_sum_of_backoffs() {
954 let p = ReconnectPolicy::new(3, Duration::from_millis(100), Duration::from_secs(30), 2.0)
955 .unwrap();
956 assert_eq!(p.total_max_delay(), Duration::from_millis(700));
958 }
959
960 #[test]
961 fn test_reconnect_policy_total_max_delay_capped_by_max_backoff() {
962 let p = ReconnectPolicy::new(5, Duration::from_millis(1000), Duration::from_millis(500), 2.0)
964 .unwrap();
965 assert_eq!(p.total_max_delay(), Duration::from_millis(2500));
967 }
968
969 #[test]
970 fn test_connection_config_with_channel_capacity_valid() {
971 let config = default_config().with_channel_capacity(512).unwrap();
972 assert_eq!(config.channel_capacity, 512);
973 }
974
975 #[test]
976 fn test_connection_config_with_channel_capacity_zero_rejected() {
977 let result = default_config().with_channel_capacity(0);
978 assert!(matches!(result, Err(StreamError::ConfigError { .. })));
979 }
980
981 #[test]
982 fn test_reconnect_policy_with_jitter_varies_across_attempts() {
983 let p = ReconnectPolicy::new(10, Duration::from_millis(1000), Duration::from_secs(30), 1.0)
984 .unwrap()
985 .with_jitter(0.5)
986 .unwrap();
987 let values: Vec<Duration> = (0..10).map(|a| p.backoff_for_attempt(a)).collect();
990 let unique: std::collections::HashSet<u64> =
991 values.iter().map(|d| d.as_millis() as u64).collect();
992 assert!(unique.len() > 1, "jitter should produce variation across attempts");
993 }
994
995 #[test]
998 fn test_with_initial_backoff_sets_value() {
999 let p = ReconnectPolicy::default()
1000 .with_initial_backoff(Duration::from_secs(2));
1001 assert_eq!(p.initial_backoff, Duration::from_secs(2));
1002 }
1003
1004 #[test]
1005 fn test_with_max_backoff_sets_value() {
1006 let p = ReconnectPolicy::default()
1007 .with_max_backoff(Duration::from_secs(60));
1008 assert_eq!(p.max_backoff, Duration::from_secs(60));
1009 }
1010
1011 #[test]
1012 fn test_with_initial_backoff_affects_first_attempt() {
1013 let p = ReconnectPolicy::default()
1014 .with_initial_backoff(Duration::from_millis(200));
1015 assert_eq!(p.backoff_for_attempt(0), Duration::from_millis(200));
1016 }
1017
1018 #[test]
1021 fn test_with_multiplier_valid() {
1022 let p = ReconnectPolicy::default().with_multiplier(3.0).unwrap();
1023 assert_eq!(p.multiplier, 3.0);
1024 }
1025
1026 #[test]
1027 fn test_with_multiplier_below_one_rejected() {
1028 let result = ReconnectPolicy::default().with_multiplier(0.9);
1029 assert!(matches!(result, Err(StreamError::ConfigError { .. })));
1030 }
1031
1032 #[test]
1033 fn test_with_multiplier_exactly_one_accepted() {
1034 let p = ReconnectPolicy::default().with_multiplier(1.0).unwrap();
1035 assert_eq!(p.multiplier, 1.0);
1036 }
1037
1038 #[test]
1041 fn test_message_rate_zero_elapsed_returns_zero() {
1042 let stats = WsStats {
1043 total_messages_received: 100,
1044 total_bytes_received: 50_000,
1045 };
1046 assert_eq!(stats.message_rate(0), 0.0);
1047 assert_eq!(stats.byte_rate(0), 0.0);
1048 }
1049
1050 #[test]
1051 fn test_message_rate_100_messages_in_1s() {
1052 let stats = WsStats {
1053 total_messages_received: 100,
1054 total_bytes_received: 0,
1055 };
1056 let rate = stats.message_rate(1_000); assert!((rate - 100.0).abs() < 1e-9);
1058 }
1059
1060 #[test]
1061 fn test_byte_rate_1mb_in_1s() {
1062 let stats = WsStats {
1063 total_messages_received: 0,
1064 total_bytes_received: 1_000_000,
1065 };
1066 let rate = stats.byte_rate(1_000); assert!((rate - 1_000_000.0).abs() < 1.0);
1068 }
1069
1070 #[test]
1073 fn test_avg_message_size_none_when_no_messages() {
1074 let stats = WsStats::default();
1075 assert!(stats.avg_message_size().is_none());
1076 }
1077
1078 #[test]
1079 fn test_avg_message_size_basic() {
1080 let stats = WsStats {
1081 total_messages_received: 10,
1082 total_bytes_received: 1_000,
1083 };
1084 let avg = stats.avg_message_size().unwrap();
1085 assert!((avg - 100.0).abs() < 1e-9);
1086 }
1087
1088 #[test]
1091 fn test_total_data_mb_zero_bytes() {
1092 let stats = WsStats::default();
1093 assert!((stats.total_data_mb() - 0.0).abs() < 1e-9);
1094 }
1095
1096 #[test]
1097 fn test_total_data_mb_one_mib() {
1098 let stats = WsStats {
1099 total_messages_received: 1,
1100 total_bytes_received: 1_048_576,
1101 };
1102 assert!((stats.total_data_mb() - 1.0).abs() < 1e-9);
1103 }
1104
1105 #[test]
1106 fn test_max_attempts_getter_matches_field() {
1107 let p = ReconnectPolicy::default();
1108 assert_eq!(p.max_attempts(), p.max_attempts);
1109 }
1110
1111 #[test]
1112 fn test_max_attempts_getter_after_new() {
1113 let p = ReconnectPolicy::new(
1114 7,
1115 std::time::Duration::from_millis(100),
1116 std::time::Duration::from_secs(30),
1117 2.0,
1118 )
1119 .unwrap();
1120 assert_eq!(p.max_attempts(), 7);
1121 }
1122
1123 #[test]
1126 fn test_is_idle_below_min_rate() {
1127 let stats = WsStats {
1128 total_messages_received: 1,
1129 total_bytes_received: 0,
1130 };
1131 assert!(stats.is_idle(10_000, 1.0));
1133 }
1134
1135 #[test]
1136 fn test_is_idle_above_min_rate() {
1137 let stats = WsStats {
1138 total_messages_received: 100,
1139 total_bytes_received: 0,
1140 };
1141 assert!(!stats.is_idle(1_000, 1.0));
1143 }
1144
1145 #[test]
1146 fn test_is_idle_zero_messages_always_idle() {
1147 let stats = WsStats::default();
1148 assert!(stats.is_idle(1_000, 0.001));
1149 }
1150
1151 #[test]
1152 fn test_total_attempts_remaining_full() {
1153 let p = ReconnectPolicy::default(); assert_eq!(p.total_attempts_remaining(0), 10);
1155 }
1156
1157 #[test]
1158 fn test_total_attempts_remaining_partial() {
1159 let p = ReconnectPolicy::default();
1160 assert_eq!(p.total_attempts_remaining(3), 7);
1161 }
1162
1163 #[test]
1164 fn test_total_attempts_remaining_exhausted() {
1165 let p = ReconnectPolicy::default();
1166 assert_eq!(p.total_attempts_remaining(10), 0);
1167 assert_eq!(p.total_attempts_remaining(99), 0);
1168 }
1169
1170 #[test]
1173 fn test_has_traffic_false_when_no_messages() {
1174 let stats = WsStats::default();
1175 assert!(!stats.has_traffic());
1176 }
1177
1178 #[test]
1179 fn test_has_traffic_true_after_one_message() {
1180 let stats = WsStats {
1181 total_messages_received: 1,
1182 total_bytes_received: 0,
1183 };
1184 assert!(stats.has_traffic());
1185 }
1186
1187 #[test]
1188 fn test_has_traffic_true_with_many_messages() {
1189 let stats = WsStats {
1190 total_messages_received: 1_000,
1191 total_bytes_received: 50_000,
1192 };
1193 assert!(stats.has_traffic());
1194 }
1195
1196 #[test]
1199 fn test_is_high_volume_true_at_threshold() {
1200 let stats = WsStats { total_messages_received: 1_000, total_bytes_received: 0 };
1201 assert!(stats.is_high_volume(1_000));
1202 }
1203
1204 #[test]
1205 fn test_is_high_volume_false_below_threshold() {
1206 let stats = WsStats { total_messages_received: 500, total_bytes_received: 0 };
1207 assert!(!stats.is_high_volume(1_000));
1208 }
1209
1210 #[test]
1211 fn test_is_high_volume_true_above_threshold() {
1212 let stats = WsStats { total_messages_received: 2_000, total_bytes_received: 0 };
1213 assert!(stats.is_high_volume(1_000));
1214 }
1215
1216 #[test]
1219 fn test_bytes_per_message_none_when_no_messages() {
1220 let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1221 assert!(stats.bytes_per_message().is_none());
1222 }
1223
1224 #[test]
1225 fn test_bytes_per_message_correct_value() {
1226 let stats = WsStats { total_messages_received: 4, total_bytes_received: 400 };
1227 assert_eq!(stats.bytes_per_message(), Some(100.0));
1228 }
1229
1230 #[test]
1231 fn test_bytes_per_message_fractional() {
1232 let stats = WsStats { total_messages_received: 3, total_bytes_received: 10 };
1233 let bpm = stats.bytes_per_message().unwrap();
1234 assert!((bpm - 10.0 / 3.0).abs() < 1e-10);
1235 }
1236
1237 #[test]
1240 fn test_delay_for_next_is_backoff_for_attempt_plus_one() {
1241 let policy = ReconnectPolicy::new(
1242 10,
1243 Duration::from_millis(100),
1244 Duration::from_secs(60),
1245 2.0,
1246 )
1247 .unwrap();
1248 assert_eq!(
1249 policy.delay_for_next(0),
1250 policy.backoff_for_attempt(1)
1251 );
1252 assert_eq!(
1253 policy.delay_for_next(3),
1254 policy.backoff_for_attempt(4)
1255 );
1256 }
1257
1258 #[test]
1259 fn test_delay_for_next_saturates_at_max_backoff() {
1260 let policy = ReconnectPolicy::new(
1261 10,
1262 Duration::from_millis(100),
1263 Duration::from_secs(1),
1264 2.0,
1265 )
1266 .unwrap();
1267 assert!(policy.delay_for_next(100) <= Duration::from_secs(1));
1269 }
1270
1271 #[test]
1274 fn test_message_rate_zero_when_elapsed_is_zero() {
1275 let stats = WsStats { total_messages_received: 1_000, total_bytes_received: 0 };
1276 assert_eq!(stats.message_rate(0), 0.0);
1277 }
1278
1279 #[test]
1280 fn test_message_rate_correct_value() {
1281 let stats = WsStats { total_messages_received: 100, total_bytes_received: 0 };
1282 assert!((stats.message_rate(10_000) - 10.0).abs() < 1e-10);
1284 }
1285
1286 #[test]
1287 fn test_message_rate_zero_messages() {
1288 let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1289 assert_eq!(stats.message_rate(5_000), 0.0);
1290 }
1291
1292 #[test]
1295 fn test_total_data_mb_zero_when_no_bytes() {
1296 let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1297 assert_eq!(stats.total_data_mb(), 0.0);
1298 }
1299
1300 #[test]
1301 fn test_total_data_mb_fractional() {
1302 let stats = WsStats { total_messages_received: 1, total_bytes_received: 524_288 };
1303 assert!((stats.total_data_mb() - 0.5).abs() < 1e-10);
1304 }
1305
1306 #[test]
1309 fn test_is_exhausted_true_at_max_attempts() {
1310 let policy = ReconnectPolicy::new(5, Duration::from_millis(100), Duration::from_secs(10), 2.0).unwrap();
1311 assert!(policy.is_exhausted(5));
1312 }
1313
1314 #[test]
1315 fn test_is_exhausted_true_beyond_max_attempts() {
1316 let policy = ReconnectPolicy::new(5, Duration::from_millis(100), Duration::from_secs(10), 2.0).unwrap();
1317 assert!(policy.is_exhausted(10));
1318 }
1319
1320 #[test]
1321 fn test_is_exhausted_false_below_max_attempts() {
1322 let policy = ReconnectPolicy::new(5, Duration::from_millis(100), Duration::from_secs(10), 2.0).unwrap();
1323 assert!(!policy.is_exhausted(4));
1324 }
1325
1326 #[test]
1329 fn test_total_data_kb_zero_when_no_bytes() {
1330 let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1331 assert_eq!(stats.total_data_kb(), 0.0);
1332 }
1333
1334 #[test]
1335 fn test_total_data_kb_one_kib() {
1336 let stats = WsStats { total_messages_received: 1, total_bytes_received: 1_024 };
1337 assert!((stats.total_data_kb() - 1.0).abs() < 1e-10);
1338 }
1339
1340 #[test]
1341 fn test_total_data_kb_equals_1024_times_mb() {
1342 let stats = WsStats { total_messages_received: 1, total_bytes_received: 2_097_152 };
1343 assert!((stats.total_data_kb() - stats.total_data_mb() * 1_024.0).abs() < 1e-6);
1344 }
1345
1346 #[test]
1349 fn test_bandwidth_bps_zero_when_elapsed_zero() {
1350 let stats = WsStats { total_messages_received: 0, total_bytes_received: 1_000 };
1351 assert_eq!(stats.bandwidth_bps(0), 0.0);
1352 }
1353
1354 #[test]
1355 fn test_bandwidth_bps_correct_value() {
1356 let stats = WsStats { total_messages_received: 1, total_bytes_received: 10_000 };
1357 assert!((stats.bandwidth_bps(1_000) - 10_000.0).abs() < 1e-6);
1359 }
1360
1361 #[test]
1362 fn test_bandwidth_bps_zero_bytes() {
1363 let stats = WsStats { total_messages_received: 5, total_bytes_received: 0 };
1364 assert_eq!(stats.bandwidth_bps(5_000), 0.0);
1365 }
1366
1367 #[test]
1370 fn test_messages_per_byte_none_when_no_bytes() {
1371 let stats = WsStats { total_messages_received: 5, total_bytes_received: 0 };
1372 assert!(stats.messages_per_byte().is_none());
1373 }
1374
1375 #[test]
1376 fn test_messages_per_byte_correct_value() {
1377 let stats = WsStats { total_messages_received: 100, total_bytes_received: 10_000 };
1378 assert!((stats.messages_per_byte().unwrap() - 0.01).abs() < 1e-12);
1380 }
1381
1382 #[test]
1383 fn test_messages_per_byte_less_than_one_for_large_messages() {
1384 let stats = WsStats { total_messages_received: 1, total_bytes_received: 500 };
1385 let mpb = stats.messages_per_byte().unwrap();
1386 assert!(mpb < 1.0);
1387 }
1388
1389 #[test]
1391 fn test_avg_message_size_bytes_none_when_no_messages() {
1392 let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1393 assert!(stats.avg_message_size_bytes().is_none());
1394 }
1395
1396 #[test]
1397 fn test_avg_message_size_bytes_correct_value() {
1398 let stats = WsStats { total_messages_received: 10, total_bytes_received: 5_000 };
1399 assert!((stats.avg_message_size_bytes().unwrap() - 500.0).abs() < 1e-12);
1400 }
1401
1402 #[test]
1403 fn test_avg_message_size_bytes_one_message() {
1404 let stats = WsStats { total_messages_received: 1, total_bytes_received: 256 };
1405 assert!((stats.avg_message_size_bytes().unwrap() - 256.0).abs() < 1e-12);
1406 }
1407
1408 #[test]
1410 fn test_bandwidth_kbps_zero_when_elapsed_zero() {
1411 let stats = WsStats { total_messages_received: 10, total_bytes_received: 50_000 };
1412 assert_eq!(stats.bandwidth_kbps(0), 0.0);
1413 }
1414
1415 #[test]
1416 fn test_bandwidth_kbps_correct_value() {
1417 let stats = WsStats { total_messages_received: 1, total_bytes_received: 100_000 };
1419 let kbps = stats.bandwidth_kbps(1_000);
1420 assert!((kbps - 800.0).abs() < 1e-10, "got {kbps}");
1421 }
1422
1423 #[test]
1424 fn test_bandwidth_kbps_zero_when_no_data() {
1425 let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1426 assert_eq!(stats.bandwidth_kbps(5_000), 0.0);
1427 }
1428
1429 #[test]
1432 fn test_total_data_gb_zero_when_no_bytes() {
1433 let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1434 assert_eq!(stats.total_data_gb(), 0.0);
1435 }
1436
1437 #[test]
1438 fn test_total_data_gb_one_gib() {
1439 let stats = WsStats { total_messages_received: 1, total_bytes_received: 1_073_741_824 };
1440 assert!((stats.total_data_gb() - 1.0).abs() < 1e-10);
1441 }
1442
1443 #[test]
1444 fn test_total_data_gb_equals_1024_times_mb() {
1445 let stats = WsStats { total_messages_received: 1, total_bytes_received: 2_147_483_648 };
1446 assert!((stats.total_data_gb() - stats.total_data_mb() / 1_024.0).abs() < 1e-6);
1447 }
1448
1449 #[test]
1450 fn test_is_active_false_when_no_messages() {
1451 let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1452 assert!(!stats.is_active(1));
1453 }
1454
1455 #[test]
1456 fn test_is_active_true_at_threshold() {
1457 let stats = WsStats { total_messages_received: 100, total_bytes_received: 0 };
1458 assert!(stats.is_active(100));
1459 }
1460
1461 #[test]
1462 fn test_is_active_false_below_threshold() {
1463 let stats = WsStats { total_messages_received: 50, total_bytes_received: 0 };
1464 assert!(!stats.is_active(100));
1465 }
1466
1467 #[test]
1470 fn test_has_received_bytes_false_when_no_bytes() {
1471 let stats = WsStats { total_messages_received: 5, total_bytes_received: 0 };
1472 assert!(!stats.has_received_bytes());
1473 }
1474
1475 #[test]
1476 fn test_has_received_bytes_true_when_bytes_present() {
1477 let stats = WsStats { total_messages_received: 1, total_bytes_received: 100 };
1478 assert!(stats.has_received_bytes());
1479 }
1480
1481 #[test]
1482 fn test_efficiency_ratio_none_when_no_bytes() {
1483 let stats = WsStats { total_messages_received: 10, total_bytes_received: 0 };
1484 assert!(stats.efficiency_ratio().is_none());
1485 }
1486
1487 #[test]
1488 fn test_efficiency_ratio_correct_value() {
1489 let stats = WsStats { total_messages_received: 100, total_bytes_received: 10_000 };
1490 assert!((stats.efficiency_ratio().unwrap() - 0.01).abs() < 1e-12);
1492 }
1493
1494 #[test]
1495 fn test_efficiency_ratio_less_than_one_for_large_messages() {
1496 let stats = WsStats { total_messages_received: 1, total_bytes_received: 500 };
1497 assert!(stats.efficiency_ratio().unwrap() < 1.0);
1498 }
1499
1500 #[test]
1503 fn test_message_density_same_as_message_rate() {
1504 let stats = WsStats { total_messages_received: 100, total_bytes_received: 0 };
1505 assert!((stats.message_density(1_000) - stats.message_rate(1_000)).abs() < 1e-12);
1506 }
1507
1508 #[test]
1509 fn test_message_density_zero_when_elapsed_zero() {
1510 let stats = WsStats { total_messages_received: 100, total_bytes_received: 0 };
1511 assert_eq!(stats.message_density(0), 0.0);
1512 }
1513
1514 #[test]
1515 fn test_compression_ratio_none_when_no_bytes() {
1516 let stats = WsStats { total_messages_received: 5, total_bytes_received: 0 };
1517 assert!(stats.compression_ratio().is_none());
1518 }
1519
1520 #[test]
1521 fn test_compression_ratio_same_as_efficiency_ratio() {
1522 let stats = WsStats { total_messages_received: 100, total_bytes_received: 10_000 };
1523 assert_eq!(stats.compression_ratio(), stats.efficiency_ratio());
1524 }
1525
1526 #[test]
1529 fn test_uptime_fraction_zero_when_elapsed_zero() {
1530 let stats = WsStats { total_messages_received: 100, total_bytes_received: 50_000 };
1531 assert_eq!(stats.uptime_fraction(0), 0.0);
1532 }
1533
1534 #[test]
1535 fn test_uptime_fraction_zero_when_no_bytes() {
1536 let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1537 assert_eq!(stats.uptime_fraction(60_000), 0.0);
1538 }
1539
1540 #[test]
1541 fn test_uptime_fraction_nonzero_with_bytes() {
1542 let stats = WsStats { total_messages_received: 100, total_bytes_received: 1_000 };
1543 let f = stats.uptime_fraction(60_000);
1545 assert!(f > 0.0 && f <= 1.0);
1546 }
1547
1548 #[test]
1549 fn test_uptime_fraction_clamped_to_one() {
1550 let stats = WsStats { total_messages_received: 0, total_bytes_received: 1_000_000 };
1552 assert_eq!(stats.uptime_fraction(100), 1.0);
1553 }
1554
1555 #[test]
1558 fn test_is_idle_true_when_no_messages() {
1559 let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1560 assert!(stats.is_idle(60_000, 1.0));
1561 }
1562
1563 #[test]
1564 fn test_is_idle_false_when_high_message_rate() {
1565 let stats = WsStats { total_messages_received: 1_000, total_bytes_received: 0 };
1566 assert!(!stats.is_idle(1_000, 1.0));
1568 }
1569
1570 #[test]
1571 fn test_is_idle_true_when_elapsed_zero() {
1572 let stats = WsStats { total_messages_received: 100, total_bytes_received: 0 };
1573 assert!(stats.is_idle(0, 1.0));
1575 }
1576
1577 #[test]
1580 fn test_average_message_size_bytes_none_when_no_messages() {
1581 let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1582 assert!(stats.average_message_size_bytes().is_none());
1583 }
1584
1585 #[test]
1586 fn test_average_message_size_bytes_same_as_bytes_per_message() {
1587 let stats = WsStats { total_messages_received: 10, total_bytes_received: 1_000 };
1588 assert_eq!(stats.average_message_size_bytes(), stats.bytes_per_message());
1589 }
1590}