1use crate::error::StreamError;
13use futures_util::{SinkExt, StreamExt};
14use std::time::Duration;
15use tokio::sync::mpsc;
16use tokio::time;
17use tokio_tungstenite::{connect_async, tungstenite::Message};
18use tracing::{debug, info, warn};
19
20#[derive(Debug, Clone, Copy, Default)]
22pub struct WsStats {
23 pub total_messages_received: u64,
25 pub total_bytes_received: u64,
27}
28
29impl WsStats {
30 pub fn message_rate(&self, elapsed_ms: u64) -> f64 {
34 if elapsed_ms == 0 {
35 return 0.0;
36 }
37 self.total_messages_received as f64 / (elapsed_ms as f64 / 1000.0)
38 }
39
40 pub fn byte_rate(&self, elapsed_ms: u64) -> f64 {
44 if elapsed_ms == 0 {
45 return 0.0;
46 }
47 self.total_bytes_received as f64 / (elapsed_ms as f64 / 1000.0)
48 }
49
50 #[deprecated(since = "2.2.0", note = "Use `bytes_per_message()` instead")]
54 pub fn avg_message_size(&self) -> Option<f64> {
55 self.bytes_per_message()
56 }
57
58 pub fn total_data_mb(&self) -> f64 {
60 self.total_bytes_received as f64 / 1_048_576.0
61 }
62
63 pub fn total_data_kb(&self) -> f64 {
65 self.total_data_mb() * 1_024.0
66 }
67
68 pub fn bandwidth_bps(&self, elapsed_ms: u64) -> f64 {
72 self.byte_rate(elapsed_ms)
73 }
74
75 #[deprecated(since = "2.2.0", note = "Use `efficiency_ratio()` instead")]
80 pub fn messages_per_byte(&self) -> Option<f64> {
81 self.efficiency_ratio()
82 }
83
84 #[deprecated(since = "2.2.0", note = "Use `bytes_per_message()` instead")]
88 pub fn avg_message_size_bytes(&self) -> Option<f64> {
89 self.bytes_per_message()
90 }
91
92 pub fn bandwidth_kbps(&self, elapsed_ms: u64) -> f64 {
96 self.byte_rate(elapsed_ms) * 8.0 / 1_000.0
97 }
98
99 pub fn is_idle(&self, elapsed_ms: u64, min_rate: f64) -> bool {
104 self.message_rate(elapsed_ms) < min_rate
105 }
106
107 pub fn has_traffic(&self) -> bool {
109 self.total_messages_received > 0
110 }
111
112 pub fn is_high_volume(&self, threshold: u64) -> bool {
114 self.total_messages_received >= threshold
115 }
116
117 #[deprecated(since = "2.2.0", note = "Use `bytes_per_message()` instead")]
121 pub fn average_message_size_bytes(&self) -> Option<f64> {
122 self.bytes_per_message()
123 }
124
125 pub fn bytes_per_message(&self) -> Option<f64> {
129 if self.total_messages_received == 0 {
130 return None;
131 }
132 Some(self.total_bytes_received as f64 / self.total_messages_received as f64)
133 }
134
135 pub fn total_data_gb(&self) -> f64 {
137 self.total_data_mb() / 1_024.0
138 }
139
140 pub fn is_active(&self, min_messages: u64) -> bool {
142 self.is_high_volume(min_messages)
143 }
144
145 pub fn has_received_bytes(&self) -> bool {
147 self.total_bytes_received > 0
148 }
149
150 pub fn efficiency_ratio(&self) -> Option<f64> {
155 if self.total_bytes_received == 0 {
156 return None;
157 }
158 Some(self.total_messages_received as f64 / self.total_bytes_received as f64)
159 }
160
161 #[deprecated(since = "2.2.0", note = "Use `message_rate()` instead")]
165 pub fn message_density(&self, elapsed_ms: u64) -> f64 {
166 self.message_rate(elapsed_ms)
167 }
168
169 #[deprecated(since = "2.2.0", note = "Use `efficiency_ratio()` instead")]
174 pub fn compression_ratio(&self) -> Option<f64> {
175 self.efficiency_ratio()
176 }
177
178 pub fn uptime_fraction(&self, total_ms: u64) -> f64 {
186 if total_ms == 0 {
187 return 0.0;
188 }
189 (self.total_bytes_received as f64 / total_ms as f64).min(1.0)
190 }
191
192
193}
194
195#[derive(Debug, Clone)]
200pub struct ReconnectPolicy {
201 pub max_attempts: u32,
203 pub initial_backoff: Duration,
205 pub max_backoff: Duration,
207 pub multiplier: f64,
209 pub jitter: f64,
212}
213
214impl ReconnectPolicy {
215 pub fn new(
222 max_attempts: u32,
223 initial_backoff: Duration,
224 max_backoff: Duration,
225 multiplier: f64,
226 ) -> Result<Self, StreamError> {
227 if multiplier < 1.0 {
228 return Err(StreamError::ConfigError {
229 reason: format!(
230 "reconnect multiplier must be >= 1.0, got {multiplier}"
231 ),
232 });
233 }
234 if max_attempts == 0 {
235 return Err(StreamError::ConfigError {
236 reason: "max_attempts must be > 0".into(),
237 });
238 }
239 Ok(Self {
240 max_attempts,
241 initial_backoff,
242 max_backoff,
243 multiplier,
244 jitter: 0.0,
245 })
246 }
247
248 pub fn with_max_attempts(mut self, max_attempts: u32) -> Result<Self, StreamError> {
254 if max_attempts == 0 {
255 return Err(StreamError::ConfigError {
256 reason: "max_attempts must be > 0".into(),
257 });
258 }
259 self.max_attempts = max_attempts;
260 Ok(self)
261 }
262
263 pub fn with_multiplier(mut self, multiplier: f64) -> Result<Self, StreamError> {
270 if multiplier < 1.0 {
271 return Err(StreamError::ConfigError {
272 reason: format!("reconnect multiplier must be >= 1.0, got {multiplier}"),
273 });
274 }
275 self.multiplier = multiplier;
276 Ok(self)
277 }
278
279 pub fn with_initial_backoff(mut self, duration: Duration) -> Self {
281 self.initial_backoff = duration;
282 self
283 }
284
285 pub fn with_max_backoff(mut self, duration: Duration) -> Self {
287 self.max_backoff = duration;
288 self
289 }
290
291 pub fn with_jitter(mut self, ratio: f64) -> Result<Self, StreamError> {
301 if !(0.0..=1.0).contains(&ratio) {
302 return Err(StreamError::ConfigError {
303 reason: format!("jitter ratio must be in [0.0, 1.0], got {ratio}"),
304 });
305 }
306 self.jitter = ratio;
307 Ok(self)
308 }
309
310 pub fn total_max_delay(&self) -> Duration {
315 let total_ms: u64 = (0..self.max_attempts)
316 .map(|a| self.backoff_for_attempt(a).as_millis() as u64)
317 .fold(0u64, |acc, ms| acc.saturating_add(ms));
318 Duration::from_millis(total_ms)
319 }
320
321 pub fn max_attempts(&self) -> u32 {
323 self.max_attempts
324 }
325
326 pub fn total_attempts_remaining(&self, current_attempt: u32) -> u32 {
330 self.max_attempts.saturating_sub(current_attempt)
331 }
332
333 pub fn delay_for_next(&self, current_attempt: u32) -> Duration {
339 self.backoff_for_attempt(current_attempt.saturating_add(1))
340 }
341
342 pub fn is_exhausted(&self, attempts: u32) -> bool {
347 attempts >= self.max_attempts
348 }
349
350 pub fn backoff_for_attempt(&self, attempt: u32) -> Duration {
352 let factor = self.multiplier.powi(attempt as i32);
353 let max_ms = self.max_backoff.as_millis() as f64;
359 let base_ms = (self.initial_backoff.as_millis() as f64 * factor).min(max_ms);
360 let ms = if self.jitter > 0.0 {
361 let hash = (attempt as u64)
363 .wrapping_mul(2654435769)
364 .wrapping_add(1013904223);
365 let noise = (hash & 0xFFFF) as f64 / 65535.0; let delta = base_ms * self.jitter * (noise * 2.0 - 1.0); (base_ms + delta).clamp(0.0, max_ms)
368 } else {
369 base_ms
370 };
371 Duration::from_millis(ms as u64)
372 }
373}
374
375impl Default for ReconnectPolicy {
376 fn default() -> Self {
377 Self {
378 max_attempts: 10,
379 initial_backoff: Duration::from_millis(500),
380 max_backoff: Duration::from_secs(30),
381 multiplier: 2.0,
382 jitter: 0.0,
383 }
384 }
385}
386
387#[derive(Debug, Clone)]
389pub struct ConnectionConfig {
390 pub url: String,
392 pub channel_capacity: usize,
394 pub reconnect: ReconnectPolicy,
396 pub ping_interval: Duration,
398}
399
400impl ConnectionConfig {
401 pub fn new(url: impl Into<String>, channel_capacity: usize) -> Result<Self, StreamError> {
409 let url = url.into();
410 if url.is_empty() {
411 return Err(StreamError::ConfigError {
412 reason: "WebSocket URL must not be empty".into(),
413 });
414 }
415 if channel_capacity == 0 {
416 return Err(StreamError::ConfigError {
417 reason: "channel_capacity must be > 0".into(),
418 });
419 }
420 Ok(Self {
421 url,
422 channel_capacity,
423 reconnect: ReconnectPolicy::default(),
424 ping_interval: Duration::from_secs(20),
425 })
426 }
427
428 pub fn with_reconnect(mut self, policy: ReconnectPolicy) -> Self {
430 self.reconnect = policy;
431 self
432 }
433
434 pub fn with_ping_interval(mut self, interval: Duration) -> Self {
436 self.ping_interval = interval;
437 self
438 }
439
440 pub fn with_reconnect_attempts(mut self, n: u32) -> Result<Self, StreamError> {
447 self.reconnect = self.reconnect.with_max_attempts(n)?;
448 Ok(self)
449 }
450
451 pub fn with_channel_capacity(mut self, capacity: usize) -> Result<Self, StreamError> {
457 if capacity == 0 {
458 return Err(StreamError::ConfigError {
459 reason: "channel_capacity must be > 0".into(),
460 });
461 }
462 self.channel_capacity = capacity;
463 Ok(self)
464 }
465}
466
467pub struct WsManager {
473 config: ConnectionConfig,
474 connect_attempts: u32,
475 is_connected: bool,
476 stats: WsStats,
477}
478
479impl WsManager {
480 pub fn new(config: ConnectionConfig) -> Self {
482 Self {
483 config,
484 connect_attempts: 0,
485 is_connected: false,
486 stats: WsStats::default(),
487 }
488 }
489
490 pub async fn run(
507 &mut self,
508 message_tx: mpsc::Sender<String>,
509 mut outbound_rx: Option<mpsc::Receiver<String>>,
510 ) -> Result<(), StreamError> {
511 loop {
512 info!(url = %self.config.url, attempt = self.connect_attempts, "connecting");
513 match self.try_connect(&message_tx, &mut outbound_rx).await {
514 Ok(()) => {
515 self.is_connected = false;
517 debug!(url = %self.config.url, "connection closed cleanly");
518 if message_tx.is_closed() {
519 return Ok(());
520 }
521 }
522 Err(e) => {
523 self.is_connected = false;
524 warn!(url = %self.config.url, error = %e, "connection error");
525 }
526 }
527
528 if !self.can_reconnect() {
529 return Err(StreamError::ReconnectExhausted {
530 url: self.config.url.clone(),
531 attempts: self.connect_attempts,
532 });
533 }
534 let backoff = self.next_reconnect_backoff()?;
535 info!(url = %self.config.url, backoff_ms = backoff.as_millis(), "reconnecting after backoff");
536 tokio::time::sleep(backoff).await;
537 }
538 }
539
540 async fn try_connect(
542 &mut self,
543 message_tx: &mpsc::Sender<String>,
544 outbound_rx: &mut Option<mpsc::Receiver<String>>,
545 ) -> Result<(), StreamError> {
546 let (ws_stream, _response) =
547 connect_async(&self.config.url)
548 .await
549 .map_err(|e| StreamError::ConnectionFailed {
550 url: self.config.url.clone(),
551 reason: e.to_string(),
552 })?;
553
554 self.is_connected = true;
555 self.connect_attempts += 1;
556 info!(url = %self.config.url, "connected");
557
558 let (mut write, mut read) = ws_stream.split();
559 let mut ping_interval = time::interval(self.config.ping_interval);
560 ping_interval.tick().await;
562
563 loop {
564 tokio::select! {
565 msg = read.next() => {
566 match msg {
567 Some(Ok(Message::Text(text))) => {
568 self.stats.total_messages_received += 1;
569 self.stats.total_bytes_received += text.len() as u64;
570 if message_tx.send(text.to_string()).await.is_err() {
571 return Ok(());
573 }
574 }
575 Some(Ok(Message::Binary(bytes))) => {
576 self.stats.total_messages_received += 1;
577 self.stats.total_bytes_received += bytes.len() as u64;
578 if let Ok(text) = String::from_utf8(bytes.to_vec()) {
579 if message_tx.send(text).await.is_err() {
580 return Ok(());
581 }
582 }
583 }
584 Some(Ok(Message::Ping(_) | Message::Pong(_) | Message::Frame(_))) => {
585 }
587 Some(Ok(Message::Close(_))) | None => {
588 return Ok(());
589 }
590 Some(Err(e)) => {
591 return Err(StreamError::WebSocket(e.to_string()));
592 }
593 }
594 }
595 _ = ping_interval.tick() => {
596 debug!(url = %self.config.url, "sending keepalive ping");
597 if write.send(Message::Ping(vec![].into())).await.is_err() {
598 return Ok(());
599 }
600 }
601 outbound = recv_outbound(outbound_rx) => {
602 if let Some(text) = outbound {
603 let _ = write.send(Message::Text(text.into())).await;
604 }
605 }
606 }
607 }
608 }
609
610 pub fn connect_simulated(&mut self) {
613 self.connect_attempts += 1;
614 self.is_connected = true;
615 }
616
617 pub fn disconnect_simulated(&mut self) {
619 self.is_connected = false;
620 }
621
622 pub fn is_connected(&self) -> bool {
624 self.is_connected
625 }
626
627 pub fn connect_attempts(&self) -> u32 {
629 self.connect_attempts
630 }
631
632 pub fn config(&self) -> &ConnectionConfig {
634 &self.config
635 }
636
637 pub fn stats(&self) -> &WsStats {
639 &self.stats
640 }
641
642 pub fn can_reconnect(&self) -> bool {
644 self.connect_attempts < self.config.reconnect.max_attempts
645 }
646
647 pub fn next_reconnect_backoff(&mut self) -> Result<Duration, StreamError> {
649 if !self.can_reconnect() {
650 return Err(StreamError::ReconnectExhausted {
651 url: self.config.url.clone(),
652 attempts: self.connect_attempts,
653 });
654 }
655 let backoff = self
656 .config
657 .reconnect
658 .backoff_for_attempt(self.connect_attempts);
659 self.connect_attempts += 1;
660 Ok(backoff)
661 }
662}
663
664async fn recv_outbound(rx: &mut Option<mpsc::Receiver<String>>) -> Option<String> {
669 match rx {
670 Some(rx) => rx.recv().await,
671 None => std::future::pending().await,
672 }
673}
674
675#[cfg(test)]
676mod tests {
677 use super::*;
678
679 fn default_config() -> ConnectionConfig {
680 ConnectionConfig::new("wss://example.com/ws", 1024).unwrap()
681 }
682
683 #[test]
684 fn test_reconnect_policy_default_values() {
685 let p = ReconnectPolicy::default();
686 assert_eq!(p.max_attempts, 10);
687 assert_eq!(p.multiplier, 2.0);
688 }
689
690 #[test]
691 fn test_reconnect_policy_backoff_exponential() {
692 let p = ReconnectPolicy::new(10, Duration::from_millis(100), Duration::from_secs(30), 2.0)
693 .unwrap();
694 assert_eq!(p.backoff_for_attempt(0), Duration::from_millis(100));
695 assert_eq!(p.backoff_for_attempt(1), Duration::from_millis(200));
696 assert_eq!(p.backoff_for_attempt(2), Duration::from_millis(400));
697 }
698
699 #[test]
700 fn test_reconnect_policy_backoff_capped_at_max() {
701 let p = ReconnectPolicy::new(10, Duration::from_millis(1000), Duration::from_secs(5), 2.0)
702 .unwrap();
703 let backoff = p.backoff_for_attempt(10);
704 assert!(backoff <= Duration::from_secs(5));
705 }
706
707 #[test]
708 fn test_reconnect_policy_multiplier_below_1_rejected() {
709 let result =
710 ReconnectPolicy::new(10, Duration::from_millis(100), Duration::from_secs(30), 0.5);
711 assert!(matches!(result, Err(StreamError::ConfigError { .. })));
712 }
713
714 #[test]
715 fn test_reconnect_policy_zero_attempts_rejected() {
716 let result =
717 ReconnectPolicy::new(0, Duration::from_millis(100), Duration::from_secs(30), 2.0);
718 assert!(matches!(result, Err(StreamError::ConfigError { .. })));
719 }
720
721 #[test]
722 fn test_connection_config_empty_url_rejected() {
723 let result = ConnectionConfig::new("", 1024);
724 assert!(matches!(result, Err(StreamError::ConfigError { .. })));
725 }
726
727 #[test]
728 fn test_connection_config_zero_capacity_rejected() {
729 let result = ConnectionConfig::new("wss://example.com", 0);
730 assert!(matches!(result, Err(StreamError::ConfigError { .. })));
731 }
732
733 #[test]
734 fn test_connection_config_with_reconnect() {
735 let policy =
736 ReconnectPolicy::new(3, Duration::from_millis(200), Duration::from_secs(10), 2.0)
737 .unwrap();
738 let config = default_config().with_reconnect(policy);
739 assert_eq!(config.reconnect.max_attempts, 3);
740 }
741
742 #[test]
743 fn test_connection_config_with_ping_interval() {
744 let config = default_config().with_ping_interval(Duration::from_secs(30));
745 assert_eq!(config.ping_interval, Duration::from_secs(30));
746 }
747
748 #[test]
749 fn test_ws_manager_initial_state() {
750 let mgr = WsManager::new(default_config());
751 assert!(!mgr.is_connected());
752 assert_eq!(mgr.connect_attempts(), 0);
753 }
754
755 #[test]
756 fn test_ws_manager_connect_simulated() {
757 let mut mgr = WsManager::new(default_config());
758 mgr.connect_simulated();
759 assert!(mgr.is_connected());
760 assert_eq!(mgr.connect_attempts(), 1);
761 }
762
763 #[test]
764 fn test_ws_manager_disconnect_simulated() {
765 let mut mgr = WsManager::new(default_config());
766 mgr.connect_simulated();
767 mgr.disconnect_simulated();
768 assert!(!mgr.is_connected());
769 }
770
771 #[test]
772 fn test_ws_manager_can_reconnect_within_limit() {
773 let mut mgr = WsManager::new(
774 default_config().with_reconnect(
775 ReconnectPolicy::new(3, Duration::from_millis(10), Duration::from_secs(1), 2.0)
776 .unwrap(),
777 ),
778 );
779 assert!(mgr.can_reconnect());
780 mgr.next_reconnect_backoff().unwrap();
781 mgr.next_reconnect_backoff().unwrap();
782 mgr.next_reconnect_backoff().unwrap();
783 assert!(!mgr.can_reconnect());
784 }
785
786 #[test]
787 fn test_ws_manager_reconnect_exhausted_error() {
788 let mut mgr = WsManager::new(
789 default_config().with_reconnect(
790 ReconnectPolicy::new(1, Duration::from_millis(10), Duration::from_secs(1), 2.0)
791 .unwrap(),
792 ),
793 );
794 mgr.next_reconnect_backoff().unwrap();
795 let result = mgr.next_reconnect_backoff();
796 assert!(matches!(
797 result,
798 Err(StreamError::ReconnectExhausted { .. })
799 ));
800 }
801
802 #[test]
803 fn test_ws_manager_backoff_increases() {
804 let mut mgr = WsManager::new(
805 default_config().with_reconnect(
806 ReconnectPolicy::new(5, Duration::from_millis(100), Duration::from_secs(30), 2.0)
807 .unwrap(),
808 ),
809 );
810 let b0 = mgr.next_reconnect_backoff().unwrap();
811 let b1 = mgr.next_reconnect_backoff().unwrap();
812 assert!(b1 >= b0);
813 }
814
815 #[test]
816 fn test_ws_manager_config_accessor() {
817 let mgr = WsManager::new(default_config());
818 assert_eq!(mgr.config().url, "wss://example.com/ws");
819 assert_eq!(mgr.config().channel_capacity, 1024);
820 }
821
822 #[tokio::test]
826 async fn test_recv_outbound_none_is_always_pending() {
827 let mut rx: Option<mpsc::Receiver<String>> = None;
828 tokio::select! {
830 _ = recv_outbound(&mut rx) => {
831 panic!("recv_outbound(None) should never resolve");
832 }
833 _ = std::future::ready(()) => {
834 }
836 }
837 }
838
839 #[tokio::test]
841 async fn test_recv_outbound_some_resolves_with_message() {
842 let (tx, mut channel_rx) = mpsc::channel::<String>(1);
843 tx.send("subscribe".into()).await.unwrap();
844 let mut rx: Option<mpsc::Receiver<String>> = Some(channel_rx);
845 let msg = recv_outbound(&mut rx).await;
846 assert_eq!(msg.as_deref(), Some("subscribe"));
847 let _ = rx;
849 }
850
851 #[test]
852 fn test_ws_stats_initial_zero() {
853 let mgr = WsManager::new(default_config());
854 let s = mgr.stats();
855 assert_eq!(s.total_messages_received, 0);
856 assert_eq!(s.total_bytes_received, 0);
857 }
858
859 #[test]
860 fn test_ws_stats_default() {
861 let s = WsStats::default();
862 assert_eq!(s.total_messages_received, 0);
863 assert_eq!(s.total_bytes_received, 0);
864 }
865
866 #[test]
867 fn test_reconnect_policy_with_jitter_valid() {
868 let p = ReconnectPolicy::new(10, Duration::from_millis(100), Duration::from_secs(30), 2.0)
869 .unwrap()
870 .with_jitter(0.5)
871 .unwrap();
872 assert_eq!(p.jitter, 0.5);
873 }
874
875 #[test]
876 fn test_reconnect_policy_with_jitter_zero_is_deterministic() {
877 let p = ReconnectPolicy::new(10, Duration::from_millis(100), Duration::from_secs(30), 2.0)
878 .unwrap()
879 .with_jitter(0.0)
880 .unwrap();
881 let b0 = p.backoff_for_attempt(0);
883 let b1 = p.backoff_for_attempt(0);
884 assert_eq!(b0, b1);
885 assert_eq!(b0, Duration::from_millis(100));
886 }
887
888 #[test]
889 fn test_reconnect_policy_with_jitter_invalid_ratio() {
890 let result =
891 ReconnectPolicy::new(10, Duration::from_millis(100), Duration::from_secs(30), 2.0)
892 .unwrap()
893 .with_jitter(1.5);
894 assert!(matches!(result, Err(StreamError::ConfigError { .. })));
895 }
896
897 #[test]
898 fn test_reconnect_policy_with_jitter_negative_ratio() {
899 let result =
900 ReconnectPolicy::new(10, Duration::from_millis(100), Duration::from_secs(30), 2.0)
901 .unwrap()
902 .with_jitter(-0.1);
903 assert!(matches!(result, Err(StreamError::ConfigError { .. })));
904 }
905
906 #[test]
907 fn test_reconnect_policy_with_jitter_stays_within_bounds() {
908 let p = ReconnectPolicy::new(20, Duration::from_millis(100), Duration::from_secs(30), 2.0)
909 .unwrap()
910 .with_jitter(1.0)
911 .unwrap();
912 for attempt in 0..20 {
914 let b = p.backoff_for_attempt(attempt);
915 assert!(b <= Duration::from_secs(30), "attempt {attempt} exceeded max_backoff");
916 }
917 }
918
919 #[test]
920 fn test_reconnect_policy_with_max_attempts_valid() {
921 let p = ReconnectPolicy::default().with_max_attempts(5).unwrap();
922 assert_eq!(p.max_attempts, 5);
923 }
924
925 #[test]
926 fn test_reconnect_policy_with_max_attempts_zero_rejected() {
927 let result = ReconnectPolicy::default().with_max_attempts(0);
928 assert!(matches!(result, Err(StreamError::ConfigError { .. })));
929 }
930
931 #[test]
932 fn test_connection_config_with_reconnect_attempts_valid() {
933 let config = default_config().with_reconnect_attempts(3).unwrap();
934 assert_eq!(config.reconnect.max_attempts, 3);
935 }
936
937 #[test]
938 fn test_connection_config_with_reconnect_attempts_zero_rejected() {
939 let result = default_config().with_reconnect_attempts(0);
940 assert!(matches!(result, Err(StreamError::ConfigError { .. })));
941 }
942
943 #[test]
944 fn test_reconnect_policy_total_max_delay_sum_of_backoffs() {
945 let p = ReconnectPolicy::new(3, Duration::from_millis(100), Duration::from_secs(30), 2.0)
946 .unwrap();
947 assert_eq!(p.total_max_delay(), Duration::from_millis(700));
949 }
950
951 #[test]
952 fn test_reconnect_policy_total_max_delay_capped_by_max_backoff() {
953 let p = ReconnectPolicy::new(5, Duration::from_millis(1000), Duration::from_millis(500), 2.0)
955 .unwrap();
956 assert_eq!(p.total_max_delay(), Duration::from_millis(2500));
958 }
959
960 #[test]
961 fn test_connection_config_with_channel_capacity_valid() {
962 let config = default_config().with_channel_capacity(512).unwrap();
963 assert_eq!(config.channel_capacity, 512);
964 }
965
966 #[test]
967 fn test_connection_config_with_channel_capacity_zero_rejected() {
968 let result = default_config().with_channel_capacity(0);
969 assert!(matches!(result, Err(StreamError::ConfigError { .. })));
970 }
971
972 #[test]
973 fn test_reconnect_policy_with_jitter_varies_across_attempts() {
974 let p = ReconnectPolicy::new(10, Duration::from_millis(1000), Duration::from_secs(30), 1.0)
975 .unwrap()
976 .with_jitter(0.5)
977 .unwrap();
978 let values: Vec<Duration> = (0..10).map(|a| p.backoff_for_attempt(a)).collect();
981 let unique: std::collections::HashSet<u64> =
982 values.iter().map(|d| d.as_millis() as u64).collect();
983 assert!(unique.len() > 1, "jitter should produce variation across attempts");
984 }
985
986 #[test]
989 fn test_with_initial_backoff_sets_value() {
990 let p = ReconnectPolicy::default()
991 .with_initial_backoff(Duration::from_secs(2));
992 assert_eq!(p.initial_backoff, Duration::from_secs(2));
993 }
994
995 #[test]
996 fn test_with_max_backoff_sets_value() {
997 let p = ReconnectPolicy::default()
998 .with_max_backoff(Duration::from_secs(60));
999 assert_eq!(p.max_backoff, Duration::from_secs(60));
1000 }
1001
1002 #[test]
1003 fn test_with_initial_backoff_affects_first_attempt() {
1004 let p = ReconnectPolicy::default()
1005 .with_initial_backoff(Duration::from_millis(200));
1006 assert_eq!(p.backoff_for_attempt(0), Duration::from_millis(200));
1007 }
1008
1009 #[test]
1012 fn test_with_multiplier_valid() {
1013 let p = ReconnectPolicy::default().with_multiplier(3.0).unwrap();
1014 assert_eq!(p.multiplier, 3.0);
1015 }
1016
1017 #[test]
1018 fn test_with_multiplier_below_one_rejected() {
1019 let result = ReconnectPolicy::default().with_multiplier(0.9);
1020 assert!(matches!(result, Err(StreamError::ConfigError { .. })));
1021 }
1022
1023 #[test]
1024 fn test_with_multiplier_exactly_one_accepted() {
1025 let p = ReconnectPolicy::default().with_multiplier(1.0).unwrap();
1026 assert_eq!(p.multiplier, 1.0);
1027 }
1028
1029 #[test]
1032 fn test_message_rate_zero_elapsed_returns_zero() {
1033 let stats = WsStats {
1034 total_messages_received: 100,
1035 total_bytes_received: 50_000,
1036 };
1037 assert_eq!(stats.message_rate(0), 0.0);
1038 assert_eq!(stats.byte_rate(0), 0.0);
1039 }
1040
1041 #[test]
1042 fn test_message_rate_100_messages_in_1s() {
1043 let stats = WsStats {
1044 total_messages_received: 100,
1045 total_bytes_received: 0,
1046 };
1047 let rate = stats.message_rate(1_000); assert!((rate - 100.0).abs() < 1e-9);
1049 }
1050
1051 #[test]
1052 fn test_byte_rate_1mb_in_1s() {
1053 let stats = WsStats {
1054 total_messages_received: 0,
1055 total_bytes_received: 1_000_000,
1056 };
1057 let rate = stats.byte_rate(1_000); assert!((rate - 1_000_000.0).abs() < 1.0);
1059 }
1060
1061 #[test]
1064 fn test_avg_message_size_none_when_no_messages() {
1065 let stats = WsStats::default();
1066 assert!(stats.avg_message_size().is_none());
1067 }
1068
1069 #[test]
1070 fn test_avg_message_size_basic() {
1071 let stats = WsStats {
1072 total_messages_received: 10,
1073 total_bytes_received: 1_000,
1074 };
1075 let avg = stats.avg_message_size().unwrap();
1076 assert!((avg - 100.0).abs() < 1e-9);
1077 }
1078
1079 #[test]
1082 fn test_total_data_mb_zero_bytes() {
1083 let stats = WsStats::default();
1084 assert!((stats.total_data_mb() - 0.0).abs() < 1e-9);
1085 }
1086
1087 #[test]
1088 fn test_total_data_mb_one_mib() {
1089 let stats = WsStats {
1090 total_messages_received: 1,
1091 total_bytes_received: 1_048_576,
1092 };
1093 assert!((stats.total_data_mb() - 1.0).abs() < 1e-9);
1094 }
1095
1096 #[test]
1097 fn test_max_attempts_getter_matches_field() {
1098 let p = ReconnectPolicy::default();
1099 assert_eq!(p.max_attempts(), p.max_attempts);
1100 }
1101
1102 #[test]
1103 fn test_max_attempts_getter_after_new() {
1104 let p = ReconnectPolicy::new(
1105 7,
1106 std::time::Duration::from_millis(100),
1107 std::time::Duration::from_secs(30),
1108 2.0,
1109 )
1110 .unwrap();
1111 assert_eq!(p.max_attempts(), 7);
1112 }
1113
1114 #[test]
1117 fn test_is_idle_below_min_rate() {
1118 let stats = WsStats {
1119 total_messages_received: 1,
1120 total_bytes_received: 0,
1121 };
1122 assert!(stats.is_idle(10_000, 1.0));
1124 }
1125
1126 #[test]
1127 fn test_is_idle_above_min_rate() {
1128 let stats = WsStats {
1129 total_messages_received: 100,
1130 total_bytes_received: 0,
1131 };
1132 assert!(!stats.is_idle(1_000, 1.0));
1134 }
1135
1136 #[test]
1137 fn test_is_idle_zero_messages_always_idle() {
1138 let stats = WsStats::default();
1139 assert!(stats.is_idle(1_000, 0.001));
1140 }
1141
1142 #[test]
1143 fn test_total_attempts_remaining_full() {
1144 let p = ReconnectPolicy::default(); assert_eq!(p.total_attempts_remaining(0), 10);
1146 }
1147
1148 #[test]
1149 fn test_total_attempts_remaining_partial() {
1150 let p = ReconnectPolicy::default();
1151 assert_eq!(p.total_attempts_remaining(3), 7);
1152 }
1153
1154 #[test]
1155 fn test_total_attempts_remaining_exhausted() {
1156 let p = ReconnectPolicy::default();
1157 assert_eq!(p.total_attempts_remaining(10), 0);
1158 assert_eq!(p.total_attempts_remaining(99), 0);
1159 }
1160
1161 #[test]
1164 fn test_has_traffic_false_when_no_messages() {
1165 let stats = WsStats::default();
1166 assert!(!stats.has_traffic());
1167 }
1168
1169 #[test]
1170 fn test_has_traffic_true_after_one_message() {
1171 let stats = WsStats {
1172 total_messages_received: 1,
1173 total_bytes_received: 0,
1174 };
1175 assert!(stats.has_traffic());
1176 }
1177
1178 #[test]
1179 fn test_has_traffic_true_with_many_messages() {
1180 let stats = WsStats {
1181 total_messages_received: 1_000,
1182 total_bytes_received: 50_000,
1183 };
1184 assert!(stats.has_traffic());
1185 }
1186
1187 #[test]
1190 fn test_is_high_volume_true_at_threshold() {
1191 let stats = WsStats { total_messages_received: 1_000, total_bytes_received: 0 };
1192 assert!(stats.is_high_volume(1_000));
1193 }
1194
1195 #[test]
1196 fn test_is_high_volume_false_below_threshold() {
1197 let stats = WsStats { total_messages_received: 500, total_bytes_received: 0 };
1198 assert!(!stats.is_high_volume(1_000));
1199 }
1200
1201 #[test]
1202 fn test_is_high_volume_true_above_threshold() {
1203 let stats = WsStats { total_messages_received: 2_000, total_bytes_received: 0 };
1204 assert!(stats.is_high_volume(1_000));
1205 }
1206
1207 #[test]
1210 fn test_bytes_per_message_none_when_no_messages() {
1211 let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1212 assert!(stats.bytes_per_message().is_none());
1213 }
1214
1215 #[test]
1216 fn test_bytes_per_message_correct_value() {
1217 let stats = WsStats { total_messages_received: 4, total_bytes_received: 400 };
1218 assert_eq!(stats.bytes_per_message(), Some(100.0));
1219 }
1220
1221 #[test]
1222 fn test_bytes_per_message_fractional() {
1223 let stats = WsStats { total_messages_received: 3, total_bytes_received: 10 };
1224 let bpm = stats.bytes_per_message().unwrap();
1225 assert!((bpm - 10.0 / 3.0).abs() < 1e-10);
1226 }
1227
1228 #[test]
1231 fn test_delay_for_next_is_backoff_for_attempt_plus_one() {
1232 let policy = ReconnectPolicy::new(
1233 10,
1234 Duration::from_millis(100),
1235 Duration::from_secs(60),
1236 2.0,
1237 )
1238 .unwrap();
1239 assert_eq!(
1240 policy.delay_for_next(0),
1241 policy.backoff_for_attempt(1)
1242 );
1243 assert_eq!(
1244 policy.delay_for_next(3),
1245 policy.backoff_for_attempt(4)
1246 );
1247 }
1248
1249 #[test]
1250 fn test_delay_for_next_saturates_at_max_backoff() {
1251 let policy = ReconnectPolicy::new(
1252 10,
1253 Duration::from_millis(100),
1254 Duration::from_secs(1),
1255 2.0,
1256 )
1257 .unwrap();
1258 assert!(policy.delay_for_next(100) <= Duration::from_secs(1));
1260 }
1261
1262 #[test]
1265 fn test_message_rate_zero_when_elapsed_is_zero() {
1266 let stats = WsStats { total_messages_received: 1_000, total_bytes_received: 0 };
1267 assert_eq!(stats.message_rate(0), 0.0);
1268 }
1269
1270 #[test]
1271 fn test_message_rate_correct_value() {
1272 let stats = WsStats { total_messages_received: 100, total_bytes_received: 0 };
1273 assert!((stats.message_rate(10_000) - 10.0).abs() < 1e-10);
1275 }
1276
1277 #[test]
1278 fn test_message_rate_zero_messages() {
1279 let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1280 assert_eq!(stats.message_rate(5_000), 0.0);
1281 }
1282
1283 #[test]
1286 fn test_total_data_mb_zero_when_no_bytes() {
1287 let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1288 assert_eq!(stats.total_data_mb(), 0.0);
1289 }
1290
1291 #[test]
1292 fn test_total_data_mb_fractional() {
1293 let stats = WsStats { total_messages_received: 1, total_bytes_received: 524_288 };
1294 assert!((stats.total_data_mb() - 0.5).abs() < 1e-10);
1295 }
1296
1297 #[test]
1300 fn test_is_exhausted_true_at_max_attempts() {
1301 let policy = ReconnectPolicy::new(5, Duration::from_millis(100), Duration::from_secs(10), 2.0).unwrap();
1302 assert!(policy.is_exhausted(5));
1303 }
1304
1305 #[test]
1306 fn test_is_exhausted_true_beyond_max_attempts() {
1307 let policy = ReconnectPolicy::new(5, Duration::from_millis(100), Duration::from_secs(10), 2.0).unwrap();
1308 assert!(policy.is_exhausted(10));
1309 }
1310
1311 #[test]
1312 fn test_is_exhausted_false_below_max_attempts() {
1313 let policy = ReconnectPolicy::new(5, Duration::from_millis(100), Duration::from_secs(10), 2.0).unwrap();
1314 assert!(!policy.is_exhausted(4));
1315 }
1316
1317 #[test]
1320 fn test_total_data_kb_zero_when_no_bytes() {
1321 let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1322 assert_eq!(stats.total_data_kb(), 0.0);
1323 }
1324
1325 #[test]
1326 fn test_total_data_kb_one_kib() {
1327 let stats = WsStats { total_messages_received: 1, total_bytes_received: 1_024 };
1328 assert!((stats.total_data_kb() - 1.0).abs() < 1e-10);
1329 }
1330
1331 #[test]
1332 fn test_total_data_kb_equals_1024_times_mb() {
1333 let stats = WsStats { total_messages_received: 1, total_bytes_received: 2_097_152 };
1334 assert!((stats.total_data_kb() - stats.total_data_mb() * 1_024.0).abs() < 1e-6);
1335 }
1336
1337 #[test]
1340 fn test_bandwidth_bps_zero_when_elapsed_zero() {
1341 let stats = WsStats { total_messages_received: 0, total_bytes_received: 1_000 };
1342 assert_eq!(stats.bandwidth_bps(0), 0.0);
1343 }
1344
1345 #[test]
1346 fn test_bandwidth_bps_correct_value() {
1347 let stats = WsStats { total_messages_received: 1, total_bytes_received: 10_000 };
1348 assert!((stats.bandwidth_bps(1_000) - 10_000.0).abs() < 1e-6);
1350 }
1351
1352 #[test]
1353 fn test_bandwidth_bps_zero_bytes() {
1354 let stats = WsStats { total_messages_received: 5, total_bytes_received: 0 };
1355 assert_eq!(stats.bandwidth_bps(5_000), 0.0);
1356 }
1357
1358 #[test]
1361 fn test_messages_per_byte_none_when_no_bytes() {
1362 let stats = WsStats { total_messages_received: 5, total_bytes_received: 0 };
1363 assert!(stats.messages_per_byte().is_none());
1364 }
1365
1366 #[test]
1367 fn test_messages_per_byte_correct_value() {
1368 let stats = WsStats { total_messages_received: 100, total_bytes_received: 10_000 };
1369 assert!((stats.messages_per_byte().unwrap() - 0.01).abs() < 1e-12);
1371 }
1372
1373 #[test]
1374 fn test_messages_per_byte_less_than_one_for_large_messages() {
1375 let stats = WsStats { total_messages_received: 1, total_bytes_received: 500 };
1376 let mpb = stats.messages_per_byte().unwrap();
1377 assert!(mpb < 1.0);
1378 }
1379
1380 #[test]
1382 fn test_avg_message_size_bytes_none_when_no_messages() {
1383 let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1384 assert!(stats.avg_message_size_bytes().is_none());
1385 }
1386
1387 #[test]
1388 fn test_avg_message_size_bytes_correct_value() {
1389 let stats = WsStats { total_messages_received: 10, total_bytes_received: 5_000 };
1390 assert!((stats.avg_message_size_bytes().unwrap() - 500.0).abs() < 1e-12);
1391 }
1392
1393 #[test]
1394 fn test_avg_message_size_bytes_one_message() {
1395 let stats = WsStats { total_messages_received: 1, total_bytes_received: 256 };
1396 assert!((stats.avg_message_size_bytes().unwrap() - 256.0).abs() < 1e-12);
1397 }
1398
1399 #[test]
1401 fn test_bandwidth_kbps_zero_when_elapsed_zero() {
1402 let stats = WsStats { total_messages_received: 10, total_bytes_received: 50_000 };
1403 assert_eq!(stats.bandwidth_kbps(0), 0.0);
1404 }
1405
1406 #[test]
1407 fn test_bandwidth_kbps_correct_value() {
1408 let stats = WsStats { total_messages_received: 1, total_bytes_received: 100_000 };
1410 let kbps = stats.bandwidth_kbps(1_000);
1411 assert!((kbps - 800.0).abs() < 1e-10, "got {kbps}");
1412 }
1413
1414 #[test]
1415 fn test_bandwidth_kbps_zero_when_no_data() {
1416 let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1417 assert_eq!(stats.bandwidth_kbps(5_000), 0.0);
1418 }
1419
1420 #[test]
1423 fn test_total_data_gb_zero_when_no_bytes() {
1424 let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1425 assert_eq!(stats.total_data_gb(), 0.0);
1426 }
1427
1428 #[test]
1429 fn test_total_data_gb_one_gib() {
1430 let stats = WsStats { total_messages_received: 1, total_bytes_received: 1_073_741_824 };
1431 assert!((stats.total_data_gb() - 1.0).abs() < 1e-10);
1432 }
1433
1434 #[test]
1435 fn test_total_data_gb_equals_1024_times_mb() {
1436 let stats = WsStats { total_messages_received: 1, total_bytes_received: 2_147_483_648 };
1437 assert!((stats.total_data_gb() - stats.total_data_mb() / 1_024.0).abs() < 1e-6);
1438 }
1439
1440 #[test]
1441 fn test_is_active_false_when_no_messages() {
1442 let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1443 assert!(!stats.is_active(1));
1444 }
1445
1446 #[test]
1447 fn test_is_active_true_at_threshold() {
1448 let stats = WsStats { total_messages_received: 100, total_bytes_received: 0 };
1449 assert!(stats.is_active(100));
1450 }
1451
1452 #[test]
1453 fn test_is_active_false_below_threshold() {
1454 let stats = WsStats { total_messages_received: 50, total_bytes_received: 0 };
1455 assert!(!stats.is_active(100));
1456 }
1457
1458 #[test]
1461 fn test_has_received_bytes_false_when_no_bytes() {
1462 let stats = WsStats { total_messages_received: 5, total_bytes_received: 0 };
1463 assert!(!stats.has_received_bytes());
1464 }
1465
1466 #[test]
1467 fn test_has_received_bytes_true_when_bytes_present() {
1468 let stats = WsStats { total_messages_received: 1, total_bytes_received: 100 };
1469 assert!(stats.has_received_bytes());
1470 }
1471
1472 #[test]
1473 fn test_efficiency_ratio_none_when_no_bytes() {
1474 let stats = WsStats { total_messages_received: 10, total_bytes_received: 0 };
1475 assert!(stats.efficiency_ratio().is_none());
1476 }
1477
1478 #[test]
1479 fn test_efficiency_ratio_correct_value() {
1480 let stats = WsStats { total_messages_received: 100, total_bytes_received: 10_000 };
1481 assert!((stats.efficiency_ratio().unwrap() - 0.01).abs() < 1e-12);
1483 }
1484
1485 #[test]
1486 fn test_efficiency_ratio_less_than_one_for_large_messages() {
1487 let stats = WsStats { total_messages_received: 1, total_bytes_received: 500 };
1488 assert!(stats.efficiency_ratio().unwrap() < 1.0);
1489 }
1490
1491 #[test]
1494 fn test_message_density_same_as_message_rate() {
1495 let stats = WsStats { total_messages_received: 100, total_bytes_received: 0 };
1496 assert!((stats.message_density(1_000) - stats.message_rate(1_000)).abs() < 1e-12);
1497 }
1498
1499 #[test]
1500 fn test_message_density_zero_when_elapsed_zero() {
1501 let stats = WsStats { total_messages_received: 100, total_bytes_received: 0 };
1502 assert_eq!(stats.message_density(0), 0.0);
1503 }
1504
1505 #[test]
1506 fn test_compression_ratio_none_when_no_bytes() {
1507 let stats = WsStats { total_messages_received: 5, total_bytes_received: 0 };
1508 assert!(stats.compression_ratio().is_none());
1509 }
1510
1511 #[test]
1512 fn test_compression_ratio_same_as_efficiency_ratio() {
1513 let stats = WsStats { total_messages_received: 100, total_bytes_received: 10_000 };
1514 assert_eq!(stats.compression_ratio(), stats.efficiency_ratio());
1515 }
1516
1517 #[test]
1520 fn test_uptime_fraction_zero_when_elapsed_zero() {
1521 let stats = WsStats { total_messages_received: 100, total_bytes_received: 50_000 };
1522 assert_eq!(stats.uptime_fraction(0), 0.0);
1523 }
1524
1525 #[test]
1526 fn test_uptime_fraction_zero_when_no_bytes() {
1527 let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1528 assert_eq!(stats.uptime_fraction(60_000), 0.0);
1529 }
1530
1531 #[test]
1532 fn test_uptime_fraction_nonzero_with_bytes() {
1533 let stats = WsStats { total_messages_received: 100, total_bytes_received: 1_000 };
1534 let f = stats.uptime_fraction(60_000);
1536 assert!(f > 0.0 && f <= 1.0);
1537 }
1538
1539 #[test]
1540 fn test_uptime_fraction_clamped_to_one() {
1541 let stats = WsStats { total_messages_received: 0, total_bytes_received: 1_000_000 };
1543 assert_eq!(stats.uptime_fraction(100), 1.0);
1544 }
1545
1546 #[test]
1549 fn test_is_idle_true_when_no_messages() {
1550 let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1551 assert!(stats.is_idle(60_000, 1.0));
1552 }
1553
1554 #[test]
1555 fn test_is_idle_false_when_high_message_rate() {
1556 let stats = WsStats { total_messages_received: 1_000, total_bytes_received: 0 };
1557 assert!(!stats.is_idle(1_000, 1.0));
1559 }
1560
1561 #[test]
1562 fn test_is_idle_true_when_elapsed_zero() {
1563 let stats = WsStats { total_messages_received: 100, total_bytes_received: 0 };
1564 assert!(stats.is_idle(0, 1.0));
1566 }
1567
1568 #[test]
1571 fn test_average_message_size_bytes_none_when_no_messages() {
1572 let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1573 assert!(stats.average_message_size_bytes().is_none());
1574 }
1575
1576 #[test]
1577 fn test_average_message_size_bytes_same_as_bytes_per_message() {
1578 let stats = WsStats { total_messages_received: 10, total_bytes_received: 1_000 };
1579 assert_eq!(stats.average_message_size_bytes(), stats.bytes_per_message());
1580 }
1581}