Skip to main content

fin_stream/ws/
mod.rs

1//! WebSocket connection management — auto-reconnect and backpressure.
2//!
3//! ## Responsibility
4//! Manage the lifecycle of a WebSocket feed connection: connect, receive
5//! messages, detect disconnections, apply exponential backoff reconnect,
6//! and propagate backpressure when the downstream channel is full.
7//!
8//! ## Guarantees
9//! - Non-panicking: all operations return Result
10//! - Configurable: reconnect policy and buffer sizes are constructor params
11
12use crate::error::StreamError;
13use futures_util::{SinkExt, StreamExt};
14use std::time::Duration;
15use tokio::sync::mpsc;
16use tokio::time;
17use tokio_tungstenite::{connect_async, tungstenite::Message};
18use tracing::{debug, info, warn};
19
20/// Statistics collected during WebSocket operation.
21#[derive(Debug, Clone, Copy, Default)]
22pub struct WsStats {
23    /// Total number of messages received (text + binary combined).
24    pub total_messages_received: u64,
25    /// Total bytes received across all messages.
26    pub total_bytes_received: u64,
27}
28
29impl WsStats {
30    /// Messages per second over the given elapsed window.
31    ///
32    /// Returns `0.0` if `elapsed_ms` is zero (avoids division by zero).
33    pub fn message_rate(&self, elapsed_ms: u64) -> f64 {
34        if elapsed_ms == 0 {
35            return 0.0;
36        }
37        self.total_messages_received as f64 / (elapsed_ms as f64 / 1000.0)
38    }
39
40    /// Bytes per second over the given elapsed window.
41    ///
42    /// Returns `0.0` if `elapsed_ms` is zero.
43    pub fn byte_rate(&self, elapsed_ms: u64) -> f64 {
44        if elapsed_ms == 0 {
45            return 0.0;
46        }
47        self.total_bytes_received as f64 / (elapsed_ms as f64 / 1000.0)
48    }
49
50    /// Average bytes per message: `total_bytes / total_messages`.
51    ///
52    /// Returns `None` if no messages have been received (avoids division by zero).
53    #[deprecated(since = "2.2.0", note = "Use `bytes_per_message()` instead")]
54    pub fn avg_message_size(&self) -> Option<f64> {
55        self.bytes_per_message()
56    }
57
58    /// Total bytes received expressed as mebibytes (MiB): `total_bytes / 1_048_576.0`.
59    pub fn total_data_mb(&self) -> f64 {
60        self.total_bytes_received as f64 / 1_048_576.0
61    }
62
63    /// Total bytes received expressed as kibibytes (KiB): `total_bytes / 1_024.0`.
64    pub fn total_data_kb(&self) -> f64 {
65        self.total_data_mb() * 1_024.0
66    }
67
68    /// Average received bandwidth in bytes per second over `elapsed_ms`.
69    ///
70    /// Returns `0.0` when `elapsed_ms` is zero (avoids division by zero).
71    pub fn bandwidth_bps(&self, elapsed_ms: u64) -> f64 {
72        self.byte_rate(elapsed_ms)
73    }
74
75    /// Average number of messages per byte received.
76    ///
77    /// Returns `None` when no bytes have been received (avoids division by
78    /// zero). A higher value means smaller average message size.
79    #[deprecated(since = "2.2.0", note = "Use `efficiency_ratio()` instead")]
80    pub fn messages_per_byte(&self) -> Option<f64> {
81        self.efficiency_ratio()
82    }
83
84    /// Average bytes per message received.
85    ///
86    /// Returns `None` if no messages have been received.
87    #[deprecated(since = "2.2.0", note = "Use `bytes_per_message()` instead")]
88    pub fn avg_message_size_bytes(&self) -> Option<f64> {
89        self.bytes_per_message()
90    }
91
92    /// Bits per second received over `elapsed_ms` (bytes × 8 / seconds).
93    ///
94    /// Returns `0.0` when `elapsed_ms` is zero.
95    pub fn bandwidth_kbps(&self, elapsed_ms: u64) -> f64 {
96        self.byte_rate(elapsed_ms) * 8.0 / 1_000.0
97    }
98
99    /// Returns `true` if the current message rate is below `min_rate` (msgs/s).
100    ///
101    /// Returns `true` when `elapsed_ms` is zero (no time elapsed → rate = 0).
102    /// Useful for detecting stalled or silent feeds.
103    pub fn is_idle(&self, elapsed_ms: u64, min_rate: f64) -> bool {
104        self.message_rate(elapsed_ms) < min_rate
105    }
106
107    /// Returns `true` if at least one message has been received.
108    pub fn has_traffic(&self) -> bool {
109        self.total_messages_received > 0
110    }
111
112    /// Returns `true` if `total_messages_received >= threshold`.
113    pub fn is_high_volume(&self, threshold: u64) -> bool {
114        self.total_messages_received >= threshold
115    }
116
117    /// Average message size in bytes: alias for [`bytes_per_message`](Self::bytes_per_message).
118    ///
119    /// Returns `None` if no messages have been received yet.
120    #[deprecated(since = "2.2.0", note = "Use `bytes_per_message()` instead")]
121    pub fn average_message_size_bytes(&self) -> Option<f64> {
122        self.bytes_per_message()
123    }
124
125    /// Average bytes per message received so far.
126    ///
127    /// Returns `None` if no messages have been received yet.
128    pub fn bytes_per_message(&self) -> Option<f64> {
129        if self.total_messages_received == 0 {
130            return None;
131        }
132        Some(self.total_bytes_received as f64 / self.total_messages_received as f64)
133    }
134
135    /// Total bytes received expressed as gibibytes (GiB): `total_bytes / 1_073_741_824.0`.
136    pub fn total_data_gb(&self) -> f64 {
137        self.total_data_mb() / 1_024.0
138    }
139
140    /// Returns `true` if at least `min_messages` have been received.
141    pub fn is_active(&self, min_messages: u64) -> bool {
142        self.is_high_volume(min_messages)
143    }
144
145    /// Returns `true` if any bytes have been received.
146    pub fn has_received_bytes(&self) -> bool {
147        self.total_bytes_received > 0
148    }
149
150    /// Messages per byte received: `total_messages / total_bytes`.
151    ///
152    /// Higher values indicate smaller average message sizes.
153    /// Returns `None` if no bytes have been received.
154    pub fn efficiency_ratio(&self) -> Option<f64> {
155        if self.total_bytes_received == 0 {
156            return None;
157        }
158        Some(self.total_messages_received as f64 / self.total_bytes_received as f64)
159    }
160
161    /// Messages received per second over `elapsed_ms`: alias for `message_rate`.
162    ///
163    /// Returns `0.0` if `elapsed_ms` is zero.
164    #[deprecated(since = "2.2.0", note = "Use `message_rate()` instead")]
165    pub fn message_density(&self, elapsed_ms: u64) -> f64 {
166        self.message_rate(elapsed_ms)
167    }
168
169    /// Ratio of messages to bytes: useful as a compression proxy.
170    ///
171    /// Higher values indicate smaller, more "compressed" messages.
172    /// Returns `None` if no bytes have been received.
173    #[deprecated(since = "2.2.0", note = "Use `efficiency_ratio()` instead")]
174    pub fn compression_ratio(&self) -> Option<f64> {
175        self.efficiency_ratio()
176    }
177
178    /// Fraction of `total_ms` during which the connection was active.
179    ///
180    /// Estimated as `total_messages_received / (total_ms / 1000.0 * expected_rate)`.
181    /// Here we use a simple proxy: `bytes_received / total_ms` (bytes per ms),
182    /// normalized so that 0.0 means idle and higher values indicate more activity.
183    ///
184    /// Returns `0.0` if `total_ms` is zero.
185    pub fn uptime_fraction(&self, total_ms: u64) -> f64 {
186        if total_ms == 0 {
187            return 0.0;
188        }
189        (self.total_bytes_received as f64 / total_ms as f64).min(1.0)
190    }
191
192
193}
194
195/// Reconnection policy for a WebSocket feed.
196///
197/// Controls exponential-backoff reconnect behaviour. Build with
198/// [`ReconnectPolicy::new`] or use [`Default`] for sensible defaults.
199#[derive(Debug, Clone)]
200pub struct ReconnectPolicy {
201    /// Maximum number of reconnect attempts before giving up.
202    pub max_attempts: u32,
203    /// Initial backoff delay for the first reconnect attempt.
204    pub initial_backoff: Duration,
205    /// Maximum backoff delay (cap for exponential growth).
206    pub max_backoff: Duration,
207    /// Multiplier applied to the backoff on each successive attempt (must be >= 1.0).
208    pub multiplier: f64,
209    /// Jitter ratio in [0.0, 1.0]: the computed backoff is offset by up to
210    /// `±ratio × backoff` using a deterministic per-attempt hash. 0.0 = no jitter.
211    pub jitter: f64,
212}
213
214impl ReconnectPolicy {
215    /// Build a reconnect policy with explicit parameters.
216    ///
217    /// # Errors
218    ///
219    /// Returns [`StreamError::ConfigError`] if `multiplier < 1.0` (which would
220    /// cause backoff to shrink over time) or if `max_attempts == 0`.
221    pub fn new(
222        max_attempts: u32,
223        initial_backoff: Duration,
224        max_backoff: Duration,
225        multiplier: f64,
226    ) -> Result<Self, StreamError> {
227        if multiplier < 1.0 {
228            return Err(StreamError::ConfigError {
229                reason: format!(
230                    "reconnect multiplier must be >= 1.0, got {multiplier}"
231                ),
232            });
233        }
234        if max_attempts == 0 {
235            return Err(StreamError::ConfigError {
236                reason: "max_attempts must be > 0".into(),
237            });
238        }
239        Ok(Self {
240            max_attempts,
241            initial_backoff,
242            max_backoff,
243            multiplier,
244            jitter: 0.0,
245        })
246    }
247
248    /// Set the maximum number of reconnect attempts.
249    ///
250    /// # Errors
251    ///
252    /// Returns [`StreamError::ConfigError`] if `max_attempts` is zero.
253    pub fn with_max_attempts(mut self, max_attempts: u32) -> Result<Self, StreamError> {
254        if max_attempts == 0 {
255            return Err(StreamError::ConfigError {
256                reason: "max_attempts must be > 0".into(),
257            });
258        }
259        self.max_attempts = max_attempts;
260        Ok(self)
261    }
262
263    /// Set the exponential backoff multiplier.
264    ///
265    /// # Errors
266    ///
267    /// Returns [`StreamError::ConfigError`] if `multiplier < 1.0` (which would
268    /// cause the backoff to shrink on each attempt).
269    pub fn with_multiplier(mut self, multiplier: f64) -> Result<Self, StreamError> {
270        if multiplier < 1.0 {
271            return Err(StreamError::ConfigError {
272                reason: format!("reconnect multiplier must be >= 1.0, got {multiplier}"),
273            });
274        }
275        self.multiplier = multiplier;
276        Ok(self)
277    }
278
279    /// Set the initial backoff duration for the first reconnect attempt.
280    pub fn with_initial_backoff(mut self, duration: Duration) -> Self {
281        self.initial_backoff = duration;
282        self
283    }
284
285    /// Set the maximum backoff duration (cap for exponential growth).
286    pub fn with_max_backoff(mut self, duration: Duration) -> Self {
287        self.max_backoff = duration;
288        self
289    }
290
291    /// Apply deterministic per-attempt jitter to the computed backoff.
292    ///
293    /// `ratio` must be in `[0.0, 1.0]`. The effective backoff for attempt N
294    /// will be spread uniformly over `[backoff*(1-ratio), backoff*(1+ratio)]`
295    /// using a hash of the attempt index — no `rand` dependency needed.
296    ///
297    /// # Errors
298    ///
299    /// Returns [`StreamError::ConfigError`] if `ratio` is outside `[0.0, 1.0]`.
300    pub fn with_jitter(mut self, ratio: f64) -> Result<Self, StreamError> {
301        if !(0.0..=1.0).contains(&ratio) {
302            return Err(StreamError::ConfigError {
303                reason: format!("jitter ratio must be in [0.0, 1.0], got {ratio}"),
304            });
305        }
306        self.jitter = ratio;
307        Ok(self)
308    }
309
310    /// Sum of all backoff delays across every reconnect attempt.
311    ///
312    /// Useful for estimating the worst-case time before a client gives up.
313    /// The result is capped at `max_backoff * max_attempts` to avoid overflow.
314    pub fn total_max_delay(&self) -> Duration {
315        let total_ms: u64 = (0..self.max_attempts)
316            .map(|a| self.backoff_for_attempt(a).as_millis() as u64)
317            .fold(0u64, |acc, ms| acc.saturating_add(ms));
318        Duration::from_millis(total_ms)
319    }
320
321    /// Maximum number of reconnect attempts before the client gives up.
322    pub fn max_attempts(&self) -> u32 {
323        self.max_attempts
324    }
325
326    /// Number of attempts remaining starting from `current_attempt` (0-indexed).
327    ///
328    /// Returns `0` if `current_attempt >= max_attempts`.
329    pub fn total_attempts_remaining(&self, current_attempt: u32) -> u32 {
330        self.max_attempts.saturating_sub(current_attempt)
331    }
332
333    /// Backoff delay that will be applied *after* `current_attempt` completes.
334    ///
335    /// Equivalent to `backoff_for_attempt(current_attempt + 1)`, capped at
336    /// `max_backoff`. Saturates rather than wrapping when `current_attempt`
337    /// is `u32::MAX`.
338    pub fn delay_for_next(&self, current_attempt: u32) -> Duration {
339        self.backoff_for_attempt(current_attempt.saturating_add(1))
340    }
341
342    /// Returns `true` if `attempts` has reached or exceeded `max_attempts`.
343    ///
344    /// After this returns `true` the reconnect loop should give up rather than
345    /// scheduling another attempt.
346    pub fn is_exhausted(&self, attempts: u32) -> bool {
347        attempts >= self.max_attempts
348    }
349
350    /// Backoff duration for attempt N (0-indexed).
351    pub fn backoff_for_attempt(&self, attempt: u32) -> Duration {
352        let factor = self.multiplier.powi(attempt as i32);
353        // Cap the f64 value *before* casting to u64. When `attempt` is large
354        // (e.g. 63 with multiplier=2.0), `factor` becomes f64::INFINITY.
355        // Casting f64::INFINITY as u64 is undefined behaviour in Rust — it
356        // saturates to 0 on some targets and panics in debug builds. Clamping
357        // to max_backoff in floating-point space first avoids the UB entirely.
358        let max_ms = self.max_backoff.as_millis() as f64;
359        let base_ms = (self.initial_backoff.as_millis() as f64 * factor).min(max_ms);
360        let ms = if self.jitter > 0.0 {
361            // Deterministic noise via Knuth multiplicative hash of the attempt index.
362            let hash = (attempt as u64)
363                .wrapping_mul(2654435769)
364                .wrapping_add(1013904223);
365            let noise = (hash & 0xFFFF) as f64 / 65535.0; // [0.0, 1.0]
366            let delta = base_ms * self.jitter * (noise * 2.0 - 1.0); // ±jitter×base
367            (base_ms + delta).clamp(0.0, max_ms)
368        } else {
369            base_ms
370        };
371        Duration::from_millis(ms as u64)
372    }
373}
374
375impl Default for ReconnectPolicy {
376    fn default() -> Self {
377        Self {
378            max_attempts: 10,
379            initial_backoff: Duration::from_millis(500),
380            max_backoff: Duration::from_secs(30),
381            multiplier: 2.0,
382            jitter: 0.0,
383        }
384    }
385}
386
387/// Configuration for a WebSocket feed connection.
388#[derive(Debug, Clone)]
389pub struct ConnectionConfig {
390    /// WebSocket URL to connect to (e.g. `"wss://stream.binance.com:9443/ws"`).
391    pub url: String,
392    /// Capacity of the downstream channel that receives incoming messages.
393    pub channel_capacity: usize,
394    /// Reconnect policy applied on disconnection.
395    pub reconnect: ReconnectPolicy,
396    /// Ping interval to keep the connection alive (default: 20 s).
397    pub ping_interval: Duration,
398}
399
400impl ConnectionConfig {
401    /// Build a connection configuration for `url` with the given downstream
402    /// channel capacity.
403    ///
404    /// # Errors
405    ///
406    /// Returns [`StreamError::ConfigError`] if `url` is empty or
407    /// `channel_capacity` is zero.
408    pub fn new(url: impl Into<String>, channel_capacity: usize) -> Result<Self, StreamError> {
409        let url = url.into();
410        if url.is_empty() {
411            return Err(StreamError::ConfigError {
412                reason: "WebSocket URL must not be empty".into(),
413            });
414        }
415        if channel_capacity == 0 {
416            return Err(StreamError::ConfigError {
417                reason: "channel_capacity must be > 0".into(),
418            });
419        }
420        Ok(Self {
421            url,
422            channel_capacity,
423            reconnect: ReconnectPolicy::default(),
424            ping_interval: Duration::from_secs(20),
425        })
426    }
427
428    /// Override the default reconnect policy.
429    pub fn with_reconnect(mut self, policy: ReconnectPolicy) -> Self {
430        self.reconnect = policy;
431        self
432    }
433
434    /// Override the keepalive ping interval (default: 20 s).
435    pub fn with_ping_interval(mut self, interval: Duration) -> Self {
436        self.ping_interval = interval;
437        self
438    }
439
440    /// Shortcut to set only the reconnect attempt limit without replacing the
441    /// entire reconnect policy.
442    ///
443    /// # Errors
444    ///
445    /// Returns [`StreamError::ConfigError`] if `n` is zero.
446    pub fn with_reconnect_attempts(mut self, n: u32) -> Result<Self, StreamError> {
447        self.reconnect = self.reconnect.with_max_attempts(n)?;
448        Ok(self)
449    }
450
451    /// Override the downstream channel capacity.
452    ///
453    /// # Errors
454    ///
455    /// Returns [`StreamError::ConfigError`] if `capacity` is zero.
456    pub fn with_channel_capacity(mut self, capacity: usize) -> Result<Self, StreamError> {
457        if capacity == 0 {
458            return Err(StreamError::ConfigError {
459                reason: "channel_capacity must be > 0".into(),
460            });
461        }
462        self.channel_capacity = capacity;
463        Ok(self)
464    }
465}
466
467/// Manages a single WebSocket feed: connect, receive, reconnect.
468///
469/// Call [`WsManager::run`] to enter the connection loop. Messages are forwarded
470/// to the [`mpsc::Sender`] supplied to `run`; the loop retries on disconnection
471/// according to the [`ReconnectPolicy`] in the [`ConnectionConfig`].
472pub struct WsManager {
473    config: ConnectionConfig,
474    connect_attempts: u32,
475    is_connected: bool,
476    stats: WsStats,
477}
478
479impl WsManager {
480    /// Create a new manager from a validated [`ConnectionConfig`].
481    pub fn new(config: ConnectionConfig) -> Self {
482        Self {
483            config,
484            connect_attempts: 0,
485            is_connected: false,
486            stats: WsStats::default(),
487        }
488    }
489
490    /// Run the WebSocket connection loop, forwarding text messages to `message_tx`.
491    ///
492    /// The loop connects, reads frames until the socket closes or errors, then
493    /// waits the configured backoff and reconnects. Returns when either:
494    /// - `message_tx` is closed (receiver dropped), or
495    /// - reconnect attempts are exhausted ([`StreamError::ReconnectExhausted`]).
496    ///
497    /// `outbound_rx` is an optional channel for sending messages **to** the
498    /// server (e.g., subscription requests). When provided, any string received
499    /// on this channel is forwarded to the WebSocket as a text frame.
500    ///
501    /// # Errors
502    ///
503    /// Returns [`StreamError::ReconnectExhausted`] after all reconnect slots
504    /// are consumed, or the underlying connection error if reconnects are
505    /// exhausted immediately on the first attempt.
506    pub async fn run(
507        &mut self,
508        message_tx: mpsc::Sender<String>,
509        mut outbound_rx: Option<mpsc::Receiver<String>>,
510    ) -> Result<(), StreamError> {
511        loop {
512            info!(url = %self.config.url, attempt = self.connect_attempts, "connecting");
513            match self.try_connect(&message_tx, &mut outbound_rx).await {
514                Ok(()) => {
515                    // Clean close — receiver dropped or server sent Close frame.
516                    self.is_connected = false;
517                    debug!(url = %self.config.url, "connection closed cleanly");
518                    if message_tx.is_closed() {
519                        return Ok(());
520                    }
521                }
522                Err(e) => {
523                    self.is_connected = false;
524                    warn!(url = %self.config.url, error = %e, "connection error");
525                }
526            }
527
528            if !self.can_reconnect() {
529                return Err(StreamError::ReconnectExhausted {
530                    url: self.config.url.clone(),
531                    attempts: self.connect_attempts,
532                });
533            }
534            let backoff = self.next_reconnect_backoff()?;
535            info!(url = %self.config.url, backoff_ms = backoff.as_millis(), "reconnecting after backoff");
536            tokio::time::sleep(backoff).await;
537        }
538    }
539
540    /// Attempt a single connection, reading messages until close or error.
541    async fn try_connect(
542        &mut self,
543        message_tx: &mpsc::Sender<String>,
544        outbound_rx: &mut Option<mpsc::Receiver<String>>,
545    ) -> Result<(), StreamError> {
546        let (ws_stream, _response) =
547            connect_async(&self.config.url)
548                .await
549                .map_err(|e| StreamError::ConnectionFailed {
550                    url: self.config.url.clone(),
551                    reason: e.to_string(),
552                })?;
553
554        self.is_connected = true;
555        self.connect_attempts += 1;
556        info!(url = %self.config.url, "connected");
557
558        let (mut write, mut read) = ws_stream.split();
559        let mut ping_interval = time::interval(self.config.ping_interval);
560        // Skip the first tick so we don't ping immediately on connect.
561        ping_interval.tick().await;
562
563        loop {
564            tokio::select! {
565                msg = read.next() => {
566                    match msg {
567                        Some(Ok(Message::Text(text))) => {
568                            self.stats.total_messages_received += 1;
569                            self.stats.total_bytes_received += text.len() as u64;
570                            if message_tx.send(text.to_string()).await.is_err() {
571                                // Receiver dropped — clean shutdown.
572                                return Ok(());
573                            }
574                        }
575                        Some(Ok(Message::Binary(bytes))) => {
576                            self.stats.total_messages_received += 1;
577                            self.stats.total_bytes_received += bytes.len() as u64;
578                            if let Ok(text) = String::from_utf8(bytes.to_vec()) {
579                                if message_tx.send(text).await.is_err() {
580                                    return Ok(());
581                                }
582                            }
583                        }
584                        Some(Ok(Message::Ping(_) | Message::Pong(_) | Message::Frame(_))) => {
585                            // Control frames handled by tungstenite internally.
586                        }
587                        Some(Ok(Message::Close(_))) | None => {
588                            return Ok(());
589                        }
590                        Some(Err(e)) => {
591                            return Err(StreamError::WebSocket(e.to_string()));
592                        }
593                    }
594                }
595                _ = ping_interval.tick() => {
596                    debug!(url = %self.config.url, "sending keepalive ping");
597                    if write.send(Message::Ping(vec![].into())).await.is_err() {
598                        return Ok(());
599                    }
600                }
601                outbound = recv_outbound(outbound_rx) => {
602                    if let Some(text) = outbound {
603                        let _ = write.send(Message::Text(text.into())).await;
604                    }
605                }
606            }
607        }
608    }
609
610    /// Simulate a connection (for testing without live WebSocket).
611    /// Increments `connect_attempts` to reflect the initial connection slot.
612    pub fn connect_simulated(&mut self) {
613        self.connect_attempts += 1;
614        self.is_connected = true;
615    }
616
617    /// Simulate a disconnection.
618    pub fn disconnect_simulated(&mut self) {
619        self.is_connected = false;
620    }
621
622    /// Whether the managed connection is currently in the connected state.
623    pub fn is_connected(&self) -> bool {
624        self.is_connected
625    }
626
627    /// Total connection attempts made so far (including the initial connect).
628    pub fn connect_attempts(&self) -> u32 {
629        self.connect_attempts
630    }
631
632    /// The configuration this manager was created with.
633    pub fn config(&self) -> &ConnectionConfig {
634        &self.config
635    }
636
637    /// Cumulative receive statistics for this manager.
638    pub fn stats(&self) -> &WsStats {
639        &self.stats
640    }
641
642    /// Check whether the next reconnect attempt is allowed.
643    pub fn can_reconnect(&self) -> bool {
644        self.connect_attempts < self.config.reconnect.max_attempts
645    }
646
647    /// Consume a reconnect slot and return the backoff duration to wait.
648    pub fn next_reconnect_backoff(&mut self) -> Result<Duration, StreamError> {
649        if !self.can_reconnect() {
650            return Err(StreamError::ReconnectExhausted {
651                url: self.config.url.clone(),
652                attempts: self.connect_attempts,
653            });
654        }
655        let backoff = self
656            .config
657            .reconnect
658            .backoff_for_attempt(self.connect_attempts);
659        self.connect_attempts += 1;
660        Ok(backoff)
661    }
662}
663
664/// Helper: receive from an optional mpsc channel, or never resolve if `None`.
665///
666/// Used in `tokio::select!` to make the outbound branch dormant when no
667/// outbound channel was supplied, without allocating or spinning.
668async fn recv_outbound(rx: &mut Option<mpsc::Receiver<String>>) -> Option<String> {
669    match rx {
670        Some(rx) => rx.recv().await,
671        None => std::future::pending().await,
672    }
673}
674
675#[cfg(test)]
676mod tests {
677    use super::*;
678
679    fn default_config() -> ConnectionConfig {
680        ConnectionConfig::new("wss://example.com/ws", 1024).unwrap()
681    }
682
683    #[test]
684    fn test_reconnect_policy_default_values() {
685        let p = ReconnectPolicy::default();
686        assert_eq!(p.max_attempts, 10);
687        assert_eq!(p.multiplier, 2.0);
688    }
689
690    #[test]
691    fn test_reconnect_policy_backoff_exponential() {
692        let p = ReconnectPolicy::new(10, Duration::from_millis(100), Duration::from_secs(30), 2.0)
693            .unwrap();
694        assert_eq!(p.backoff_for_attempt(0), Duration::from_millis(100));
695        assert_eq!(p.backoff_for_attempt(1), Duration::from_millis(200));
696        assert_eq!(p.backoff_for_attempt(2), Duration::from_millis(400));
697    }
698
699    #[test]
700    fn test_reconnect_policy_backoff_capped_at_max() {
701        let p = ReconnectPolicy::new(10, Duration::from_millis(1000), Duration::from_secs(5), 2.0)
702            .unwrap();
703        let backoff = p.backoff_for_attempt(10);
704        assert!(backoff <= Duration::from_secs(5));
705    }
706
707    #[test]
708    fn test_reconnect_policy_multiplier_below_1_rejected() {
709        let result =
710            ReconnectPolicy::new(10, Duration::from_millis(100), Duration::from_secs(30), 0.5);
711        assert!(matches!(result, Err(StreamError::ConfigError { .. })));
712    }
713
714    #[test]
715    fn test_reconnect_policy_zero_attempts_rejected() {
716        let result =
717            ReconnectPolicy::new(0, Duration::from_millis(100), Duration::from_secs(30), 2.0);
718        assert!(matches!(result, Err(StreamError::ConfigError { .. })));
719    }
720
721    #[test]
722    fn test_connection_config_empty_url_rejected() {
723        let result = ConnectionConfig::new("", 1024);
724        assert!(matches!(result, Err(StreamError::ConfigError { .. })));
725    }
726
727    #[test]
728    fn test_connection_config_zero_capacity_rejected() {
729        let result = ConnectionConfig::new("wss://example.com", 0);
730        assert!(matches!(result, Err(StreamError::ConfigError { .. })));
731    }
732
733    #[test]
734    fn test_connection_config_with_reconnect() {
735        let policy =
736            ReconnectPolicy::new(3, Duration::from_millis(200), Duration::from_secs(10), 2.0)
737                .unwrap();
738        let config = default_config().with_reconnect(policy);
739        assert_eq!(config.reconnect.max_attempts, 3);
740    }
741
742    #[test]
743    fn test_connection_config_with_ping_interval() {
744        let config = default_config().with_ping_interval(Duration::from_secs(30));
745        assert_eq!(config.ping_interval, Duration::from_secs(30));
746    }
747
748    #[test]
749    fn test_ws_manager_initial_state() {
750        let mgr = WsManager::new(default_config());
751        assert!(!mgr.is_connected());
752        assert_eq!(mgr.connect_attempts(), 0);
753    }
754
755    #[test]
756    fn test_ws_manager_connect_simulated() {
757        let mut mgr = WsManager::new(default_config());
758        mgr.connect_simulated();
759        assert!(mgr.is_connected());
760        assert_eq!(mgr.connect_attempts(), 1);
761    }
762
763    #[test]
764    fn test_ws_manager_disconnect_simulated() {
765        let mut mgr = WsManager::new(default_config());
766        mgr.connect_simulated();
767        mgr.disconnect_simulated();
768        assert!(!mgr.is_connected());
769    }
770
771    #[test]
772    fn test_ws_manager_can_reconnect_within_limit() {
773        let mut mgr = WsManager::new(
774            default_config().with_reconnect(
775                ReconnectPolicy::new(3, Duration::from_millis(10), Duration::from_secs(1), 2.0)
776                    .unwrap(),
777            ),
778        );
779        assert!(mgr.can_reconnect());
780        mgr.next_reconnect_backoff().unwrap();
781        mgr.next_reconnect_backoff().unwrap();
782        mgr.next_reconnect_backoff().unwrap();
783        assert!(!mgr.can_reconnect());
784    }
785
786    #[test]
787    fn test_ws_manager_reconnect_exhausted_error() {
788        let mut mgr = WsManager::new(
789            default_config().with_reconnect(
790                ReconnectPolicy::new(1, Duration::from_millis(10), Duration::from_secs(1), 2.0)
791                    .unwrap(),
792            ),
793        );
794        mgr.next_reconnect_backoff().unwrap();
795        let result = mgr.next_reconnect_backoff();
796        assert!(matches!(
797            result,
798            Err(StreamError::ReconnectExhausted { .. })
799        ));
800    }
801
802    #[test]
803    fn test_ws_manager_backoff_increases() {
804        let mut mgr = WsManager::new(
805            default_config().with_reconnect(
806                ReconnectPolicy::new(5, Duration::from_millis(100), Duration::from_secs(30), 2.0)
807                    .unwrap(),
808            ),
809        );
810        let b0 = mgr.next_reconnect_backoff().unwrap();
811        let b1 = mgr.next_reconnect_backoff().unwrap();
812        assert!(b1 >= b0);
813    }
814
815    #[test]
816    fn test_ws_manager_config_accessor() {
817        let mgr = WsManager::new(default_config());
818        assert_eq!(mgr.config().url, "wss://example.com/ws");
819        assert_eq!(mgr.config().channel_capacity, 1024);
820    }
821
822    /// Verify that `recv_outbound` with `None` never resolves (returns pending).
823    /// We test this by racing it against a resolved future and confirming the
824    /// resolved future always wins.
825    #[tokio::test]
826    async fn test_recv_outbound_none_is_always_pending() {
827        let mut rx: Option<mpsc::Receiver<String>> = None;
828        // Race recv_outbound(None) against an immediately-ready future.
829        tokio::select! {
830            _ = recv_outbound(&mut rx) => {
831                panic!("recv_outbound(None) should never resolve");
832            }
833            _ = std::future::ready(()) => {
834                // Expected: the ready() future wins.
835            }
836        }
837    }
838
839    /// Verify that `recv_outbound` with `Some(rx)` resolves when a message arrives.
840    #[tokio::test]
841    async fn test_recv_outbound_some_resolves_with_message() {
842        let (tx, mut channel_rx) = mpsc::channel::<String>(1);
843        tx.send("subscribe".into()).await.unwrap();
844        let mut rx: Option<mpsc::Receiver<String>> = Some(channel_rx);
845        let msg = recv_outbound(&mut rx).await;
846        assert_eq!(msg.as_deref(), Some("subscribe"));
847        // Re-borrow to confirm the channel holds the receiver
848        let _ = rx;
849    }
850
851    #[test]
852    fn test_ws_stats_initial_zero() {
853        let mgr = WsManager::new(default_config());
854        let s = mgr.stats();
855        assert_eq!(s.total_messages_received, 0);
856        assert_eq!(s.total_bytes_received, 0);
857    }
858
859    #[test]
860    fn test_ws_stats_default() {
861        let s = WsStats::default();
862        assert_eq!(s.total_messages_received, 0);
863        assert_eq!(s.total_bytes_received, 0);
864    }
865
866    #[test]
867    fn test_reconnect_policy_with_jitter_valid() {
868        let p = ReconnectPolicy::new(10, Duration::from_millis(100), Duration::from_secs(30), 2.0)
869            .unwrap()
870            .with_jitter(0.5)
871            .unwrap();
872        assert_eq!(p.jitter, 0.5);
873    }
874
875    #[test]
876    fn test_reconnect_policy_with_jitter_zero_is_deterministic() {
877        let p = ReconnectPolicy::new(10, Duration::from_millis(100), Duration::from_secs(30), 2.0)
878            .unwrap()
879            .with_jitter(0.0)
880            .unwrap();
881        // Zero jitter: backoff must be identical to un-jittered value.
882        let b0 = p.backoff_for_attempt(0);
883        let b1 = p.backoff_for_attempt(0);
884        assert_eq!(b0, b1);
885        assert_eq!(b0, Duration::from_millis(100));
886    }
887
888    #[test]
889    fn test_reconnect_policy_with_jitter_invalid_ratio() {
890        let result =
891            ReconnectPolicy::new(10, Duration::from_millis(100), Duration::from_secs(30), 2.0)
892                .unwrap()
893                .with_jitter(1.5);
894        assert!(matches!(result, Err(StreamError::ConfigError { .. })));
895    }
896
897    #[test]
898    fn test_reconnect_policy_with_jitter_negative_ratio() {
899        let result =
900            ReconnectPolicy::new(10, Duration::from_millis(100), Duration::from_secs(30), 2.0)
901                .unwrap()
902                .with_jitter(-0.1);
903        assert!(matches!(result, Err(StreamError::ConfigError { .. })));
904    }
905
906    #[test]
907    fn test_reconnect_policy_with_jitter_stays_within_bounds() {
908        let p = ReconnectPolicy::new(20, Duration::from_millis(100), Duration::from_secs(30), 2.0)
909            .unwrap()
910            .with_jitter(1.0)
911            .unwrap();
912        // With ratio=1.0 the backoff can range [0, 2×base] but must not exceed max_backoff.
913        for attempt in 0..20 {
914            let b = p.backoff_for_attempt(attempt);
915            assert!(b <= Duration::from_secs(30), "attempt {attempt} exceeded max_backoff");
916        }
917    }
918
919    #[test]
920    fn test_reconnect_policy_with_max_attempts_valid() {
921        let p = ReconnectPolicy::default().with_max_attempts(5).unwrap();
922        assert_eq!(p.max_attempts, 5);
923    }
924
925    #[test]
926    fn test_reconnect_policy_with_max_attempts_zero_rejected() {
927        let result = ReconnectPolicy::default().with_max_attempts(0);
928        assert!(matches!(result, Err(StreamError::ConfigError { .. })));
929    }
930
931    #[test]
932    fn test_connection_config_with_reconnect_attempts_valid() {
933        let config = default_config().with_reconnect_attempts(3).unwrap();
934        assert_eq!(config.reconnect.max_attempts, 3);
935    }
936
937    #[test]
938    fn test_connection_config_with_reconnect_attempts_zero_rejected() {
939        let result = default_config().with_reconnect_attempts(0);
940        assert!(matches!(result, Err(StreamError::ConfigError { .. })));
941    }
942
943    #[test]
944    fn test_reconnect_policy_total_max_delay_sum_of_backoffs() {
945        let p = ReconnectPolicy::new(3, Duration::from_millis(100), Duration::from_secs(30), 2.0)
946            .unwrap();
947        // attempts 0,1,2 → 100ms, 200ms, 400ms = 700ms
948        assert_eq!(p.total_max_delay(), Duration::from_millis(700));
949    }
950
951    #[test]
952    fn test_reconnect_policy_total_max_delay_capped_by_max_backoff() {
953        // With small max_backoff, all delays are capped
954        let p = ReconnectPolicy::new(5, Duration::from_millis(1000), Duration::from_millis(500), 2.0)
955            .unwrap();
956        // All 5 attempts capped at 500ms → total = 2500ms
957        assert_eq!(p.total_max_delay(), Duration::from_millis(2500));
958    }
959
960    #[test]
961    fn test_connection_config_with_channel_capacity_valid() {
962        let config = default_config().with_channel_capacity(512).unwrap();
963        assert_eq!(config.channel_capacity, 512);
964    }
965
966    #[test]
967    fn test_connection_config_with_channel_capacity_zero_rejected() {
968        let result = default_config().with_channel_capacity(0);
969        assert!(matches!(result, Err(StreamError::ConfigError { .. })));
970    }
971
972    #[test]
973    fn test_reconnect_policy_with_jitter_varies_across_attempts() {
974        let p = ReconnectPolicy::new(10, Duration::from_millis(1000), Duration::from_secs(30), 1.0)
975            .unwrap()
976            .with_jitter(0.5)
977            .unwrap();
978        // With constant multiplier=1.0 all base backoffs are 1000ms; jitter should
979        // produce different values for different attempt indices.
980        let values: Vec<Duration> = (0..10).map(|a| p.backoff_for_attempt(a)).collect();
981        let unique: std::collections::HashSet<u64> =
982            values.iter().map(|d| d.as_millis() as u64).collect();
983        assert!(unique.len() > 1, "jitter should produce variation across attempts");
984    }
985
986    // ── ReconnectPolicy::with_initial_backoff / with_max_backoff ──────────────
987
988    #[test]
989    fn test_with_initial_backoff_sets_value() {
990        let p = ReconnectPolicy::default()
991            .with_initial_backoff(Duration::from_secs(2));
992        assert_eq!(p.initial_backoff, Duration::from_secs(2));
993    }
994
995    #[test]
996    fn test_with_max_backoff_sets_value() {
997        let p = ReconnectPolicy::default()
998            .with_max_backoff(Duration::from_secs(60));
999        assert_eq!(p.max_backoff, Duration::from_secs(60));
1000    }
1001
1002    #[test]
1003    fn test_with_initial_backoff_affects_first_attempt() {
1004        let p = ReconnectPolicy::default()
1005            .with_initial_backoff(Duration::from_millis(200));
1006        assert_eq!(p.backoff_for_attempt(0), Duration::from_millis(200));
1007    }
1008
1009    // ── ReconnectPolicy::with_multiplier ──────────────────────────────────────
1010
1011    #[test]
1012    fn test_with_multiplier_valid() {
1013        let p = ReconnectPolicy::default().with_multiplier(3.0).unwrap();
1014        assert_eq!(p.multiplier, 3.0);
1015    }
1016
1017    #[test]
1018    fn test_with_multiplier_below_one_rejected() {
1019        let result = ReconnectPolicy::default().with_multiplier(0.9);
1020        assert!(matches!(result, Err(StreamError::ConfigError { .. })));
1021    }
1022
1023    #[test]
1024    fn test_with_multiplier_exactly_one_accepted() {
1025        let p = ReconnectPolicy::default().with_multiplier(1.0).unwrap();
1026        assert_eq!(p.multiplier, 1.0);
1027    }
1028
1029    // ── WsStats::message_rate / byte_rate ─────────────────────────────────────
1030
1031    #[test]
1032    fn test_message_rate_zero_elapsed_returns_zero() {
1033        let stats = WsStats {
1034            total_messages_received: 100,
1035            total_bytes_received: 50_000,
1036        };
1037        assert_eq!(stats.message_rate(0), 0.0);
1038        assert_eq!(stats.byte_rate(0), 0.0);
1039    }
1040
1041    #[test]
1042    fn test_message_rate_100_messages_in_1s() {
1043        let stats = WsStats {
1044            total_messages_received: 100,
1045            total_bytes_received: 0,
1046        };
1047        let rate = stats.message_rate(1_000); // 1 second = 1000ms
1048        assert!((rate - 100.0).abs() < 1e-9);
1049    }
1050
1051    #[test]
1052    fn test_byte_rate_1mb_in_1s() {
1053        let stats = WsStats {
1054            total_messages_received: 0,
1055            total_bytes_received: 1_000_000,
1056        };
1057        let rate = stats.byte_rate(1_000); // 1 second
1058        assert!((rate - 1_000_000.0).abs() < 1.0);
1059    }
1060
1061    // ── WsStats::avg_message_size ─────────────────────────────────────────────
1062
1063    #[test]
1064    fn test_avg_message_size_none_when_no_messages() {
1065        let stats = WsStats::default();
1066        assert!(stats.avg_message_size().is_none());
1067    }
1068
1069    #[test]
1070    fn test_avg_message_size_basic() {
1071        let stats = WsStats {
1072            total_messages_received: 10,
1073            total_bytes_received: 1_000,
1074        };
1075        let avg = stats.avg_message_size().unwrap();
1076        assert!((avg - 100.0).abs() < 1e-9);
1077    }
1078
1079    // ── WsStats::total_data_mb ────────────────────────────────────────────────
1080
1081    #[test]
1082    fn test_total_data_mb_zero_bytes() {
1083        let stats = WsStats::default();
1084        assert!((stats.total_data_mb() - 0.0).abs() < 1e-9);
1085    }
1086
1087    #[test]
1088    fn test_total_data_mb_one_mib() {
1089        let stats = WsStats {
1090            total_messages_received: 1,
1091            total_bytes_received: 1_048_576,
1092        };
1093        assert!((stats.total_data_mb() - 1.0).abs() < 1e-9);
1094    }
1095
1096    #[test]
1097    fn test_max_attempts_getter_matches_field() {
1098        let p = ReconnectPolicy::default();
1099        assert_eq!(p.max_attempts(), p.max_attempts);
1100    }
1101
1102    #[test]
1103    fn test_max_attempts_getter_after_new() {
1104        let p = ReconnectPolicy::new(
1105            7,
1106            std::time::Duration::from_millis(100),
1107            std::time::Duration::from_secs(30),
1108            2.0,
1109        )
1110        .unwrap();
1111        assert_eq!(p.max_attempts(), 7);
1112    }
1113
1114    // ── WsStats::is_idle ──────────────────────────────────────────────────────
1115
1116    #[test]
1117    fn test_is_idle_below_min_rate() {
1118        let stats = WsStats {
1119            total_messages_received: 1,
1120            total_bytes_received: 0,
1121        };
1122        // 1 msg / 10s = 0.1 msg/s; min_rate = 1.0 → idle
1123        assert!(stats.is_idle(10_000, 1.0));
1124    }
1125
1126    #[test]
1127    fn test_is_idle_above_min_rate() {
1128        let stats = WsStats {
1129            total_messages_received: 100,
1130            total_bytes_received: 0,
1131        };
1132        // 100 msg / 1s = 100 msg/s; min_rate = 1.0 → not idle
1133        assert!(!stats.is_idle(1_000, 1.0));
1134    }
1135
1136    #[test]
1137    fn test_is_idle_zero_messages_always_idle() {
1138        let stats = WsStats::default();
1139        assert!(stats.is_idle(1_000, 0.001));
1140    }
1141
1142    #[test]
1143    fn test_total_attempts_remaining_full() {
1144        let p = ReconnectPolicy::default(); // max_attempts = 10
1145        assert_eq!(p.total_attempts_remaining(0), 10);
1146    }
1147
1148    #[test]
1149    fn test_total_attempts_remaining_partial() {
1150        let p = ReconnectPolicy::default();
1151        assert_eq!(p.total_attempts_remaining(3), 7);
1152    }
1153
1154    #[test]
1155    fn test_total_attempts_remaining_exhausted() {
1156        let p = ReconnectPolicy::default();
1157        assert_eq!(p.total_attempts_remaining(10), 0);
1158        assert_eq!(p.total_attempts_remaining(99), 0);
1159    }
1160
1161    // ── WsStats::has_traffic ──────────────────────────────────────────────────
1162
1163    #[test]
1164    fn test_has_traffic_false_when_no_messages() {
1165        let stats = WsStats::default();
1166        assert!(!stats.has_traffic());
1167    }
1168
1169    #[test]
1170    fn test_has_traffic_true_after_one_message() {
1171        let stats = WsStats {
1172            total_messages_received: 1,
1173            total_bytes_received: 0,
1174        };
1175        assert!(stats.has_traffic());
1176    }
1177
1178    #[test]
1179    fn test_has_traffic_true_with_many_messages() {
1180        let stats = WsStats {
1181            total_messages_received: 1_000,
1182            total_bytes_received: 50_000,
1183        };
1184        assert!(stats.has_traffic());
1185    }
1186
1187    // ── WsStats::is_high_volume ───────────────────────────────────────────────
1188
1189    #[test]
1190    fn test_is_high_volume_true_at_threshold() {
1191        let stats = WsStats { total_messages_received: 1_000, total_bytes_received: 0 };
1192        assert!(stats.is_high_volume(1_000));
1193    }
1194
1195    #[test]
1196    fn test_is_high_volume_false_below_threshold() {
1197        let stats = WsStats { total_messages_received: 500, total_bytes_received: 0 };
1198        assert!(!stats.is_high_volume(1_000));
1199    }
1200
1201    #[test]
1202    fn test_is_high_volume_true_above_threshold() {
1203        let stats = WsStats { total_messages_received: 2_000, total_bytes_received: 0 };
1204        assert!(stats.is_high_volume(1_000));
1205    }
1206
1207    // ── WsStats::bytes_per_message ────────────────────────────────────────────
1208
1209    #[test]
1210    fn test_bytes_per_message_none_when_no_messages() {
1211        let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1212        assert!(stats.bytes_per_message().is_none());
1213    }
1214
1215    #[test]
1216    fn test_bytes_per_message_correct_value() {
1217        let stats = WsStats { total_messages_received: 4, total_bytes_received: 400 };
1218        assert_eq!(stats.bytes_per_message(), Some(100.0));
1219    }
1220
1221    #[test]
1222    fn test_bytes_per_message_fractional() {
1223        let stats = WsStats { total_messages_received: 3, total_bytes_received: 10 };
1224        let bpm = stats.bytes_per_message().unwrap();
1225        assert!((bpm - 10.0 / 3.0).abs() < 1e-10);
1226    }
1227
1228    // --- delay_for_next ---
1229
1230    #[test]
1231    fn test_delay_for_next_is_backoff_for_attempt_plus_one() {
1232        let policy = ReconnectPolicy::new(
1233            10,
1234            Duration::from_millis(100),
1235            Duration::from_secs(60),
1236            2.0,
1237        )
1238        .unwrap();
1239        assert_eq!(
1240            policy.delay_for_next(0),
1241            policy.backoff_for_attempt(1)
1242        );
1243        assert_eq!(
1244            policy.delay_for_next(3),
1245            policy.backoff_for_attempt(4)
1246        );
1247    }
1248
1249    #[test]
1250    fn test_delay_for_next_saturates_at_max_backoff() {
1251        let policy = ReconnectPolicy::new(
1252            10,
1253            Duration::from_millis(100),
1254            Duration::from_secs(1),
1255            2.0,
1256        )
1257        .unwrap();
1258        // After many attempts the delay is capped at max_backoff
1259        assert!(policy.delay_for_next(100) <= Duration::from_secs(1));
1260    }
1261
1262    // ── WsStats::message_rate ─────────────────────────────────────────────────
1263
1264    #[test]
1265    fn test_message_rate_zero_when_elapsed_is_zero() {
1266        let stats = WsStats { total_messages_received: 1_000, total_bytes_received: 0 };
1267        assert_eq!(stats.message_rate(0), 0.0);
1268    }
1269
1270    #[test]
1271    fn test_message_rate_correct_value() {
1272        let stats = WsStats { total_messages_received: 100, total_bytes_received: 0 };
1273        // 100 messages in 10_000ms = 10 msg/s
1274        assert!((stats.message_rate(10_000) - 10.0).abs() < 1e-10);
1275    }
1276
1277    #[test]
1278    fn test_message_rate_zero_messages() {
1279        let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1280        assert_eq!(stats.message_rate(5_000), 0.0);
1281    }
1282
1283    // ── WsStats::total_data_mb ────────────────────────────────────────────────
1284
1285    #[test]
1286    fn test_total_data_mb_zero_when_no_bytes() {
1287        let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1288        assert_eq!(stats.total_data_mb(), 0.0);
1289    }
1290
1291    #[test]
1292    fn test_total_data_mb_fractional() {
1293        let stats = WsStats { total_messages_received: 1, total_bytes_received: 524_288 };
1294        assert!((stats.total_data_mb() - 0.5).abs() < 1e-10);
1295    }
1296
1297    // --- is_exhausted ---
1298
1299    #[test]
1300    fn test_is_exhausted_true_at_max_attempts() {
1301        let policy = ReconnectPolicy::new(5, Duration::from_millis(100), Duration::from_secs(10), 2.0).unwrap();
1302        assert!(policy.is_exhausted(5));
1303    }
1304
1305    #[test]
1306    fn test_is_exhausted_true_beyond_max_attempts() {
1307        let policy = ReconnectPolicy::new(5, Duration::from_millis(100), Duration::from_secs(10), 2.0).unwrap();
1308        assert!(policy.is_exhausted(10));
1309    }
1310
1311    #[test]
1312    fn test_is_exhausted_false_below_max_attempts() {
1313        let policy = ReconnectPolicy::new(5, Duration::from_millis(100), Duration::from_secs(10), 2.0).unwrap();
1314        assert!(!policy.is_exhausted(4));
1315    }
1316
1317    // --- WsStats::total_data_kb ---
1318
1319    #[test]
1320    fn test_total_data_kb_zero_when_no_bytes() {
1321        let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1322        assert_eq!(stats.total_data_kb(), 0.0);
1323    }
1324
1325    #[test]
1326    fn test_total_data_kb_one_kib() {
1327        let stats = WsStats { total_messages_received: 1, total_bytes_received: 1_024 };
1328        assert!((stats.total_data_kb() - 1.0).abs() < 1e-10);
1329    }
1330
1331    #[test]
1332    fn test_total_data_kb_equals_1024_times_mb() {
1333        let stats = WsStats { total_messages_received: 1, total_bytes_received: 2_097_152 };
1334        assert!((stats.total_data_kb() - stats.total_data_mb() * 1_024.0).abs() < 1e-6);
1335    }
1336
1337    // --- WsStats::bandwidth_bps ---
1338
1339    #[test]
1340    fn test_bandwidth_bps_zero_when_elapsed_zero() {
1341        let stats = WsStats { total_messages_received: 0, total_bytes_received: 1_000 };
1342        assert_eq!(stats.bandwidth_bps(0), 0.0);
1343    }
1344
1345    #[test]
1346    fn test_bandwidth_bps_correct_value() {
1347        let stats = WsStats { total_messages_received: 1, total_bytes_received: 10_000 };
1348        // 10_000 bytes / 1s = 10_000 bps
1349        assert!((stats.bandwidth_bps(1_000) - 10_000.0).abs() < 1e-6);
1350    }
1351
1352    #[test]
1353    fn test_bandwidth_bps_zero_bytes() {
1354        let stats = WsStats { total_messages_received: 5, total_bytes_received: 0 };
1355        assert_eq!(stats.bandwidth_bps(5_000), 0.0);
1356    }
1357
1358    // --- WsStats::messages_per_byte ---
1359
1360    #[test]
1361    fn test_messages_per_byte_none_when_no_bytes() {
1362        let stats = WsStats { total_messages_received: 5, total_bytes_received: 0 };
1363        assert!(stats.messages_per_byte().is_none());
1364    }
1365
1366    #[test]
1367    fn test_messages_per_byte_correct_value() {
1368        let stats = WsStats { total_messages_received: 100, total_bytes_received: 10_000 };
1369        // 100 / 10_000 = 0.01
1370        assert!((stats.messages_per_byte().unwrap() - 0.01).abs() < 1e-12);
1371    }
1372
1373    #[test]
1374    fn test_messages_per_byte_less_than_one_for_large_messages() {
1375        let stats = WsStats { total_messages_received: 1, total_bytes_received: 500 };
1376        let mpb = stats.messages_per_byte().unwrap();
1377        assert!(mpb < 1.0);
1378    }
1379
1380    // --- WsStats::avg_message_size_bytes ---
1381    #[test]
1382    fn test_avg_message_size_bytes_none_when_no_messages() {
1383        let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1384        assert!(stats.avg_message_size_bytes().is_none());
1385    }
1386
1387    #[test]
1388    fn test_avg_message_size_bytes_correct_value() {
1389        let stats = WsStats { total_messages_received: 10, total_bytes_received: 5_000 };
1390        assert!((stats.avg_message_size_bytes().unwrap() - 500.0).abs() < 1e-12);
1391    }
1392
1393    #[test]
1394    fn test_avg_message_size_bytes_one_message() {
1395        let stats = WsStats { total_messages_received: 1, total_bytes_received: 256 };
1396        assert!((stats.avg_message_size_bytes().unwrap() - 256.0).abs() < 1e-12);
1397    }
1398
1399    // --- WsStats::bandwidth_kbps ---
1400    #[test]
1401    fn test_bandwidth_kbps_zero_when_elapsed_zero() {
1402        let stats = WsStats { total_messages_received: 10, total_bytes_received: 50_000 };
1403        assert_eq!(stats.bandwidth_kbps(0), 0.0);
1404    }
1405
1406    #[test]
1407    fn test_bandwidth_kbps_correct_value() {
1408        // 100_000 bytes in 1_000ms = 100KB/s = 100*8 = 800 kbps
1409        let stats = WsStats { total_messages_received: 1, total_bytes_received: 100_000 };
1410        let kbps = stats.bandwidth_kbps(1_000);
1411        assert!((kbps - 800.0).abs() < 1e-10, "got {kbps}");
1412    }
1413
1414    #[test]
1415    fn test_bandwidth_kbps_zero_when_no_data() {
1416        let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1417        assert_eq!(stats.bandwidth_kbps(5_000), 0.0);
1418    }
1419
1420    // ── WsStats::total_data_gb / is_active ─────────────────────────────────
1421
1422    #[test]
1423    fn test_total_data_gb_zero_when_no_bytes() {
1424        let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1425        assert_eq!(stats.total_data_gb(), 0.0);
1426    }
1427
1428    #[test]
1429    fn test_total_data_gb_one_gib() {
1430        let stats = WsStats { total_messages_received: 1, total_bytes_received: 1_073_741_824 };
1431        assert!((stats.total_data_gb() - 1.0).abs() < 1e-10);
1432    }
1433
1434    #[test]
1435    fn test_total_data_gb_equals_1024_times_mb() {
1436        let stats = WsStats { total_messages_received: 1, total_bytes_received: 2_147_483_648 };
1437        assert!((stats.total_data_gb() - stats.total_data_mb() / 1_024.0).abs() < 1e-6);
1438    }
1439
1440    #[test]
1441    fn test_is_active_false_when_no_messages() {
1442        let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1443        assert!(!stats.is_active(1));
1444    }
1445
1446    #[test]
1447    fn test_is_active_true_at_threshold() {
1448        let stats = WsStats { total_messages_received: 100, total_bytes_received: 0 };
1449        assert!(stats.is_active(100));
1450    }
1451
1452    #[test]
1453    fn test_is_active_false_below_threshold() {
1454        let stats = WsStats { total_messages_received: 50, total_bytes_received: 0 };
1455        assert!(!stats.is_active(100));
1456    }
1457
1458    // ── WsStats::has_received_bytes / efficiency_ratio ──────────────────────
1459
1460    #[test]
1461    fn test_has_received_bytes_false_when_no_bytes() {
1462        let stats = WsStats { total_messages_received: 5, total_bytes_received: 0 };
1463        assert!(!stats.has_received_bytes());
1464    }
1465
1466    #[test]
1467    fn test_has_received_bytes_true_when_bytes_present() {
1468        let stats = WsStats { total_messages_received: 1, total_bytes_received: 100 };
1469        assert!(stats.has_received_bytes());
1470    }
1471
1472    #[test]
1473    fn test_efficiency_ratio_none_when_no_bytes() {
1474        let stats = WsStats { total_messages_received: 10, total_bytes_received: 0 };
1475        assert!(stats.efficiency_ratio().is_none());
1476    }
1477
1478    #[test]
1479    fn test_efficiency_ratio_correct_value() {
1480        let stats = WsStats { total_messages_received: 100, total_bytes_received: 10_000 };
1481        // 100 / 10_000 = 0.01
1482        assert!((stats.efficiency_ratio().unwrap() - 0.01).abs() < 1e-12);
1483    }
1484
1485    #[test]
1486    fn test_efficiency_ratio_less_than_one_for_large_messages() {
1487        let stats = WsStats { total_messages_received: 1, total_bytes_received: 500 };
1488        assert!(stats.efficiency_ratio().unwrap() < 1.0);
1489    }
1490
1491    // ── WsStats::message_density / compression_ratio ────────────────────────
1492
1493    #[test]
1494    fn test_message_density_same_as_message_rate() {
1495        let stats = WsStats { total_messages_received: 100, total_bytes_received: 0 };
1496        assert!((stats.message_density(1_000) - stats.message_rate(1_000)).abs() < 1e-12);
1497    }
1498
1499    #[test]
1500    fn test_message_density_zero_when_elapsed_zero() {
1501        let stats = WsStats { total_messages_received: 100, total_bytes_received: 0 };
1502        assert_eq!(stats.message_density(0), 0.0);
1503    }
1504
1505    #[test]
1506    fn test_compression_ratio_none_when_no_bytes() {
1507        let stats = WsStats { total_messages_received: 5, total_bytes_received: 0 };
1508        assert!(stats.compression_ratio().is_none());
1509    }
1510
1511    #[test]
1512    fn test_compression_ratio_same_as_efficiency_ratio() {
1513        let stats = WsStats { total_messages_received: 100, total_bytes_received: 10_000 };
1514        assert_eq!(stats.compression_ratio(), stats.efficiency_ratio());
1515    }
1516
1517    // ── WsStats::uptime_fraction ──────────────────────────────────────────────
1518
1519    #[test]
1520    fn test_uptime_fraction_zero_when_elapsed_zero() {
1521        let stats = WsStats { total_messages_received: 100, total_bytes_received: 50_000 };
1522        assert_eq!(stats.uptime_fraction(0), 0.0);
1523    }
1524
1525    #[test]
1526    fn test_uptime_fraction_zero_when_no_bytes() {
1527        let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1528        assert_eq!(stats.uptime_fraction(60_000), 0.0);
1529    }
1530
1531    #[test]
1532    fn test_uptime_fraction_nonzero_with_bytes() {
1533        let stats = WsStats { total_messages_received: 100, total_bytes_received: 1_000 };
1534        // bytes/ms = 1000/60000 ≈ 0.0167
1535        let f = stats.uptime_fraction(60_000);
1536        assert!(f > 0.0 && f <= 1.0);
1537    }
1538
1539    #[test]
1540    fn test_uptime_fraction_clamped_to_one() {
1541        // Very high byte count relative to elapsed → capped at 1.0
1542        let stats = WsStats { total_messages_received: 0, total_bytes_received: 1_000_000 };
1543        assert_eq!(stats.uptime_fraction(100), 1.0);
1544    }
1545
1546    // ── WsStats::is_idle ──────────────────────────────────────────────────────
1547
1548    #[test]
1549    fn test_is_idle_true_when_no_messages() {
1550        let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1551        assert!(stats.is_idle(60_000, 1.0));
1552    }
1553
1554    #[test]
1555    fn test_is_idle_false_when_high_message_rate() {
1556        let stats = WsStats { total_messages_received: 1_000, total_bytes_received: 0 };
1557        // rate = 1000 msg / 1s = 1000/s, threshold = 1.0
1558        assert!(!stats.is_idle(1_000, 1.0));
1559    }
1560
1561    #[test]
1562    fn test_is_idle_true_when_elapsed_zero() {
1563        let stats = WsStats { total_messages_received: 100, total_bytes_received: 0 };
1564        // message_rate returns 0.0 when elapsed_ms=0 → always idle
1565        assert!(stats.is_idle(0, 1.0));
1566    }
1567
1568    // ── WsStats::average_message_size_bytes ───────────────────────────────────
1569
1570    #[test]
1571    fn test_average_message_size_bytes_none_when_no_messages() {
1572        let stats = WsStats { total_messages_received: 0, total_bytes_received: 0 };
1573        assert!(stats.average_message_size_bytes().is_none());
1574    }
1575
1576    #[test]
1577    fn test_average_message_size_bytes_same_as_bytes_per_message() {
1578        let stats = WsStats { total_messages_received: 10, total_bytes_received: 1_000 };
1579        assert_eq!(stats.average_message_size_bytes(), stats.bytes_per_message());
1580    }
1581}