presentar_core/
streaming.rs

1#![allow(
2    clippy::unwrap_used,
3    clippy::disallowed_methods,
4    clippy::comparison_chain,
5    clippy::match_same_arms
6)]
7//! Data streaming and live updates for Presentar.
8//!
9//! This module provides infrastructure for real-time data updates:
10//! - Subscription management for data sources
11//! - Message protocol for updates
12//! - Reconnection and backpressure handling
13//! - Integration with expression executor for live transforms
14//!
15//! # Example
16//!
17//! ```
18//! use presentar_core::streaming::{
19//!     DataStream, StreamConfig, StreamMessage, StreamSubscription,
20//! };
21//!
22//! // Create a subscription
23//! let sub = StreamSubscription::new("metrics/cpu")
24//!     .with_interval(1000)  // 1 second
25//!     .with_transform("rate()");
26//!
27//! // Handle incoming messages
28//! fn handle_message(msg: StreamMessage) {
29//!     match msg {
30//!         StreamMessage::Data { payload, .. } => println!("Got data: {:?}", payload),
31//!         StreamMessage::Error { message, .. } => eprintln!("Error: {}", message),
32//!         _ => {}
33//!     }
34//! }
35//! ```
36
37use serde::{Deserialize, Serialize};
38use std::collections::HashMap;
39use std::sync::{Arc, Mutex};
40use std::time::Duration;
41
42/// Stream connection state.
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
44pub enum ConnectionState {
45    /// Not connected
46    #[default]
47    Disconnected,
48    /// Attempting to connect
49    Connecting,
50    /// Connected and ready
51    Connected,
52    /// Connection lost, attempting reconnection
53    Reconnecting,
54    /// Permanently failed
55    Failed,
56}
57
58impl ConnectionState {
59    /// Check if the stream can send/receive messages.
60    #[must_use]
61    pub const fn is_active(&self) -> bool {
62        matches!(self, Self::Connected)
63    }
64
65    /// Check if the stream is trying to connect.
66    #[must_use]
67    pub const fn is_connecting(&self) -> bool {
68        matches!(self, Self::Connecting | Self::Reconnecting)
69    }
70}
71
72/// Stream message types for the protocol.
73#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
74#[serde(tag = "type", rename_all = "snake_case")]
75pub enum StreamMessage {
76    /// Subscribe to a data source
77    Subscribe {
78        /// Subscription ID
79        id: String,
80        /// Data source path
81        source: String,
82        /// Optional transform expression
83        #[serde(skip_serializing_if = "Option::is_none")]
84        transform: Option<String>,
85        /// Refresh interval in milliseconds
86        #[serde(skip_serializing_if = "Option::is_none")]
87        interval_ms: Option<u64>,
88    },
89    /// Unsubscribe from a data source
90    Unsubscribe {
91        /// Subscription ID
92        id: String,
93    },
94    /// Data update from server
95    Data {
96        /// Subscription ID this data belongs to
97        id: String,
98        /// The payload data
99        payload: serde_json::Value,
100        /// Sequence number for ordering
101        #[serde(default)]
102        seq: u64,
103        /// Timestamp of the data
104        #[serde(skip_serializing_if = "Option::is_none")]
105        timestamp: Option<u64>,
106    },
107    /// Error message
108    Error {
109        /// Related subscription ID (if any)
110        #[serde(skip_serializing_if = "Option::is_none")]
111        id: Option<String>,
112        /// Error message
113        message: String,
114        /// Error code
115        #[serde(skip_serializing_if = "Option::is_none")]
116        code: Option<i32>,
117    },
118    /// Acknowledgment
119    Ack {
120        /// Subscription ID being acknowledged
121        id: String,
122        /// Status message
123        #[serde(skip_serializing_if = "Option::is_none")]
124        status: Option<String>,
125    },
126    /// Heartbeat/ping
127    Ping {
128        /// Timestamp
129        timestamp: u64,
130    },
131    /// Heartbeat response
132    Pong {
133        /// Echo of ping timestamp
134        timestamp: u64,
135    },
136}
137
138impl StreamMessage {
139    /// Create a subscribe message.
140    #[must_use]
141    pub fn subscribe(id: impl Into<String>, source: impl Into<String>) -> Self {
142        Self::Subscribe {
143            id: id.into(),
144            source: source.into(),
145            transform: None,
146            interval_ms: None,
147        }
148    }
149
150    /// Create a subscribe message with transform.
151    #[must_use]
152    pub fn subscribe_with_transform(
153        id: impl Into<String>,
154        source: impl Into<String>,
155        transform: impl Into<String>,
156    ) -> Self {
157        Self::Subscribe {
158            id: id.into(),
159            source: source.into(),
160            transform: Some(transform.into()),
161            interval_ms: None,
162        }
163    }
164
165    /// Create an unsubscribe message.
166    #[must_use]
167    pub fn unsubscribe(id: impl Into<String>) -> Self {
168        Self::Unsubscribe { id: id.into() }
169    }
170
171    /// Create a data message.
172    #[must_use]
173    pub fn data(id: impl Into<String>, payload: serde_json::Value, seq: u64) -> Self {
174        Self::Data {
175            id: id.into(),
176            payload,
177            seq,
178            timestamp: None,
179        }
180    }
181
182    /// Create an error message.
183    #[must_use]
184    pub fn error(message: impl Into<String>) -> Self {
185        Self::Error {
186            id: None,
187            message: message.into(),
188            code: None,
189        }
190    }
191
192    /// Create an error message for a subscription.
193    #[must_use]
194    pub fn error_for(id: impl Into<String>, message: impl Into<String>) -> Self {
195        Self::Error {
196            id: Some(id.into()),
197            message: message.into(),
198            code: None,
199        }
200    }
201
202    /// Create an ack message.
203    #[must_use]
204    pub fn ack(id: impl Into<String>) -> Self {
205        Self::Ack {
206            id: id.into(),
207            status: None,
208        }
209    }
210
211    /// Create a ping message.
212    #[must_use]
213    pub fn ping(timestamp: u64) -> Self {
214        Self::Ping { timestamp }
215    }
216
217    /// Create a pong message.
218    #[must_use]
219    pub fn pong(timestamp: u64) -> Self {
220        Self::Pong { timestamp }
221    }
222
223    /// Get the subscription ID if this message has one.
224    #[must_use]
225    pub fn subscription_id(&self) -> Option<&str> {
226        match self {
227            Self::Subscribe { id, .. }
228            | Self::Unsubscribe { id }
229            | Self::Data { id, .. }
230            | Self::Ack { id, .. } => Some(id),
231            Self::Error { id, .. } => id.as_deref(),
232            Self::Ping { .. } | Self::Pong { .. } => None,
233        }
234    }
235}
236
237/// Subscription to a data source.
238#[derive(Debug, Clone)]
239pub struct StreamSubscription {
240    /// Unique subscription ID
241    pub id: String,
242    /// Data source path
243    pub source: String,
244    /// Transform expression to apply
245    pub transform: Option<String>,
246    /// Refresh interval
247    pub interval: Option<Duration>,
248    /// Whether this subscription is active
249    pub active: bool,
250    /// Last received sequence number
251    pub last_seq: u64,
252    /// Error count for backoff
253    pub error_count: u32,
254}
255
256impl StreamSubscription {
257    /// Create a new subscription.
258    #[must_use]
259    pub fn new(source: impl Into<String>) -> Self {
260        let source = source.into();
261        let id = format!("sub_{}", Self::hash_source(&source));
262        Self {
263            id,
264            source,
265            transform: None,
266            interval: None,
267            active: false,
268            last_seq: 0,
269            error_count: 0,
270        }
271    }
272
273    /// Create with explicit ID.
274    #[must_use]
275    pub fn with_id(id: impl Into<String>, source: impl Into<String>) -> Self {
276        Self {
277            id: id.into(),
278            source: source.into(),
279            transform: None,
280            interval: None,
281            active: false,
282            last_seq: 0,
283            error_count: 0,
284        }
285    }
286
287    /// Set refresh interval in milliseconds.
288    #[must_use]
289    pub fn with_interval(mut self, ms: u64) -> Self {
290        self.interval = Some(Duration::from_millis(ms));
291        self
292    }
293
294    /// Set transform expression.
295    #[must_use]
296    pub fn with_transform(mut self, transform: impl Into<String>) -> Self {
297        self.transform = Some(transform.into());
298        self
299    }
300
301    /// Convert to subscribe message.
302    #[must_use]
303    pub fn to_message(&self) -> StreamMessage {
304        StreamMessage::Subscribe {
305            id: self.id.clone(),
306            source: self.source.clone(),
307            transform: self.transform.clone(),
308            interval_ms: self.interval.map(|d| d.as_millis() as u64),
309        }
310    }
311
312    /// Simple hash for generating IDs.
313    fn hash_source(s: &str) -> u64 {
314        let mut hash: u64 = 5381;
315        for byte in s.bytes() {
316            hash = hash.wrapping_mul(33).wrapping_add(u64::from(byte));
317        }
318        hash
319    }
320}
321
322/// Configuration for stream connection.
323#[derive(Debug, Clone)]
324pub struct StreamConfig {
325    /// WebSocket URL
326    pub url: String,
327    /// Reconnection settings
328    pub reconnect: ReconnectConfig,
329    /// Heartbeat interval
330    pub heartbeat_interval: Duration,
331    /// Message buffer size
332    pub buffer_size: usize,
333}
334
335impl Default for StreamConfig {
336    fn default() -> Self {
337        Self {
338            url: String::new(),
339            reconnect: ReconnectConfig::default(),
340            heartbeat_interval: Duration::from_secs(30),
341            buffer_size: 1024,
342        }
343    }
344}
345
346impl StreamConfig {
347    /// Create with URL.
348    #[must_use]
349    pub fn new(url: impl Into<String>) -> Self {
350        Self {
351            url: url.into(),
352            ..Default::default()
353        }
354    }
355
356    /// Set reconnection config.
357    #[must_use]
358    pub fn with_reconnect(mut self, config: ReconnectConfig) -> Self {
359        self.reconnect = config;
360        self
361    }
362
363    /// Set heartbeat interval.
364    #[must_use]
365    pub fn with_heartbeat(mut self, interval: Duration) -> Self {
366        self.heartbeat_interval = interval;
367        self
368    }
369
370    /// Set buffer size.
371    #[must_use]
372    pub fn with_buffer_size(mut self, size: usize) -> Self {
373        self.buffer_size = size;
374        self
375    }
376}
377
378/// Reconnection configuration.
379#[derive(Debug, Clone)]
380pub struct ReconnectConfig {
381    /// Whether to auto-reconnect
382    pub enabled: bool,
383    /// Initial delay before first reconnect
384    pub initial_delay: Duration,
385    /// Maximum delay between reconnects
386    pub max_delay: Duration,
387    /// Multiplier for exponential backoff
388    pub backoff_multiplier: f32,
389    /// Maximum reconnection attempts (None = infinite)
390    pub max_attempts: Option<u32>,
391}
392
393impl Default for ReconnectConfig {
394    fn default() -> Self {
395        Self {
396            enabled: true,
397            initial_delay: Duration::from_millis(500),
398            max_delay: Duration::from_secs(30),
399            backoff_multiplier: 2.0,
400            max_attempts: None,
401        }
402    }
403}
404
405impl ReconnectConfig {
406    /// Calculate delay for a given attempt number.
407    #[must_use]
408    pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
409        if !self.enabled {
410            return Duration::ZERO;
411        }
412
413        let delay_ms = self.initial_delay.as_millis() as f32
414            * self.backoff_multiplier.powi(attempt.min(20) as i32);
415
416        let delay = Duration::from_millis(delay_ms.min(self.max_delay.as_millis() as f32) as u64);
417        delay.min(self.max_delay)
418    }
419
420    /// Check if we should attempt reconnection.
421    #[must_use]
422    pub fn should_reconnect(&self, attempt: u32) -> bool {
423        if !self.enabled {
424            return false;
425        }
426        match self.max_attempts {
427            Some(max) => attempt < max,
428            None => true,
429        }
430    }
431}
432
433/// Callback type for data updates.
434pub type DataCallback = Box<dyn Fn(&str, &serde_json::Value) + Send + Sync>;
435
436/// Callback type for errors.
437pub type ErrorCallback = Box<dyn Fn(&str) + Send + Sync>;
438
439/// Callback type for connection state changes.
440pub type StateCallback = Box<dyn Fn(ConnectionState) + Send + Sync>;
441
442/// Data stream manager.
443///
444/// Manages subscriptions and handles incoming data.
445#[derive(Default)]
446pub struct DataStream {
447    /// Active subscriptions
448    subscriptions: Arc<Mutex<HashMap<String, StreamSubscription>>>,
449    /// Connection state
450    state: Arc<Mutex<ConnectionState>>,
451    /// Pending outbound messages
452    outbox: Arc<Mutex<Vec<StreamMessage>>>,
453    /// Last received data per subscription
454    data_cache: Arc<Mutex<HashMap<String, serde_json::Value>>>,
455    /// Reconnection attempt count
456    reconnect_attempts: Arc<Mutex<u32>>,
457    /// Config
458    config: StreamConfig,
459}
460
461impl DataStream {
462    /// Create a new data stream.
463    #[must_use]
464    pub fn new(config: StreamConfig) -> Self {
465        Self {
466            subscriptions: Arc::new(Mutex::new(HashMap::new())),
467            state: Arc::new(Mutex::new(ConnectionState::Disconnected)),
468            outbox: Arc::new(Mutex::new(Vec::new())),
469            data_cache: Arc::new(Mutex::new(HashMap::new())),
470            reconnect_attempts: Arc::new(Mutex::new(0)),
471            config,
472        }
473    }
474
475    /// Get connection state.
476    #[must_use]
477    pub fn state(&self) -> ConnectionState {
478        *self.state.lock().expect("state mutex not poisoned")
479    }
480
481    /// Set connection state.
482    pub fn set_state(&self, state: ConnectionState) {
483        *self.state.lock().expect("state mutex not poisoned") = state;
484    }
485
486    /// Subscribe to a data source.
487    pub fn subscribe(&self, subscription: StreamSubscription) -> String {
488        let id = subscription.id.clone();
489        let msg = subscription.to_message();
490
491        self.subscriptions
492            .lock()
493            .expect("subscriptions mutex not poisoned")
494            .insert(id.clone(), subscription);
495
496        self.outbox
497            .lock()
498            .expect("outbox mutex not poisoned")
499            .push(msg);
500        id
501    }
502
503    /// Unsubscribe from a data source.
504    pub fn unsubscribe(&self, id: &str) {
505        self.subscriptions
506            .lock()
507            .expect("subscriptions mutex not poisoned")
508            .remove(id);
509        self.data_cache
510            .lock()
511            .expect("cache mutex not poisoned")
512            .remove(id);
513        self.outbox
514            .lock()
515            .expect("outbox mutex not poisoned")
516            .push(StreamMessage::unsubscribe(id));
517    }
518
519    /// Get subscription by ID.
520    #[must_use]
521    pub fn get_subscription(&self, id: &str) -> Option<StreamSubscription> {
522        self.subscriptions
523            .lock()
524            .expect("subscriptions mutex not poisoned")
525            .get(id)
526            .cloned()
527    }
528
529    /// Get all active subscriptions.
530    #[must_use]
531    pub fn subscriptions(&self) -> Vec<StreamSubscription> {
532        self.subscriptions
533            .lock()
534            .expect("subscriptions mutex not poisoned")
535            .values()
536            .cloned()
537            .collect()
538    }
539
540    /// Get cached data for a subscription.
541    #[must_use]
542    pub fn get_data(&self, id: &str) -> Option<serde_json::Value> {
543        self.data_cache
544            .lock()
545            .expect("cache mutex not poisoned")
546            .get(id)
547            .cloned()
548    }
549
550    /// Handle an incoming message.
551    pub fn handle_message(&self, msg: StreamMessage) -> Option<StreamMessage> {
552        match msg {
553            StreamMessage::Data {
554                id, payload, seq, ..
555            } => {
556                // Update subscription state
557                if let Some(sub) = self
558                    .subscriptions
559                    .lock()
560                    .expect("subscriptions mutex not poisoned")
561                    .get_mut(&id)
562                {
563                    sub.last_seq = seq;
564                    sub.active = true;
565                    sub.error_count = 0;
566                }
567                // Cache data
568                self.data_cache
569                    .lock()
570                    .expect("cache mutex not poisoned")
571                    .insert(id, payload);
572                None
573            }
574            StreamMessage::Ack { id, .. } => {
575                if let Some(sub) = self
576                    .subscriptions
577                    .lock()
578                    .expect("subscriptions mutex not poisoned")
579                    .get_mut(&id)
580                {
581                    sub.active = true;
582                }
583                None
584            }
585            StreamMessage::Error { id, .. } => {
586                if let Some(ref id) = id {
587                    if let Some(sub) = self
588                        .subscriptions
589                        .lock()
590                        .expect("subscriptions mutex not poisoned")
591                        .get_mut(id)
592                    {
593                        sub.error_count += 1;
594                    }
595                }
596                None
597            }
598            StreamMessage::Ping { timestamp } => Some(StreamMessage::pong(timestamp)),
599            StreamMessage::Pong { .. } => None,
600            _ => None,
601        }
602    }
603
604    /// Take pending outbound messages.
605    #[must_use]
606    pub fn take_outbox(&self) -> Vec<StreamMessage> {
607        std::mem::take(&mut *self.outbox.lock().expect("outbox mutex not poisoned"))
608    }
609
610    /// Queue an outbound message.
611    pub fn send(&self, msg: StreamMessage) {
612        self.outbox
613            .lock()
614            .expect("outbox mutex not poisoned")
615            .push(msg);
616    }
617
618    /// Get reconnection delay based on current attempts.
619    #[must_use]
620    pub fn reconnect_delay(&self) -> Duration {
621        let attempts = *self
622            .reconnect_attempts
623            .lock()
624            .expect("reconnect mutex not poisoned");
625        self.config.reconnect.delay_for_attempt(attempts)
626    }
627
628    /// Increment reconnection attempts.
629    pub fn increment_reconnect_attempts(&self) {
630        *self
631            .reconnect_attempts
632            .lock()
633            .expect("reconnect mutex not poisoned") += 1;
634    }
635
636    /// Reset reconnection attempts.
637    pub fn reset_reconnect_attempts(&self) {
638        *self
639            .reconnect_attempts
640            .lock()
641            .expect("reconnect mutex not poisoned") = 0;
642    }
643
644    /// Check if we should try to reconnect.
645    #[must_use]
646    pub fn should_reconnect(&self) -> bool {
647        let attempts = *self
648            .reconnect_attempts
649            .lock()
650            .expect("reconnect mutex not poisoned");
651        self.config.reconnect.should_reconnect(attempts)
652    }
653
654    /// Resubscribe all subscriptions (after reconnect).
655    pub fn resubscribe_all(&self) {
656        let subs = self
657            .subscriptions
658            .lock()
659            .expect("subscriptions mutex not poisoned")
660            .clone();
661        let mut outbox = self.outbox.lock().expect("outbox mutex not poisoned");
662        for sub in subs.values() {
663            outbox.push(sub.to_message());
664        }
665    }
666
667    /// Number of active subscriptions.
668    #[must_use]
669    pub fn subscription_count(&self) -> usize {
670        self.subscriptions
671            .lock()
672            .expect("subscriptions mutex not poisoned")
673            .len()
674    }
675
676    /// Clear all subscriptions and cache.
677    pub fn clear(&self) {
678        self.subscriptions
679            .lock()
680            .expect("subscriptions mutex not poisoned")
681            .clear();
682        self.data_cache
683            .lock()
684            .expect("cache mutex not poisoned")
685            .clear();
686        self.outbox
687            .lock()
688            .expect("outbox mutex not poisoned")
689            .clear();
690    }
691}
692
693/// Rate limiter for backpressure handling.
694#[derive(Debug)]
695pub struct RateLimiter {
696    /// Maximum messages per window
697    max_messages: usize,
698    /// Window duration
699    window: Duration,
700    /// Message timestamps
701    timestamps: Vec<u64>,
702}
703
704impl RateLimiter {
705    /// Create a new rate limiter.
706    #[must_use]
707    pub fn new(max_messages: usize, window: Duration) -> Self {
708        Self {
709            max_messages,
710            window,
711            timestamps: Vec::with_capacity(max_messages),
712        }
713    }
714
715    /// Check if a message is allowed and record it.
716    pub fn check(&mut self, now: u64) -> bool {
717        let window_start = now.saturating_sub(self.window.as_millis() as u64);
718
719        // Remove expired timestamps (>= to keep timestamps at window boundary)
720        self.timestamps.retain(|&ts| ts >= window_start);
721
722        if self.timestamps.len() < self.max_messages {
723            self.timestamps.push(now);
724            true
725        } else {
726            false
727        }
728    }
729
730    /// Get the number of messages in the current window.
731    #[must_use]
732    pub fn current_count(&self) -> usize {
733        self.timestamps.len()
734    }
735
736    /// Reset the rate limiter.
737    pub fn reset(&mut self) {
738        self.timestamps.clear();
739    }
740
741    /// Check if the limiter is at capacity.
742    #[must_use]
743    pub fn is_at_capacity(&self) -> bool {
744        self.timestamps.len() >= self.max_messages
745    }
746}
747
748impl Default for RateLimiter {
749    fn default() -> Self {
750        Self::new(100, Duration::from_secs(1))
751    }
752}
753
754/// Message buffer for ordering and deduplication.
755#[derive(Debug, Default)]
756pub struct MessageBuffer {
757    /// Buffer of messages by subscription ID
758    buffers: HashMap<String, SubscriptionBuffer>,
759}
760
761#[derive(Debug, Default)]
762struct SubscriptionBuffer {
763    /// Last processed sequence number
764    last_seq: u64,
765    /// Buffered out-of-order messages
766    pending: Vec<(u64, serde_json::Value)>,
767}
768
769impl MessageBuffer {
770    /// Create a new message buffer.
771    #[must_use]
772    pub fn new() -> Self {
773        Self::default()
774    }
775
776    /// Process a message and return it if it's the next in sequence.
777    pub fn process(
778        &mut self,
779        id: &str,
780        seq: u64,
781        payload: serde_json::Value,
782    ) -> Option<serde_json::Value> {
783        let buffer = self.buffers.entry(id.to_string()).or_default();
784
785        if seq == buffer.last_seq + 1 {
786            // This is the next expected message
787            buffer.last_seq = seq;
788
789            // Check for any buffered messages that are now in order
790            let mut result = Some(payload);
791            while let Some(pos) = buffer
792                .pending
793                .iter()
794                .position(|(s, _)| *s == buffer.last_seq + 1)
795            {
796                let (next_seq, next_payload) = buffer.pending.remove(pos);
797                buffer.last_seq = next_seq;
798                // Replace result with latest (or could accumulate)
799                result = Some(next_payload);
800            }
801            result
802        } else if seq > buffer.last_seq + 1 {
803            // Out of order - buffer it
804            buffer.pending.push((seq, payload));
805            None
806        } else {
807            // Duplicate or old message - ignore
808            None
809        }
810    }
811
812    /// Get the last processed sequence for a subscription.
813    #[must_use]
814    pub fn last_seq(&self, id: &str) -> u64 {
815        self.buffers.get(id).map_or(0, |b| b.last_seq)
816    }
817
818    /// Get the number of pending messages for a subscription.
819    #[must_use]
820    pub fn pending_count(&self, id: &str) -> usize {
821        self.buffers.get(id).map_or(0, |b| b.pending.len())
822    }
823
824    /// Clear buffer for a subscription.
825    pub fn clear(&mut self, id: &str) {
826        self.buffers.remove(id);
827    }
828
829    /// Clear all buffers.
830    pub fn clear_all(&mut self) {
831        self.buffers.clear();
832    }
833}
834
835#[cfg(test)]
836mod tests {
837    use super::*;
838
839    // =========================================================================
840    // ConnectionState Tests
841    // =========================================================================
842
843    #[test]
844    fn test_connection_state_default() {
845        let state = ConnectionState::default();
846        assert_eq!(state, ConnectionState::Disconnected);
847    }
848
849    #[test]
850    fn test_connection_state_is_active() {
851        assert!(!ConnectionState::Disconnected.is_active());
852        assert!(!ConnectionState::Connecting.is_active());
853        assert!(ConnectionState::Connected.is_active());
854        assert!(!ConnectionState::Reconnecting.is_active());
855        assert!(!ConnectionState::Failed.is_active());
856    }
857
858    #[test]
859    fn test_connection_state_is_connecting() {
860        assert!(!ConnectionState::Disconnected.is_connecting());
861        assert!(ConnectionState::Connecting.is_connecting());
862        assert!(!ConnectionState::Connected.is_connecting());
863        assert!(ConnectionState::Reconnecting.is_connecting());
864        assert!(!ConnectionState::Failed.is_connecting());
865    }
866
867    // =========================================================================
868    // StreamMessage Tests
869    // =========================================================================
870
871    #[test]
872    fn test_stream_message_subscribe() {
873        let msg = StreamMessage::subscribe("sub1", "metrics/cpu");
874        if let StreamMessage::Subscribe { id, source, .. } = msg {
875            assert_eq!(id, "sub1");
876            assert_eq!(source, "metrics/cpu");
877        } else {
878            panic!("Expected Subscribe message");
879        }
880    }
881
882    #[test]
883    fn test_stream_message_subscribe_with_transform() {
884        let msg = StreamMessage::subscribe_with_transform("sub1", "metrics/cpu", "rate()");
885        if let StreamMessage::Subscribe { transform, .. } = msg {
886            assert_eq!(transform, Some("rate()".to_string()));
887        } else {
888            panic!("Expected Subscribe message");
889        }
890    }
891
892    #[test]
893    fn test_stream_message_unsubscribe() {
894        let msg = StreamMessage::unsubscribe("sub1");
895        if let StreamMessage::Unsubscribe { id } = msg {
896            assert_eq!(id, "sub1");
897        } else {
898            panic!("Expected Unsubscribe message");
899        }
900    }
901
902    #[test]
903    fn test_stream_message_data() {
904        let msg = StreamMessage::data("sub1", serde_json::json!({"value": 42}), 5);
905        if let StreamMessage::Data {
906            id, payload, seq, ..
907        } = msg
908        {
909            assert_eq!(id, "sub1");
910            assert_eq!(payload, serde_json::json!({"value": 42}));
911            assert_eq!(seq, 5);
912        } else {
913            panic!("Expected Data message");
914        }
915    }
916
917    #[test]
918    fn test_stream_message_error() {
919        let msg = StreamMessage::error("connection failed");
920        if let StreamMessage::Error { message, id, .. } = msg {
921            assert_eq!(message, "connection failed");
922            assert!(id.is_none());
923        } else {
924            panic!("Expected Error message");
925        }
926    }
927
928    #[test]
929    fn test_stream_message_error_for() {
930        let msg = StreamMessage::error_for("sub1", "invalid source");
931        if let StreamMessage::Error { message, id, .. } = msg {
932            assert_eq!(message, "invalid source");
933            assert_eq!(id, Some("sub1".to_string()));
934        } else {
935            panic!("Expected Error message");
936        }
937    }
938
939    #[test]
940    fn test_stream_message_ack() {
941        let msg = StreamMessage::ack("sub1");
942        if let StreamMessage::Ack { id, .. } = msg {
943            assert_eq!(id, "sub1");
944        } else {
945            panic!("Expected Ack message");
946        }
947    }
948
949    #[test]
950    fn test_stream_message_ping_pong() {
951        let ping = StreamMessage::ping(12345);
952        let pong = StreamMessage::pong(12345);
953
954        if let StreamMessage::Ping { timestamp } = ping {
955            assert_eq!(timestamp, 12345);
956        } else {
957            panic!("Expected Ping");
958        }
959
960        if let StreamMessage::Pong { timestamp } = pong {
961            assert_eq!(timestamp, 12345);
962        } else {
963            panic!("Expected Pong");
964        }
965    }
966
967    #[test]
968    fn test_stream_message_subscription_id() {
969        assert_eq!(
970            StreamMessage::subscribe("sub1", "x").subscription_id(),
971            Some("sub1")
972        );
973        assert_eq!(
974            StreamMessage::unsubscribe("sub2").subscription_id(),
975            Some("sub2")
976        );
977        assert_eq!(
978            StreamMessage::data("sub3", serde_json::json!({}), 0).subscription_id(),
979            Some("sub3")
980        );
981        assert_eq!(StreamMessage::error("msg").subscription_id(), None);
982        assert_eq!(
983            StreamMessage::error_for("sub4", "msg").subscription_id(),
984            Some("sub4")
985        );
986        assert!(StreamMessage::ping(0).subscription_id().is_none());
987        assert!(StreamMessage::pong(0).subscription_id().is_none());
988    }
989
990    #[test]
991    fn test_stream_message_serialize() {
992        let msg = StreamMessage::data("sub1", serde_json::json!({"x": 1}), 42);
993        let json = serde_json::to_string(&msg).unwrap();
994        assert!(json.contains("\"type\":\"data\""));
995        assert!(json.contains("\"id\":\"sub1\""));
996        assert!(json.contains("\"seq\":42"));
997    }
998
999    #[test]
1000    fn test_stream_message_deserialize() {
1001        let json = r#"{"type":"subscribe","id":"s1","source":"data/x"}"#;
1002        let msg: StreamMessage = serde_json::from_str(json).unwrap();
1003        if let StreamMessage::Subscribe { id, source, .. } = msg {
1004            assert_eq!(id, "s1");
1005            assert_eq!(source, "data/x");
1006        } else {
1007            panic!("Expected Subscribe");
1008        }
1009    }
1010
1011    // =========================================================================
1012    // StreamSubscription Tests
1013    // =========================================================================
1014
1015    #[test]
1016    fn test_subscription_new() {
1017        let sub = StreamSubscription::new("metrics/cpu");
1018        assert_eq!(sub.source, "metrics/cpu");
1019        assert!(sub.id.starts_with("sub_"));
1020        assert!(!sub.active);
1021    }
1022
1023    #[test]
1024    fn test_subscription_with_id() {
1025        let sub = StreamSubscription::with_id("my-sub", "data/x");
1026        assert_eq!(sub.id, "my-sub");
1027        assert_eq!(sub.source, "data/x");
1028    }
1029
1030    #[test]
1031    fn test_subscription_with_interval() {
1032        let sub = StreamSubscription::new("x").with_interval(1000);
1033        assert_eq!(sub.interval, Some(Duration::from_millis(1000)));
1034    }
1035
1036    #[test]
1037    fn test_subscription_with_transform() {
1038        let sub = StreamSubscription::new("x").with_transform("rate() | limit(10)");
1039        assert_eq!(sub.transform, Some("rate() | limit(10)".to_string()));
1040    }
1041
1042    #[test]
1043    fn test_subscription_to_message() {
1044        let sub = StreamSubscription::with_id("sub1", "metrics")
1045            .with_interval(5000)
1046            .with_transform("mean()");
1047
1048        let msg = sub.to_message();
1049        if let StreamMessage::Subscribe {
1050            id,
1051            source,
1052            transform,
1053            interval_ms,
1054        } = msg
1055        {
1056            assert_eq!(id, "sub1");
1057            assert_eq!(source, "metrics");
1058            assert_eq!(transform, Some("mean()".to_string()));
1059            assert_eq!(interval_ms, Some(5000));
1060        } else {
1061            panic!("Expected Subscribe");
1062        }
1063    }
1064
1065    // =========================================================================
1066    // StreamConfig Tests
1067    // =========================================================================
1068
1069    #[test]
1070    fn test_stream_config_default() {
1071        let config = StreamConfig::default();
1072        assert!(config.url.is_empty());
1073        assert!(config.reconnect.enabled);
1074        assert_eq!(config.heartbeat_interval, Duration::from_secs(30));
1075    }
1076
1077    #[test]
1078    fn test_stream_config_new() {
1079        let config = StreamConfig::new("ws://localhost:8080");
1080        assert_eq!(config.url, "ws://localhost:8080");
1081    }
1082
1083    #[test]
1084    fn test_stream_config_builder() {
1085        let config = StreamConfig::new("ws://x")
1086            .with_heartbeat(Duration::from_secs(10))
1087            .with_buffer_size(2048);
1088
1089        assert_eq!(config.heartbeat_interval, Duration::from_secs(10));
1090        assert_eq!(config.buffer_size, 2048);
1091    }
1092
1093    // =========================================================================
1094    // ReconnectConfig Tests
1095    // =========================================================================
1096
1097    #[test]
1098    fn test_reconnect_config_default() {
1099        let config = ReconnectConfig::default();
1100        assert!(config.enabled);
1101        assert_eq!(config.initial_delay, Duration::from_millis(500));
1102        assert_eq!(config.max_delay, Duration::from_secs(30));
1103        assert!(config.max_attempts.is_none());
1104    }
1105
1106    #[test]
1107    fn test_reconnect_delay_for_attempt() {
1108        let config = ReconnectConfig {
1109            enabled: true,
1110            initial_delay: Duration::from_millis(100),
1111            max_delay: Duration::from_secs(10),
1112            backoff_multiplier: 2.0,
1113            max_attempts: None,
1114        };
1115
1116        assert_eq!(config.delay_for_attempt(0), Duration::from_millis(100));
1117        assert_eq!(config.delay_for_attempt(1), Duration::from_millis(200));
1118        assert_eq!(config.delay_for_attempt(2), Duration::from_millis(400));
1119        assert_eq!(config.delay_for_attempt(3), Duration::from_millis(800));
1120    }
1121
1122    #[test]
1123    fn test_reconnect_delay_capped() {
1124        let config = ReconnectConfig {
1125            enabled: true,
1126            initial_delay: Duration::from_secs(1),
1127            max_delay: Duration::from_secs(5),
1128            backoff_multiplier: 10.0,
1129            max_attempts: None,
1130        };
1131
1132        // After just a couple attempts, should be capped at max
1133        assert_eq!(config.delay_for_attempt(5), Duration::from_secs(5));
1134    }
1135
1136    #[test]
1137    fn test_reconnect_disabled() {
1138        let config = ReconnectConfig {
1139            enabled: false,
1140            ..Default::default()
1141        };
1142
1143        assert_eq!(config.delay_for_attempt(0), Duration::ZERO);
1144        assert!(!config.should_reconnect(0));
1145    }
1146
1147    #[test]
1148    fn test_reconnect_max_attempts() {
1149        let config = ReconnectConfig {
1150            enabled: true,
1151            max_attempts: Some(3),
1152            ..Default::default()
1153        };
1154
1155        assert!(config.should_reconnect(0));
1156        assert!(config.should_reconnect(1));
1157        assert!(config.should_reconnect(2));
1158        assert!(!config.should_reconnect(3));
1159        assert!(!config.should_reconnect(4));
1160    }
1161
1162    // =========================================================================
1163    // DataStream Tests
1164    // =========================================================================
1165
1166    #[test]
1167    fn test_data_stream_new() {
1168        let stream = DataStream::new(StreamConfig::new("ws://x"));
1169        assert_eq!(stream.state(), ConnectionState::Disconnected);
1170        assert_eq!(stream.subscription_count(), 0);
1171    }
1172
1173    #[test]
1174    fn test_data_stream_subscribe() {
1175        let stream = DataStream::new(StreamConfig::default());
1176        let sub = StreamSubscription::with_id("sub1", "metrics");
1177
1178        let id = stream.subscribe(sub);
1179        assert_eq!(id, "sub1");
1180        assert_eq!(stream.subscription_count(), 1);
1181
1182        let outbox = stream.take_outbox();
1183        assert_eq!(outbox.len(), 1);
1184        assert!(matches!(outbox[0], StreamMessage::Subscribe { .. }));
1185    }
1186
1187    #[test]
1188    fn test_data_stream_unsubscribe() {
1189        let stream = DataStream::new(StreamConfig::default());
1190        stream.subscribe(StreamSubscription::with_id("sub1", "x"));
1191        let _ = stream.take_outbox(); // Clear
1192
1193        stream.unsubscribe("sub1");
1194        assert_eq!(stream.subscription_count(), 0);
1195
1196        let outbox = stream.take_outbox();
1197        assert_eq!(outbox.len(), 1);
1198        assert!(matches!(outbox[0], StreamMessage::Unsubscribe { .. }));
1199    }
1200
1201    #[test]
1202    fn test_data_stream_handle_data() {
1203        let stream = DataStream::new(StreamConfig::default());
1204        stream.subscribe(StreamSubscription::with_id("sub1", "x"));
1205
1206        let msg = StreamMessage::data("sub1", serde_json::json!({"val": 42}), 1);
1207        stream.handle_message(msg);
1208
1209        let data = stream.get_data("sub1");
1210        assert_eq!(data, Some(serde_json::json!({"val": 42})));
1211
1212        let sub = stream.get_subscription("sub1").unwrap();
1213        assert!(sub.active);
1214        assert_eq!(sub.last_seq, 1);
1215    }
1216
1217    #[test]
1218    fn test_data_stream_handle_ack() {
1219        let stream = DataStream::new(StreamConfig::default());
1220        stream.subscribe(StreamSubscription::with_id("sub1", "x"));
1221
1222        stream.handle_message(StreamMessage::ack("sub1"));
1223
1224        let sub = stream.get_subscription("sub1").unwrap();
1225        assert!(sub.active);
1226    }
1227
1228    #[test]
1229    fn test_data_stream_handle_error() {
1230        let stream = DataStream::new(StreamConfig::default());
1231        stream.subscribe(StreamSubscription::with_id("sub1", "x"));
1232
1233        stream.handle_message(StreamMessage::error_for("sub1", "fail"));
1234
1235        let sub = stream.get_subscription("sub1").unwrap();
1236        assert_eq!(sub.error_count, 1);
1237    }
1238
1239    #[test]
1240    fn test_data_stream_handle_ping() {
1241        let stream = DataStream::new(StreamConfig::default());
1242        let response = stream.handle_message(StreamMessage::ping(12345));
1243
1244        assert!(matches!(
1245            response,
1246            Some(StreamMessage::Pong { timestamp: 12345 })
1247        ));
1248    }
1249
1250    #[test]
1251    fn test_data_stream_reconnect_logic() {
1252        let stream = DataStream::new(StreamConfig::default());
1253
1254        assert!(stream.should_reconnect());
1255        assert_eq!(stream.reconnect_delay(), Duration::from_millis(500));
1256
1257        stream.increment_reconnect_attempts();
1258        assert!(stream.should_reconnect());
1259        assert_eq!(stream.reconnect_delay(), Duration::from_millis(1000));
1260
1261        stream.reset_reconnect_attempts();
1262        assert_eq!(stream.reconnect_delay(), Duration::from_millis(500));
1263    }
1264
1265    #[test]
1266    fn test_data_stream_resubscribe_all() {
1267        let stream = DataStream::new(StreamConfig::default());
1268        stream.subscribe(StreamSubscription::with_id("sub1", "a"));
1269        stream.subscribe(StreamSubscription::with_id("sub2", "b"));
1270        let _ = stream.take_outbox(); // Clear
1271
1272        stream.resubscribe_all();
1273
1274        let outbox = stream.take_outbox();
1275        assert_eq!(outbox.len(), 2);
1276    }
1277
1278    #[test]
1279    fn test_data_stream_clear() {
1280        let stream = DataStream::new(StreamConfig::default());
1281        stream.subscribe(StreamSubscription::with_id("sub1", "a"));
1282        stream.handle_message(StreamMessage::data("sub1", serde_json::json!(1), 1));
1283
1284        stream.clear();
1285
1286        assert_eq!(stream.subscription_count(), 0);
1287        assert!(stream.get_data("sub1").is_none());
1288    }
1289
1290    // =========================================================================
1291    // RateLimiter Tests
1292    // =========================================================================
1293
1294    #[test]
1295    fn test_rate_limiter_allows_under_limit() {
1296        let mut limiter = RateLimiter::new(5, Duration::from_secs(1));
1297
1298        for i in 0..5 {
1299            assert!(limiter.check(i * 100), "message {} should be allowed", i);
1300        }
1301    }
1302
1303    #[test]
1304    fn test_rate_limiter_blocks_over_limit() {
1305        let mut limiter = RateLimiter::new(3, Duration::from_secs(1));
1306
1307        assert!(limiter.check(0));
1308        assert!(limiter.check(100));
1309        assert!(limiter.check(200));
1310        assert!(!limiter.check(300)); // Over limit
1311    }
1312
1313    #[test]
1314    fn test_rate_limiter_window_expiry() {
1315        let mut limiter = RateLimiter::new(2, Duration::from_millis(100));
1316
1317        assert!(limiter.check(0));
1318        assert!(limiter.check(50));
1319        assert!(!limiter.check(60)); // Over limit
1320
1321        // After window expires
1322        assert!(limiter.check(200)); // Old messages expired
1323    }
1324
1325    #[test]
1326    fn test_rate_limiter_current_count() {
1327        let mut limiter = RateLimiter::new(10, Duration::from_secs(1));
1328
1329        assert_eq!(limiter.current_count(), 0);
1330        limiter.check(0);
1331        assert_eq!(limiter.current_count(), 1);
1332        limiter.check(100);
1333        assert_eq!(limiter.current_count(), 2);
1334    }
1335
1336    #[test]
1337    fn test_rate_limiter_reset() {
1338        let mut limiter = RateLimiter::new(2, Duration::from_secs(1));
1339
1340        limiter.check(0);
1341        limiter.check(100);
1342        assert!(limiter.is_at_capacity());
1343
1344        limiter.reset();
1345        assert_eq!(limiter.current_count(), 0);
1346        assert!(!limiter.is_at_capacity());
1347    }
1348
1349    #[test]
1350    fn test_rate_limiter_default() {
1351        let limiter = RateLimiter::default();
1352        assert_eq!(limiter.max_messages, 100);
1353    }
1354
1355    // =========================================================================
1356    // MessageBuffer Tests
1357    // =========================================================================
1358
1359    #[test]
1360    fn test_message_buffer_in_order() {
1361        let mut buffer = MessageBuffer::new();
1362
1363        let r1 = buffer.process("sub1", 1, serde_json::json!(1));
1364        let r2 = buffer.process("sub1", 2, serde_json::json!(2));
1365        let r3 = buffer.process("sub1", 3, serde_json::json!(3));
1366
1367        assert_eq!(r1, Some(serde_json::json!(1)));
1368        assert_eq!(r2, Some(serde_json::json!(2)));
1369        assert_eq!(r3, Some(serde_json::json!(3)));
1370    }
1371
1372    #[test]
1373    fn test_message_buffer_out_of_order() {
1374        let mut buffer = MessageBuffer::new();
1375
1376        // Receive seq 1, then 3, then 2
1377        let r1 = buffer.process("sub1", 1, serde_json::json!(1));
1378        let r3 = buffer.process("sub1", 3, serde_json::json!(3)); // Buffered
1379        let r2 = buffer.process("sub1", 2, serde_json::json!(2)); // Triggers flush
1380
1381        assert_eq!(r1, Some(serde_json::json!(1)));
1382        assert!(r3.is_none()); // Buffered
1383        assert_eq!(r2, Some(serde_json::json!(3))); // Returns latest after reorder
1384    }
1385
1386    #[test]
1387    fn test_message_buffer_duplicate() {
1388        let mut buffer = MessageBuffer::new();
1389
1390        buffer.process("sub1", 1, serde_json::json!(1));
1391        let dup = buffer.process("sub1", 1, serde_json::json!("dup"));
1392
1393        assert!(dup.is_none()); // Duplicate ignored
1394    }
1395
1396    #[test]
1397    fn test_message_buffer_last_seq() {
1398        let mut buffer = MessageBuffer::new();
1399
1400        assert_eq!(buffer.last_seq("sub1"), 0);
1401        buffer.process("sub1", 1, serde_json::json!(1));
1402        assert_eq!(buffer.last_seq("sub1"), 1);
1403        buffer.process("sub1", 2, serde_json::json!(2));
1404        assert_eq!(buffer.last_seq("sub1"), 2);
1405    }
1406
1407    #[test]
1408    fn test_message_buffer_pending_count() {
1409        let mut buffer = MessageBuffer::new();
1410
1411        buffer.process("sub1", 1, serde_json::json!(1));
1412        buffer.process("sub1", 3, serde_json::json!(3)); // Skip 2
1413        buffer.process("sub1", 4, serde_json::json!(4)); // Skip 2
1414
1415        assert_eq!(buffer.pending_count("sub1"), 2);
1416    }
1417
1418    #[test]
1419    fn test_message_buffer_clear() {
1420        let mut buffer = MessageBuffer::new();
1421
1422        buffer.process("sub1", 1, serde_json::json!(1));
1423        buffer.process("sub2", 1, serde_json::json!(2));
1424
1425        buffer.clear("sub1");
1426        assert_eq!(buffer.last_seq("sub1"), 0);
1427        assert_eq!(buffer.last_seq("sub2"), 1);
1428    }
1429
1430    #[test]
1431    fn test_message_buffer_clear_all() {
1432        let mut buffer = MessageBuffer::new();
1433
1434        buffer.process("sub1", 1, serde_json::json!(1));
1435        buffer.process("sub2", 1, serde_json::json!(2));
1436
1437        buffer.clear_all();
1438        assert_eq!(buffer.last_seq("sub1"), 0);
1439        assert_eq!(buffer.last_seq("sub2"), 0);
1440    }
1441
1442    #[test]
1443    fn test_message_buffer_multiple_subscriptions() {
1444        let mut buffer = MessageBuffer::new();
1445
1446        buffer.process("sub1", 1, serde_json::json!("a"));
1447        buffer.process("sub2", 1, serde_json::json!("b"));
1448        buffer.process("sub1", 2, serde_json::json!("c"));
1449
1450        assert_eq!(buffer.last_seq("sub1"), 2);
1451        assert_eq!(buffer.last_seq("sub2"), 1);
1452    }
1453
1454    // =========================================================================
1455    // Additional Edge Case Tests
1456    // =========================================================================
1457
1458    #[test]
1459    fn test_connection_state_debug() {
1460        assert_eq!(format!("{:?}", ConnectionState::Connected), "Connected");
1461        assert_eq!(format!("{:?}", ConnectionState::Failed), "Failed");
1462    }
1463
1464    #[test]
1465    fn test_connection_state_clone() {
1466        let state = ConnectionState::Reconnecting;
1467        let cloned = state;
1468        assert_eq!(state, cloned);
1469    }
1470
1471    #[test]
1472    fn test_stream_message_debug() {
1473        let msg = StreamMessage::ping(12345);
1474        let debug = format!("{msg:?}");
1475        assert!(debug.contains("Ping"));
1476    }
1477
1478    #[test]
1479    fn test_stream_message_clone() {
1480        let msg = StreamMessage::data("sub1", serde_json::json!({"x": 1}), 5);
1481        let cloned = msg.clone();
1482        assert_eq!(msg, cloned);
1483    }
1484
1485    #[test]
1486    fn test_stream_subscription_clone() {
1487        let sub = StreamSubscription::with_id("sub1", "metrics")
1488            .with_interval(1000)
1489            .with_transform("rate()");
1490        let cloned = sub.clone();
1491        assert_eq!(cloned.id, "sub1");
1492        assert_eq!(cloned.source, "metrics");
1493        assert_eq!(cloned.transform, Some("rate()".to_string()));
1494    }
1495
1496    #[test]
1497    fn test_stream_subscription_debug() {
1498        let sub = StreamSubscription::new("test");
1499        let debug = format!("{sub:?}");
1500        assert!(debug.contains("StreamSubscription"));
1501    }
1502
1503    #[test]
1504    fn test_stream_subscription_hash_consistency() {
1505        // Same source should produce same hash
1506        let sub1 = StreamSubscription::new("metrics/cpu");
1507        let sub2 = StreamSubscription::new("metrics/cpu");
1508        assert_eq!(sub1.id, sub2.id);
1509    }
1510
1511    #[test]
1512    fn test_stream_subscription_hash_different() {
1513        let sub1 = StreamSubscription::new("metrics/cpu");
1514        let sub2 = StreamSubscription::new("metrics/memory");
1515        assert_ne!(sub1.id, sub2.id);
1516    }
1517
1518    #[test]
1519    fn test_stream_config_debug() {
1520        let config = StreamConfig::default();
1521        let debug = format!("{config:?}");
1522        assert!(debug.contains("StreamConfig"));
1523    }
1524
1525    #[test]
1526    fn test_stream_config_clone() {
1527        let config = StreamConfig::new("ws://test")
1528            .with_buffer_size(2048)
1529            .with_heartbeat(Duration::from_secs(60));
1530        let cloned = config.clone();
1531        assert_eq!(cloned.url, "ws://test");
1532        assert_eq!(cloned.buffer_size, 2048);
1533    }
1534
1535    #[test]
1536    fn test_stream_config_with_reconnect() {
1537        let reconnect = ReconnectConfig {
1538            enabled: false,
1539            max_attempts: Some(5),
1540            ..Default::default()
1541        };
1542        let config = StreamConfig::new("ws://x").with_reconnect(reconnect);
1543        assert!(!config.reconnect.enabled);
1544        assert_eq!(config.reconnect.max_attempts, Some(5));
1545    }
1546
1547    #[test]
1548    fn test_reconnect_config_debug() {
1549        let config = ReconnectConfig::default();
1550        let debug = format!("{config:?}");
1551        assert!(debug.contains("ReconnectConfig"));
1552    }
1553
1554    #[test]
1555    fn test_reconnect_config_clone() {
1556        let config = ReconnectConfig {
1557            max_attempts: Some(10),
1558            ..Default::default()
1559        };
1560        let cloned = config.clone();
1561        assert_eq!(cloned.max_attempts, Some(10));
1562    }
1563
1564    #[test]
1565    fn test_reconnect_delay_large_attempt() {
1566        let config = ReconnectConfig::default();
1567        // Large attempt number should be capped by max(20)
1568        let delay = config.delay_for_attempt(100);
1569        assert!(delay <= config.max_delay);
1570    }
1571
1572    #[test]
1573    fn test_data_stream_default() {
1574        let stream = DataStream::default();
1575        assert_eq!(stream.state(), ConnectionState::Disconnected);
1576        assert_eq!(stream.subscription_count(), 0);
1577    }
1578
1579    #[test]
1580    fn test_data_stream_set_state() {
1581        let stream = DataStream::default();
1582        stream.set_state(ConnectionState::Connected);
1583        assert_eq!(stream.state(), ConnectionState::Connected);
1584        stream.set_state(ConnectionState::Failed);
1585        assert_eq!(stream.state(), ConnectionState::Failed);
1586    }
1587
1588    #[test]
1589    fn test_data_stream_send() {
1590        let stream = DataStream::default();
1591        stream.send(StreamMessage::ping(100));
1592        stream.send(StreamMessage::ping(200));
1593
1594        let outbox = stream.take_outbox();
1595        assert_eq!(outbox.len(), 2);
1596    }
1597
1598    #[test]
1599    fn test_data_stream_get_nonexistent_subscription() {
1600        let stream = DataStream::default();
1601        assert!(stream.get_subscription("nonexistent").is_none());
1602    }
1603
1604    #[test]
1605    fn test_data_stream_get_nonexistent_data() {
1606        let stream = DataStream::default();
1607        assert!(stream.get_data("nonexistent").is_none());
1608    }
1609
1610    #[test]
1611    fn test_data_stream_subscriptions_list() {
1612        let stream = DataStream::default();
1613        stream.subscribe(StreamSubscription::with_id("sub1", "a"));
1614        stream.subscribe(StreamSubscription::with_id("sub2", "b"));
1615
1616        let subs = stream.subscriptions();
1617        assert_eq!(subs.len(), 2);
1618    }
1619
1620    #[test]
1621    fn test_data_stream_handle_pong() {
1622        let stream = DataStream::default();
1623        let response = stream.handle_message(StreamMessage::pong(12345));
1624        assert!(response.is_none());
1625    }
1626
1627    #[test]
1628    fn test_data_stream_handle_subscribe() {
1629        let stream = DataStream::default();
1630        // Subscribe messages from server side are ignored
1631        let response = stream.handle_message(StreamMessage::subscribe("sub1", "metrics"));
1632        assert!(response.is_none());
1633    }
1634
1635    #[test]
1636    fn test_data_stream_handle_error_no_id() {
1637        let stream = DataStream::default();
1638        // Error without ID doesn't affect any subscription
1639        let response = stream.handle_message(StreamMessage::error("general error"));
1640        assert!(response.is_none());
1641    }
1642
1643    #[test]
1644    fn test_data_stream_handle_error_unknown_id() {
1645        let stream = DataStream::default();
1646        // Error for unknown subscription
1647        let response = stream.handle_message(StreamMessage::error_for("unknown", "error"));
1648        assert!(response.is_none());
1649    }
1650
1651    #[test]
1652    fn test_data_stream_handle_data_unknown_subscription() {
1653        let stream = DataStream::default();
1654        // Data for unknown subscription still gets cached
1655        stream.handle_message(StreamMessage::data("unknown", serde_json::json!(42), 1));
1656        assert_eq!(stream.get_data("unknown"), Some(serde_json::json!(42)));
1657    }
1658
1659    #[test]
1660    fn test_rate_limiter_debug() {
1661        let limiter = RateLimiter::new(10, Duration::from_secs(1));
1662        let debug = format!("{limiter:?}");
1663        assert!(debug.contains("RateLimiter"));
1664    }
1665
1666    #[test]
1667    fn test_rate_limiter_at_boundary() {
1668        let mut limiter = RateLimiter::new(3, Duration::from_millis(100));
1669
1670        // All at time 0
1671        assert!(limiter.check(0));
1672        assert!(limiter.check(0));
1673        assert!(limiter.check(0));
1674        assert!(!limiter.check(0)); // Over limit at same time
1675
1676        // Exactly at window boundary - should keep messages
1677        assert!(!limiter.check(100)); // At boundary, old ones still valid
1678
1679        // Past window boundary
1680        assert!(limiter.check(101)); // Window expired
1681    }
1682
1683    #[test]
1684    fn test_message_buffer_debug() {
1685        let buffer = MessageBuffer::new();
1686        let debug = format!("{buffer:?}");
1687        assert!(debug.contains("MessageBuffer"));
1688    }
1689
1690    #[test]
1691    fn test_message_buffer_old_message() {
1692        let mut buffer = MessageBuffer::new();
1693
1694        // Process messages 1, 2, 3 in order
1695        buffer.process("sub1", 1, serde_json::json!(1));
1696        buffer.process("sub1", 2, serde_json::json!(2));
1697        buffer.process("sub1", 3, serde_json::json!(3));
1698
1699        // Old message (seq 1 when we're at 3) should be ignored
1700        let old = buffer.process("sub1", 1, serde_json::json!("old"));
1701        assert!(old.is_none());
1702        assert_eq!(buffer.last_seq("sub1"), 3);
1703    }
1704
1705    #[test]
1706    fn test_message_buffer_large_gap() {
1707        let mut buffer = MessageBuffer::new();
1708
1709        buffer.process("sub1", 1, serde_json::json!(1));
1710        // Skip many sequence numbers
1711        buffer.process("sub1", 100, serde_json::json!(100)); // Buffered
1712
1713        assert_eq!(buffer.last_seq("sub1"), 1);
1714        assert_eq!(buffer.pending_count("sub1"), 1);
1715    }
1716
1717    #[test]
1718    fn test_message_buffer_nonexistent_subscription() {
1719        let buffer = MessageBuffer::new();
1720        assert_eq!(buffer.last_seq("nonexistent"), 0);
1721        assert_eq!(buffer.pending_count("nonexistent"), 0);
1722    }
1723
1724    #[test]
1725    fn test_stream_message_serialize_all_variants() {
1726        let messages = vec![
1727            StreamMessage::subscribe("s1", "source"),
1728            StreamMessage::subscribe_with_transform("s2", "source", "rate()"),
1729            StreamMessage::unsubscribe("s1"),
1730            StreamMessage::data("s1", serde_json::json!({"x": 1}), 5),
1731            StreamMessage::error("msg"),
1732            StreamMessage::error_for("s1", "msg"),
1733            StreamMessage::ack("s1"),
1734            StreamMessage::ping(1000),
1735            StreamMessage::pong(1000),
1736        ];
1737
1738        for msg in messages {
1739            let json = serde_json::to_string(&msg).unwrap();
1740            let parsed: StreamMessage = serde_json::from_str(&json).unwrap();
1741            assert_eq!(msg, parsed);
1742        }
1743    }
1744
1745    #[test]
1746    fn test_stream_subscription_empty_source() {
1747        let sub = StreamSubscription::new("");
1748        assert!(sub.id.starts_with("sub_"));
1749        assert_eq!(sub.source, "");
1750    }
1751
1752    #[test]
1753    fn test_stream_subscription_unicode_source() {
1754        let sub = StreamSubscription::new("数据/指标");
1755        assert!(sub.id.starts_with("sub_"));
1756        assert_eq!(sub.source, "数据/指标");
1757    }
1758
1759    #[test]
1760    fn test_data_stream_multiple_data_updates() {
1761        let stream = DataStream::default();
1762        stream.subscribe(StreamSubscription::with_id("sub1", "x"));
1763
1764        // Multiple updates should update cache
1765        stream.handle_message(StreamMessage::data("sub1", serde_json::json!(1), 1));
1766        assert_eq!(stream.get_data("sub1"), Some(serde_json::json!(1)));
1767
1768        stream.handle_message(StreamMessage::data("sub1", serde_json::json!(2), 2));
1769        assert_eq!(stream.get_data("sub1"), Some(serde_json::json!(2)));
1770
1771        let sub = stream.get_subscription("sub1").unwrap();
1772        assert_eq!(sub.last_seq, 2);
1773        assert_eq!(sub.error_count, 0);
1774    }
1775
1776    #[test]
1777    fn test_reconnect_infinite_attempts() {
1778        let config = ReconnectConfig {
1779            enabled: true,
1780            max_attempts: None,
1781            ..Default::default()
1782        };
1783
1784        // Should always reconnect with infinite attempts
1785        assert!(config.should_reconnect(0));
1786        assert!(config.should_reconnect(100));
1787        assert!(config.should_reconnect(1000));
1788        assert!(config.should_reconnect(u32::MAX - 1));
1789    }
1790}