1#![allow(
2 clippy::unwrap_used,
3 clippy::disallowed_methods,
4 clippy::comparison_chain,
5 clippy::match_same_arms
6)]
7use serde::{Deserialize, Serialize};
38use std::collections::HashMap;
39use std::sync::{Arc, Mutex};
40use std::time::Duration;
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
44pub enum ConnectionState {
45 #[default]
47 Disconnected,
48 Connecting,
50 Connected,
52 Reconnecting,
54 Failed,
56}
57
58impl ConnectionState {
59 #[must_use]
61 pub const fn is_active(&self) -> bool {
62 matches!(self, Self::Connected)
63 }
64
65 #[must_use]
67 pub const fn is_connecting(&self) -> bool {
68 matches!(self, Self::Connecting | Self::Reconnecting)
69 }
70}
71
72#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
74#[serde(tag = "type", rename_all = "snake_case")]
75pub enum StreamMessage {
76 Subscribe {
78 id: String,
80 source: String,
82 #[serde(skip_serializing_if = "Option::is_none")]
84 transform: Option<String>,
85 #[serde(skip_serializing_if = "Option::is_none")]
87 interval_ms: Option<u64>,
88 },
89 Unsubscribe {
91 id: String,
93 },
94 Data {
96 id: String,
98 payload: serde_json::Value,
100 #[serde(default)]
102 seq: u64,
103 #[serde(skip_serializing_if = "Option::is_none")]
105 timestamp: Option<u64>,
106 },
107 Error {
109 #[serde(skip_serializing_if = "Option::is_none")]
111 id: Option<String>,
112 message: String,
114 #[serde(skip_serializing_if = "Option::is_none")]
116 code: Option<i32>,
117 },
118 Ack {
120 id: String,
122 #[serde(skip_serializing_if = "Option::is_none")]
124 status: Option<String>,
125 },
126 Ping {
128 timestamp: u64,
130 },
131 Pong {
133 timestamp: u64,
135 },
136}
137
138impl StreamMessage {
139 #[must_use]
141 pub fn subscribe(id: impl Into<String>, source: impl Into<String>) -> Self {
142 Self::Subscribe {
143 id: id.into(),
144 source: source.into(),
145 transform: None,
146 interval_ms: None,
147 }
148 }
149
150 #[must_use]
152 pub fn subscribe_with_transform(
153 id: impl Into<String>,
154 source: impl Into<String>,
155 transform: impl Into<String>,
156 ) -> Self {
157 Self::Subscribe {
158 id: id.into(),
159 source: source.into(),
160 transform: Some(transform.into()),
161 interval_ms: None,
162 }
163 }
164
165 #[must_use]
167 pub fn unsubscribe(id: impl Into<String>) -> Self {
168 Self::Unsubscribe { id: id.into() }
169 }
170
171 #[must_use]
173 pub fn data(id: impl Into<String>, payload: serde_json::Value, seq: u64) -> Self {
174 Self::Data {
175 id: id.into(),
176 payload,
177 seq,
178 timestamp: None,
179 }
180 }
181
182 #[must_use]
184 pub fn error(message: impl Into<String>) -> Self {
185 Self::Error {
186 id: None,
187 message: message.into(),
188 code: None,
189 }
190 }
191
192 #[must_use]
194 pub fn error_for(id: impl Into<String>, message: impl Into<String>) -> Self {
195 Self::Error {
196 id: Some(id.into()),
197 message: message.into(),
198 code: None,
199 }
200 }
201
202 #[must_use]
204 pub fn ack(id: impl Into<String>) -> Self {
205 Self::Ack {
206 id: id.into(),
207 status: None,
208 }
209 }
210
211 #[must_use]
213 pub fn ping(timestamp: u64) -> Self {
214 Self::Ping { timestamp }
215 }
216
217 #[must_use]
219 pub fn pong(timestamp: u64) -> Self {
220 Self::Pong { timestamp }
221 }
222
223 #[must_use]
225 pub fn subscription_id(&self) -> Option<&str> {
226 match self {
227 Self::Subscribe { id, .. }
228 | Self::Unsubscribe { id }
229 | Self::Data { id, .. }
230 | Self::Ack { id, .. } => Some(id),
231 Self::Error { id, .. } => id.as_deref(),
232 Self::Ping { .. } | Self::Pong { .. } => None,
233 }
234 }
235}
236
237#[derive(Debug, Clone)]
239pub struct StreamSubscription {
240 pub id: String,
242 pub source: String,
244 pub transform: Option<String>,
246 pub interval: Option<Duration>,
248 pub active: bool,
250 pub last_seq: u64,
252 pub error_count: u32,
254}
255
256impl StreamSubscription {
257 #[must_use]
259 pub fn new(source: impl Into<String>) -> Self {
260 let source = source.into();
261 let id = format!("sub_{}", Self::hash_source(&source));
262 Self {
263 id,
264 source,
265 transform: None,
266 interval: None,
267 active: false,
268 last_seq: 0,
269 error_count: 0,
270 }
271 }
272
273 #[must_use]
275 pub fn with_id(id: impl Into<String>, source: impl Into<String>) -> Self {
276 Self {
277 id: id.into(),
278 source: source.into(),
279 transform: None,
280 interval: None,
281 active: false,
282 last_seq: 0,
283 error_count: 0,
284 }
285 }
286
287 #[must_use]
289 pub fn with_interval(mut self, ms: u64) -> Self {
290 self.interval = Some(Duration::from_millis(ms));
291 self
292 }
293
294 #[must_use]
296 pub fn with_transform(mut self, transform: impl Into<String>) -> Self {
297 self.transform = Some(transform.into());
298 self
299 }
300
301 #[must_use]
303 pub fn to_message(&self) -> StreamMessage {
304 StreamMessage::Subscribe {
305 id: self.id.clone(),
306 source: self.source.clone(),
307 transform: self.transform.clone(),
308 interval_ms: self.interval.map(|d| d.as_millis() as u64),
309 }
310 }
311
312 fn hash_source(s: &str) -> u64 {
314 let mut hash: u64 = 5381;
315 for byte in s.bytes() {
316 hash = hash.wrapping_mul(33).wrapping_add(u64::from(byte));
317 }
318 hash
319 }
320}
321
322#[derive(Debug, Clone)]
324pub struct StreamConfig {
325 pub url: String,
327 pub reconnect: ReconnectConfig,
329 pub heartbeat_interval: Duration,
331 pub buffer_size: usize,
333}
334
335impl Default for StreamConfig {
336 fn default() -> Self {
337 Self {
338 url: String::new(),
339 reconnect: ReconnectConfig::default(),
340 heartbeat_interval: Duration::from_secs(30),
341 buffer_size: 1024,
342 }
343 }
344}
345
346impl StreamConfig {
347 #[must_use]
349 pub fn new(url: impl Into<String>) -> Self {
350 Self {
351 url: url.into(),
352 ..Default::default()
353 }
354 }
355
356 #[must_use]
358 pub fn with_reconnect(mut self, config: ReconnectConfig) -> Self {
359 self.reconnect = config;
360 self
361 }
362
363 #[must_use]
365 pub fn with_heartbeat(mut self, interval: Duration) -> Self {
366 self.heartbeat_interval = interval;
367 self
368 }
369
370 #[must_use]
372 pub fn with_buffer_size(mut self, size: usize) -> Self {
373 self.buffer_size = size;
374 self
375 }
376}
377
378#[derive(Debug, Clone)]
380pub struct ReconnectConfig {
381 pub enabled: bool,
383 pub initial_delay: Duration,
385 pub max_delay: Duration,
387 pub backoff_multiplier: f32,
389 pub max_attempts: Option<u32>,
391}
392
393impl Default for ReconnectConfig {
394 fn default() -> Self {
395 Self {
396 enabled: true,
397 initial_delay: Duration::from_millis(500),
398 max_delay: Duration::from_secs(30),
399 backoff_multiplier: 2.0,
400 max_attempts: None,
401 }
402 }
403}
404
405impl ReconnectConfig {
406 #[must_use]
408 pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
409 if !self.enabled {
410 return Duration::ZERO;
411 }
412
413 let delay_ms = self.initial_delay.as_millis() as f32
414 * self.backoff_multiplier.powi(attempt.min(20) as i32);
415
416 let delay = Duration::from_millis(delay_ms.min(self.max_delay.as_millis() as f32) as u64);
417 delay.min(self.max_delay)
418 }
419
420 #[must_use]
422 pub fn should_reconnect(&self, attempt: u32) -> bool {
423 if !self.enabled {
424 return false;
425 }
426 match self.max_attempts {
427 Some(max) => attempt < max,
428 None => true,
429 }
430 }
431}
432
433pub type DataCallback = Box<dyn Fn(&str, &serde_json::Value) + Send + Sync>;
435
436pub type ErrorCallback = Box<dyn Fn(&str) + Send + Sync>;
438
439pub type StateCallback = Box<dyn Fn(ConnectionState) + Send + Sync>;
441
442#[derive(Default)]
446pub struct DataStream {
447 subscriptions: Arc<Mutex<HashMap<String, StreamSubscription>>>,
449 state: Arc<Mutex<ConnectionState>>,
451 outbox: Arc<Mutex<Vec<StreamMessage>>>,
453 data_cache: Arc<Mutex<HashMap<String, serde_json::Value>>>,
455 reconnect_attempts: Arc<Mutex<u32>>,
457 config: StreamConfig,
459}
460
461impl DataStream {
462 #[must_use]
464 pub fn new(config: StreamConfig) -> Self {
465 Self {
466 subscriptions: Arc::new(Mutex::new(HashMap::new())),
467 state: Arc::new(Mutex::new(ConnectionState::Disconnected)),
468 outbox: Arc::new(Mutex::new(Vec::new())),
469 data_cache: Arc::new(Mutex::new(HashMap::new())),
470 reconnect_attempts: Arc::new(Mutex::new(0)),
471 config,
472 }
473 }
474
475 #[must_use]
477 pub fn state(&self) -> ConnectionState {
478 *self.state.lock().unwrap()
479 }
480
481 pub fn set_state(&self, state: ConnectionState) {
483 *self.state.lock().unwrap() = state;
484 }
485
486 pub fn subscribe(&self, subscription: StreamSubscription) -> String {
488 let id = subscription.id.clone();
489 let msg = subscription.to_message();
490
491 self.subscriptions
492 .lock()
493 .unwrap()
494 .insert(id.clone(), subscription);
495
496 self.outbox.lock().unwrap().push(msg);
497 id
498 }
499
500 pub fn unsubscribe(&self, id: &str) {
502 self.subscriptions.lock().unwrap().remove(id);
503 self.data_cache.lock().unwrap().remove(id);
504 self.outbox
505 .lock()
506 .unwrap()
507 .push(StreamMessage::unsubscribe(id));
508 }
509
510 #[must_use]
512 pub fn get_subscription(&self, id: &str) -> Option<StreamSubscription> {
513 self.subscriptions.lock().unwrap().get(id).cloned()
514 }
515
516 #[must_use]
518 pub fn subscriptions(&self) -> Vec<StreamSubscription> {
519 self.subscriptions
520 .lock()
521 .unwrap()
522 .values()
523 .cloned()
524 .collect()
525 }
526
527 #[must_use]
529 pub fn get_data(&self, id: &str) -> Option<serde_json::Value> {
530 self.data_cache.lock().unwrap().get(id).cloned()
531 }
532
533 pub fn handle_message(&self, msg: StreamMessage) -> Option<StreamMessage> {
535 match msg {
536 StreamMessage::Data {
537 id, payload, seq, ..
538 } => {
539 if let Some(sub) = self.subscriptions.lock().unwrap().get_mut(&id) {
541 sub.last_seq = seq;
542 sub.active = true;
543 sub.error_count = 0;
544 }
545 self.data_cache.lock().unwrap().insert(id, payload);
547 None
548 }
549 StreamMessage::Ack { id, .. } => {
550 if let Some(sub) = self.subscriptions.lock().unwrap().get_mut(&id) {
551 sub.active = true;
552 }
553 None
554 }
555 StreamMessage::Error { id, .. } => {
556 if let Some(ref id) = id {
557 if let Some(sub) = self.subscriptions.lock().unwrap().get_mut(id) {
558 sub.error_count += 1;
559 }
560 }
561 None
562 }
563 StreamMessage::Ping { timestamp } => Some(StreamMessage::pong(timestamp)),
564 StreamMessage::Pong { .. } => None,
565 _ => None,
566 }
567 }
568
569 #[must_use]
571 pub fn take_outbox(&self) -> Vec<StreamMessage> {
572 std::mem::take(&mut *self.outbox.lock().unwrap())
573 }
574
575 pub fn send(&self, msg: StreamMessage) {
577 self.outbox.lock().unwrap().push(msg);
578 }
579
580 #[must_use]
582 pub fn reconnect_delay(&self) -> Duration {
583 let attempts = *self.reconnect_attempts.lock().unwrap();
584 self.config.reconnect.delay_for_attempt(attempts)
585 }
586
587 pub fn increment_reconnect_attempts(&self) {
589 *self.reconnect_attempts.lock().unwrap() += 1;
590 }
591
592 pub fn reset_reconnect_attempts(&self) {
594 *self.reconnect_attempts.lock().unwrap() = 0;
595 }
596
597 #[must_use]
599 pub fn should_reconnect(&self) -> bool {
600 let attempts = *self.reconnect_attempts.lock().unwrap();
601 self.config.reconnect.should_reconnect(attempts)
602 }
603
604 pub fn resubscribe_all(&self) {
606 let subs = self.subscriptions.lock().unwrap().clone();
607 let mut outbox = self.outbox.lock().unwrap();
608 for sub in subs.values() {
609 outbox.push(sub.to_message());
610 }
611 }
612
613 #[must_use]
615 pub fn subscription_count(&self) -> usize {
616 self.subscriptions.lock().unwrap().len()
617 }
618
619 pub fn clear(&self) {
621 self.subscriptions.lock().unwrap().clear();
622 self.data_cache.lock().unwrap().clear();
623 self.outbox.lock().unwrap().clear();
624 }
625}
626
627#[derive(Debug)]
629pub struct RateLimiter {
630 max_messages: usize,
632 window: Duration,
634 timestamps: Vec<u64>,
636}
637
638impl RateLimiter {
639 #[must_use]
641 pub fn new(max_messages: usize, window: Duration) -> Self {
642 Self {
643 max_messages,
644 window,
645 timestamps: Vec::with_capacity(max_messages),
646 }
647 }
648
649 pub fn check(&mut self, now: u64) -> bool {
651 let window_start = now.saturating_sub(self.window.as_millis() as u64);
652
653 self.timestamps.retain(|&ts| ts >= window_start);
655
656 if self.timestamps.len() < self.max_messages {
657 self.timestamps.push(now);
658 true
659 } else {
660 false
661 }
662 }
663
664 #[must_use]
666 pub fn current_count(&self) -> usize {
667 self.timestamps.len()
668 }
669
670 pub fn reset(&mut self) {
672 self.timestamps.clear();
673 }
674
675 #[must_use]
677 pub fn is_at_capacity(&self) -> bool {
678 self.timestamps.len() >= self.max_messages
679 }
680}
681
682impl Default for RateLimiter {
683 fn default() -> Self {
684 Self::new(100, Duration::from_secs(1))
685 }
686}
687
688#[derive(Debug, Default)]
690pub struct MessageBuffer {
691 buffers: HashMap<String, SubscriptionBuffer>,
693}
694
695#[derive(Debug, Default)]
696struct SubscriptionBuffer {
697 last_seq: u64,
699 pending: Vec<(u64, serde_json::Value)>,
701}
702
703impl MessageBuffer {
704 #[must_use]
706 pub fn new() -> Self {
707 Self::default()
708 }
709
710 pub fn process(
712 &mut self,
713 id: &str,
714 seq: u64,
715 payload: serde_json::Value,
716 ) -> Option<serde_json::Value> {
717 let buffer = self.buffers.entry(id.to_string()).or_default();
718
719 if seq == buffer.last_seq + 1 {
720 buffer.last_seq = seq;
722
723 let mut result = Some(payload);
725 while let Some(pos) = buffer
726 .pending
727 .iter()
728 .position(|(s, _)| *s == buffer.last_seq + 1)
729 {
730 let (next_seq, next_payload) = buffer.pending.remove(pos);
731 buffer.last_seq = next_seq;
732 result = Some(next_payload);
734 }
735 result
736 } else if seq > buffer.last_seq + 1 {
737 buffer.pending.push((seq, payload));
739 None
740 } else {
741 None
743 }
744 }
745
746 #[must_use]
748 pub fn last_seq(&self, id: &str) -> u64 {
749 self.buffers.get(id).map_or(0, |b| b.last_seq)
750 }
751
752 #[must_use]
754 pub fn pending_count(&self, id: &str) -> usize {
755 self.buffers.get(id).map_or(0, |b| b.pending.len())
756 }
757
758 pub fn clear(&mut self, id: &str) {
760 self.buffers.remove(id);
761 }
762
763 pub fn clear_all(&mut self) {
765 self.buffers.clear();
766 }
767}
768
769#[cfg(test)]
770mod tests {
771 use super::*;
772
773 #[test]
778 fn test_connection_state_default() {
779 let state = ConnectionState::default();
780 assert_eq!(state, ConnectionState::Disconnected);
781 }
782
783 #[test]
784 fn test_connection_state_is_active() {
785 assert!(!ConnectionState::Disconnected.is_active());
786 assert!(!ConnectionState::Connecting.is_active());
787 assert!(ConnectionState::Connected.is_active());
788 assert!(!ConnectionState::Reconnecting.is_active());
789 assert!(!ConnectionState::Failed.is_active());
790 }
791
792 #[test]
793 fn test_connection_state_is_connecting() {
794 assert!(!ConnectionState::Disconnected.is_connecting());
795 assert!(ConnectionState::Connecting.is_connecting());
796 assert!(!ConnectionState::Connected.is_connecting());
797 assert!(ConnectionState::Reconnecting.is_connecting());
798 assert!(!ConnectionState::Failed.is_connecting());
799 }
800
801 #[test]
806 fn test_stream_message_subscribe() {
807 let msg = StreamMessage::subscribe("sub1", "metrics/cpu");
808 if let StreamMessage::Subscribe { id, source, .. } = msg {
809 assert_eq!(id, "sub1");
810 assert_eq!(source, "metrics/cpu");
811 } else {
812 panic!("Expected Subscribe message");
813 }
814 }
815
816 #[test]
817 fn test_stream_message_subscribe_with_transform() {
818 let msg = StreamMessage::subscribe_with_transform("sub1", "metrics/cpu", "rate()");
819 if let StreamMessage::Subscribe { transform, .. } = msg {
820 assert_eq!(transform, Some("rate()".to_string()));
821 } else {
822 panic!("Expected Subscribe message");
823 }
824 }
825
826 #[test]
827 fn test_stream_message_unsubscribe() {
828 let msg = StreamMessage::unsubscribe("sub1");
829 if let StreamMessage::Unsubscribe { id } = msg {
830 assert_eq!(id, "sub1");
831 } else {
832 panic!("Expected Unsubscribe message");
833 }
834 }
835
836 #[test]
837 fn test_stream_message_data() {
838 let msg = StreamMessage::data("sub1", serde_json::json!({"value": 42}), 5);
839 if let StreamMessage::Data {
840 id, payload, seq, ..
841 } = msg
842 {
843 assert_eq!(id, "sub1");
844 assert_eq!(payload, serde_json::json!({"value": 42}));
845 assert_eq!(seq, 5);
846 } else {
847 panic!("Expected Data message");
848 }
849 }
850
851 #[test]
852 fn test_stream_message_error() {
853 let msg = StreamMessage::error("connection failed");
854 if let StreamMessage::Error { message, id, .. } = msg {
855 assert_eq!(message, "connection failed");
856 assert!(id.is_none());
857 } else {
858 panic!("Expected Error message");
859 }
860 }
861
862 #[test]
863 fn test_stream_message_error_for() {
864 let msg = StreamMessage::error_for("sub1", "invalid source");
865 if let StreamMessage::Error { message, id, .. } = msg {
866 assert_eq!(message, "invalid source");
867 assert_eq!(id, Some("sub1".to_string()));
868 } else {
869 panic!("Expected Error message");
870 }
871 }
872
873 #[test]
874 fn test_stream_message_ack() {
875 let msg = StreamMessage::ack("sub1");
876 if let StreamMessage::Ack { id, .. } = msg {
877 assert_eq!(id, "sub1");
878 } else {
879 panic!("Expected Ack message");
880 }
881 }
882
883 #[test]
884 fn test_stream_message_ping_pong() {
885 let ping = StreamMessage::ping(12345);
886 let pong = StreamMessage::pong(12345);
887
888 if let StreamMessage::Ping { timestamp } = ping {
889 assert_eq!(timestamp, 12345);
890 } else {
891 panic!("Expected Ping");
892 }
893
894 if let StreamMessage::Pong { timestamp } = pong {
895 assert_eq!(timestamp, 12345);
896 } else {
897 panic!("Expected Pong");
898 }
899 }
900
901 #[test]
902 fn test_stream_message_subscription_id() {
903 assert_eq!(
904 StreamMessage::subscribe("sub1", "x").subscription_id(),
905 Some("sub1")
906 );
907 assert_eq!(
908 StreamMessage::unsubscribe("sub2").subscription_id(),
909 Some("sub2")
910 );
911 assert_eq!(
912 StreamMessage::data("sub3", serde_json::json!({}), 0).subscription_id(),
913 Some("sub3")
914 );
915 assert_eq!(StreamMessage::error("msg").subscription_id(), None);
916 assert_eq!(
917 StreamMessage::error_for("sub4", "msg").subscription_id(),
918 Some("sub4")
919 );
920 assert!(StreamMessage::ping(0).subscription_id().is_none());
921 assert!(StreamMessage::pong(0).subscription_id().is_none());
922 }
923
924 #[test]
925 fn test_stream_message_serialize() {
926 let msg = StreamMessage::data("sub1", serde_json::json!({"x": 1}), 42);
927 let json = serde_json::to_string(&msg).unwrap();
928 assert!(json.contains("\"type\":\"data\""));
929 assert!(json.contains("\"id\":\"sub1\""));
930 assert!(json.contains("\"seq\":42"));
931 }
932
933 #[test]
934 fn test_stream_message_deserialize() {
935 let json = r#"{"type":"subscribe","id":"s1","source":"data/x"}"#;
936 let msg: StreamMessage = serde_json::from_str(json).unwrap();
937 if let StreamMessage::Subscribe { id, source, .. } = msg {
938 assert_eq!(id, "s1");
939 assert_eq!(source, "data/x");
940 } else {
941 panic!("Expected Subscribe");
942 }
943 }
944
945 #[test]
950 fn test_subscription_new() {
951 let sub = StreamSubscription::new("metrics/cpu");
952 assert_eq!(sub.source, "metrics/cpu");
953 assert!(sub.id.starts_with("sub_"));
954 assert!(!sub.active);
955 }
956
957 #[test]
958 fn test_subscription_with_id() {
959 let sub = StreamSubscription::with_id("my-sub", "data/x");
960 assert_eq!(sub.id, "my-sub");
961 assert_eq!(sub.source, "data/x");
962 }
963
964 #[test]
965 fn test_subscription_with_interval() {
966 let sub = StreamSubscription::new("x").with_interval(1000);
967 assert_eq!(sub.interval, Some(Duration::from_millis(1000)));
968 }
969
970 #[test]
971 fn test_subscription_with_transform() {
972 let sub = StreamSubscription::new("x").with_transform("rate() | limit(10)");
973 assert_eq!(sub.transform, Some("rate() | limit(10)".to_string()));
974 }
975
976 #[test]
977 fn test_subscription_to_message() {
978 let sub = StreamSubscription::with_id("sub1", "metrics")
979 .with_interval(5000)
980 .with_transform("mean()");
981
982 let msg = sub.to_message();
983 if let StreamMessage::Subscribe {
984 id,
985 source,
986 transform,
987 interval_ms,
988 } = msg
989 {
990 assert_eq!(id, "sub1");
991 assert_eq!(source, "metrics");
992 assert_eq!(transform, Some("mean()".to_string()));
993 assert_eq!(interval_ms, Some(5000));
994 } else {
995 panic!("Expected Subscribe");
996 }
997 }
998
999 #[test]
1004 fn test_stream_config_default() {
1005 let config = StreamConfig::default();
1006 assert!(config.url.is_empty());
1007 assert!(config.reconnect.enabled);
1008 assert_eq!(config.heartbeat_interval, Duration::from_secs(30));
1009 }
1010
1011 #[test]
1012 fn test_stream_config_new() {
1013 let config = StreamConfig::new("ws://localhost:8080");
1014 assert_eq!(config.url, "ws://localhost:8080");
1015 }
1016
1017 #[test]
1018 fn test_stream_config_builder() {
1019 let config = StreamConfig::new("ws://x")
1020 .with_heartbeat(Duration::from_secs(10))
1021 .with_buffer_size(2048);
1022
1023 assert_eq!(config.heartbeat_interval, Duration::from_secs(10));
1024 assert_eq!(config.buffer_size, 2048);
1025 }
1026
1027 #[test]
1032 fn test_reconnect_config_default() {
1033 let config = ReconnectConfig::default();
1034 assert!(config.enabled);
1035 assert_eq!(config.initial_delay, Duration::from_millis(500));
1036 assert_eq!(config.max_delay, Duration::from_secs(30));
1037 assert!(config.max_attempts.is_none());
1038 }
1039
1040 #[test]
1041 fn test_reconnect_delay_for_attempt() {
1042 let config = ReconnectConfig {
1043 enabled: true,
1044 initial_delay: Duration::from_millis(100),
1045 max_delay: Duration::from_secs(10),
1046 backoff_multiplier: 2.0,
1047 max_attempts: None,
1048 };
1049
1050 assert_eq!(config.delay_for_attempt(0), Duration::from_millis(100));
1051 assert_eq!(config.delay_for_attempt(1), Duration::from_millis(200));
1052 assert_eq!(config.delay_for_attempt(2), Duration::from_millis(400));
1053 assert_eq!(config.delay_for_attempt(3), Duration::from_millis(800));
1054 }
1055
1056 #[test]
1057 fn test_reconnect_delay_capped() {
1058 let config = ReconnectConfig {
1059 enabled: true,
1060 initial_delay: Duration::from_secs(1),
1061 max_delay: Duration::from_secs(5),
1062 backoff_multiplier: 10.0,
1063 max_attempts: None,
1064 };
1065
1066 assert_eq!(config.delay_for_attempt(5), Duration::from_secs(5));
1068 }
1069
1070 #[test]
1071 fn test_reconnect_disabled() {
1072 let config = ReconnectConfig {
1073 enabled: false,
1074 ..Default::default()
1075 };
1076
1077 assert_eq!(config.delay_for_attempt(0), Duration::ZERO);
1078 assert!(!config.should_reconnect(0));
1079 }
1080
1081 #[test]
1082 fn test_reconnect_max_attempts() {
1083 let config = ReconnectConfig {
1084 enabled: true,
1085 max_attempts: Some(3),
1086 ..Default::default()
1087 };
1088
1089 assert!(config.should_reconnect(0));
1090 assert!(config.should_reconnect(1));
1091 assert!(config.should_reconnect(2));
1092 assert!(!config.should_reconnect(3));
1093 assert!(!config.should_reconnect(4));
1094 }
1095
1096 #[test]
1101 fn test_data_stream_new() {
1102 let stream = DataStream::new(StreamConfig::new("ws://x"));
1103 assert_eq!(stream.state(), ConnectionState::Disconnected);
1104 assert_eq!(stream.subscription_count(), 0);
1105 }
1106
1107 #[test]
1108 fn test_data_stream_subscribe() {
1109 let stream = DataStream::new(StreamConfig::default());
1110 let sub = StreamSubscription::with_id("sub1", "metrics");
1111
1112 let id = stream.subscribe(sub);
1113 assert_eq!(id, "sub1");
1114 assert_eq!(stream.subscription_count(), 1);
1115
1116 let outbox = stream.take_outbox();
1117 assert_eq!(outbox.len(), 1);
1118 assert!(matches!(outbox[0], StreamMessage::Subscribe { .. }));
1119 }
1120
1121 #[test]
1122 fn test_data_stream_unsubscribe() {
1123 let stream = DataStream::new(StreamConfig::default());
1124 stream.subscribe(StreamSubscription::with_id("sub1", "x"));
1125 let _ = stream.take_outbox(); stream.unsubscribe("sub1");
1128 assert_eq!(stream.subscription_count(), 0);
1129
1130 let outbox = stream.take_outbox();
1131 assert_eq!(outbox.len(), 1);
1132 assert!(matches!(outbox[0], StreamMessage::Unsubscribe { .. }));
1133 }
1134
1135 #[test]
1136 fn test_data_stream_handle_data() {
1137 let stream = DataStream::new(StreamConfig::default());
1138 stream.subscribe(StreamSubscription::with_id("sub1", "x"));
1139
1140 let msg = StreamMessage::data("sub1", serde_json::json!({"val": 42}), 1);
1141 stream.handle_message(msg);
1142
1143 let data = stream.get_data("sub1");
1144 assert_eq!(data, Some(serde_json::json!({"val": 42})));
1145
1146 let sub = stream.get_subscription("sub1").unwrap();
1147 assert!(sub.active);
1148 assert_eq!(sub.last_seq, 1);
1149 }
1150
1151 #[test]
1152 fn test_data_stream_handle_ack() {
1153 let stream = DataStream::new(StreamConfig::default());
1154 stream.subscribe(StreamSubscription::with_id("sub1", "x"));
1155
1156 stream.handle_message(StreamMessage::ack("sub1"));
1157
1158 let sub = stream.get_subscription("sub1").unwrap();
1159 assert!(sub.active);
1160 }
1161
1162 #[test]
1163 fn test_data_stream_handle_error() {
1164 let stream = DataStream::new(StreamConfig::default());
1165 stream.subscribe(StreamSubscription::with_id("sub1", "x"));
1166
1167 stream.handle_message(StreamMessage::error_for("sub1", "fail"));
1168
1169 let sub = stream.get_subscription("sub1").unwrap();
1170 assert_eq!(sub.error_count, 1);
1171 }
1172
1173 #[test]
1174 fn test_data_stream_handle_ping() {
1175 let stream = DataStream::new(StreamConfig::default());
1176 let response = stream.handle_message(StreamMessage::ping(12345));
1177
1178 assert!(matches!(
1179 response,
1180 Some(StreamMessage::Pong { timestamp: 12345 })
1181 ));
1182 }
1183
1184 #[test]
1185 fn test_data_stream_reconnect_logic() {
1186 let stream = DataStream::new(StreamConfig::default());
1187
1188 assert!(stream.should_reconnect());
1189 assert_eq!(stream.reconnect_delay(), Duration::from_millis(500));
1190
1191 stream.increment_reconnect_attempts();
1192 assert!(stream.should_reconnect());
1193 assert_eq!(stream.reconnect_delay(), Duration::from_millis(1000));
1194
1195 stream.reset_reconnect_attempts();
1196 assert_eq!(stream.reconnect_delay(), Duration::from_millis(500));
1197 }
1198
1199 #[test]
1200 fn test_data_stream_resubscribe_all() {
1201 let stream = DataStream::new(StreamConfig::default());
1202 stream.subscribe(StreamSubscription::with_id("sub1", "a"));
1203 stream.subscribe(StreamSubscription::with_id("sub2", "b"));
1204 let _ = stream.take_outbox(); stream.resubscribe_all();
1207
1208 let outbox = stream.take_outbox();
1209 assert_eq!(outbox.len(), 2);
1210 }
1211
1212 #[test]
1213 fn test_data_stream_clear() {
1214 let stream = DataStream::new(StreamConfig::default());
1215 stream.subscribe(StreamSubscription::with_id("sub1", "a"));
1216 stream.handle_message(StreamMessage::data("sub1", serde_json::json!(1), 1));
1217
1218 stream.clear();
1219
1220 assert_eq!(stream.subscription_count(), 0);
1221 assert!(stream.get_data("sub1").is_none());
1222 }
1223
1224 #[test]
1229 fn test_rate_limiter_allows_under_limit() {
1230 let mut limiter = RateLimiter::new(5, Duration::from_secs(1));
1231
1232 for i in 0..5 {
1233 assert!(limiter.check(i * 100), "message {} should be allowed", i);
1234 }
1235 }
1236
1237 #[test]
1238 fn test_rate_limiter_blocks_over_limit() {
1239 let mut limiter = RateLimiter::new(3, Duration::from_secs(1));
1240
1241 assert!(limiter.check(0));
1242 assert!(limiter.check(100));
1243 assert!(limiter.check(200));
1244 assert!(!limiter.check(300)); }
1246
1247 #[test]
1248 fn test_rate_limiter_window_expiry() {
1249 let mut limiter = RateLimiter::new(2, Duration::from_millis(100));
1250
1251 assert!(limiter.check(0));
1252 assert!(limiter.check(50));
1253 assert!(!limiter.check(60)); assert!(limiter.check(200)); }
1258
1259 #[test]
1260 fn test_rate_limiter_current_count() {
1261 let mut limiter = RateLimiter::new(10, Duration::from_secs(1));
1262
1263 assert_eq!(limiter.current_count(), 0);
1264 limiter.check(0);
1265 assert_eq!(limiter.current_count(), 1);
1266 limiter.check(100);
1267 assert_eq!(limiter.current_count(), 2);
1268 }
1269
1270 #[test]
1271 fn test_rate_limiter_reset() {
1272 let mut limiter = RateLimiter::new(2, Duration::from_secs(1));
1273
1274 limiter.check(0);
1275 limiter.check(100);
1276 assert!(limiter.is_at_capacity());
1277
1278 limiter.reset();
1279 assert_eq!(limiter.current_count(), 0);
1280 assert!(!limiter.is_at_capacity());
1281 }
1282
1283 #[test]
1284 fn test_rate_limiter_default() {
1285 let limiter = RateLimiter::default();
1286 assert_eq!(limiter.max_messages, 100);
1287 }
1288
1289 #[test]
1294 fn test_message_buffer_in_order() {
1295 let mut buffer = MessageBuffer::new();
1296
1297 let r1 = buffer.process("sub1", 1, serde_json::json!(1));
1298 let r2 = buffer.process("sub1", 2, serde_json::json!(2));
1299 let r3 = buffer.process("sub1", 3, serde_json::json!(3));
1300
1301 assert_eq!(r1, Some(serde_json::json!(1)));
1302 assert_eq!(r2, Some(serde_json::json!(2)));
1303 assert_eq!(r3, Some(serde_json::json!(3)));
1304 }
1305
1306 #[test]
1307 fn test_message_buffer_out_of_order() {
1308 let mut buffer = MessageBuffer::new();
1309
1310 let r1 = buffer.process("sub1", 1, serde_json::json!(1));
1312 let r3 = buffer.process("sub1", 3, serde_json::json!(3)); let r2 = buffer.process("sub1", 2, serde_json::json!(2)); assert_eq!(r1, Some(serde_json::json!(1)));
1316 assert!(r3.is_none()); assert_eq!(r2, Some(serde_json::json!(3))); }
1319
1320 #[test]
1321 fn test_message_buffer_duplicate() {
1322 let mut buffer = MessageBuffer::new();
1323
1324 buffer.process("sub1", 1, serde_json::json!(1));
1325 let dup = buffer.process("sub1", 1, serde_json::json!("dup"));
1326
1327 assert!(dup.is_none()); }
1329
1330 #[test]
1331 fn test_message_buffer_last_seq() {
1332 let mut buffer = MessageBuffer::new();
1333
1334 assert_eq!(buffer.last_seq("sub1"), 0);
1335 buffer.process("sub1", 1, serde_json::json!(1));
1336 assert_eq!(buffer.last_seq("sub1"), 1);
1337 buffer.process("sub1", 2, serde_json::json!(2));
1338 assert_eq!(buffer.last_seq("sub1"), 2);
1339 }
1340
1341 #[test]
1342 fn test_message_buffer_pending_count() {
1343 let mut buffer = MessageBuffer::new();
1344
1345 buffer.process("sub1", 1, serde_json::json!(1));
1346 buffer.process("sub1", 3, serde_json::json!(3)); buffer.process("sub1", 4, serde_json::json!(4)); assert_eq!(buffer.pending_count("sub1"), 2);
1350 }
1351
1352 #[test]
1353 fn test_message_buffer_clear() {
1354 let mut buffer = MessageBuffer::new();
1355
1356 buffer.process("sub1", 1, serde_json::json!(1));
1357 buffer.process("sub2", 1, serde_json::json!(2));
1358
1359 buffer.clear("sub1");
1360 assert_eq!(buffer.last_seq("sub1"), 0);
1361 assert_eq!(buffer.last_seq("sub2"), 1);
1362 }
1363
1364 #[test]
1365 fn test_message_buffer_clear_all() {
1366 let mut buffer = MessageBuffer::new();
1367
1368 buffer.process("sub1", 1, serde_json::json!(1));
1369 buffer.process("sub2", 1, serde_json::json!(2));
1370
1371 buffer.clear_all();
1372 assert_eq!(buffer.last_seq("sub1"), 0);
1373 assert_eq!(buffer.last_seq("sub2"), 0);
1374 }
1375
1376 #[test]
1377 fn test_message_buffer_multiple_subscriptions() {
1378 let mut buffer = MessageBuffer::new();
1379
1380 buffer.process("sub1", 1, serde_json::json!("a"));
1381 buffer.process("sub2", 1, serde_json::json!("b"));
1382 buffer.process("sub1", 2, serde_json::json!("c"));
1383
1384 assert_eq!(buffer.last_seq("sub1"), 2);
1385 assert_eq!(buffer.last_seq("sub2"), 1);
1386 }
1387
1388 #[test]
1393 fn test_connection_state_debug() {
1394 assert_eq!(format!("{:?}", ConnectionState::Connected), "Connected");
1395 assert_eq!(format!("{:?}", ConnectionState::Failed), "Failed");
1396 }
1397
1398 #[test]
1399 fn test_connection_state_clone() {
1400 let state = ConnectionState::Reconnecting;
1401 let cloned = state;
1402 assert_eq!(state, cloned);
1403 }
1404
1405 #[test]
1406 fn test_stream_message_debug() {
1407 let msg = StreamMessage::ping(12345);
1408 let debug = format!("{msg:?}");
1409 assert!(debug.contains("Ping"));
1410 }
1411
1412 #[test]
1413 fn test_stream_message_clone() {
1414 let msg = StreamMessage::data("sub1", serde_json::json!({"x": 1}), 5);
1415 let cloned = msg.clone();
1416 assert_eq!(msg, cloned);
1417 }
1418
1419 #[test]
1420 fn test_stream_subscription_clone() {
1421 let sub = StreamSubscription::with_id("sub1", "metrics")
1422 .with_interval(1000)
1423 .with_transform("rate()");
1424 let cloned = sub.clone();
1425 assert_eq!(cloned.id, "sub1");
1426 assert_eq!(cloned.source, "metrics");
1427 assert_eq!(cloned.transform, Some("rate()".to_string()));
1428 }
1429
1430 #[test]
1431 fn test_stream_subscription_debug() {
1432 let sub = StreamSubscription::new("test");
1433 let debug = format!("{sub:?}");
1434 assert!(debug.contains("StreamSubscription"));
1435 }
1436
1437 #[test]
1438 fn test_stream_subscription_hash_consistency() {
1439 let sub1 = StreamSubscription::new("metrics/cpu");
1441 let sub2 = StreamSubscription::new("metrics/cpu");
1442 assert_eq!(sub1.id, sub2.id);
1443 }
1444
1445 #[test]
1446 fn test_stream_subscription_hash_different() {
1447 let sub1 = StreamSubscription::new("metrics/cpu");
1448 let sub2 = StreamSubscription::new("metrics/memory");
1449 assert_ne!(sub1.id, sub2.id);
1450 }
1451
1452 #[test]
1453 fn test_stream_config_debug() {
1454 let config = StreamConfig::default();
1455 let debug = format!("{config:?}");
1456 assert!(debug.contains("StreamConfig"));
1457 }
1458
1459 #[test]
1460 fn test_stream_config_clone() {
1461 let config = StreamConfig::new("ws://test")
1462 .with_buffer_size(2048)
1463 .with_heartbeat(Duration::from_secs(60));
1464 let cloned = config.clone();
1465 assert_eq!(cloned.url, "ws://test");
1466 assert_eq!(cloned.buffer_size, 2048);
1467 }
1468
1469 #[test]
1470 fn test_stream_config_with_reconnect() {
1471 let reconnect = ReconnectConfig {
1472 enabled: false,
1473 max_attempts: Some(5),
1474 ..Default::default()
1475 };
1476 let config = StreamConfig::new("ws://x").with_reconnect(reconnect);
1477 assert!(!config.reconnect.enabled);
1478 assert_eq!(config.reconnect.max_attempts, Some(5));
1479 }
1480
1481 #[test]
1482 fn test_reconnect_config_debug() {
1483 let config = ReconnectConfig::default();
1484 let debug = format!("{config:?}");
1485 assert!(debug.contains("ReconnectConfig"));
1486 }
1487
1488 #[test]
1489 fn test_reconnect_config_clone() {
1490 let config = ReconnectConfig {
1491 max_attempts: Some(10),
1492 ..Default::default()
1493 };
1494 let cloned = config.clone();
1495 assert_eq!(cloned.max_attempts, Some(10));
1496 }
1497
1498 #[test]
1499 fn test_reconnect_delay_large_attempt() {
1500 let config = ReconnectConfig::default();
1501 let delay = config.delay_for_attempt(100);
1503 assert!(delay <= config.max_delay);
1504 }
1505
1506 #[test]
1507 fn test_data_stream_default() {
1508 let stream = DataStream::default();
1509 assert_eq!(stream.state(), ConnectionState::Disconnected);
1510 assert_eq!(stream.subscription_count(), 0);
1511 }
1512
1513 #[test]
1514 fn test_data_stream_set_state() {
1515 let stream = DataStream::default();
1516 stream.set_state(ConnectionState::Connected);
1517 assert_eq!(stream.state(), ConnectionState::Connected);
1518 stream.set_state(ConnectionState::Failed);
1519 assert_eq!(stream.state(), ConnectionState::Failed);
1520 }
1521
1522 #[test]
1523 fn test_data_stream_send() {
1524 let stream = DataStream::default();
1525 stream.send(StreamMessage::ping(100));
1526 stream.send(StreamMessage::ping(200));
1527
1528 let outbox = stream.take_outbox();
1529 assert_eq!(outbox.len(), 2);
1530 }
1531
1532 #[test]
1533 fn test_data_stream_get_nonexistent_subscription() {
1534 let stream = DataStream::default();
1535 assert!(stream.get_subscription("nonexistent").is_none());
1536 }
1537
1538 #[test]
1539 fn test_data_stream_get_nonexistent_data() {
1540 let stream = DataStream::default();
1541 assert!(stream.get_data("nonexistent").is_none());
1542 }
1543
1544 #[test]
1545 fn test_data_stream_subscriptions_list() {
1546 let stream = DataStream::default();
1547 stream.subscribe(StreamSubscription::with_id("sub1", "a"));
1548 stream.subscribe(StreamSubscription::with_id("sub2", "b"));
1549
1550 let subs = stream.subscriptions();
1551 assert_eq!(subs.len(), 2);
1552 }
1553
1554 #[test]
1555 fn test_data_stream_handle_pong() {
1556 let stream = DataStream::default();
1557 let response = stream.handle_message(StreamMessage::pong(12345));
1558 assert!(response.is_none());
1559 }
1560
1561 #[test]
1562 fn test_data_stream_handle_subscribe() {
1563 let stream = DataStream::default();
1564 let response = stream.handle_message(StreamMessage::subscribe("sub1", "metrics"));
1566 assert!(response.is_none());
1567 }
1568
1569 #[test]
1570 fn test_data_stream_handle_error_no_id() {
1571 let stream = DataStream::default();
1572 let response = stream.handle_message(StreamMessage::error("general error"));
1574 assert!(response.is_none());
1575 }
1576
1577 #[test]
1578 fn test_data_stream_handle_error_unknown_id() {
1579 let stream = DataStream::default();
1580 let response = stream.handle_message(StreamMessage::error_for("unknown", "error"));
1582 assert!(response.is_none());
1583 }
1584
1585 #[test]
1586 fn test_data_stream_handle_data_unknown_subscription() {
1587 let stream = DataStream::default();
1588 stream.handle_message(StreamMessage::data("unknown", serde_json::json!(42), 1));
1590 assert_eq!(stream.get_data("unknown"), Some(serde_json::json!(42)));
1591 }
1592
1593 #[test]
1594 fn test_rate_limiter_debug() {
1595 let limiter = RateLimiter::new(10, Duration::from_secs(1));
1596 let debug = format!("{limiter:?}");
1597 assert!(debug.contains("RateLimiter"));
1598 }
1599
1600 #[test]
1601 fn test_rate_limiter_at_boundary() {
1602 let mut limiter = RateLimiter::new(3, Duration::from_millis(100));
1603
1604 assert!(limiter.check(0));
1606 assert!(limiter.check(0));
1607 assert!(limiter.check(0));
1608 assert!(!limiter.check(0)); assert!(!limiter.check(100)); assert!(limiter.check(101)); }
1616
1617 #[test]
1618 fn test_message_buffer_debug() {
1619 let buffer = MessageBuffer::new();
1620 let debug = format!("{buffer:?}");
1621 assert!(debug.contains("MessageBuffer"));
1622 }
1623
1624 #[test]
1625 fn test_message_buffer_old_message() {
1626 let mut buffer = MessageBuffer::new();
1627
1628 buffer.process("sub1", 1, serde_json::json!(1));
1630 buffer.process("sub1", 2, serde_json::json!(2));
1631 buffer.process("sub1", 3, serde_json::json!(3));
1632
1633 let old = buffer.process("sub1", 1, serde_json::json!("old"));
1635 assert!(old.is_none());
1636 assert_eq!(buffer.last_seq("sub1"), 3);
1637 }
1638
1639 #[test]
1640 fn test_message_buffer_large_gap() {
1641 let mut buffer = MessageBuffer::new();
1642
1643 buffer.process("sub1", 1, serde_json::json!(1));
1644 buffer.process("sub1", 100, serde_json::json!(100)); assert_eq!(buffer.last_seq("sub1"), 1);
1648 assert_eq!(buffer.pending_count("sub1"), 1);
1649 }
1650
1651 #[test]
1652 fn test_message_buffer_nonexistent_subscription() {
1653 let buffer = MessageBuffer::new();
1654 assert_eq!(buffer.last_seq("nonexistent"), 0);
1655 assert_eq!(buffer.pending_count("nonexistent"), 0);
1656 }
1657
1658 #[test]
1659 fn test_stream_message_serialize_all_variants() {
1660 let messages = vec![
1661 StreamMessage::subscribe("s1", "source"),
1662 StreamMessage::subscribe_with_transform("s2", "source", "rate()"),
1663 StreamMessage::unsubscribe("s1"),
1664 StreamMessage::data("s1", serde_json::json!({"x": 1}), 5),
1665 StreamMessage::error("msg"),
1666 StreamMessage::error_for("s1", "msg"),
1667 StreamMessage::ack("s1"),
1668 StreamMessage::ping(1000),
1669 StreamMessage::pong(1000),
1670 ];
1671
1672 for msg in messages {
1673 let json = serde_json::to_string(&msg).unwrap();
1674 let parsed: StreamMessage = serde_json::from_str(&json).unwrap();
1675 assert_eq!(msg, parsed);
1676 }
1677 }
1678
1679 #[test]
1680 fn test_stream_subscription_empty_source() {
1681 let sub = StreamSubscription::new("");
1682 assert!(sub.id.starts_with("sub_"));
1683 assert_eq!(sub.source, "");
1684 }
1685
1686 #[test]
1687 fn test_stream_subscription_unicode_source() {
1688 let sub = StreamSubscription::new("数据/指标");
1689 assert!(sub.id.starts_with("sub_"));
1690 assert_eq!(sub.source, "数据/指标");
1691 }
1692
1693 #[test]
1694 fn test_data_stream_multiple_data_updates() {
1695 let stream = DataStream::default();
1696 stream.subscribe(StreamSubscription::with_id("sub1", "x"));
1697
1698 stream.handle_message(StreamMessage::data("sub1", serde_json::json!(1), 1));
1700 assert_eq!(stream.get_data("sub1"), Some(serde_json::json!(1)));
1701
1702 stream.handle_message(StreamMessage::data("sub1", serde_json::json!(2), 2));
1703 assert_eq!(stream.get_data("sub1"), Some(serde_json::json!(2)));
1704
1705 let sub = stream.get_subscription("sub1").unwrap();
1706 assert_eq!(sub.last_seq, 2);
1707 assert_eq!(sub.error_count, 0);
1708 }
1709
1710 #[test]
1711 fn test_reconnect_infinite_attempts() {
1712 let config = ReconnectConfig {
1713 enabled: true,
1714 max_attempts: None,
1715 ..Default::default()
1716 };
1717
1718 assert!(config.should_reconnect(0));
1720 assert!(config.should_reconnect(100));
1721 assert!(config.should_reconnect(1000));
1722 assert!(config.should_reconnect(u32::MAX - 1));
1723 }
1724}