Skip to main content

fin_stream/ws/
mod.rs

1//! WebSocket connection management — auto-reconnect and backpressure.
2//!
3//! ## Responsibility
4//! Manage the lifecycle of a WebSocket feed connection: connect, receive
5//! messages, detect disconnections, apply exponential backoff reconnect,
6//! and propagate backpressure when the downstream channel is full.
7//!
8//! ## Guarantees
9//! - Non-panicking: all operations return Result
10//! - Configurable: reconnect policy and buffer sizes are constructor params
11
12use crate::error::StreamError;
13use futures_util::{SinkExt, StreamExt};
14use std::time::Duration;
15use tokio::sync::mpsc;
16use tokio::time;
17use tokio_tungstenite::{connect_async, tungstenite::Message};
18use tracing::{debug, info, warn};
19
20/// Statistics collected during WebSocket operation.
21#[derive(Debug, Clone, Copy, Default)]
22pub struct WsStats {
23    /// Total number of messages received (text + binary combined).
24    pub total_messages_received: u64,
25    /// Total bytes received across all messages.
26    pub total_bytes_received: u64,
27}
28
29impl WsStats {
30    /// Messages per second over the given elapsed window.
31    ///
32    /// Returns `0.0` if `elapsed_ms` is zero (avoids division by zero).
33    pub fn message_rate(&self, elapsed_ms: u64) -> f64 {
34        if elapsed_ms == 0 {
35            return 0.0;
36        }
37        self.total_messages_received as f64 / (elapsed_ms as f64 / 1000.0)
38    }
39
40    /// Bytes per second over the given elapsed window.
41    ///
42    /// Returns `0.0` if `elapsed_ms` is zero.
43    pub fn byte_rate(&self, elapsed_ms: u64) -> f64 {
44        if elapsed_ms == 0 {
45            return 0.0;
46        }
47        self.total_bytes_received as f64 / (elapsed_ms as f64 / 1000.0)
48    }
49
50    /// Average bytes per message: `total_bytes / total_messages`.
51    ///
52    /// Returns `None` if no messages have been received (avoids division by zero).
53    pub fn avg_message_size(&self) -> Option<f64> {
54        if self.total_messages_received == 0 {
55            return None;
56        }
57        Some(self.total_bytes_received as f64 / self.total_messages_received as f64)
58    }
59
60    /// Total bytes received expressed as mebibytes (MiB): `total_bytes / 1_048_576.0`.
61    pub fn total_data_mb(&self) -> f64 {
62        self.total_bytes_received as f64 / 1_048_576.0
63    }
64
65    /// Total bytes received expressed as kibibytes (KiB): `total_bytes / 1_024.0`.
66    pub fn total_data_kb(&self) -> f64 {
67        self.total_bytes_received as f64 / 1_024.0
68    }
69
70    /// Average received bandwidth in bytes per second over `elapsed_ms`.
71    ///
72    /// Returns `0.0` when `elapsed_ms` is zero (avoids division by zero).
73    pub fn bandwidth_bps(&self, elapsed_ms: u64) -> f64 {
74        if elapsed_ms == 0 {
75            return 0.0;
76        }
77        self.total_bytes_received as f64 * 1_000.0 / elapsed_ms as f64
78    }
79
80    /// Average number of messages per byte received.
81    ///
82    /// Returns `None` when no bytes have been received (avoids division by
83    /// zero). A higher value means smaller average message size.
84    pub fn messages_per_byte(&self) -> Option<f64> {
85        if self.total_bytes_received == 0 {
86            return None;
87        }
88        Some(self.total_messages_received as f64 / self.total_bytes_received as f64)
89    }
90
91    /// Average bytes per message received.
92    ///
93    /// Returns `None` if no messages have been received.
94    pub fn avg_message_size_bytes(&self) -> Option<f64> {
95        if self.total_messages_received == 0 {
96            return None;
97        }
98        Some(self.total_bytes_received as f64 / self.total_messages_received as f64)
99    }
100
101    /// Bits per second received over `elapsed_ms` (bytes × 8 / seconds).
102    ///
103    /// Returns `0.0` when `elapsed_ms` is zero.
104    pub fn bandwidth_kbps(&self, elapsed_ms: u64) -> f64 {
105        if elapsed_ms == 0 {
106            return 0.0;
107        }
108        self.total_bytes_received as f64 * 8.0 / 1_000.0 / (elapsed_ms as f64 / 1_000.0)
109    }
110
111    /// Returns `true` if the current message rate is below `min_rate` (msgs/s).
112    ///
113    /// Returns `true` when `elapsed_ms` is zero (no time elapsed → rate = 0).
114    /// Useful for detecting stalled or silent feeds.
115    pub fn is_idle(&self, elapsed_ms: u64, min_rate: f64) -> bool {
116        self.message_rate(elapsed_ms) < min_rate
117    }
118
119    /// Returns `true` if at least one message has been received.
120    pub fn has_traffic(&self) -> bool {
121        self.total_messages_received > 0
122    }
123
124    /// Returns `true` if `total_messages_received >= threshold`.
125    pub fn is_high_volume(&self, threshold: u64) -> bool {
126        self.total_messages_received >= threshold
127    }
128
129    /// Average message size in bytes: alias for [`bytes_per_message`](Self::bytes_per_message).
130    ///
131    /// Returns `None` if no messages have been received yet.
132    pub fn average_message_size_bytes(&self) -> Option<f64> {
133        self.bytes_per_message()
134    }
135
136    /// Average bytes per message received so far.
137    ///
138    /// Returns `None` if no messages have been received yet.
139    pub fn bytes_per_message(&self) -> Option<f64> {
140        if self.total_messages_received == 0 {
141            return None;
142        }
143        Some(self.total_bytes_received as f64 / self.total_messages_received as f64)
144    }
145
146    /// Total bytes received expressed as gibibytes (GiB): `total_bytes / 1_073_741_824.0`.
147    pub fn total_data_gb(&self) -> f64 {
148        self.total_bytes_received as f64 / 1_073_741_824.0
149    }
150
151    /// Returns `true` if at least `min_messages` have been received.
152    pub fn is_active(&self, min_messages: u64) -> bool {
153        self.total_messages_received >= min_messages
154    }
155
156    /// Returns `true` if any bytes have been received.
157    pub fn has_received_bytes(&self) -> bool {
158        self.total_bytes_received > 0
159    }
160
161    /// Messages per byte received: `total_messages / total_bytes`.
162    ///
163    /// Higher values indicate smaller average message sizes.
164    /// Returns `None` if no bytes have been received.
165    pub fn efficiency_ratio(&self) -> Option<f64> {
166        if self.total_bytes_received == 0 {
167            return None;
168        }
169        Some(self.total_messages_received as f64 / self.total_bytes_received as f64)
170    }
171
172    /// Messages received per second over `elapsed_ms`: alias for `message_rate`.
173    ///
174    /// Returns `0.0` if `elapsed_ms` is zero.
175    pub fn message_density(&self, elapsed_ms: u64) -> f64 {
176        self.message_rate(elapsed_ms)
177    }
178
179    /// Ratio of messages to bytes: useful as a compression proxy.
180    ///
181    /// Higher values indicate smaller, more "compressed" messages.
182    /// Returns `None` if no bytes have been received.
183    pub fn compression_ratio(&self) -> Option<f64> {
184        self.efficiency_ratio()
185    }
186
187    /// Fraction of `total_ms` during which the connection was active.
188    ///
189    /// Estimated as `total_messages_received / (total_ms / 1000.0 * expected_rate)`.
190    /// Here we use a simple proxy: `bytes_received / total_ms` (bytes per ms),
191    /// normalized so that 0.0 means idle and higher values indicate more activity.
192    ///
193    /// Returns `0.0` if `total_ms` is zero.
194    pub fn uptime_fraction(&self, total_ms: u64) -> f64 {
195        if total_ms == 0 {
196            return 0.0;
197        }
198        (self.total_bytes_received as f64 / total_ms as f64).min(1.0)
199    }
200
201
202}
203
204/// Reconnection policy for a WebSocket feed.
205///
206/// Controls exponential-backoff reconnect behaviour. Build with
207/// [`ReconnectPolicy::new`] or use [`Default`] for sensible defaults.
208#[derive(Debug, Clone)]
209pub struct ReconnectPolicy {
210    /// Maximum number of reconnect attempts before giving up.
211    pub max_attempts: u32,
212    /// Initial backoff delay for the first reconnect attempt.
213    pub initial_backoff: Duration,
214    /// Maximum backoff delay (cap for exponential growth).
215    pub max_backoff: Duration,
216    /// Multiplier applied to the backoff on each successive attempt (must be >= 1.0).
217    pub multiplier: f64,
218    /// Jitter ratio in [0.0, 1.0]: the computed backoff is offset by up to
219    /// `±ratio × backoff` using a deterministic per-attempt hash. 0.0 = no jitter.
220    pub jitter: f64,
221}
222
223impl ReconnectPolicy {
224    /// Build a reconnect policy with explicit parameters.
225    ///
226    /// # Errors
227    ///
228    /// Returns [`StreamError::ConfigError`] if `multiplier < 1.0` (which would
229    /// cause backoff to shrink over time) or if `max_attempts == 0`.
230    pub fn new(
231        max_attempts: u32,
232        initial_backoff: Duration,
233        max_backoff: Duration,
234        multiplier: f64,
235    ) -> Result<Self, StreamError> {
236        if multiplier < 1.0 {
237            return Err(StreamError::ConfigError {
238                reason: format!(
239                    "reconnect multiplier must be >= 1.0, got {multiplier}"
240                ),
241            });
242        }
243        if max_attempts == 0 {
244            return Err(StreamError::ConfigError {
245                reason: "max_attempts must be > 0".into(),
246            });
247        }
248        Ok(Self {
249            max_attempts,
250            initial_backoff,
251            max_backoff,
252            multiplier,
253            jitter: 0.0,
254        })
255    }
256
257    /// Set the maximum number of reconnect attempts.
258    ///
259    /// # Errors
260    ///
261    /// Returns [`StreamError::ConfigError`] if `max_attempts` is zero.
262    pub fn with_max_attempts(mut self, max_attempts: u32) -> Result<Self, StreamError> {
263        if max_attempts == 0 {
264            return Err(StreamError::ConfigError {
265                reason: "max_attempts must be > 0".into(),
266            });
267        }
268        self.max_attempts = max_attempts;
269        Ok(self)
270    }
271
272    /// Set the exponential backoff multiplier.
273    ///
274    /// # Errors
275    ///
276    /// Returns [`StreamError::ConfigError`] if `multiplier < 1.0` (which would
277    /// cause the backoff to shrink on each attempt).
278    pub fn with_multiplier(mut self, multiplier: f64) -> Result<Self, StreamError> {
279        if multiplier < 1.0 {
280            return Err(StreamError::ConfigError {
281                reason: format!("reconnect multiplier must be >= 1.0, got {multiplier}"),
282            });
283        }
284        self.multiplier = multiplier;
285        Ok(self)
286    }
287
288    /// Set the initial backoff duration for the first reconnect attempt.
289    pub fn with_initial_backoff(mut self, duration: Duration) -> Self {
290        self.initial_backoff = duration;
291        self
292    }
293
294    /// Set the maximum backoff duration (cap for exponential growth).
295    pub fn with_max_backoff(mut self, duration: Duration) -> Self {
296        self.max_backoff = duration;
297        self
298    }
299
300    /// Apply deterministic per-attempt jitter to the computed backoff.
301    ///
302    /// `ratio` must be in `[0.0, 1.0]`. The effective backoff for attempt N
303    /// will be spread uniformly over `[backoff*(1-ratio), backoff*(1+ratio)]`
304    /// using a hash of the attempt index — no `rand` dependency needed.
305    ///
306    /// # Errors
307    ///
308    /// Returns [`StreamError::ConfigError`] if `ratio` is outside `[0.0, 1.0]`.
309    pub fn with_jitter(mut self, ratio: f64) -> Result<Self, StreamError> {
310        if !(0.0..=1.0).contains(&ratio) {
311            return Err(StreamError::ConfigError {
312                reason: format!("jitter ratio must be in [0.0, 1.0], got {ratio}"),
313            });
314        }
315        self.jitter = ratio;
316        Ok(self)
317    }
318
319    /// Sum of all backoff delays across every reconnect attempt.
320    ///
321    /// Useful for estimating the worst-case time before a client gives up.
322    /// The result is capped at `max_backoff * max_attempts` to avoid overflow.
323    pub fn total_max_delay(&self) -> Duration {
324        let total_ms: u64 = (0..self.max_attempts)
325            .map(|a| self.backoff_for_attempt(a).as_millis() as u64)
326            .fold(0u64, |acc, ms| acc.saturating_add(ms));
327        Duration::from_millis(total_ms)
328    }
329
330    /// Maximum number of reconnect attempts before the client gives up.
331    pub fn max_attempts(&self) -> u32 {
332        self.max_attempts
333    }
334
335    /// Number of attempts remaining starting from `current_attempt` (0-indexed).
336    ///
337    /// Returns `0` if `current_attempt >= max_attempts`.
338    pub fn total_attempts_remaining(&self, current_attempt: u32) -> u32 {
339        self.max_attempts.saturating_sub(current_attempt)
340    }
341
342    /// Backoff delay that will be applied *after* `current_attempt` completes.
343    ///
344    /// Equivalent to `backoff_for_attempt(current_attempt + 1)`, capped at
345    /// `max_backoff`. Saturates rather than wrapping when `current_attempt`
346    /// is `u32::MAX`.
347    pub fn delay_for_next(&self, current_attempt: u32) -> Duration {
348        self.backoff_for_attempt(current_attempt.saturating_add(1))
349    }
350
351    /// Returns `true` if `attempts` has reached or exceeded `max_attempts`.
352    ///
353    /// After this returns `true` the reconnect loop should give up rather than
354    /// scheduling another attempt.
355    pub fn is_exhausted(&self, attempts: u32) -> bool {
356        attempts >= self.max_attempts
357    }
358
359    /// Backoff duration for attempt N (0-indexed).
360    pub fn backoff_for_attempt(&self, attempt: u32) -> Duration {
361        let factor = self.multiplier.powi(attempt as i32);
362        // Cap the f64 value *before* casting to u64. When `attempt` is large
363        // (e.g. 63 with multiplier=2.0), `factor` becomes f64::INFINITY.
364        // Casting f64::INFINITY as u64 is undefined behaviour in Rust — it
365        // saturates to 0 on some targets and panics in debug builds. Clamping
366        // to max_backoff in floating-point space first avoids the UB entirely.
367        let max_ms = self.max_backoff.as_millis() as f64;
368        let base_ms = (self.initial_backoff.as_millis() as f64 * factor).min(max_ms);
369        let ms = if self.jitter > 0.0 {
370            // Deterministic noise via Knuth multiplicative hash of the attempt index.
371            let hash = (attempt as u64)
372                .wrapping_mul(2654435769)
373                .wrapping_add(1013904223);
374            let noise = (hash & 0xFFFF) as f64 / 65535.0; // [0.0, 1.0]
375            let delta = base_ms * self.jitter * (noise * 2.0 - 1.0); // ±jitter×base
376            (base_ms + delta).clamp(0.0, max_ms)
377        } else {
378            base_ms
379        };
380        Duration::from_millis(ms as u64)
381    }
382}
383
384impl Default for ReconnectPolicy {
385    fn default() -> Self {
386        Self {
387            max_attempts: 10,
388            initial_backoff: Duration::from_millis(500),
389            max_backoff: Duration::from_secs(30),
390            multiplier: 2.0,
391            jitter: 0.0,
392        }
393    }
394}
395
396/// Configuration for a WebSocket feed connection.
397#[derive(Debug, Clone)]
398pub struct ConnectionConfig {
399    /// WebSocket URL to connect to (e.g. `"wss://stream.binance.com:9443/ws"`).
400    pub url: String,
401    /// Capacity of the downstream channel that receives incoming messages.
402    pub channel_capacity: usize,
403    /// Reconnect policy applied on disconnection.
404    pub reconnect: ReconnectPolicy,
405    /// Ping interval to keep the connection alive (default: 20 s).
406    pub ping_interval: Duration,
407}
408
409impl ConnectionConfig {
410    /// Build a connection configuration for `url` with the given downstream
411    /// channel capacity.
412    ///
413    /// # Errors
414    ///
415    /// Returns [`StreamError::ConfigError`] if `url` is empty or
416    /// `channel_capacity` is zero.
417    pub fn new(url: impl Into<String>, channel_capacity: usize) -> Result<Self, StreamError> {
418        let url = url.into();
419        if url.is_empty() {
420            return Err(StreamError::ConfigError {
421                reason: "WebSocket URL must not be empty".into(),
422            });
423        }
424        if channel_capacity == 0 {
425            return Err(StreamError::ConfigError {
426                reason: "channel_capacity must be > 0".into(),
427            });
428        }
429        Ok(Self {
430            url,
431            channel_capacity,
432            reconnect: ReconnectPolicy::default(),
433            ping_interval: Duration::from_secs(20),
434        })
435    }
436
437    /// Override the default reconnect policy.
438    pub fn with_reconnect(mut self, policy: ReconnectPolicy) -> Self {
439        self.reconnect = policy;
440        self
441    }
442
443    /// Override the keepalive ping interval (default: 20 s).
444    pub fn with_ping_interval(mut self, interval: Duration) -> Self {
445        self.ping_interval = interval;
446        self
447    }
448
449    /// Shortcut to set only the reconnect attempt limit without replacing the
450    /// entire reconnect policy.
451    ///
452    /// # Errors
453    ///
454    /// Returns [`StreamError::ConfigError`] if `n` is zero.
455    pub fn with_reconnect_attempts(mut self, n: u32) -> Result<Self, StreamError> {
456        self.reconnect = self.reconnect.with_max_attempts(n)?;
457        Ok(self)
458    }
459
460    /// Override the downstream channel capacity.
461    ///
462    /// # Errors
463    ///
464    /// Returns [`StreamError::ConfigError`] if `capacity` is zero.
465    pub fn with_channel_capacity(mut self, capacity: usize) -> Result<Self, StreamError> {
466        if capacity == 0 {
467            return Err(StreamError::ConfigError {
468                reason: "channel_capacity must be > 0".into(),
469            });
470        }
471        self.channel_capacity = capacity;
472        Ok(self)
473    }
474}
475
476/// Manages a single WebSocket feed: connect, receive, reconnect.
477///
478/// Call [`WsManager::run`] to enter the connection loop. Messages are forwarded
479/// to the [`mpsc::Sender`] supplied to `run`; the loop retries on disconnection
480/// according to the [`ReconnectPolicy`] in the [`ConnectionConfig`].
481pub struct WsManager {
482    config: ConnectionConfig,
483    connect_attempts: u32,
484    is_connected: bool,
485    stats: WsStats,
486}
487
488impl WsManager {
489    /// Create a new manager from a validated [`ConnectionConfig`].
490    pub fn new(config: ConnectionConfig) -> Self {
491        Self {
492            config,
493            connect_attempts: 0,
494            is_connected: false,
495            stats: WsStats::default(),
496        }
497    }
498
499    /// Run the WebSocket connection loop, forwarding text messages to `message_tx`.
500    ///
501    /// The loop connects, reads frames until the socket closes or errors, then
502    /// waits the configured backoff and reconnects. Returns when either:
503    /// - `message_tx` is closed (receiver dropped), or
504    /// - reconnect attempts are exhausted ([`StreamError::ReconnectExhausted`]).
505    ///
506    /// `outbound_rx` is an optional channel for sending messages **to** the
507    /// server (e.g., subscription requests). When provided, any string received
508    /// on this channel is forwarded to the WebSocket as a text frame.
509    ///
510    /// # Errors
511    ///
512    /// Returns [`StreamError::ReconnectExhausted`] after all reconnect slots
513    /// are consumed, or the underlying connection error if reconnects are
514    /// exhausted immediately on the first attempt.
515    pub async fn run(
516        &mut self,
517        message_tx: mpsc::Sender<String>,
518        mut outbound_rx: Option<mpsc::Receiver<String>>,
519    ) -> Result<(), StreamError> {
520        loop {
521            info!(url = %self.config.url, attempt = self.connect_attempts, "connecting");
522            match self.try_connect(&message_tx, &mut outbound_rx).await {
523                Ok(()) => {
524                    // Clean close — receiver dropped or server sent Close frame.
525                    self.is_connected = false;
526                    debug!(url = %self.config.url, "connection closed cleanly");
527                    if message_tx.is_closed() {
528                        return Ok(());
529                    }
530                }
531                Err(e) => {
532                    self.is_connected = false;
533                    warn!(url = %self.config.url, error = %e, "connection error");
534                }
535            }
536
537            if !self.can_reconnect() {
538                return Err(StreamError::ReconnectExhausted {
539                    url: self.config.url.clone(),
540                    attempts: self.connect_attempts,
541                });
542            }
543            let backoff = self.next_reconnect_backoff()?;
544            info!(url = %self.config.url, backoff_ms = backoff.as_millis(), "reconnecting after backoff");
545            tokio::time::sleep(backoff).await;
546        }
547    }
548
549    /// Attempt a single connection, reading messages until close or error.
550    async fn try_connect(
551        &mut self,
552        message_tx: &mpsc::Sender<String>,
553        outbound_rx: &mut Option<mpsc::Receiver<String>>,
554    ) -> Result<(), StreamError> {
555        let (ws_stream, _response) =
556            connect_async(&self.config.url)
557                .await
558                .map_err(|e| StreamError::ConnectionFailed {
559                    url: self.config.url.clone(),
560                    reason: e.to_string(),
561                })?;
562
563        self.is_connected = true;
564        self.connect_attempts += 1;
565        info!(url = %self.config.url, "connected");
566
567        let (mut write, mut read) = ws_stream.split();
568        let mut ping_interval = time::interval(self.config.ping_interval);
569        // Skip the first tick so we don't ping immediately on connect.
570        ping_interval.tick().await;
571
572        loop {
573            tokio::select! {
574                msg = read.next() => {
575                    match msg {
576                        Some(Ok(Message::Text(text))) => {
577                            self.stats.total_messages_received += 1;
578                            self.stats.total_bytes_received += text.len() as u64;
579                            if message_tx.send(text.to_string()).await.is_err() {
580                                // Receiver dropped — clean shutdown.
581                                return Ok(());
582                            }
583                        }
584                        Some(Ok(Message::Binary(bytes))) => {
585                            self.stats.total_messages_received += 1;
586                            self.stats.total_bytes_received += bytes.len() as u64;
587                            if let Ok(text) = String::from_utf8(bytes.to_vec()) {
588                                if message_tx.send(text).await.is_err() {
589                                    return Ok(());
590                                }
591                            }
592                        }
593                        Some(Ok(Message::Ping(_) | Message::Pong(_) | Message::Frame(_))) => {
594                            // Control frames handled by tungstenite internally.
595                        }
596                        Some(Ok(Message::Close(_))) | None => {
597                            return Ok(());
598                        }
599                        Some(Err(e)) => {
600                            return Err(StreamError::WebSocket(e.to_string()));
601                        }
602                    }
603                }
604                _ = ping_interval.tick() => {
605                    debug!(url = %self.config.url, "sending keepalive ping");
606                    if write.send(Message::Ping(vec![].into())).await.is_err() {
607                        return Ok(());
608                    }
609                }
610                outbound = recv_outbound(outbound_rx) => {
611                    if let Some(text) = outbound {
612                        let _ = write.send(Message::Text(text.into())).await;
613                    }
614                }
615            }
616        }
617    }
618
619    /// Simulate a connection (for testing without live WebSocket).
620    /// Increments `connect_attempts` to reflect the initial connection slot.
621    pub fn connect_simulated(&mut self) {
622        self.connect_attempts += 1;
623        self.is_connected = true;
624    }
625
626    /// Simulate a disconnection.
627    pub fn disconnect_simulated(&mut self) {
628        self.is_connected = false;
629    }
630
631    /// Whether the managed connection is currently in the connected state.
632    pub fn is_connected(&self) -> bool {
633        self.is_connected
634    }
635
636    /// Total connection attempts made so far (including the initial connect).
637    pub fn connect_attempts(&self) -> u32 {
638        self.connect_attempts
639    }
640
641    /// The configuration this manager was created with.
642    pub fn config(&self) -> &ConnectionConfig {
643        &self.config
644    }
645
646    /// Cumulative receive statistics for this manager.
647    pub fn stats(&self) -> &WsStats {
648        &self.stats
649    }
650
651    /// Check whether the next reconnect attempt is allowed.
652    pub fn can_reconnect(&self) -> bool {
653        self.connect_attempts < self.config.reconnect.max_attempts
654    }
655
656    /// Consume a reconnect slot and return the backoff duration to wait.
657    pub fn next_reconnect_backoff(&mut self) -> Result<Duration, StreamError> {
658        if !self.can_reconnect() {
659            return Err(StreamError::ReconnectExhausted {
660                url: self.config.url.clone(),
661                attempts: self.connect_attempts,
662            });
663        }
664        let backoff = self
665            .config
666            .reconnect
667            .backoff_for_attempt(self.connect_attempts);
668        self.connect_attempts += 1;
669        Ok(backoff)
670    }
671}
672
673/// Helper: receive from an optional mpsc channel, or never resolve if `None`.
674///
675/// Used in `tokio::select!` to make the outbound branch dormant when no
676/// outbound channel was supplied, without allocating or spinning.
677async fn recv_outbound(rx: &mut Option<mpsc::Receiver<String>>) -> Option<String> {
678    match rx {
679        Some(rx) => rx.recv().await,
680        None => std::future::pending().await,
681    }
682}
683
684#[cfg(test)]
685mod tests {
686    use super::*;
687
688    fn default_config() -> ConnectionConfig {
689        ConnectionConfig::new("wss://example.com/ws", 1024).unwrap()
690    }
691
692    #[test]
693    fn test_reconnect_policy_default_values() {
694        let p = ReconnectPolicy::default();
695        assert_eq!(p.max_attempts, 10);
696        assert_eq!(p.multiplier, 2.0);
697    }
698
699    #[test]
700    fn test_reconnect_policy_backoff_exponential() {
701        let p = ReconnectPolicy::new(10, Duration::from_millis(100), Duration::from_secs(30), 2.0)
702            .unwrap();
703        assert_eq!(p.backoff_for_attempt(0), Duration::from_millis(100));
704        assert_eq!(p.backoff_for_attempt(1), Duration::from_millis(200));
705        assert_eq!(p.backoff_for_attempt(2), Duration::from_millis(400));
706    }
707
708    #[test]
709    fn test_reconnect_policy_backoff_capped_at_max() {
710        let p = ReconnectPolicy::new(10, Duration::from_millis(1000), Duration::from_secs(5), 2.0)
711            .unwrap();
712        let backoff = p.backoff_for_attempt(10);
713        assert!(backoff <= Duration::from_secs(5));
714    }
715
716    #[test]
717    fn test_reconnect_policy_multiplier_below_1_rejected() {
718        let result =
719            ReconnectPolicy::new(10, Duration::from_millis(100), Duration::from_secs(30), 0.5);
720        assert!(matches!(result, Err(StreamError::ConfigError { .. })));
721    }
722
723    #[test]
724    fn test_reconnect_policy_zero_attempts_rejected() {
725        let result =
726            ReconnectPolicy::new(0, Duration::from_millis(100), Duration::from_secs(30), 2.0);
727        assert!(matches!(result, Err(StreamError::ConfigError { .. })));
728    }
729
730    #[test]
731    fn test_connection_config_empty_url_rejected() {
732        let result = ConnectionConfig::new("", 1024);
733        assert!(matches!(result, Err(StreamError::ConfigError { .. })));
734    }
735
736    #[test]
737    fn test_connection_config_zero_capacity_rejected() {
738        let result = ConnectionConfig::new("wss://example.com", 0);
739        assert!(matches!(result, Err(StreamError::ConfigError { .. })));
740    }
741
742    #[test]
743    fn test_connection_config_with_reconnect() {
744        let policy =
745            ReconnectPolicy::new(3, Duration::from_millis(200), Duration::from_secs(10), 2.0)
746                .unwrap();
747        let config = default_config().with_reconnect(policy);
748        assert_eq!(config.reconnect.max_attempts, 3);
749    }
750
751    #[test]
752    fn test_connection_config_with_ping_interval() {
753        let config = default_config().with_ping_interval(Duration::from_secs(30));
754        assert_eq!(config.ping_interval, Duration::from_secs(30));
755    }
756
757    #[test]
758    fn test_ws_manager_initial_state() {
759        let mgr = WsManager::new(default_config());
760        assert!(!mgr.is_connected());
761        assert_eq!(mgr.connect_attempts(), 0);
762    }
763
764    #[test]
765    fn test_ws_manager_connect_simulated() {
766        let mut mgr = WsManager::new(default_config());
767        mgr.connect_simulated();
768        assert!(mgr.is_connected());
769        assert_eq!(mgr.connect_attempts(), 1);
770    }
771
772    #[test]
773    fn test_ws_manager_disconnect_simulated() {
774        let mut mgr = WsManager::new(default_config());
775        mgr.connect_simulated();
776        mgr.disconnect_simulated();
777        assert!(!mgr.is_connected());
778    }
779
780    #[test]
781    fn test_ws_manager_can_reconnect_within_limit() {
782        let mut mgr = WsManager::new(
783            default_config().with_reconnect(
784                ReconnectPolicy::new(3, Duration::from_millis(10), Duration::from_secs(1), 2.0)
785                    .unwrap(),
786            ),
787        );
788        assert!(mgr.can_reconnect());
789        mgr.next_reconnect_backoff().unwrap();
790        mgr.next_reconnect_backoff().unwrap();
791        mgr.next_reconnect_backoff().unwrap();
792        assert!(!mgr.can_reconnect());
793    }
794
795    #[test]
796    fn test_ws_manager_reconnect_exhausted_error() {
797        let mut mgr = WsManager::new(
798            default_config().with_reconnect(
799                ReconnectPolicy::new(1, Duration::from_millis(10), Duration::from_secs(1), 2.0)
800                    .unwrap(),
801            ),
802        );
803        mgr.next_reconnect_backoff().unwrap();
804        let result = mgr.next_reconnect_backoff();
805        assert!(matches!(
806            result,
807            Err(StreamError::ReconnectExhausted { .. })
808        ));
809    }
810
811    #[test]
812    fn test_ws_manager_backoff_increases() {
813        let mut mgr = WsManager::new(
814            default_config().with_reconnect(
815                ReconnectPolicy::new(5, Duration::from_millis(100), Duration::from_secs(30), 2.0)
816                    .unwrap(),
817            ),
818        );
819        let b0 = mgr.next_reconnect_backoff().unwrap();
820        let b1 = mgr.next_reconnect_backoff().unwrap();
821        assert!(b1 >= b0);
822    }
823
824    #[test]
825    fn test_ws_manager_config_accessor() {
826        let mgr = WsManager::new(default_config());
827        assert_eq!(mgr.config().url, "wss://example.com/ws");
828        assert_eq!(mgr.config().channel_capacity, 1024);
829    }
830
831    /// Verify that `recv_outbound` with `None` never resolves (returns pending).
832    /// We test this by racing it against a resolved future and confirming the
833    /// resolved future always wins.
834    #[tokio::test]
835    async fn test_recv_outbound_none_is_always_pending() {
836        let mut rx: Option<mpsc::Receiver<String>> = None;
837        // Race recv_outbound(None) against an immediately-ready future.
838        tokio::select! {
839            _ = recv_outbound(&mut rx) => {
840                panic!("recv_outbound(None) should never resolve");
841            }
842            _ = std::future::ready(()) => {
843                // Expected: the ready() future wins.
844            }
845        }
846    }
847
848    /// Verify that `recv_outbound` with `Some(rx)` resolves when a message arrives.
849    #[tokio::test]
850    async fn test_recv_outbound_some_resolves_with_message() {
851        let (tx, mut channel_rx) = mpsc::channel::<String>(1);
852        tx.send("subscribe".into()).await.unwrap();
853        let mut rx: Option<mpsc::Receiver<String>> = Some(channel_rx);
854        let msg = recv_outbound(&mut rx).await;
855        assert_eq!(msg.as_deref(), Some("subscribe"));
856        // Re-borrow to confirm the channel holds the receiver
857        let _ = rx;
858    }
859
860    #[test]
861    fn test_ws_stats_initial_zero() {
862        let mgr = WsManager::new(default_config());
863        let s = mgr.stats();
864        assert_eq!(s.total_messages_received, 0);
865        assert_eq!(s.total_bytes_received, 0);
866    }
867
868    #[test]
869    fn test_ws_stats_default() {
870        let s = WsStats::default();
871        assert_eq!(s.total_messages_received, 0);
872        assert_eq!(s.total_bytes_received, 0);
873    }
874
875    #[test]
876    fn test_reconnect_policy_with_jitter_valid() {
877        let p = ReconnectPolicy::new(10, Duration::from_millis(100), Duration::from_secs(30), 2.0)
878            .unwrap()
879            .with_jitter(0.5)
880            .unwrap();
881        assert_eq!(p.jitter, 0.5);
882    }
883
884    #[test]
885    fn test_reconnect_policy_with_jitter_zero_is_deterministic() {
886        let p = ReconnectPolicy::new(10, Duration::from_millis(100), Duration::from_secs(30), 2.0)
887            .unwrap()
888            .with_jitter(0.0)
889            .unwrap();
890        // Zero jitter: backoff must be identical to un-jittered value.
891        let b0 = p.backoff_for_attempt(0);
892        let b1 = p.backoff_for_attempt(0);
893        assert_eq!(b0, b1);
894        assert_eq!(b0, Duration::from_millis(100));
895    }
896
897    #[test]
898    fn test_reconnect_policy_with_jitter_invalid_ratio() {
899        let result =
900            ReconnectPolicy::new(10, Duration::from_millis(100), Duration::from_secs(30), 2.0)
901                .unwrap()
902                .with_jitter(1.5);
903        assert!(matches!(result, Err(StreamError::ConfigError { .. })));
904    }
905
906    #[test]
907    fn test_reconnect_policy_with_jitter_negative_ratio() {
908        let result =
909            ReconnectPolicy::new(10, Duration::from_millis(100), Duration::from_secs(30), 2.0)
910                .unwrap()
911                .with_jitter(-0.1);
912        assert!(matches!(result, Err(StreamError::ConfigError { .. })));
913    }
914
915    #[test]
916    fn test_reconnect_policy_with_jitter_stays_within_bounds() {
917        let p = ReconnectPolicy::new(20, Duration::from_millis(100), Duration::from_secs(30), 2.0)
918            .unwrap()
919            .with_jitter(1.0)
920            .unwrap();
921        // With ratio=1.0 the backoff can range [0, 2×base] but must not exceed max_backoff.
922        for attempt in 0..20 {
923            let b = p.backoff_for_attempt(attempt);
924            assert!(b <= Duration::from_secs(30), "attempt {attempt} exceeded max_backoff");
925        }
926    }
927
928    #[test]
929    fn test_reconnect_policy_with_max_attempts_valid() {
930        let p = ReconnectPolicy::default().with_max_attempts(5).unwrap();
931        assert_eq!(p.max_attempts, 5);
932    }
933
934    #[test]
935    fn test_reconnect_policy_with_max_attempts_zero_rejected() {
936        let result = ReconnectPolicy::default().with_max_attempts(0);
937        assert!(matches!(result, Err(StreamError::ConfigError { .. })));
938    }
939
940    #[test]
941    fn test_connection_config_with_reconnect_attempts_valid() {
942        let config = default_config().with_reconnect_attempts(3).unwrap();
943        assert_eq!(config.reconnect.max_attempts, 3);
944    }
945
946    #[test]
947    fn test_connection_config_with_reconnect_attempts_zero_rejected() {
948        let result = default_config().with_reconnect_attempts(0);
949        assert!(matches!(result, Err(StreamError::ConfigError { .. })));
950    }
951
952    #[test]
953    fn test_reconnect_policy_total_max_delay_sum_of_backoffs() {
954        let p = ReconnectPolicy::new(3, Duration::from_millis(100), Duration::from_secs(30), 2.0)
955            .unwrap();
956        // attempts 0,1,2 → 100ms, 200ms, 400ms = 700ms
957        assert_eq!(p.total_max_delay(), Duration::from_millis(700));
958    }
959
960    #[test]
961    fn test_reconnect_policy_total_max_delay_capped_by_max_backoff() {
962        // With small max_backoff, all delays are capped
963        let p = ReconnectPolicy::new(5, Duration::from_millis(1000), Duration::from_millis(500), 2.0)
964            .unwrap();
965        // All 5 attempts capped at 500ms → total = 2500ms
966        assert_eq!(p.total_max_delay(), Duration::from_millis(2500));
967    }
968
969    #[test]
970    fn test_connection_config_with_channel_capacity_valid() {
971        let config = default_config().with_channel_capacity(512).unwrap();
972        assert_eq!(config.channel_capacity, 512);
973    }
974
975    #[test]
976    fn test_connection_config_with_channel_capacity_zero_rejected() {
977        let result = default_config().with_channel_capacity(0);
978        assert!(matches!(result, Err(StreamError::ConfigError { .. })));
979    }
980
981    #[test]
982    fn test_reconnect_policy_with_jitter_varies_across_attempts() {
983        let p = ReconnectPolicy::new(10, Duration::from_millis(1000), Duration::from_secs(30), 1.0)
984            .unwrap()
985            .with_jitter(0.5)
986            .unwrap();
987        // With constant multiplier=1.0 all base backoffs are 1000ms; jitter should
988        // produce different values for different attempt indices.
989        let values: Vec<Duration> = (0..10).map(|a| p.backoff_for_attempt(a)).collect();
990        let unique: std::collections::HashSet<u64> =
991            values.iter().map(|d| d.as_millis() as u64).collect();
992        assert!(unique.len() > 1, "jitter should produce variation across attempts");
993    }
994
995    // ── ReconnectPolicy::with_initial_backoff / with_max_backoff ──────────────
996
997    #[test]
998    fn test_with_initial_backoff_sets_value() {
999        let p = ReconnectPolicy::default()
1000            .with_initial_backoff(Duration::from_secs(2));
1001        assert_eq!(p.initial_backoff, Duration::from_secs(2));
1002    }
1003
1004    #[test]
1005    fn test_with_max_backoff_sets_value() {
1006        let p = ReconnectPolicy::default()
1007            .with_max_backoff(Duration::from_secs(60));
1008        assert_eq!(p.max_backoff, Duration::from_secs(60));
1009    }
1010
1011    #[test]
1012    fn test_with_initial_backoff_affects_first_attempt() {
1013        let p = ReconnectPolicy::default()
1014            .with_initial_backoff(Duration::from_millis(200));
1015        assert_eq!(p.backoff_for_attempt(0), Duration::from_millis(200));
1016    }
1017
1018    // ── ReconnectPolicy::with_multiplier ──────────────────────────────────────
1019
1020    #[test]
1021    fn test_with_multiplier_valid() {
1022        let p = ReconnectPolicy::default().with_multiplier(3.0).unwrap();
1023        assert_eq!(p.multiplier, 3.0);
1024    }
1025
1026    #[test]
1027    fn test_with_multiplier_below_one_rejected() {
1028        let result = ReconnectPolicy::default().with_multiplier(0.9);
1029        assert!(matches!(result, Err(StreamError::ConfigError { .. })));
1030    }
1031
1032    #[test]
1033    fn test_with_multiplier_exactly_one_accepted() {
1034        let p = ReconnectPolicy::default().with_multiplier(1.0).unwrap();
1035        assert_eq!(p.multiplier, 1.0);
1036    }
1037
1038    // ── WsStats::message_rate / byte_rate ─────────────────────────────────────
1039
1040    #[test]
1041    fn test_message_rate_zero_elapsed_returns_zero() {
1042        let stats = WsStats {
1043            total_messages_received: 100,
1044            total_bytes_received: 50_000,
1045        };
1046        assert_eq!(stats.message_rate(0), 0.0);
1047        assert_eq!(stats.byte_rate(0), 0.0);
1048    }
1049
1050    #[test]
1051    fn test_message_rate_100_messages_in_1s() {
1052        let stats = WsStats {
1053            total_messages_received: 100,
1054            total_bytes_received: 0,
1055        };
1056        let rate = stats.message_rate(1_000); // 1 second = 1000ms
1057        assert!((rate - 100.0).abs() < 1e-9);
1058    }
1059
1060    #[test]
1061    fn test_byte_rate_1mb_in_1s() {
1062        let stats = WsStats {
1063            total_messages_received: 0,
1064            total_bytes_received: 1_000_000,
1065        };
1066        let rate = stats.byte_rate(1_000); // 1 second
1067        assert!((rate - 1_000_000.0).abs() < 1.0);
1068    }
1069
1070    // ── WsStats::avg_message_size ─────────────────────────────────────────────
1071
1072    #[test]
1073    fn test_avg_message_size_none_when_no_messages() {
1074        let stats = WsStats::default();
1075        assert!(stats.avg_message_size().is_none());
1076    }
1077
1078    #[test]
1079    fn test_avg_message_size_basic() {
1080        let stats = WsStats {
1081            total_messages_received: 10,
1082            total_bytes_received: 1_000,
1083        };
1084        let avg = stats.avg_message_size().unwrap();
1085        assert!((avg - 100.0).abs() < 1e-9);
1086    }
1087
1088    // ── WsStats::total_data_mb ────────────────────────────────────────────────
1089
1090    #[test]
1091    fn test_total_data_mb_zero_bytes() {
1092        let stats = WsStats::default();
1093        assert!((stats.total_data_mb() - 0.0).abs() < 1e-9);
1094    }
1095
1096    #[test]
1097    fn test_total_data_mb_one_mib() {
1098        let stats = WsStats {
1099            total_messages_received: 1,
1100            total_bytes_received: 1_048_576,
1101        };
1102        assert!((stats.total_data_mb() - 1.0).abs() < 1e-9);
1103    }
1104
1105    #[test]
1106    fn test_max_attempts_getter_matches_field() {
1107        let p = ReconnectPolicy::default();
1108        assert_eq!(p.max_attempts(), p.max_attempts);
1109    }
1110
1111    #[test]
1112    fn test_max_attempts_getter_after_new() {
1113        let p = ReconnectPolicy::new(
1114            7,
1115            std::time::Duration::from_millis(100),
1116            std::time::Duration::from_secs(30),
1117            2.0,
1118        )
1119        .unwrap();
1120        assert_eq!(p.max_attempts(), 7);
1121    }
1122
1123    // ── WsStats::is_idle ──────────────────────────────────────────────────────
1124
1125    #[test]
1126    fn test_is_idle_below_min_rate() {
1127        let stats = WsStats {
1128            total_messages_received: 1,
1129            total_bytes_received: 0,
1130        };
1131        // 1 msg / 10s = 0.1 msg/s; min_rate = 1.0 → idle
1132        assert!(stats.is_idle(10_000, 1.0));
1133    }
1134
1135    #[test]
1136    fn test_is_idle_above_min_rate() {
1137        let stats = WsStats {
1138            total_messages_received: 100,
1139            total_bytes_received: 0,
1140        };
1141        // 100 msg / 1s = 100 msg/s; min_rate = 1.0 → not idle
1142        assert!(!stats.is_idle(1_000, 1.0));
1143    }
1144
1145    #[test]
1146    fn test_is_idle_zero_messages_always_idle() {
1147        let stats = WsStats::default();
1148        assert!(stats.is_idle(1_000, 0.001));
1149    }
1150
1151    #[test]
1152    fn test_total_attempts_remaining_full() {
1153        let p = ReconnectPolicy::default(); // max_attempts = 10
1154        assert_eq!(p.total_attempts_remaining(0), 10);
1155    }
1156
1157    #[test]
1158    fn test_total_attempts_remaining_partial() {
1159        let p = ReconnectPolicy::default();
1160        assert_eq!(p.total_attempts_remaining(3), 7);
1161    }
1162
1163    #[test]
1164    fn test_total_attempts_remaining_exhausted() {
1165        let p = ReconnectPolicy::default();
1166        assert_eq!(p.total_attempts_remaining(10), 0);
1167        assert_eq!(p.total_attempts_remaining(99), 0);
1168    }
1169
1170    // ── WsStats::has_traffic ──────────────────────────────────────────────────
1171
1172    #[test]
1173    fn test_has_traffic_false_when_no_messages() {
1174        let stats = WsStats::default();
1175        assert!(!stats.has_traffic());
1176    }
1177
1178    #[test]
1179    fn test_has_traffic_true_after_one_message() {
1180        let stats = WsStats {
1181            total_messages_received: 1,
1182            total_bytes_received: 0,
1183        };
1184        assert!(stats.has_traffic());
1185    }
1186
1187    #[test]
1188    fn test_has_traffic_true_with_many_messages() {
1189        let stats = WsStats {
1190            total_messages_received: 1_000,
1191            total_bytes_received: 50_000,
1192        };
1193        assert!(stats.has_traffic());
1194    }
1195
1196    // ── WsStats::is_high_volume ───────────────────────────────────────────────
1197
1198    #[test]
1199    fn test_is_high_volume_true_at_threshold() {
1200        let stats = WsStats { total_messages_received: 1_000, total_bytes_received: 0 };
1201        assert!(stats.is_high_volume(1_000));
1202    }
1203
1204    #[test]
1205    fn test_is_high_volume_false_below_threshold() {
1206        let stats = WsStats { total_messages_received: 500, total_bytes_received: 0 };
1207        assert!(!stats.is_high_volume(1_000));
1208    }
1209
1210    #[test]
1211    fn test_is_high_volume_true_above_threshold() {
1212        let stats = WsStats { total_messages_received: 2_000, total_bytes_received: 0 };
1213        assert!(stats.is_high_volume(1_000));
1214    }
1215
1216    // ── WsStats::bytes_per_message ────────────────────────────────────────────
1217
1218    #[test]
1219    fn test_bytes_per_message_none_when_no_messages() {
1220        let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1221        assert!(stats.bytes_per_message().is_none());
1222    }
1223
1224    #[test]
1225    fn test_bytes_per_message_correct_value() {
1226        let stats = WsStats { total_messages_received: 4, total_bytes_received: 400 };
1227        assert_eq!(stats.bytes_per_message(), Some(100.0));
1228    }
1229
1230    #[test]
1231    fn test_bytes_per_message_fractional() {
1232        let stats = WsStats { total_messages_received: 3, total_bytes_received: 10 };
1233        let bpm = stats.bytes_per_message().unwrap();
1234        assert!((bpm - 10.0 / 3.0).abs() < 1e-10);
1235    }
1236
1237    // --- delay_for_next ---
1238
1239    #[test]
1240    fn test_delay_for_next_is_backoff_for_attempt_plus_one() {
1241        let policy = ReconnectPolicy::new(
1242            10,
1243            Duration::from_millis(100),
1244            Duration::from_secs(60),
1245            2.0,
1246        )
1247        .unwrap();
1248        assert_eq!(
1249            policy.delay_for_next(0),
1250            policy.backoff_for_attempt(1)
1251        );
1252        assert_eq!(
1253            policy.delay_for_next(3),
1254            policy.backoff_for_attempt(4)
1255        );
1256    }
1257
1258    #[test]
1259    fn test_delay_for_next_saturates_at_max_backoff() {
1260        let policy = ReconnectPolicy::new(
1261            10,
1262            Duration::from_millis(100),
1263            Duration::from_secs(1),
1264            2.0,
1265        )
1266        .unwrap();
1267        // After many attempts the delay is capped at max_backoff
1268        assert!(policy.delay_for_next(100) <= Duration::from_secs(1));
1269    }
1270
1271    // ── WsStats::message_rate ─────────────────────────────────────────────────
1272
1273    #[test]
1274    fn test_message_rate_zero_when_elapsed_is_zero() {
1275        let stats = WsStats { total_messages_received: 1_000, total_bytes_received: 0 };
1276        assert_eq!(stats.message_rate(0), 0.0);
1277    }
1278
1279    #[test]
1280    fn test_message_rate_correct_value() {
1281        let stats = WsStats { total_messages_received: 100, total_bytes_received: 0 };
1282        // 100 messages in 10_000ms = 10 msg/s
1283        assert!((stats.message_rate(10_000) - 10.0).abs() < 1e-10);
1284    }
1285
1286    #[test]
1287    fn test_message_rate_zero_messages() {
1288        let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1289        assert_eq!(stats.message_rate(5_000), 0.0);
1290    }
1291
1292    // ── WsStats::total_data_mb ────────────────────────────────────────────────
1293
1294    #[test]
1295    fn test_total_data_mb_zero_when_no_bytes() {
1296        let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1297        assert_eq!(stats.total_data_mb(), 0.0);
1298    }
1299
1300    #[test]
1301    fn test_total_data_mb_fractional() {
1302        let stats = WsStats { total_messages_received: 1, total_bytes_received: 524_288 };
1303        assert!((stats.total_data_mb() - 0.5).abs() < 1e-10);
1304    }
1305
1306    // --- is_exhausted ---
1307
1308    #[test]
1309    fn test_is_exhausted_true_at_max_attempts() {
1310        let policy = ReconnectPolicy::new(5, Duration::from_millis(100), Duration::from_secs(10), 2.0).unwrap();
1311        assert!(policy.is_exhausted(5));
1312    }
1313
1314    #[test]
1315    fn test_is_exhausted_true_beyond_max_attempts() {
1316        let policy = ReconnectPolicy::new(5, Duration::from_millis(100), Duration::from_secs(10), 2.0).unwrap();
1317        assert!(policy.is_exhausted(10));
1318    }
1319
1320    #[test]
1321    fn test_is_exhausted_false_below_max_attempts() {
1322        let policy = ReconnectPolicy::new(5, Duration::from_millis(100), Duration::from_secs(10), 2.0).unwrap();
1323        assert!(!policy.is_exhausted(4));
1324    }
1325
1326    // --- WsStats::total_data_kb ---
1327
1328    #[test]
1329    fn test_total_data_kb_zero_when_no_bytes() {
1330        let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1331        assert_eq!(stats.total_data_kb(), 0.0);
1332    }
1333
1334    #[test]
1335    fn test_total_data_kb_one_kib() {
1336        let stats = WsStats { total_messages_received: 1, total_bytes_received: 1_024 };
1337        assert!((stats.total_data_kb() - 1.0).abs() < 1e-10);
1338    }
1339
1340    #[test]
1341    fn test_total_data_kb_equals_1024_times_mb() {
1342        let stats = WsStats { total_messages_received: 1, total_bytes_received: 2_097_152 };
1343        assert!((stats.total_data_kb() - stats.total_data_mb() * 1_024.0).abs() < 1e-6);
1344    }
1345
1346    // --- WsStats::bandwidth_bps ---
1347
1348    #[test]
1349    fn test_bandwidth_bps_zero_when_elapsed_zero() {
1350        let stats = WsStats { total_messages_received: 0, total_bytes_received: 1_000 };
1351        assert_eq!(stats.bandwidth_bps(0), 0.0);
1352    }
1353
1354    #[test]
1355    fn test_bandwidth_bps_correct_value() {
1356        let stats = WsStats { total_messages_received: 1, total_bytes_received: 10_000 };
1357        // 10_000 bytes / 1s = 10_000 bps
1358        assert!((stats.bandwidth_bps(1_000) - 10_000.0).abs() < 1e-6);
1359    }
1360
1361    #[test]
1362    fn test_bandwidth_bps_zero_bytes() {
1363        let stats = WsStats { total_messages_received: 5, total_bytes_received: 0 };
1364        assert_eq!(stats.bandwidth_bps(5_000), 0.0);
1365    }
1366
1367    // --- WsStats::messages_per_byte ---
1368
1369    #[test]
1370    fn test_messages_per_byte_none_when_no_bytes() {
1371        let stats = WsStats { total_messages_received: 5, total_bytes_received: 0 };
1372        assert!(stats.messages_per_byte().is_none());
1373    }
1374
1375    #[test]
1376    fn test_messages_per_byte_correct_value() {
1377        let stats = WsStats { total_messages_received: 100, total_bytes_received: 10_000 };
1378        // 100 / 10_000 = 0.01
1379        assert!((stats.messages_per_byte().unwrap() - 0.01).abs() < 1e-12);
1380    }
1381
1382    #[test]
1383    fn test_messages_per_byte_less_than_one_for_large_messages() {
1384        let stats = WsStats { total_messages_received: 1, total_bytes_received: 500 };
1385        let mpb = stats.messages_per_byte().unwrap();
1386        assert!(mpb < 1.0);
1387    }
1388
1389    // --- WsStats::avg_message_size_bytes ---
1390    #[test]
1391    fn test_avg_message_size_bytes_none_when_no_messages() {
1392        let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1393        assert!(stats.avg_message_size_bytes().is_none());
1394    }
1395
1396    #[test]
1397    fn test_avg_message_size_bytes_correct_value() {
1398        let stats = WsStats { total_messages_received: 10, total_bytes_received: 5_000 };
1399        assert!((stats.avg_message_size_bytes().unwrap() - 500.0).abs() < 1e-12);
1400    }
1401
1402    #[test]
1403    fn test_avg_message_size_bytes_one_message() {
1404        let stats = WsStats { total_messages_received: 1, total_bytes_received: 256 };
1405        assert!((stats.avg_message_size_bytes().unwrap() - 256.0).abs() < 1e-12);
1406    }
1407
1408    // --- WsStats::bandwidth_kbps ---
1409    #[test]
1410    fn test_bandwidth_kbps_zero_when_elapsed_zero() {
1411        let stats = WsStats { total_messages_received: 10, total_bytes_received: 50_000 };
1412        assert_eq!(stats.bandwidth_kbps(0), 0.0);
1413    }
1414
1415    #[test]
1416    fn test_bandwidth_kbps_correct_value() {
1417        // 100_000 bytes in 1_000ms = 100KB/s = 100*8 = 800 kbps
1418        let stats = WsStats { total_messages_received: 1, total_bytes_received: 100_000 };
1419        let kbps = stats.bandwidth_kbps(1_000);
1420        assert!((kbps - 800.0).abs() < 1e-10, "got {kbps}");
1421    }
1422
1423    #[test]
1424    fn test_bandwidth_kbps_zero_when_no_data() {
1425        let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1426        assert_eq!(stats.bandwidth_kbps(5_000), 0.0);
1427    }
1428
1429    // ── WsStats::total_data_gb / is_active ─────────────────────────────────
1430
1431    #[test]
1432    fn test_total_data_gb_zero_when_no_bytes() {
1433        let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1434        assert_eq!(stats.total_data_gb(), 0.0);
1435    }
1436
1437    #[test]
1438    fn test_total_data_gb_one_gib() {
1439        let stats = WsStats { total_messages_received: 1, total_bytes_received: 1_073_741_824 };
1440        assert!((stats.total_data_gb() - 1.0).abs() < 1e-10);
1441    }
1442
1443    #[test]
1444    fn test_total_data_gb_equals_1024_times_mb() {
1445        let stats = WsStats { total_messages_received: 1, total_bytes_received: 2_147_483_648 };
1446        assert!((stats.total_data_gb() - stats.total_data_mb() / 1_024.0).abs() < 1e-6);
1447    }
1448
1449    #[test]
1450    fn test_is_active_false_when_no_messages() {
1451        let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1452        assert!(!stats.is_active(1));
1453    }
1454
1455    #[test]
1456    fn test_is_active_true_at_threshold() {
1457        let stats = WsStats { total_messages_received: 100, total_bytes_received: 0 };
1458        assert!(stats.is_active(100));
1459    }
1460
1461    #[test]
1462    fn test_is_active_false_below_threshold() {
1463        let stats = WsStats { total_messages_received: 50, total_bytes_received: 0 };
1464        assert!(!stats.is_active(100));
1465    }
1466
1467    // ── WsStats::has_received_bytes / efficiency_ratio ──────────────────────
1468
1469    #[test]
1470    fn test_has_received_bytes_false_when_no_bytes() {
1471        let stats = WsStats { total_messages_received: 5, total_bytes_received: 0 };
1472        assert!(!stats.has_received_bytes());
1473    }
1474
1475    #[test]
1476    fn test_has_received_bytes_true_when_bytes_present() {
1477        let stats = WsStats { total_messages_received: 1, total_bytes_received: 100 };
1478        assert!(stats.has_received_bytes());
1479    }
1480
1481    #[test]
1482    fn test_efficiency_ratio_none_when_no_bytes() {
1483        let stats = WsStats { total_messages_received: 10, total_bytes_received: 0 };
1484        assert!(stats.efficiency_ratio().is_none());
1485    }
1486
1487    #[test]
1488    fn test_efficiency_ratio_correct_value() {
1489        let stats = WsStats { total_messages_received: 100, total_bytes_received: 10_000 };
1490        // 100 / 10_000 = 0.01
1491        assert!((stats.efficiency_ratio().unwrap() - 0.01).abs() < 1e-12);
1492    }
1493
1494    #[test]
1495    fn test_efficiency_ratio_less_than_one_for_large_messages() {
1496        let stats = WsStats { total_messages_received: 1, total_bytes_received: 500 };
1497        assert!(stats.efficiency_ratio().unwrap() < 1.0);
1498    }
1499
1500    // ── WsStats::message_density / compression_ratio ────────────────────────
1501
1502    #[test]
1503    fn test_message_density_same_as_message_rate() {
1504        let stats = WsStats { total_messages_received: 100, total_bytes_received: 0 };
1505        assert!((stats.message_density(1_000) - stats.message_rate(1_000)).abs() < 1e-12);
1506    }
1507
1508    #[test]
1509    fn test_message_density_zero_when_elapsed_zero() {
1510        let stats = WsStats { total_messages_received: 100, total_bytes_received: 0 };
1511        assert_eq!(stats.message_density(0), 0.0);
1512    }
1513
1514    #[test]
1515    fn test_compression_ratio_none_when_no_bytes() {
1516        let stats = WsStats { total_messages_received: 5, total_bytes_received: 0 };
1517        assert!(stats.compression_ratio().is_none());
1518    }
1519
1520    #[test]
1521    fn test_compression_ratio_same_as_efficiency_ratio() {
1522        let stats = WsStats { total_messages_received: 100, total_bytes_received: 10_000 };
1523        assert_eq!(stats.compression_ratio(), stats.efficiency_ratio());
1524    }
1525
1526    // ── WsStats::uptime_fraction ──────────────────────────────────────────────
1527
1528    #[test]
1529    fn test_uptime_fraction_zero_when_elapsed_zero() {
1530        let stats = WsStats { total_messages_received: 100, total_bytes_received: 50_000 };
1531        assert_eq!(stats.uptime_fraction(0), 0.0);
1532    }
1533
1534    #[test]
1535    fn test_uptime_fraction_zero_when_no_bytes() {
1536        let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1537        assert_eq!(stats.uptime_fraction(60_000), 0.0);
1538    }
1539
1540    #[test]
1541    fn test_uptime_fraction_nonzero_with_bytes() {
1542        let stats = WsStats { total_messages_received: 100, total_bytes_received: 1_000 };
1543        // bytes/ms = 1000/60000 ≈ 0.0167
1544        let f = stats.uptime_fraction(60_000);
1545        assert!(f > 0.0 && f <= 1.0);
1546    }
1547
1548    #[test]
1549    fn test_uptime_fraction_clamped_to_one() {
1550        // Very high byte count relative to elapsed → capped at 1.0
1551        let stats = WsStats { total_messages_received: 0, total_bytes_received: 1_000_000 };
1552        assert_eq!(stats.uptime_fraction(100), 1.0);
1553    }
1554
1555    // ── WsStats::is_idle ──────────────────────────────────────────────────────
1556
1557    #[test]
1558    fn test_is_idle_true_when_no_messages() {
1559        let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1560        assert!(stats.is_idle(60_000, 1.0));
1561    }
1562
1563    #[test]
1564    fn test_is_idle_false_when_high_message_rate() {
1565        let stats = WsStats { total_messages_received: 1_000, total_bytes_received: 0 };
1566        // rate = 1000 msg / 1s = 1000/s, threshold = 1.0
1567        assert!(!stats.is_idle(1_000, 1.0));
1568    }
1569
1570    #[test]
1571    fn test_is_idle_true_when_elapsed_zero() {
1572        let stats = WsStats { total_messages_received: 100, total_bytes_received: 0 };
1573        // message_rate returns 0.0 when elapsed_ms=0 → always idle
1574        assert!(stats.is_idle(0, 1.0));
1575    }
1576
1577    // ── WsStats::average_message_size_bytes ───────────────────────────────────
1578
1579    #[test]
1580    fn test_average_message_size_bytes_none_when_no_messages() {
1581        let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1582        assert!(stats.average_message_size_bytes().is_none());
1583    }
1584
1585    #[test]
1586    fn test_average_message_size_bytes_same_as_bytes_per_message() {
1587        let stats = WsStats { total_messages_received: 10, total_bytes_received: 1_000 };
1588        assert_eq!(stats.average_message_size_bytes(), stats.bytes_per_message());
1589    }
1590}