presentar_core/
streaming.rs

1#![allow(
2    clippy::unwrap_used,
3    clippy::disallowed_methods,
4    clippy::comparison_chain,
5    clippy::match_same_arms
6)]
7//! Data streaming and live updates for Presentar.
8//!
9//! This module provides infrastructure for real-time data updates:
10//! - Subscription management for data sources
11//! - Message protocol for updates
12//! - Reconnection and backpressure handling
13//! - Integration with expression executor for live transforms
14//!
15//! # Example
16//!
17//! ```
18//! use presentar_core::streaming::{
19//!     DataStream, StreamConfig, StreamMessage, StreamSubscription,
20//! };
21//!
22//! // Create a subscription
23//! let sub = StreamSubscription::new("metrics/cpu")
24//!     .with_interval(1000)  // 1 second
25//!     .with_transform("rate()");
26//!
27//! // Handle incoming messages
28//! fn handle_message(msg: StreamMessage) {
29//!     match msg {
30//!         StreamMessage::Data { payload, .. } => println!("Got data: {:?}", payload),
31//!         StreamMessage::Error { message, .. } => eprintln!("Error: {}", message),
32//!         _ => {}
33//!     }
34//! }
35//! ```
36
37use serde::{Deserialize, Serialize};
38use std::collections::HashMap;
39use std::sync::{Arc, Mutex};
40use std::time::Duration;
41
42/// Stream connection state.
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
44pub enum ConnectionState {
45    /// Not connected
46    #[default]
47    Disconnected,
48    /// Attempting to connect
49    Connecting,
50    /// Connected and ready
51    Connected,
52    /// Connection lost, attempting reconnection
53    Reconnecting,
54    /// Permanently failed
55    Failed,
56}
57
58impl ConnectionState {
59    /// Check if the stream can send/receive messages.
60    #[must_use]
61    pub const fn is_active(&self) -> bool {
62        matches!(self, Self::Connected)
63    }
64
65    /// Check if the stream is trying to connect.
66    #[must_use]
67    pub const fn is_connecting(&self) -> bool {
68        matches!(self, Self::Connecting | Self::Reconnecting)
69    }
70}
71
72/// Stream message types for the protocol.
73#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
74#[serde(tag = "type", rename_all = "snake_case")]
75pub enum StreamMessage {
76    /// Subscribe to a data source
77    Subscribe {
78        /// Subscription ID
79        id: String,
80        /// Data source path
81        source: String,
82        /// Optional transform expression
83        #[serde(skip_serializing_if = "Option::is_none")]
84        transform: Option<String>,
85        /// Refresh interval in milliseconds
86        #[serde(skip_serializing_if = "Option::is_none")]
87        interval_ms: Option<u64>,
88    },
89    /// Unsubscribe from a data source
90    Unsubscribe {
91        /// Subscription ID
92        id: String,
93    },
94    /// Data update from server
95    Data {
96        /// Subscription ID this data belongs to
97        id: String,
98        /// The payload data
99        payload: serde_json::Value,
100        /// Sequence number for ordering
101        #[serde(default)]
102        seq: u64,
103        /// Timestamp of the data
104        #[serde(skip_serializing_if = "Option::is_none")]
105        timestamp: Option<u64>,
106    },
107    /// Error message
108    Error {
109        /// Related subscription ID (if any)
110        #[serde(skip_serializing_if = "Option::is_none")]
111        id: Option<String>,
112        /// Error message
113        message: String,
114        /// Error code
115        #[serde(skip_serializing_if = "Option::is_none")]
116        code: Option<i32>,
117    },
118    /// Acknowledgment
119    Ack {
120        /// Subscription ID being acknowledged
121        id: String,
122        /// Status message
123        #[serde(skip_serializing_if = "Option::is_none")]
124        status: Option<String>,
125    },
126    /// Heartbeat/ping
127    Ping {
128        /// Timestamp
129        timestamp: u64,
130    },
131    /// Heartbeat response
132    Pong {
133        /// Echo of ping timestamp
134        timestamp: u64,
135    },
136}
137
138impl StreamMessage {
139    /// Create a subscribe message.
140    #[must_use]
141    pub fn subscribe(id: impl Into<String>, source: impl Into<String>) -> Self {
142        Self::Subscribe {
143            id: id.into(),
144            source: source.into(),
145            transform: None,
146            interval_ms: None,
147        }
148    }
149
150    /// Create a subscribe message with transform.
151    #[must_use]
152    pub fn subscribe_with_transform(
153        id: impl Into<String>,
154        source: impl Into<String>,
155        transform: impl Into<String>,
156    ) -> Self {
157        Self::Subscribe {
158            id: id.into(),
159            source: source.into(),
160            transform: Some(transform.into()),
161            interval_ms: None,
162        }
163    }
164
165    /// Create an unsubscribe message.
166    #[must_use]
167    pub fn unsubscribe(id: impl Into<String>) -> Self {
168        Self::Unsubscribe { id: id.into() }
169    }
170
171    /// Create a data message.
172    #[must_use]
173    pub fn data(id: impl Into<String>, payload: serde_json::Value, seq: u64) -> Self {
174        Self::Data {
175            id: id.into(),
176            payload,
177            seq,
178            timestamp: None,
179        }
180    }
181
182    /// Create an error message.
183    #[must_use]
184    pub fn error(message: impl Into<String>) -> Self {
185        Self::Error {
186            id: None,
187            message: message.into(),
188            code: None,
189        }
190    }
191
192    /// Create an error message for a subscription.
193    #[must_use]
194    pub fn error_for(id: impl Into<String>, message: impl Into<String>) -> Self {
195        Self::Error {
196            id: Some(id.into()),
197            message: message.into(),
198            code: None,
199        }
200    }
201
202    /// Create an ack message.
203    #[must_use]
204    pub fn ack(id: impl Into<String>) -> Self {
205        Self::Ack {
206            id: id.into(),
207            status: None,
208        }
209    }
210
211    /// Create a ping message.
212    #[must_use]
213    pub fn ping(timestamp: u64) -> Self {
214        Self::Ping { timestamp }
215    }
216
217    /// Create a pong message.
218    #[must_use]
219    pub fn pong(timestamp: u64) -> Self {
220        Self::Pong { timestamp }
221    }
222
223    /// Get the subscription ID if this message has one.
224    #[must_use]
225    pub fn subscription_id(&self) -> Option<&str> {
226        match self {
227            Self::Subscribe { id, .. }
228            | Self::Unsubscribe { id }
229            | Self::Data { id, .. }
230            | Self::Ack { id, .. } => Some(id),
231            Self::Error { id, .. } => id.as_deref(),
232            Self::Ping { .. } | Self::Pong { .. } => None,
233        }
234    }
235}
236
237/// Subscription to a data source.
238#[derive(Debug, Clone)]
239pub struct StreamSubscription {
240    /// Unique subscription ID
241    pub id: String,
242    /// Data source path
243    pub source: String,
244    /// Transform expression to apply
245    pub transform: Option<String>,
246    /// Refresh interval
247    pub interval: Option<Duration>,
248    /// Whether this subscription is active
249    pub active: bool,
250    /// Last received sequence number
251    pub last_seq: u64,
252    /// Error count for backoff
253    pub error_count: u32,
254}
255
256impl StreamSubscription {
257    /// Create a new subscription.
258    #[must_use]
259    pub fn new(source: impl Into<String>) -> Self {
260        let source = source.into();
261        let id = format!("sub_{}", Self::hash_source(&source));
262        Self {
263            id,
264            source,
265            transform: None,
266            interval: None,
267            active: false,
268            last_seq: 0,
269            error_count: 0,
270        }
271    }
272
273    /// Create with explicit ID.
274    #[must_use]
275    pub fn with_id(id: impl Into<String>, source: impl Into<String>) -> Self {
276        Self {
277            id: id.into(),
278            source: source.into(),
279            transform: None,
280            interval: None,
281            active: false,
282            last_seq: 0,
283            error_count: 0,
284        }
285    }
286
287    /// Set refresh interval in milliseconds.
288    #[must_use]
289    pub fn with_interval(mut self, ms: u64) -> Self {
290        self.interval = Some(Duration::from_millis(ms));
291        self
292    }
293
294    /// Set transform expression.
295    #[must_use]
296    pub fn with_transform(mut self, transform: impl Into<String>) -> Self {
297        self.transform = Some(transform.into());
298        self
299    }
300
301    /// Convert to subscribe message.
302    #[must_use]
303    pub fn to_message(&self) -> StreamMessage {
304        StreamMessage::Subscribe {
305            id: self.id.clone(),
306            source: self.source.clone(),
307            transform: self.transform.clone(),
308            interval_ms: self.interval.map(|d| d.as_millis() as u64),
309        }
310    }
311
312    /// Simple hash for generating IDs.
313    fn hash_source(s: &str) -> u64 {
314        let mut hash: u64 = 5381;
315        for byte in s.bytes() {
316            hash = hash.wrapping_mul(33).wrapping_add(u64::from(byte));
317        }
318        hash
319    }
320}
321
322/// Configuration for stream connection.
323#[derive(Debug, Clone)]
324pub struct StreamConfig {
325    /// WebSocket URL
326    pub url: String,
327    /// Reconnection settings
328    pub reconnect: ReconnectConfig,
329    /// Heartbeat interval
330    pub heartbeat_interval: Duration,
331    /// Message buffer size
332    pub buffer_size: usize,
333}
334
335impl Default for StreamConfig {
336    fn default() -> Self {
337        Self {
338            url: String::new(),
339            reconnect: ReconnectConfig::default(),
340            heartbeat_interval: Duration::from_secs(30),
341            buffer_size: 1024,
342        }
343    }
344}
345
346impl StreamConfig {
347    /// Create with URL.
348    #[must_use]
349    pub fn new(url: impl Into<String>) -> Self {
350        Self {
351            url: url.into(),
352            ..Default::default()
353        }
354    }
355
356    /// Set reconnection config.
357    #[must_use]
358    pub fn with_reconnect(mut self, config: ReconnectConfig) -> Self {
359        self.reconnect = config;
360        self
361    }
362
363    /// Set heartbeat interval.
364    #[must_use]
365    pub fn with_heartbeat(mut self, interval: Duration) -> Self {
366        self.heartbeat_interval = interval;
367        self
368    }
369
370    /// Set buffer size.
371    #[must_use]
372    pub fn with_buffer_size(mut self, size: usize) -> Self {
373        self.buffer_size = size;
374        self
375    }
376}
377
378/// Reconnection configuration.
379#[derive(Debug, Clone)]
380pub struct ReconnectConfig {
381    /// Whether to auto-reconnect
382    pub enabled: bool,
383    /// Initial delay before first reconnect
384    pub initial_delay: Duration,
385    /// Maximum delay between reconnects
386    pub max_delay: Duration,
387    /// Multiplier for exponential backoff
388    pub backoff_multiplier: f32,
389    /// Maximum reconnection attempts (None = infinite)
390    pub max_attempts: Option<u32>,
391}
392
393impl Default for ReconnectConfig {
394    fn default() -> Self {
395        Self {
396            enabled: true,
397            initial_delay: Duration::from_millis(500),
398            max_delay: Duration::from_secs(30),
399            backoff_multiplier: 2.0,
400            max_attempts: None,
401        }
402    }
403}
404
405impl ReconnectConfig {
406    /// Calculate delay for a given attempt number.
407    #[must_use]
408    pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
409        if !self.enabled {
410            return Duration::ZERO;
411        }
412
413        let delay_ms = self.initial_delay.as_millis() as f32
414            * self.backoff_multiplier.powi(attempt.min(20) as i32);
415
416        let delay = Duration::from_millis(delay_ms.min(self.max_delay.as_millis() as f32) as u64);
417        delay.min(self.max_delay)
418    }
419
420    /// Check if we should attempt reconnection.
421    #[must_use]
422    pub fn should_reconnect(&self, attempt: u32) -> bool {
423        if !self.enabled {
424            return false;
425        }
426        match self.max_attempts {
427            Some(max) => attempt < max,
428            None => true,
429        }
430    }
431}
432
433/// Callback type for data updates.
434pub type DataCallback = Box<dyn Fn(&str, &serde_json::Value) + Send + Sync>;
435
436/// Callback type for errors.
437pub type ErrorCallback = Box<dyn Fn(&str) + Send + Sync>;
438
439/// Callback type for connection state changes.
440pub type StateCallback = Box<dyn Fn(ConnectionState) + Send + Sync>;
441
442/// Data stream manager.
443///
444/// Manages subscriptions and handles incoming data.
445#[derive(Default)]
446pub struct DataStream {
447    /// Active subscriptions
448    subscriptions: Arc<Mutex<HashMap<String, StreamSubscription>>>,
449    /// Connection state
450    state: Arc<Mutex<ConnectionState>>,
451    /// Pending outbound messages
452    outbox: Arc<Mutex<Vec<StreamMessage>>>,
453    /// Last received data per subscription
454    data_cache: Arc<Mutex<HashMap<String, serde_json::Value>>>,
455    /// Reconnection attempt count
456    reconnect_attempts: Arc<Mutex<u32>>,
457    /// Config
458    config: StreamConfig,
459}
460
461impl DataStream {
462    /// Create a new data stream.
463    #[must_use]
464    pub fn new(config: StreamConfig) -> Self {
465        Self {
466            subscriptions: Arc::new(Mutex::new(HashMap::new())),
467            state: Arc::new(Mutex::new(ConnectionState::Disconnected)),
468            outbox: Arc::new(Mutex::new(Vec::new())),
469            data_cache: Arc::new(Mutex::new(HashMap::new())),
470            reconnect_attempts: Arc::new(Mutex::new(0)),
471            config,
472        }
473    }
474
475    /// Get connection state.
476    #[must_use]
477    pub fn state(&self) -> ConnectionState {
478        *self.state.lock().unwrap()
479    }
480
481    /// Set connection state.
482    pub fn set_state(&self, state: ConnectionState) {
483        *self.state.lock().unwrap() = state;
484    }
485
486    /// Subscribe to a data source.
487    pub fn subscribe(&self, subscription: StreamSubscription) -> String {
488        let id = subscription.id.clone();
489        let msg = subscription.to_message();
490
491        self.subscriptions
492            .lock()
493            .unwrap()
494            .insert(id.clone(), subscription);
495
496        self.outbox.lock().unwrap().push(msg);
497        id
498    }
499
500    /// Unsubscribe from a data source.
501    pub fn unsubscribe(&self, id: &str) {
502        self.subscriptions.lock().unwrap().remove(id);
503        self.data_cache.lock().unwrap().remove(id);
504        self.outbox
505            .lock()
506            .unwrap()
507            .push(StreamMessage::unsubscribe(id));
508    }
509
510    /// Get subscription by ID.
511    #[must_use]
512    pub fn get_subscription(&self, id: &str) -> Option<StreamSubscription> {
513        self.subscriptions.lock().unwrap().get(id).cloned()
514    }
515
516    /// Get all active subscriptions.
517    #[must_use]
518    pub fn subscriptions(&self) -> Vec<StreamSubscription> {
519        self.subscriptions
520            .lock()
521            .unwrap()
522            .values()
523            .cloned()
524            .collect()
525    }
526
527    /// Get cached data for a subscription.
528    #[must_use]
529    pub fn get_data(&self, id: &str) -> Option<serde_json::Value> {
530        self.data_cache.lock().unwrap().get(id).cloned()
531    }
532
533    /// Handle an incoming message.
534    pub fn handle_message(&self, msg: StreamMessage) -> Option<StreamMessage> {
535        match msg {
536            StreamMessage::Data {
537                id, payload, seq, ..
538            } => {
539                // Update subscription state
540                if let Some(sub) = self.subscriptions.lock().unwrap().get_mut(&id) {
541                    sub.last_seq = seq;
542                    sub.active = true;
543                    sub.error_count = 0;
544                }
545                // Cache data
546                self.data_cache.lock().unwrap().insert(id, payload);
547                None
548            }
549            StreamMessage::Ack { id, .. } => {
550                if let Some(sub) = self.subscriptions.lock().unwrap().get_mut(&id) {
551                    sub.active = true;
552                }
553                None
554            }
555            StreamMessage::Error { id, .. } => {
556                if let Some(ref id) = id {
557                    if let Some(sub) = self.subscriptions.lock().unwrap().get_mut(id) {
558                        sub.error_count += 1;
559                    }
560                }
561                None
562            }
563            StreamMessage::Ping { timestamp } => Some(StreamMessage::pong(timestamp)),
564            StreamMessage::Pong { .. } => None,
565            _ => None,
566        }
567    }
568
569    /// Take pending outbound messages.
570    #[must_use]
571    pub fn take_outbox(&self) -> Vec<StreamMessage> {
572        std::mem::take(&mut *self.outbox.lock().unwrap())
573    }
574
575    /// Queue an outbound message.
576    pub fn send(&self, msg: StreamMessage) {
577        self.outbox.lock().unwrap().push(msg);
578    }
579
580    /// Get reconnection delay based on current attempts.
581    #[must_use]
582    pub fn reconnect_delay(&self) -> Duration {
583        let attempts = *self.reconnect_attempts.lock().unwrap();
584        self.config.reconnect.delay_for_attempt(attempts)
585    }
586
587    /// Increment reconnection attempts.
588    pub fn increment_reconnect_attempts(&self) {
589        *self.reconnect_attempts.lock().unwrap() += 1;
590    }
591
592    /// Reset reconnection attempts.
593    pub fn reset_reconnect_attempts(&self) {
594        *self.reconnect_attempts.lock().unwrap() = 0;
595    }
596
597    /// Check if we should try to reconnect.
598    #[must_use]
599    pub fn should_reconnect(&self) -> bool {
600        let attempts = *self.reconnect_attempts.lock().unwrap();
601        self.config.reconnect.should_reconnect(attempts)
602    }
603
604    /// Resubscribe all subscriptions (after reconnect).
605    pub fn resubscribe_all(&self) {
606        let subs = self.subscriptions.lock().unwrap().clone();
607        let mut outbox = self.outbox.lock().unwrap();
608        for sub in subs.values() {
609            outbox.push(sub.to_message());
610        }
611    }
612
613    /// Number of active subscriptions.
614    #[must_use]
615    pub fn subscription_count(&self) -> usize {
616        self.subscriptions.lock().unwrap().len()
617    }
618
619    /// Clear all subscriptions and cache.
620    pub fn clear(&self) {
621        self.subscriptions.lock().unwrap().clear();
622        self.data_cache.lock().unwrap().clear();
623        self.outbox.lock().unwrap().clear();
624    }
625}
626
627/// Rate limiter for backpressure handling.
628#[derive(Debug)]
629pub struct RateLimiter {
630    /// Maximum messages per window
631    max_messages: usize,
632    /// Window duration
633    window: Duration,
634    /// Message timestamps
635    timestamps: Vec<u64>,
636}
637
638impl RateLimiter {
639    /// Create a new rate limiter.
640    #[must_use]
641    pub fn new(max_messages: usize, window: Duration) -> Self {
642        Self {
643            max_messages,
644            window,
645            timestamps: Vec::with_capacity(max_messages),
646        }
647    }
648
649    /// Check if a message is allowed and record it.
650    pub fn check(&mut self, now: u64) -> bool {
651        let window_start = now.saturating_sub(self.window.as_millis() as u64);
652
653        // Remove expired timestamps (>= to keep timestamps at window boundary)
654        self.timestamps.retain(|&ts| ts >= window_start);
655
656        if self.timestamps.len() < self.max_messages {
657            self.timestamps.push(now);
658            true
659        } else {
660            false
661        }
662    }
663
664    /// Get the number of messages in the current window.
665    #[must_use]
666    pub fn current_count(&self) -> usize {
667        self.timestamps.len()
668    }
669
670    /// Reset the rate limiter.
671    pub fn reset(&mut self) {
672        self.timestamps.clear();
673    }
674
675    /// Check if the limiter is at capacity.
676    #[must_use]
677    pub fn is_at_capacity(&self) -> bool {
678        self.timestamps.len() >= self.max_messages
679    }
680}
681
682impl Default for RateLimiter {
683    fn default() -> Self {
684        Self::new(100, Duration::from_secs(1))
685    }
686}
687
688/// Message buffer for ordering and deduplication.
689#[derive(Debug, Default)]
690pub struct MessageBuffer {
691    /// Buffer of messages by subscription ID
692    buffers: HashMap<String, SubscriptionBuffer>,
693}
694
695#[derive(Debug, Default)]
696struct SubscriptionBuffer {
697    /// Last processed sequence number
698    last_seq: u64,
699    /// Buffered out-of-order messages
700    pending: Vec<(u64, serde_json::Value)>,
701}
702
703impl MessageBuffer {
704    /// Create a new message buffer.
705    #[must_use]
706    pub fn new() -> Self {
707        Self::default()
708    }
709
710    /// Process a message and return it if it's the next in sequence.
711    pub fn process(
712        &mut self,
713        id: &str,
714        seq: u64,
715        payload: serde_json::Value,
716    ) -> Option<serde_json::Value> {
717        let buffer = self.buffers.entry(id.to_string()).or_default();
718
719        if seq == buffer.last_seq + 1 {
720            // This is the next expected message
721            buffer.last_seq = seq;
722
723            // Check for any buffered messages that are now in order
724            let mut result = Some(payload);
725            while let Some(pos) = buffer
726                .pending
727                .iter()
728                .position(|(s, _)| *s == buffer.last_seq + 1)
729            {
730                let (next_seq, next_payload) = buffer.pending.remove(pos);
731                buffer.last_seq = next_seq;
732                // Replace result with latest (or could accumulate)
733                result = Some(next_payload);
734            }
735            result
736        } else if seq > buffer.last_seq + 1 {
737            // Out of order - buffer it
738            buffer.pending.push((seq, payload));
739            None
740        } else {
741            // Duplicate or old message - ignore
742            None
743        }
744    }
745
746    /// Get the last processed sequence for a subscription.
747    #[must_use]
748    pub fn last_seq(&self, id: &str) -> u64 {
749        self.buffers.get(id).map_or(0, |b| b.last_seq)
750    }
751
752    /// Get the number of pending messages for a subscription.
753    #[must_use]
754    pub fn pending_count(&self, id: &str) -> usize {
755        self.buffers.get(id).map_or(0, |b| b.pending.len())
756    }
757
758    /// Clear buffer for a subscription.
759    pub fn clear(&mut self, id: &str) {
760        self.buffers.remove(id);
761    }
762
763    /// Clear all buffers.
764    pub fn clear_all(&mut self) {
765        self.buffers.clear();
766    }
767}
768
769#[cfg(test)]
770mod tests {
771    use super::*;
772
773    // =========================================================================
774    // ConnectionState Tests
775    // =========================================================================
776
777    #[test]
778    fn test_connection_state_default() {
779        let state = ConnectionState::default();
780        assert_eq!(state, ConnectionState::Disconnected);
781    }
782
783    #[test]
784    fn test_connection_state_is_active() {
785        assert!(!ConnectionState::Disconnected.is_active());
786        assert!(!ConnectionState::Connecting.is_active());
787        assert!(ConnectionState::Connected.is_active());
788        assert!(!ConnectionState::Reconnecting.is_active());
789        assert!(!ConnectionState::Failed.is_active());
790    }
791
792    #[test]
793    fn test_connection_state_is_connecting() {
794        assert!(!ConnectionState::Disconnected.is_connecting());
795        assert!(ConnectionState::Connecting.is_connecting());
796        assert!(!ConnectionState::Connected.is_connecting());
797        assert!(ConnectionState::Reconnecting.is_connecting());
798        assert!(!ConnectionState::Failed.is_connecting());
799    }
800
801    // =========================================================================
802    // StreamMessage Tests
803    // =========================================================================
804
805    #[test]
806    fn test_stream_message_subscribe() {
807        let msg = StreamMessage::subscribe("sub1", "metrics/cpu");
808        if let StreamMessage::Subscribe { id, source, .. } = msg {
809            assert_eq!(id, "sub1");
810            assert_eq!(source, "metrics/cpu");
811        } else {
812            panic!("Expected Subscribe message");
813        }
814    }
815
816    #[test]
817    fn test_stream_message_subscribe_with_transform() {
818        let msg = StreamMessage::subscribe_with_transform("sub1", "metrics/cpu", "rate()");
819        if let StreamMessage::Subscribe { transform, .. } = msg {
820            assert_eq!(transform, Some("rate()".to_string()));
821        } else {
822            panic!("Expected Subscribe message");
823        }
824    }
825
826    #[test]
827    fn test_stream_message_unsubscribe() {
828        let msg = StreamMessage::unsubscribe("sub1");
829        if let StreamMessage::Unsubscribe { id } = msg {
830            assert_eq!(id, "sub1");
831        } else {
832            panic!("Expected Unsubscribe message");
833        }
834    }
835
836    #[test]
837    fn test_stream_message_data() {
838        let msg = StreamMessage::data("sub1", serde_json::json!({"value": 42}), 5);
839        if let StreamMessage::Data {
840            id, payload, seq, ..
841        } = msg
842        {
843            assert_eq!(id, "sub1");
844            assert_eq!(payload, serde_json::json!({"value": 42}));
845            assert_eq!(seq, 5);
846        } else {
847            panic!("Expected Data message");
848        }
849    }
850
851    #[test]
852    fn test_stream_message_error() {
853        let msg = StreamMessage::error("connection failed");
854        if let StreamMessage::Error { message, id, .. } = msg {
855            assert_eq!(message, "connection failed");
856            assert!(id.is_none());
857        } else {
858            panic!("Expected Error message");
859        }
860    }
861
862    #[test]
863    fn test_stream_message_error_for() {
864        let msg = StreamMessage::error_for("sub1", "invalid source");
865        if let StreamMessage::Error { message, id, .. } = msg {
866            assert_eq!(message, "invalid source");
867            assert_eq!(id, Some("sub1".to_string()));
868        } else {
869            panic!("Expected Error message");
870        }
871    }
872
873    #[test]
874    fn test_stream_message_ack() {
875        let msg = StreamMessage::ack("sub1");
876        if let StreamMessage::Ack { id, .. } = msg {
877            assert_eq!(id, "sub1");
878        } else {
879            panic!("Expected Ack message");
880        }
881    }
882
883    #[test]
884    fn test_stream_message_ping_pong() {
885        let ping = StreamMessage::ping(12345);
886        let pong = StreamMessage::pong(12345);
887
888        if let StreamMessage::Ping { timestamp } = ping {
889            assert_eq!(timestamp, 12345);
890        } else {
891            panic!("Expected Ping");
892        }
893
894        if let StreamMessage::Pong { timestamp } = pong {
895            assert_eq!(timestamp, 12345);
896        } else {
897            panic!("Expected Pong");
898        }
899    }
900
901    #[test]
902    fn test_stream_message_subscription_id() {
903        assert_eq!(
904            StreamMessage::subscribe("sub1", "x").subscription_id(),
905            Some("sub1")
906        );
907        assert_eq!(
908            StreamMessage::unsubscribe("sub2").subscription_id(),
909            Some("sub2")
910        );
911        assert_eq!(
912            StreamMessage::data("sub3", serde_json::json!({}), 0).subscription_id(),
913            Some("sub3")
914        );
915        assert_eq!(StreamMessage::error("msg").subscription_id(), None);
916        assert_eq!(
917            StreamMessage::error_for("sub4", "msg").subscription_id(),
918            Some("sub4")
919        );
920        assert!(StreamMessage::ping(0).subscription_id().is_none());
921        assert!(StreamMessage::pong(0).subscription_id().is_none());
922    }
923
924    #[test]
925    fn test_stream_message_serialize() {
926        let msg = StreamMessage::data("sub1", serde_json::json!({"x": 1}), 42);
927        let json = serde_json::to_string(&msg).unwrap();
928        assert!(json.contains("\"type\":\"data\""));
929        assert!(json.contains("\"id\":\"sub1\""));
930        assert!(json.contains("\"seq\":42"));
931    }
932
933    #[test]
934    fn test_stream_message_deserialize() {
935        let json = r#"{"type":"subscribe","id":"s1","source":"data/x"}"#;
936        let msg: StreamMessage = serde_json::from_str(json).unwrap();
937        if let StreamMessage::Subscribe { id, source, .. } = msg {
938            assert_eq!(id, "s1");
939            assert_eq!(source, "data/x");
940        } else {
941            panic!("Expected Subscribe");
942        }
943    }
944
945    // =========================================================================
946    // StreamSubscription Tests
947    // =========================================================================
948
949    #[test]
950    fn test_subscription_new() {
951        let sub = StreamSubscription::new("metrics/cpu");
952        assert_eq!(sub.source, "metrics/cpu");
953        assert!(sub.id.starts_with("sub_"));
954        assert!(!sub.active);
955    }
956
957    #[test]
958    fn test_subscription_with_id() {
959        let sub = StreamSubscription::with_id("my-sub", "data/x");
960        assert_eq!(sub.id, "my-sub");
961        assert_eq!(sub.source, "data/x");
962    }
963
964    #[test]
965    fn test_subscription_with_interval() {
966        let sub = StreamSubscription::new("x").with_interval(1000);
967        assert_eq!(sub.interval, Some(Duration::from_millis(1000)));
968    }
969
970    #[test]
971    fn test_subscription_with_transform() {
972        let sub = StreamSubscription::new("x").with_transform("rate() | limit(10)");
973        assert_eq!(sub.transform, Some("rate() | limit(10)".to_string()));
974    }
975
976    #[test]
977    fn test_subscription_to_message() {
978        let sub = StreamSubscription::with_id("sub1", "metrics")
979            .with_interval(5000)
980            .with_transform("mean()");
981
982        let msg = sub.to_message();
983        if let StreamMessage::Subscribe {
984            id,
985            source,
986            transform,
987            interval_ms,
988        } = msg
989        {
990            assert_eq!(id, "sub1");
991            assert_eq!(source, "metrics");
992            assert_eq!(transform, Some("mean()".to_string()));
993            assert_eq!(interval_ms, Some(5000));
994        } else {
995            panic!("Expected Subscribe");
996        }
997    }
998
999    // =========================================================================
1000    // StreamConfig Tests
1001    // =========================================================================
1002
1003    #[test]
1004    fn test_stream_config_default() {
1005        let config = StreamConfig::default();
1006        assert!(config.url.is_empty());
1007        assert!(config.reconnect.enabled);
1008        assert_eq!(config.heartbeat_interval, Duration::from_secs(30));
1009    }
1010
1011    #[test]
1012    fn test_stream_config_new() {
1013        let config = StreamConfig::new("ws://localhost:8080");
1014        assert_eq!(config.url, "ws://localhost:8080");
1015    }
1016
1017    #[test]
1018    fn test_stream_config_builder() {
1019        let config = StreamConfig::new("ws://x")
1020            .with_heartbeat(Duration::from_secs(10))
1021            .with_buffer_size(2048);
1022
1023        assert_eq!(config.heartbeat_interval, Duration::from_secs(10));
1024        assert_eq!(config.buffer_size, 2048);
1025    }
1026
1027    // =========================================================================
1028    // ReconnectConfig Tests
1029    // =========================================================================
1030
1031    #[test]
1032    fn test_reconnect_config_default() {
1033        let config = ReconnectConfig::default();
1034        assert!(config.enabled);
1035        assert_eq!(config.initial_delay, Duration::from_millis(500));
1036        assert_eq!(config.max_delay, Duration::from_secs(30));
1037        assert!(config.max_attempts.is_none());
1038    }
1039
1040    #[test]
1041    fn test_reconnect_delay_for_attempt() {
1042        let config = ReconnectConfig {
1043            enabled: true,
1044            initial_delay: Duration::from_millis(100),
1045            max_delay: Duration::from_secs(10),
1046            backoff_multiplier: 2.0,
1047            max_attempts: None,
1048        };
1049
1050        assert_eq!(config.delay_for_attempt(0), Duration::from_millis(100));
1051        assert_eq!(config.delay_for_attempt(1), Duration::from_millis(200));
1052        assert_eq!(config.delay_for_attempt(2), Duration::from_millis(400));
1053        assert_eq!(config.delay_for_attempt(3), Duration::from_millis(800));
1054    }
1055
1056    #[test]
1057    fn test_reconnect_delay_capped() {
1058        let config = ReconnectConfig {
1059            enabled: true,
1060            initial_delay: Duration::from_secs(1),
1061            max_delay: Duration::from_secs(5),
1062            backoff_multiplier: 10.0,
1063            max_attempts: None,
1064        };
1065
1066        // After just a couple attempts, should be capped at max
1067        assert_eq!(config.delay_for_attempt(5), Duration::from_secs(5));
1068    }
1069
1070    #[test]
1071    fn test_reconnect_disabled() {
1072        let config = ReconnectConfig {
1073            enabled: false,
1074            ..Default::default()
1075        };
1076
1077        assert_eq!(config.delay_for_attempt(0), Duration::ZERO);
1078        assert!(!config.should_reconnect(0));
1079    }
1080
1081    #[test]
1082    fn test_reconnect_max_attempts() {
1083        let config = ReconnectConfig {
1084            enabled: true,
1085            max_attempts: Some(3),
1086            ..Default::default()
1087        };
1088
1089        assert!(config.should_reconnect(0));
1090        assert!(config.should_reconnect(1));
1091        assert!(config.should_reconnect(2));
1092        assert!(!config.should_reconnect(3));
1093        assert!(!config.should_reconnect(4));
1094    }
1095
1096    // =========================================================================
1097    // DataStream Tests
1098    // =========================================================================
1099
1100    #[test]
1101    fn test_data_stream_new() {
1102        let stream = DataStream::new(StreamConfig::new("ws://x"));
1103        assert_eq!(stream.state(), ConnectionState::Disconnected);
1104        assert_eq!(stream.subscription_count(), 0);
1105    }
1106
1107    #[test]
1108    fn test_data_stream_subscribe() {
1109        let stream = DataStream::new(StreamConfig::default());
1110        let sub = StreamSubscription::with_id("sub1", "metrics");
1111
1112        let id = stream.subscribe(sub);
1113        assert_eq!(id, "sub1");
1114        assert_eq!(stream.subscription_count(), 1);
1115
1116        let outbox = stream.take_outbox();
1117        assert_eq!(outbox.len(), 1);
1118        assert!(matches!(outbox[0], StreamMessage::Subscribe { .. }));
1119    }
1120
1121    #[test]
1122    fn test_data_stream_unsubscribe() {
1123        let stream = DataStream::new(StreamConfig::default());
1124        stream.subscribe(StreamSubscription::with_id("sub1", "x"));
1125        let _ = stream.take_outbox(); // Clear
1126
1127        stream.unsubscribe("sub1");
1128        assert_eq!(stream.subscription_count(), 0);
1129
1130        let outbox = stream.take_outbox();
1131        assert_eq!(outbox.len(), 1);
1132        assert!(matches!(outbox[0], StreamMessage::Unsubscribe { .. }));
1133    }
1134
1135    #[test]
1136    fn test_data_stream_handle_data() {
1137        let stream = DataStream::new(StreamConfig::default());
1138        stream.subscribe(StreamSubscription::with_id("sub1", "x"));
1139
1140        let msg = StreamMessage::data("sub1", serde_json::json!({"val": 42}), 1);
1141        stream.handle_message(msg);
1142
1143        let data = stream.get_data("sub1");
1144        assert_eq!(data, Some(serde_json::json!({"val": 42})));
1145
1146        let sub = stream.get_subscription("sub1").unwrap();
1147        assert!(sub.active);
1148        assert_eq!(sub.last_seq, 1);
1149    }
1150
1151    #[test]
1152    fn test_data_stream_handle_ack() {
1153        let stream = DataStream::new(StreamConfig::default());
1154        stream.subscribe(StreamSubscription::with_id("sub1", "x"));
1155
1156        stream.handle_message(StreamMessage::ack("sub1"));
1157
1158        let sub = stream.get_subscription("sub1").unwrap();
1159        assert!(sub.active);
1160    }
1161
1162    #[test]
1163    fn test_data_stream_handle_error() {
1164        let stream = DataStream::new(StreamConfig::default());
1165        stream.subscribe(StreamSubscription::with_id("sub1", "x"));
1166
1167        stream.handle_message(StreamMessage::error_for("sub1", "fail"));
1168
1169        let sub = stream.get_subscription("sub1").unwrap();
1170        assert_eq!(sub.error_count, 1);
1171    }
1172
1173    #[test]
1174    fn test_data_stream_handle_ping() {
1175        let stream = DataStream::new(StreamConfig::default());
1176        let response = stream.handle_message(StreamMessage::ping(12345));
1177
1178        assert!(matches!(
1179            response,
1180            Some(StreamMessage::Pong { timestamp: 12345 })
1181        ));
1182    }
1183
1184    #[test]
1185    fn test_data_stream_reconnect_logic() {
1186        let stream = DataStream::new(StreamConfig::default());
1187
1188        assert!(stream.should_reconnect());
1189        assert_eq!(stream.reconnect_delay(), Duration::from_millis(500));
1190
1191        stream.increment_reconnect_attempts();
1192        assert!(stream.should_reconnect());
1193        assert_eq!(stream.reconnect_delay(), Duration::from_millis(1000));
1194
1195        stream.reset_reconnect_attempts();
1196        assert_eq!(stream.reconnect_delay(), Duration::from_millis(500));
1197    }
1198
1199    #[test]
1200    fn test_data_stream_resubscribe_all() {
1201        let stream = DataStream::new(StreamConfig::default());
1202        stream.subscribe(StreamSubscription::with_id("sub1", "a"));
1203        stream.subscribe(StreamSubscription::with_id("sub2", "b"));
1204        let _ = stream.take_outbox(); // Clear
1205
1206        stream.resubscribe_all();
1207
1208        let outbox = stream.take_outbox();
1209        assert_eq!(outbox.len(), 2);
1210    }
1211
1212    #[test]
1213    fn test_data_stream_clear() {
1214        let stream = DataStream::new(StreamConfig::default());
1215        stream.subscribe(StreamSubscription::with_id("sub1", "a"));
1216        stream.handle_message(StreamMessage::data("sub1", serde_json::json!(1), 1));
1217
1218        stream.clear();
1219
1220        assert_eq!(stream.subscription_count(), 0);
1221        assert!(stream.get_data("sub1").is_none());
1222    }
1223
1224    // =========================================================================
1225    // RateLimiter Tests
1226    // =========================================================================
1227
1228    #[test]
1229    fn test_rate_limiter_allows_under_limit() {
1230        let mut limiter = RateLimiter::new(5, Duration::from_secs(1));
1231
1232        for i in 0..5 {
1233            assert!(limiter.check(i * 100), "message {} should be allowed", i);
1234        }
1235    }
1236
1237    #[test]
1238    fn test_rate_limiter_blocks_over_limit() {
1239        let mut limiter = RateLimiter::new(3, Duration::from_secs(1));
1240
1241        assert!(limiter.check(0));
1242        assert!(limiter.check(100));
1243        assert!(limiter.check(200));
1244        assert!(!limiter.check(300)); // Over limit
1245    }
1246
1247    #[test]
1248    fn test_rate_limiter_window_expiry() {
1249        let mut limiter = RateLimiter::new(2, Duration::from_millis(100));
1250
1251        assert!(limiter.check(0));
1252        assert!(limiter.check(50));
1253        assert!(!limiter.check(60)); // Over limit
1254
1255        // After window expires
1256        assert!(limiter.check(200)); // Old messages expired
1257    }
1258
1259    #[test]
1260    fn test_rate_limiter_current_count() {
1261        let mut limiter = RateLimiter::new(10, Duration::from_secs(1));
1262
1263        assert_eq!(limiter.current_count(), 0);
1264        limiter.check(0);
1265        assert_eq!(limiter.current_count(), 1);
1266        limiter.check(100);
1267        assert_eq!(limiter.current_count(), 2);
1268    }
1269
1270    #[test]
1271    fn test_rate_limiter_reset() {
1272        let mut limiter = RateLimiter::new(2, Duration::from_secs(1));
1273
1274        limiter.check(0);
1275        limiter.check(100);
1276        assert!(limiter.is_at_capacity());
1277
1278        limiter.reset();
1279        assert_eq!(limiter.current_count(), 0);
1280        assert!(!limiter.is_at_capacity());
1281    }
1282
1283    #[test]
1284    fn test_rate_limiter_default() {
1285        let limiter = RateLimiter::default();
1286        assert_eq!(limiter.max_messages, 100);
1287    }
1288
1289    // =========================================================================
1290    // MessageBuffer Tests
1291    // =========================================================================
1292
1293    #[test]
1294    fn test_message_buffer_in_order() {
1295        let mut buffer = MessageBuffer::new();
1296
1297        let r1 = buffer.process("sub1", 1, serde_json::json!(1));
1298        let r2 = buffer.process("sub1", 2, serde_json::json!(2));
1299        let r3 = buffer.process("sub1", 3, serde_json::json!(3));
1300
1301        assert_eq!(r1, Some(serde_json::json!(1)));
1302        assert_eq!(r2, Some(serde_json::json!(2)));
1303        assert_eq!(r3, Some(serde_json::json!(3)));
1304    }
1305
1306    #[test]
1307    fn test_message_buffer_out_of_order() {
1308        let mut buffer = MessageBuffer::new();
1309
1310        // Receive seq 1, then 3, then 2
1311        let r1 = buffer.process("sub1", 1, serde_json::json!(1));
1312        let r3 = buffer.process("sub1", 3, serde_json::json!(3)); // Buffered
1313        let r2 = buffer.process("sub1", 2, serde_json::json!(2)); // Triggers flush
1314
1315        assert_eq!(r1, Some(serde_json::json!(1)));
1316        assert!(r3.is_none()); // Buffered
1317        assert_eq!(r2, Some(serde_json::json!(3))); // Returns latest after reorder
1318    }
1319
1320    #[test]
1321    fn test_message_buffer_duplicate() {
1322        let mut buffer = MessageBuffer::new();
1323
1324        buffer.process("sub1", 1, serde_json::json!(1));
1325        let dup = buffer.process("sub1", 1, serde_json::json!("dup"));
1326
1327        assert!(dup.is_none()); // Duplicate ignored
1328    }
1329
1330    #[test]
1331    fn test_message_buffer_last_seq() {
1332        let mut buffer = MessageBuffer::new();
1333
1334        assert_eq!(buffer.last_seq("sub1"), 0);
1335        buffer.process("sub1", 1, serde_json::json!(1));
1336        assert_eq!(buffer.last_seq("sub1"), 1);
1337        buffer.process("sub1", 2, serde_json::json!(2));
1338        assert_eq!(buffer.last_seq("sub1"), 2);
1339    }
1340
1341    #[test]
1342    fn test_message_buffer_pending_count() {
1343        let mut buffer = MessageBuffer::new();
1344
1345        buffer.process("sub1", 1, serde_json::json!(1));
1346        buffer.process("sub1", 3, serde_json::json!(3)); // Skip 2
1347        buffer.process("sub1", 4, serde_json::json!(4)); // Skip 2
1348
1349        assert_eq!(buffer.pending_count("sub1"), 2);
1350    }
1351
1352    #[test]
1353    fn test_message_buffer_clear() {
1354        let mut buffer = MessageBuffer::new();
1355
1356        buffer.process("sub1", 1, serde_json::json!(1));
1357        buffer.process("sub2", 1, serde_json::json!(2));
1358
1359        buffer.clear("sub1");
1360        assert_eq!(buffer.last_seq("sub1"), 0);
1361        assert_eq!(buffer.last_seq("sub2"), 1);
1362    }
1363
1364    #[test]
1365    fn test_message_buffer_clear_all() {
1366        let mut buffer = MessageBuffer::new();
1367
1368        buffer.process("sub1", 1, serde_json::json!(1));
1369        buffer.process("sub2", 1, serde_json::json!(2));
1370
1371        buffer.clear_all();
1372        assert_eq!(buffer.last_seq("sub1"), 0);
1373        assert_eq!(buffer.last_seq("sub2"), 0);
1374    }
1375
1376    #[test]
1377    fn test_message_buffer_multiple_subscriptions() {
1378        let mut buffer = MessageBuffer::new();
1379
1380        buffer.process("sub1", 1, serde_json::json!("a"));
1381        buffer.process("sub2", 1, serde_json::json!("b"));
1382        buffer.process("sub1", 2, serde_json::json!("c"));
1383
1384        assert_eq!(buffer.last_seq("sub1"), 2);
1385        assert_eq!(buffer.last_seq("sub2"), 1);
1386    }
1387
1388    // =========================================================================
1389    // Additional Edge Case Tests
1390    // =========================================================================
1391
1392    #[test]
1393    fn test_connection_state_debug() {
1394        assert_eq!(format!("{:?}", ConnectionState::Connected), "Connected");
1395        assert_eq!(format!("{:?}", ConnectionState::Failed), "Failed");
1396    }
1397
1398    #[test]
1399    fn test_connection_state_clone() {
1400        let state = ConnectionState::Reconnecting;
1401        let cloned = state;
1402        assert_eq!(state, cloned);
1403    }
1404
1405    #[test]
1406    fn test_stream_message_debug() {
1407        let msg = StreamMessage::ping(12345);
1408        let debug = format!("{msg:?}");
1409        assert!(debug.contains("Ping"));
1410    }
1411
1412    #[test]
1413    fn test_stream_message_clone() {
1414        let msg = StreamMessage::data("sub1", serde_json::json!({"x": 1}), 5);
1415        let cloned = msg.clone();
1416        assert_eq!(msg, cloned);
1417    }
1418
1419    #[test]
1420    fn test_stream_subscription_clone() {
1421        let sub = StreamSubscription::with_id("sub1", "metrics")
1422            .with_interval(1000)
1423            .with_transform("rate()");
1424        let cloned = sub.clone();
1425        assert_eq!(cloned.id, "sub1");
1426        assert_eq!(cloned.source, "metrics");
1427        assert_eq!(cloned.transform, Some("rate()".to_string()));
1428    }
1429
1430    #[test]
1431    fn test_stream_subscription_debug() {
1432        let sub = StreamSubscription::new("test");
1433        let debug = format!("{sub:?}");
1434        assert!(debug.contains("StreamSubscription"));
1435    }
1436
1437    #[test]
1438    fn test_stream_subscription_hash_consistency() {
1439        // Same source should produce same hash
1440        let sub1 = StreamSubscription::new("metrics/cpu");
1441        let sub2 = StreamSubscription::new("metrics/cpu");
1442        assert_eq!(sub1.id, sub2.id);
1443    }
1444
1445    #[test]
1446    fn test_stream_subscription_hash_different() {
1447        let sub1 = StreamSubscription::new("metrics/cpu");
1448        let sub2 = StreamSubscription::new("metrics/memory");
1449        assert_ne!(sub1.id, sub2.id);
1450    }
1451
1452    #[test]
1453    fn test_stream_config_debug() {
1454        let config = StreamConfig::default();
1455        let debug = format!("{config:?}");
1456        assert!(debug.contains("StreamConfig"));
1457    }
1458
1459    #[test]
1460    fn test_stream_config_clone() {
1461        let config = StreamConfig::new("ws://test")
1462            .with_buffer_size(2048)
1463            .with_heartbeat(Duration::from_secs(60));
1464        let cloned = config.clone();
1465        assert_eq!(cloned.url, "ws://test");
1466        assert_eq!(cloned.buffer_size, 2048);
1467    }
1468
1469    #[test]
1470    fn test_stream_config_with_reconnect() {
1471        let reconnect = ReconnectConfig {
1472            enabled: false,
1473            max_attempts: Some(5),
1474            ..Default::default()
1475        };
1476        let config = StreamConfig::new("ws://x").with_reconnect(reconnect);
1477        assert!(!config.reconnect.enabled);
1478        assert_eq!(config.reconnect.max_attempts, Some(5));
1479    }
1480
1481    #[test]
1482    fn test_reconnect_config_debug() {
1483        let config = ReconnectConfig::default();
1484        let debug = format!("{config:?}");
1485        assert!(debug.contains("ReconnectConfig"));
1486    }
1487
1488    #[test]
1489    fn test_reconnect_config_clone() {
1490        let config = ReconnectConfig {
1491            max_attempts: Some(10),
1492            ..Default::default()
1493        };
1494        let cloned = config.clone();
1495        assert_eq!(cloned.max_attempts, Some(10));
1496    }
1497
1498    #[test]
1499    fn test_reconnect_delay_large_attempt() {
1500        let config = ReconnectConfig::default();
1501        // Large attempt number should be capped by max(20)
1502        let delay = config.delay_for_attempt(100);
1503        assert!(delay <= config.max_delay);
1504    }
1505
1506    #[test]
1507    fn test_data_stream_default() {
1508        let stream = DataStream::default();
1509        assert_eq!(stream.state(), ConnectionState::Disconnected);
1510        assert_eq!(stream.subscription_count(), 0);
1511    }
1512
1513    #[test]
1514    fn test_data_stream_set_state() {
1515        let stream = DataStream::default();
1516        stream.set_state(ConnectionState::Connected);
1517        assert_eq!(stream.state(), ConnectionState::Connected);
1518        stream.set_state(ConnectionState::Failed);
1519        assert_eq!(stream.state(), ConnectionState::Failed);
1520    }
1521
1522    #[test]
1523    fn test_data_stream_send() {
1524        let stream = DataStream::default();
1525        stream.send(StreamMessage::ping(100));
1526        stream.send(StreamMessage::ping(200));
1527
1528        let outbox = stream.take_outbox();
1529        assert_eq!(outbox.len(), 2);
1530    }
1531
1532    #[test]
1533    fn test_data_stream_get_nonexistent_subscription() {
1534        let stream = DataStream::default();
1535        assert!(stream.get_subscription("nonexistent").is_none());
1536    }
1537
1538    #[test]
1539    fn test_data_stream_get_nonexistent_data() {
1540        let stream = DataStream::default();
1541        assert!(stream.get_data("nonexistent").is_none());
1542    }
1543
1544    #[test]
1545    fn test_data_stream_subscriptions_list() {
1546        let stream = DataStream::default();
1547        stream.subscribe(StreamSubscription::with_id("sub1", "a"));
1548        stream.subscribe(StreamSubscription::with_id("sub2", "b"));
1549
1550        let subs = stream.subscriptions();
1551        assert_eq!(subs.len(), 2);
1552    }
1553
1554    #[test]
1555    fn test_data_stream_handle_pong() {
1556        let stream = DataStream::default();
1557        let response = stream.handle_message(StreamMessage::pong(12345));
1558        assert!(response.is_none());
1559    }
1560
1561    #[test]
1562    fn test_data_stream_handle_subscribe() {
1563        let stream = DataStream::default();
1564        // Subscribe messages from server side are ignored
1565        let response = stream.handle_message(StreamMessage::subscribe("sub1", "metrics"));
1566        assert!(response.is_none());
1567    }
1568
1569    #[test]
1570    fn test_data_stream_handle_error_no_id() {
1571        let stream = DataStream::default();
1572        // Error without ID doesn't affect any subscription
1573        let response = stream.handle_message(StreamMessage::error("general error"));
1574        assert!(response.is_none());
1575    }
1576
1577    #[test]
1578    fn test_data_stream_handle_error_unknown_id() {
1579        let stream = DataStream::default();
1580        // Error for unknown subscription
1581        let response = stream.handle_message(StreamMessage::error_for("unknown", "error"));
1582        assert!(response.is_none());
1583    }
1584
1585    #[test]
1586    fn test_data_stream_handle_data_unknown_subscription() {
1587        let stream = DataStream::default();
1588        // Data for unknown subscription still gets cached
1589        stream.handle_message(StreamMessage::data("unknown", serde_json::json!(42), 1));
1590        assert_eq!(stream.get_data("unknown"), Some(serde_json::json!(42)));
1591    }
1592
1593    #[test]
1594    fn test_rate_limiter_debug() {
1595        let limiter = RateLimiter::new(10, Duration::from_secs(1));
1596        let debug = format!("{limiter:?}");
1597        assert!(debug.contains("RateLimiter"));
1598    }
1599
1600    #[test]
1601    fn test_rate_limiter_at_boundary() {
1602        let mut limiter = RateLimiter::new(3, Duration::from_millis(100));
1603
1604        // All at time 0
1605        assert!(limiter.check(0));
1606        assert!(limiter.check(0));
1607        assert!(limiter.check(0));
1608        assert!(!limiter.check(0)); // Over limit at same time
1609
1610        // Exactly at window boundary - should keep messages
1611        assert!(!limiter.check(100)); // At boundary, old ones still valid
1612
1613        // Past window boundary
1614        assert!(limiter.check(101)); // Window expired
1615    }
1616
1617    #[test]
1618    fn test_message_buffer_debug() {
1619        let buffer = MessageBuffer::new();
1620        let debug = format!("{buffer:?}");
1621        assert!(debug.contains("MessageBuffer"));
1622    }
1623
1624    #[test]
1625    fn test_message_buffer_old_message() {
1626        let mut buffer = MessageBuffer::new();
1627
1628        // Process messages 1, 2, 3 in order
1629        buffer.process("sub1", 1, serde_json::json!(1));
1630        buffer.process("sub1", 2, serde_json::json!(2));
1631        buffer.process("sub1", 3, serde_json::json!(3));
1632
1633        // Old message (seq 1 when we're at 3) should be ignored
1634        let old = buffer.process("sub1", 1, serde_json::json!("old"));
1635        assert!(old.is_none());
1636        assert_eq!(buffer.last_seq("sub1"), 3);
1637    }
1638
1639    #[test]
1640    fn test_message_buffer_large_gap() {
1641        let mut buffer = MessageBuffer::new();
1642
1643        buffer.process("sub1", 1, serde_json::json!(1));
1644        // Skip many sequence numbers
1645        buffer.process("sub1", 100, serde_json::json!(100)); // Buffered
1646
1647        assert_eq!(buffer.last_seq("sub1"), 1);
1648        assert_eq!(buffer.pending_count("sub1"), 1);
1649    }
1650
1651    #[test]
1652    fn test_message_buffer_nonexistent_subscription() {
1653        let buffer = MessageBuffer::new();
1654        assert_eq!(buffer.last_seq("nonexistent"), 0);
1655        assert_eq!(buffer.pending_count("nonexistent"), 0);
1656    }
1657
1658    #[test]
1659    fn test_stream_message_serialize_all_variants() {
1660        let messages = vec![
1661            StreamMessage::subscribe("s1", "source"),
1662            StreamMessage::subscribe_with_transform("s2", "source", "rate()"),
1663            StreamMessage::unsubscribe("s1"),
1664            StreamMessage::data("s1", serde_json::json!({"x": 1}), 5),
1665            StreamMessage::error("msg"),
1666            StreamMessage::error_for("s1", "msg"),
1667            StreamMessage::ack("s1"),
1668            StreamMessage::ping(1000),
1669            StreamMessage::pong(1000),
1670        ];
1671
1672        for msg in messages {
1673            let json = serde_json::to_string(&msg).unwrap();
1674            let parsed: StreamMessage = serde_json::from_str(&json).unwrap();
1675            assert_eq!(msg, parsed);
1676        }
1677    }
1678
1679    #[test]
1680    fn test_stream_subscription_empty_source() {
1681        let sub = StreamSubscription::new("");
1682        assert!(sub.id.starts_with("sub_"));
1683        assert_eq!(sub.source, "");
1684    }
1685
1686    #[test]
1687    fn test_stream_subscription_unicode_source() {
1688        let sub = StreamSubscription::new("数据/指标");
1689        assert!(sub.id.starts_with("sub_"));
1690        assert_eq!(sub.source, "数据/指标");
1691    }
1692
1693    #[test]
1694    fn test_data_stream_multiple_data_updates() {
1695        let stream = DataStream::default();
1696        stream.subscribe(StreamSubscription::with_id("sub1", "x"));
1697
1698        // Multiple updates should update cache
1699        stream.handle_message(StreamMessage::data("sub1", serde_json::json!(1), 1));
1700        assert_eq!(stream.get_data("sub1"), Some(serde_json::json!(1)));
1701
1702        stream.handle_message(StreamMessage::data("sub1", serde_json::json!(2), 2));
1703        assert_eq!(stream.get_data("sub1"), Some(serde_json::json!(2)));
1704
1705        let sub = stream.get_subscription("sub1").unwrap();
1706        assert_eq!(sub.last_seq, 2);
1707        assert_eq!(sub.error_count, 0);
1708    }
1709
1710    #[test]
1711    fn test_reconnect_infinite_attempts() {
1712        let config = ReconnectConfig {
1713            enabled: true,
1714            max_attempts: None,
1715            ..Default::default()
1716        };
1717
1718        // Should always reconnect with infinite attempts
1719        assert!(config.should_reconnect(0));
1720        assert!(config.should_reconnect(100));
1721        assert!(config.should_reconnect(1000));
1722        assert!(config.should_reconnect(u32::MAX - 1));
1723    }
1724}