1#![allow(
2 clippy::unwrap_used,
3 clippy::disallowed_methods,
4 clippy::comparison_chain,
5 clippy::match_same_arms
6)]
7use serde::{Deserialize, Serialize};
38use std::collections::HashMap;
39use std::sync::{Arc, Mutex};
40use std::time::Duration;
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
44pub enum ConnectionState {
45 #[default]
47 Disconnected,
48 Connecting,
50 Connected,
52 Reconnecting,
54 Failed,
56}
57
58impl ConnectionState {
59 #[must_use]
61 pub const fn is_active(&self) -> bool {
62 matches!(self, Self::Connected)
63 }
64
65 #[must_use]
67 pub const fn is_connecting(&self) -> bool {
68 matches!(self, Self::Connecting | Self::Reconnecting)
69 }
70}
71
72#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
74#[serde(tag = "type", rename_all = "snake_case")]
75pub enum StreamMessage {
76 Subscribe {
78 id: String,
80 source: String,
82 #[serde(skip_serializing_if = "Option::is_none")]
84 transform: Option<String>,
85 #[serde(skip_serializing_if = "Option::is_none")]
87 interval_ms: Option<u64>,
88 },
89 Unsubscribe {
91 id: String,
93 },
94 Data {
96 id: String,
98 payload: serde_json::Value,
100 #[serde(default)]
102 seq: u64,
103 #[serde(skip_serializing_if = "Option::is_none")]
105 timestamp: Option<u64>,
106 },
107 Error {
109 #[serde(skip_serializing_if = "Option::is_none")]
111 id: Option<String>,
112 message: String,
114 #[serde(skip_serializing_if = "Option::is_none")]
116 code: Option<i32>,
117 },
118 Ack {
120 id: String,
122 #[serde(skip_serializing_if = "Option::is_none")]
124 status: Option<String>,
125 },
126 Ping {
128 timestamp: u64,
130 },
131 Pong {
133 timestamp: u64,
135 },
136}
137
138impl StreamMessage {
139 #[must_use]
141 pub fn subscribe(id: impl Into<String>, source: impl Into<String>) -> Self {
142 Self::Subscribe {
143 id: id.into(),
144 source: source.into(),
145 transform: None,
146 interval_ms: None,
147 }
148 }
149
150 #[must_use]
152 pub fn subscribe_with_transform(
153 id: impl Into<String>,
154 source: impl Into<String>,
155 transform: impl Into<String>,
156 ) -> Self {
157 Self::Subscribe {
158 id: id.into(),
159 source: source.into(),
160 transform: Some(transform.into()),
161 interval_ms: None,
162 }
163 }
164
165 #[must_use]
167 pub fn unsubscribe(id: impl Into<String>) -> Self {
168 Self::Unsubscribe { id: id.into() }
169 }
170
171 #[must_use]
173 pub fn data(id: impl Into<String>, payload: serde_json::Value, seq: u64) -> Self {
174 Self::Data {
175 id: id.into(),
176 payload,
177 seq,
178 timestamp: None,
179 }
180 }
181
182 #[must_use]
184 pub fn error(message: impl Into<String>) -> Self {
185 Self::Error {
186 id: None,
187 message: message.into(),
188 code: None,
189 }
190 }
191
192 #[must_use]
194 pub fn error_for(id: impl Into<String>, message: impl Into<String>) -> Self {
195 Self::Error {
196 id: Some(id.into()),
197 message: message.into(),
198 code: None,
199 }
200 }
201
202 #[must_use]
204 pub fn ack(id: impl Into<String>) -> Self {
205 Self::Ack {
206 id: id.into(),
207 status: None,
208 }
209 }
210
211 #[must_use]
213 pub fn ping(timestamp: u64) -> Self {
214 Self::Ping { timestamp }
215 }
216
217 #[must_use]
219 pub fn pong(timestamp: u64) -> Self {
220 Self::Pong { timestamp }
221 }
222
223 #[must_use]
225 pub fn subscription_id(&self) -> Option<&str> {
226 match self {
227 Self::Subscribe { id, .. }
228 | Self::Unsubscribe { id }
229 | Self::Data { id, .. }
230 | Self::Ack { id, .. } => Some(id),
231 Self::Error { id, .. } => id.as_deref(),
232 Self::Ping { .. } | Self::Pong { .. } => None,
233 }
234 }
235}
236
237#[derive(Debug, Clone)]
239pub struct StreamSubscription {
240 pub id: String,
242 pub source: String,
244 pub transform: Option<String>,
246 pub interval: Option<Duration>,
248 pub active: bool,
250 pub last_seq: u64,
252 pub error_count: u32,
254}
255
256impl StreamSubscription {
257 #[must_use]
259 pub fn new(source: impl Into<String>) -> Self {
260 let source = source.into();
261 let id = format!("sub_{}", Self::hash_source(&source));
262 Self {
263 id,
264 source,
265 transform: None,
266 interval: None,
267 active: false,
268 last_seq: 0,
269 error_count: 0,
270 }
271 }
272
273 #[must_use]
275 pub fn with_id(id: impl Into<String>, source: impl Into<String>) -> Self {
276 Self {
277 id: id.into(),
278 source: source.into(),
279 transform: None,
280 interval: None,
281 active: false,
282 last_seq: 0,
283 error_count: 0,
284 }
285 }
286
287 #[must_use]
289 pub fn with_interval(mut self, ms: u64) -> Self {
290 self.interval = Some(Duration::from_millis(ms));
291 self
292 }
293
294 #[must_use]
296 pub fn with_transform(mut self, transform: impl Into<String>) -> Self {
297 self.transform = Some(transform.into());
298 self
299 }
300
301 #[must_use]
303 pub fn to_message(&self) -> StreamMessage {
304 StreamMessage::Subscribe {
305 id: self.id.clone(),
306 source: self.source.clone(),
307 transform: self.transform.clone(),
308 interval_ms: self.interval.map(|d| d.as_millis() as u64),
309 }
310 }
311
312 fn hash_source(s: &str) -> u64 {
314 let mut hash: u64 = 5381;
315 for byte in s.bytes() {
316 hash = hash.wrapping_mul(33).wrapping_add(u64::from(byte));
317 }
318 hash
319 }
320}
321
322#[derive(Debug, Clone)]
324pub struct StreamConfig {
325 pub url: String,
327 pub reconnect: ReconnectConfig,
329 pub heartbeat_interval: Duration,
331 pub buffer_size: usize,
333}
334
335impl Default for StreamConfig {
336 fn default() -> Self {
337 Self {
338 url: String::new(),
339 reconnect: ReconnectConfig::default(),
340 heartbeat_interval: Duration::from_secs(30),
341 buffer_size: 1024,
342 }
343 }
344}
345
346impl StreamConfig {
347 #[must_use]
349 pub fn new(url: impl Into<String>) -> Self {
350 Self {
351 url: url.into(),
352 ..Default::default()
353 }
354 }
355
356 #[must_use]
358 pub fn with_reconnect(mut self, config: ReconnectConfig) -> Self {
359 self.reconnect = config;
360 self
361 }
362
363 #[must_use]
365 pub fn with_heartbeat(mut self, interval: Duration) -> Self {
366 self.heartbeat_interval = interval;
367 self
368 }
369
370 #[must_use]
372 pub fn with_buffer_size(mut self, size: usize) -> Self {
373 self.buffer_size = size;
374 self
375 }
376}
377
378#[derive(Debug, Clone)]
380pub struct ReconnectConfig {
381 pub enabled: bool,
383 pub initial_delay: Duration,
385 pub max_delay: Duration,
387 pub backoff_multiplier: f32,
389 pub max_attempts: Option<u32>,
391}
392
393impl Default for ReconnectConfig {
394 fn default() -> Self {
395 Self {
396 enabled: true,
397 initial_delay: Duration::from_millis(500),
398 max_delay: Duration::from_secs(30),
399 backoff_multiplier: 2.0,
400 max_attempts: None,
401 }
402 }
403}
404
405impl ReconnectConfig {
406 #[must_use]
408 pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
409 if !self.enabled {
410 return Duration::ZERO;
411 }
412
413 let delay_ms = self.initial_delay.as_millis() as f32
414 * self.backoff_multiplier.powi(attempt.min(20) as i32);
415
416 let delay = Duration::from_millis(delay_ms.min(self.max_delay.as_millis() as f32) as u64);
417 delay.min(self.max_delay)
418 }
419
420 #[must_use]
422 pub fn should_reconnect(&self, attempt: u32) -> bool {
423 if !self.enabled {
424 return false;
425 }
426 match self.max_attempts {
427 Some(max) => attempt < max,
428 None => true,
429 }
430 }
431}
432
433pub type DataCallback = Box<dyn Fn(&str, &serde_json::Value) + Send + Sync>;
435
436pub type ErrorCallback = Box<dyn Fn(&str) + Send + Sync>;
438
439pub type StateCallback = Box<dyn Fn(ConnectionState) + Send + Sync>;
441
442#[derive(Default)]
446pub struct DataStream {
447 subscriptions: Arc<Mutex<HashMap<String, StreamSubscription>>>,
449 state: Arc<Mutex<ConnectionState>>,
451 outbox: Arc<Mutex<Vec<StreamMessage>>>,
453 data_cache: Arc<Mutex<HashMap<String, serde_json::Value>>>,
455 reconnect_attempts: Arc<Mutex<u32>>,
457 config: StreamConfig,
459}
460
461impl DataStream {
462 #[must_use]
464 pub fn new(config: StreamConfig) -> Self {
465 Self {
466 subscriptions: Arc::new(Mutex::new(HashMap::new())),
467 state: Arc::new(Mutex::new(ConnectionState::Disconnected)),
468 outbox: Arc::new(Mutex::new(Vec::new())),
469 data_cache: Arc::new(Mutex::new(HashMap::new())),
470 reconnect_attempts: Arc::new(Mutex::new(0)),
471 config,
472 }
473 }
474
475 #[must_use]
477 pub fn state(&self) -> ConnectionState {
478 *self.state.lock().expect("state mutex not poisoned")
479 }
480
481 pub fn set_state(&self, state: ConnectionState) {
483 *self.state.lock().expect("state mutex not poisoned") = state;
484 }
485
486 pub fn subscribe(&self, subscription: StreamSubscription) -> String {
488 let id = subscription.id.clone();
489 let msg = subscription.to_message();
490
491 self.subscriptions
492 .lock()
493 .expect("subscriptions mutex not poisoned")
494 .insert(id.clone(), subscription);
495
496 self.outbox
497 .lock()
498 .expect("outbox mutex not poisoned")
499 .push(msg);
500 id
501 }
502
503 pub fn unsubscribe(&self, id: &str) {
505 self.subscriptions
506 .lock()
507 .expect("subscriptions mutex not poisoned")
508 .remove(id);
509 self.data_cache
510 .lock()
511 .expect("cache mutex not poisoned")
512 .remove(id);
513 self.outbox
514 .lock()
515 .expect("outbox mutex not poisoned")
516 .push(StreamMessage::unsubscribe(id));
517 }
518
519 #[must_use]
521 pub fn get_subscription(&self, id: &str) -> Option<StreamSubscription> {
522 self.subscriptions
523 .lock()
524 .expect("subscriptions mutex not poisoned")
525 .get(id)
526 .cloned()
527 }
528
529 #[must_use]
531 pub fn subscriptions(&self) -> Vec<StreamSubscription> {
532 self.subscriptions
533 .lock()
534 .expect("subscriptions mutex not poisoned")
535 .values()
536 .cloned()
537 .collect()
538 }
539
540 #[must_use]
542 pub fn get_data(&self, id: &str) -> Option<serde_json::Value> {
543 self.data_cache
544 .lock()
545 .expect("cache mutex not poisoned")
546 .get(id)
547 .cloned()
548 }
549
550 pub fn handle_message(&self, msg: StreamMessage) -> Option<StreamMessage> {
552 match msg {
553 StreamMessage::Data {
554 id, payload, seq, ..
555 } => {
556 if let Some(sub) = self
558 .subscriptions
559 .lock()
560 .expect("subscriptions mutex not poisoned")
561 .get_mut(&id)
562 {
563 sub.last_seq = seq;
564 sub.active = true;
565 sub.error_count = 0;
566 }
567 self.data_cache
569 .lock()
570 .expect("cache mutex not poisoned")
571 .insert(id, payload);
572 None
573 }
574 StreamMessage::Ack { id, .. } => {
575 if let Some(sub) = self
576 .subscriptions
577 .lock()
578 .expect("subscriptions mutex not poisoned")
579 .get_mut(&id)
580 {
581 sub.active = true;
582 }
583 None
584 }
585 StreamMessage::Error { id, .. } => {
586 if let Some(ref id) = id {
587 if let Some(sub) = self
588 .subscriptions
589 .lock()
590 .expect("subscriptions mutex not poisoned")
591 .get_mut(id)
592 {
593 sub.error_count += 1;
594 }
595 }
596 None
597 }
598 StreamMessage::Ping { timestamp } => Some(StreamMessage::pong(timestamp)),
599 StreamMessage::Pong { .. } => None,
600 _ => None,
601 }
602 }
603
604 #[must_use]
606 pub fn take_outbox(&self) -> Vec<StreamMessage> {
607 std::mem::take(&mut *self.outbox.lock().expect("outbox mutex not poisoned"))
608 }
609
610 pub fn send(&self, msg: StreamMessage) {
612 self.outbox
613 .lock()
614 .expect("outbox mutex not poisoned")
615 .push(msg);
616 }
617
618 #[must_use]
620 pub fn reconnect_delay(&self) -> Duration {
621 let attempts = *self
622 .reconnect_attempts
623 .lock()
624 .expect("reconnect mutex not poisoned");
625 self.config.reconnect.delay_for_attempt(attempts)
626 }
627
628 pub fn increment_reconnect_attempts(&self) {
630 *self
631 .reconnect_attempts
632 .lock()
633 .expect("reconnect mutex not poisoned") += 1;
634 }
635
636 pub fn reset_reconnect_attempts(&self) {
638 *self
639 .reconnect_attempts
640 .lock()
641 .expect("reconnect mutex not poisoned") = 0;
642 }
643
644 #[must_use]
646 pub fn should_reconnect(&self) -> bool {
647 let attempts = *self
648 .reconnect_attempts
649 .lock()
650 .expect("reconnect mutex not poisoned");
651 self.config.reconnect.should_reconnect(attempts)
652 }
653
654 pub fn resubscribe_all(&self) {
656 let subs = self
657 .subscriptions
658 .lock()
659 .expect("subscriptions mutex not poisoned")
660 .clone();
661 let mut outbox = self.outbox.lock().expect("outbox mutex not poisoned");
662 for sub in subs.values() {
663 outbox.push(sub.to_message());
664 }
665 }
666
667 #[must_use]
669 pub fn subscription_count(&self) -> usize {
670 self.subscriptions
671 .lock()
672 .expect("subscriptions mutex not poisoned")
673 .len()
674 }
675
676 pub fn clear(&self) {
678 self.subscriptions
679 .lock()
680 .expect("subscriptions mutex not poisoned")
681 .clear();
682 self.data_cache
683 .lock()
684 .expect("cache mutex not poisoned")
685 .clear();
686 self.outbox
687 .lock()
688 .expect("outbox mutex not poisoned")
689 .clear();
690 }
691}
692
693#[derive(Debug)]
695pub struct RateLimiter {
696 max_messages: usize,
698 window: Duration,
700 timestamps: Vec<u64>,
702}
703
704impl RateLimiter {
705 #[must_use]
707 pub fn new(max_messages: usize, window: Duration) -> Self {
708 Self {
709 max_messages,
710 window,
711 timestamps: Vec::with_capacity(max_messages),
712 }
713 }
714
715 pub fn check(&mut self, now: u64) -> bool {
717 let window_start = now.saturating_sub(self.window.as_millis() as u64);
718
719 self.timestamps.retain(|&ts| ts >= window_start);
721
722 if self.timestamps.len() < self.max_messages {
723 self.timestamps.push(now);
724 true
725 } else {
726 false
727 }
728 }
729
730 #[must_use]
732 pub fn current_count(&self) -> usize {
733 self.timestamps.len()
734 }
735
736 pub fn reset(&mut self) {
738 self.timestamps.clear();
739 }
740
741 #[must_use]
743 pub fn is_at_capacity(&self) -> bool {
744 self.timestamps.len() >= self.max_messages
745 }
746}
747
748impl Default for RateLimiter {
749 fn default() -> Self {
750 Self::new(100, Duration::from_secs(1))
751 }
752}
753
754#[derive(Debug, Default)]
756pub struct MessageBuffer {
757 buffers: HashMap<String, SubscriptionBuffer>,
759}
760
761#[derive(Debug, Default)]
762struct SubscriptionBuffer {
763 last_seq: u64,
765 pending: Vec<(u64, serde_json::Value)>,
767}
768
769impl MessageBuffer {
770 #[must_use]
772 pub fn new() -> Self {
773 Self::default()
774 }
775
776 pub fn process(
778 &mut self,
779 id: &str,
780 seq: u64,
781 payload: serde_json::Value,
782 ) -> Option<serde_json::Value> {
783 let buffer = self.buffers.entry(id.to_string()).or_default();
784
785 if seq == buffer.last_seq + 1 {
786 buffer.last_seq = seq;
788
789 let mut result = Some(payload);
791 while let Some(pos) = buffer
792 .pending
793 .iter()
794 .position(|(s, _)| *s == buffer.last_seq + 1)
795 {
796 let (next_seq, next_payload) = buffer.pending.remove(pos);
797 buffer.last_seq = next_seq;
798 result = Some(next_payload);
800 }
801 result
802 } else if seq > buffer.last_seq + 1 {
803 buffer.pending.push((seq, payload));
805 None
806 } else {
807 None
809 }
810 }
811
812 #[must_use]
814 pub fn last_seq(&self, id: &str) -> u64 {
815 self.buffers.get(id).map_or(0, |b| b.last_seq)
816 }
817
818 #[must_use]
820 pub fn pending_count(&self, id: &str) -> usize {
821 self.buffers.get(id).map_or(0, |b| b.pending.len())
822 }
823
824 pub fn clear(&mut self, id: &str) {
826 self.buffers.remove(id);
827 }
828
829 pub fn clear_all(&mut self) {
831 self.buffers.clear();
832 }
833}
834
835#[cfg(test)]
836mod tests {
837 use super::*;
838
839 #[test]
844 fn test_connection_state_default() {
845 let state = ConnectionState::default();
846 assert_eq!(state, ConnectionState::Disconnected);
847 }
848
849 #[test]
850 fn test_connection_state_is_active() {
851 assert!(!ConnectionState::Disconnected.is_active());
852 assert!(!ConnectionState::Connecting.is_active());
853 assert!(ConnectionState::Connected.is_active());
854 assert!(!ConnectionState::Reconnecting.is_active());
855 assert!(!ConnectionState::Failed.is_active());
856 }
857
858 #[test]
859 fn test_connection_state_is_connecting() {
860 assert!(!ConnectionState::Disconnected.is_connecting());
861 assert!(ConnectionState::Connecting.is_connecting());
862 assert!(!ConnectionState::Connected.is_connecting());
863 assert!(ConnectionState::Reconnecting.is_connecting());
864 assert!(!ConnectionState::Failed.is_connecting());
865 }
866
867 #[test]
872 fn test_stream_message_subscribe() {
873 let msg = StreamMessage::subscribe("sub1", "metrics/cpu");
874 if let StreamMessage::Subscribe { id, source, .. } = msg {
875 assert_eq!(id, "sub1");
876 assert_eq!(source, "metrics/cpu");
877 } else {
878 panic!("Expected Subscribe message");
879 }
880 }
881
882 #[test]
883 fn test_stream_message_subscribe_with_transform() {
884 let msg = StreamMessage::subscribe_with_transform("sub1", "metrics/cpu", "rate()");
885 if let StreamMessage::Subscribe { transform, .. } = msg {
886 assert_eq!(transform, Some("rate()".to_string()));
887 } else {
888 panic!("Expected Subscribe message");
889 }
890 }
891
892 #[test]
893 fn test_stream_message_unsubscribe() {
894 let msg = StreamMessage::unsubscribe("sub1");
895 if let StreamMessage::Unsubscribe { id } = msg {
896 assert_eq!(id, "sub1");
897 } else {
898 panic!("Expected Unsubscribe message");
899 }
900 }
901
902 #[test]
903 fn test_stream_message_data() {
904 let msg = StreamMessage::data("sub1", serde_json::json!({"value": 42}), 5);
905 if let StreamMessage::Data {
906 id, payload, seq, ..
907 } = msg
908 {
909 assert_eq!(id, "sub1");
910 assert_eq!(payload, serde_json::json!({"value": 42}));
911 assert_eq!(seq, 5);
912 } else {
913 panic!("Expected Data message");
914 }
915 }
916
917 #[test]
918 fn test_stream_message_error() {
919 let msg = StreamMessage::error("connection failed");
920 if let StreamMessage::Error { message, id, .. } = msg {
921 assert_eq!(message, "connection failed");
922 assert!(id.is_none());
923 } else {
924 panic!("Expected Error message");
925 }
926 }
927
928 #[test]
929 fn test_stream_message_error_for() {
930 let msg = StreamMessage::error_for("sub1", "invalid source");
931 if let StreamMessage::Error { message, id, .. } = msg {
932 assert_eq!(message, "invalid source");
933 assert_eq!(id, Some("sub1".to_string()));
934 } else {
935 panic!("Expected Error message");
936 }
937 }
938
939 #[test]
940 fn test_stream_message_ack() {
941 let msg = StreamMessage::ack("sub1");
942 if let StreamMessage::Ack { id, .. } = msg {
943 assert_eq!(id, "sub1");
944 } else {
945 panic!("Expected Ack message");
946 }
947 }
948
949 #[test]
950 fn test_stream_message_ping_pong() {
951 let ping = StreamMessage::ping(12345);
952 let pong = StreamMessage::pong(12345);
953
954 if let StreamMessage::Ping { timestamp } = ping {
955 assert_eq!(timestamp, 12345);
956 } else {
957 panic!("Expected Ping");
958 }
959
960 if let StreamMessage::Pong { timestamp } = pong {
961 assert_eq!(timestamp, 12345);
962 } else {
963 panic!("Expected Pong");
964 }
965 }
966
967 #[test]
968 fn test_stream_message_subscription_id() {
969 assert_eq!(
970 StreamMessage::subscribe("sub1", "x").subscription_id(),
971 Some("sub1")
972 );
973 assert_eq!(
974 StreamMessage::unsubscribe("sub2").subscription_id(),
975 Some("sub2")
976 );
977 assert_eq!(
978 StreamMessage::data("sub3", serde_json::json!({}), 0).subscription_id(),
979 Some("sub3")
980 );
981 assert_eq!(StreamMessage::error("msg").subscription_id(), None);
982 assert_eq!(
983 StreamMessage::error_for("sub4", "msg").subscription_id(),
984 Some("sub4")
985 );
986 assert!(StreamMessage::ping(0).subscription_id().is_none());
987 assert!(StreamMessage::pong(0).subscription_id().is_none());
988 }
989
990 #[test]
991 fn test_stream_message_serialize() {
992 let msg = StreamMessage::data("sub1", serde_json::json!({"x": 1}), 42);
993 let json = serde_json::to_string(&msg).unwrap();
994 assert!(json.contains("\"type\":\"data\""));
995 assert!(json.contains("\"id\":\"sub1\""));
996 assert!(json.contains("\"seq\":42"));
997 }
998
999 #[test]
1000 fn test_stream_message_deserialize() {
1001 let json = r#"{"type":"subscribe","id":"s1","source":"data/x"}"#;
1002 let msg: StreamMessage = serde_json::from_str(json).unwrap();
1003 if let StreamMessage::Subscribe { id, source, .. } = msg {
1004 assert_eq!(id, "s1");
1005 assert_eq!(source, "data/x");
1006 } else {
1007 panic!("Expected Subscribe");
1008 }
1009 }
1010
1011 #[test]
1016 fn test_subscription_new() {
1017 let sub = StreamSubscription::new("metrics/cpu");
1018 assert_eq!(sub.source, "metrics/cpu");
1019 assert!(sub.id.starts_with("sub_"));
1020 assert!(!sub.active);
1021 }
1022
1023 #[test]
1024 fn test_subscription_with_id() {
1025 let sub = StreamSubscription::with_id("my-sub", "data/x");
1026 assert_eq!(sub.id, "my-sub");
1027 assert_eq!(sub.source, "data/x");
1028 }
1029
1030 #[test]
1031 fn test_subscription_with_interval() {
1032 let sub = StreamSubscription::new("x").with_interval(1000);
1033 assert_eq!(sub.interval, Some(Duration::from_millis(1000)));
1034 }
1035
1036 #[test]
1037 fn test_subscription_with_transform() {
1038 let sub = StreamSubscription::new("x").with_transform("rate() | limit(10)");
1039 assert_eq!(sub.transform, Some("rate() | limit(10)".to_string()));
1040 }
1041
1042 #[test]
1043 fn test_subscription_to_message() {
1044 let sub = StreamSubscription::with_id("sub1", "metrics")
1045 .with_interval(5000)
1046 .with_transform("mean()");
1047
1048 let msg = sub.to_message();
1049 if let StreamMessage::Subscribe {
1050 id,
1051 source,
1052 transform,
1053 interval_ms,
1054 } = msg
1055 {
1056 assert_eq!(id, "sub1");
1057 assert_eq!(source, "metrics");
1058 assert_eq!(transform, Some("mean()".to_string()));
1059 assert_eq!(interval_ms, Some(5000));
1060 } else {
1061 panic!("Expected Subscribe");
1062 }
1063 }
1064
1065 #[test]
1070 fn test_stream_config_default() {
1071 let config = StreamConfig::default();
1072 assert!(config.url.is_empty());
1073 assert!(config.reconnect.enabled);
1074 assert_eq!(config.heartbeat_interval, Duration::from_secs(30));
1075 }
1076
1077 #[test]
1078 fn test_stream_config_new() {
1079 let config = StreamConfig::new("ws://localhost:8080");
1080 assert_eq!(config.url, "ws://localhost:8080");
1081 }
1082
1083 #[test]
1084 fn test_stream_config_builder() {
1085 let config = StreamConfig::new("ws://x")
1086 .with_heartbeat(Duration::from_secs(10))
1087 .with_buffer_size(2048);
1088
1089 assert_eq!(config.heartbeat_interval, Duration::from_secs(10));
1090 assert_eq!(config.buffer_size, 2048);
1091 }
1092
1093 #[test]
1098 fn test_reconnect_config_default() {
1099 let config = ReconnectConfig::default();
1100 assert!(config.enabled);
1101 assert_eq!(config.initial_delay, Duration::from_millis(500));
1102 assert_eq!(config.max_delay, Duration::from_secs(30));
1103 assert!(config.max_attempts.is_none());
1104 }
1105
1106 #[test]
1107 fn test_reconnect_delay_for_attempt() {
1108 let config = ReconnectConfig {
1109 enabled: true,
1110 initial_delay: Duration::from_millis(100),
1111 max_delay: Duration::from_secs(10),
1112 backoff_multiplier: 2.0,
1113 max_attempts: None,
1114 };
1115
1116 assert_eq!(config.delay_for_attempt(0), Duration::from_millis(100));
1117 assert_eq!(config.delay_for_attempt(1), Duration::from_millis(200));
1118 assert_eq!(config.delay_for_attempt(2), Duration::from_millis(400));
1119 assert_eq!(config.delay_for_attempt(3), Duration::from_millis(800));
1120 }
1121
1122 #[test]
1123 fn test_reconnect_delay_capped() {
1124 let config = ReconnectConfig {
1125 enabled: true,
1126 initial_delay: Duration::from_secs(1),
1127 max_delay: Duration::from_secs(5),
1128 backoff_multiplier: 10.0,
1129 max_attempts: None,
1130 };
1131
1132 assert_eq!(config.delay_for_attempt(5), Duration::from_secs(5));
1134 }
1135
1136 #[test]
1137 fn test_reconnect_disabled() {
1138 let config = ReconnectConfig {
1139 enabled: false,
1140 ..Default::default()
1141 };
1142
1143 assert_eq!(config.delay_for_attempt(0), Duration::ZERO);
1144 assert!(!config.should_reconnect(0));
1145 }
1146
1147 #[test]
1148 fn test_reconnect_max_attempts() {
1149 let config = ReconnectConfig {
1150 enabled: true,
1151 max_attempts: Some(3),
1152 ..Default::default()
1153 };
1154
1155 assert!(config.should_reconnect(0));
1156 assert!(config.should_reconnect(1));
1157 assert!(config.should_reconnect(2));
1158 assert!(!config.should_reconnect(3));
1159 assert!(!config.should_reconnect(4));
1160 }
1161
1162 #[test]
1167 fn test_data_stream_new() {
1168 let stream = DataStream::new(StreamConfig::new("ws://x"));
1169 assert_eq!(stream.state(), ConnectionState::Disconnected);
1170 assert_eq!(stream.subscription_count(), 0);
1171 }
1172
1173 #[test]
1174 fn test_data_stream_subscribe() {
1175 let stream = DataStream::new(StreamConfig::default());
1176 let sub = StreamSubscription::with_id("sub1", "metrics");
1177
1178 let id = stream.subscribe(sub);
1179 assert_eq!(id, "sub1");
1180 assert_eq!(stream.subscription_count(), 1);
1181
1182 let outbox = stream.take_outbox();
1183 assert_eq!(outbox.len(), 1);
1184 assert!(matches!(outbox[0], StreamMessage::Subscribe { .. }));
1185 }
1186
1187 #[test]
1188 fn test_data_stream_unsubscribe() {
1189 let stream = DataStream::new(StreamConfig::default());
1190 stream.subscribe(StreamSubscription::with_id("sub1", "x"));
1191 let _ = stream.take_outbox(); stream.unsubscribe("sub1");
1194 assert_eq!(stream.subscription_count(), 0);
1195
1196 let outbox = stream.take_outbox();
1197 assert_eq!(outbox.len(), 1);
1198 assert!(matches!(outbox[0], StreamMessage::Unsubscribe { .. }));
1199 }
1200
1201 #[test]
1202 fn test_data_stream_handle_data() {
1203 let stream = DataStream::new(StreamConfig::default());
1204 stream.subscribe(StreamSubscription::with_id("sub1", "x"));
1205
1206 let msg = StreamMessage::data("sub1", serde_json::json!({"val": 42}), 1);
1207 stream.handle_message(msg);
1208
1209 let data = stream.get_data("sub1");
1210 assert_eq!(data, Some(serde_json::json!({"val": 42})));
1211
1212 let sub = stream.get_subscription("sub1").unwrap();
1213 assert!(sub.active);
1214 assert_eq!(sub.last_seq, 1);
1215 }
1216
1217 #[test]
1218 fn test_data_stream_handle_ack() {
1219 let stream = DataStream::new(StreamConfig::default());
1220 stream.subscribe(StreamSubscription::with_id("sub1", "x"));
1221
1222 stream.handle_message(StreamMessage::ack("sub1"));
1223
1224 let sub = stream.get_subscription("sub1").unwrap();
1225 assert!(sub.active);
1226 }
1227
1228 #[test]
1229 fn test_data_stream_handle_error() {
1230 let stream = DataStream::new(StreamConfig::default());
1231 stream.subscribe(StreamSubscription::with_id("sub1", "x"));
1232
1233 stream.handle_message(StreamMessage::error_for("sub1", "fail"));
1234
1235 let sub = stream.get_subscription("sub1").unwrap();
1236 assert_eq!(sub.error_count, 1);
1237 }
1238
1239 #[test]
1240 fn test_data_stream_handle_ping() {
1241 let stream = DataStream::new(StreamConfig::default());
1242 let response = stream.handle_message(StreamMessage::ping(12345));
1243
1244 assert!(matches!(
1245 response,
1246 Some(StreamMessage::Pong { timestamp: 12345 })
1247 ));
1248 }
1249
1250 #[test]
1251 fn test_data_stream_reconnect_logic() {
1252 let stream = DataStream::new(StreamConfig::default());
1253
1254 assert!(stream.should_reconnect());
1255 assert_eq!(stream.reconnect_delay(), Duration::from_millis(500));
1256
1257 stream.increment_reconnect_attempts();
1258 assert!(stream.should_reconnect());
1259 assert_eq!(stream.reconnect_delay(), Duration::from_millis(1000));
1260
1261 stream.reset_reconnect_attempts();
1262 assert_eq!(stream.reconnect_delay(), Duration::from_millis(500));
1263 }
1264
1265 #[test]
1266 fn test_data_stream_resubscribe_all() {
1267 let stream = DataStream::new(StreamConfig::default());
1268 stream.subscribe(StreamSubscription::with_id("sub1", "a"));
1269 stream.subscribe(StreamSubscription::with_id("sub2", "b"));
1270 let _ = stream.take_outbox(); stream.resubscribe_all();
1273
1274 let outbox = stream.take_outbox();
1275 assert_eq!(outbox.len(), 2);
1276 }
1277
1278 #[test]
1279 fn test_data_stream_clear() {
1280 let stream = DataStream::new(StreamConfig::default());
1281 stream.subscribe(StreamSubscription::with_id("sub1", "a"));
1282 stream.handle_message(StreamMessage::data("sub1", serde_json::json!(1), 1));
1283
1284 stream.clear();
1285
1286 assert_eq!(stream.subscription_count(), 0);
1287 assert!(stream.get_data("sub1").is_none());
1288 }
1289
1290 #[test]
1295 fn test_rate_limiter_allows_under_limit() {
1296 let mut limiter = RateLimiter::new(5, Duration::from_secs(1));
1297
1298 for i in 0..5 {
1299 assert!(limiter.check(i * 100), "message {} should be allowed", i);
1300 }
1301 }
1302
1303 #[test]
1304 fn test_rate_limiter_blocks_over_limit() {
1305 let mut limiter = RateLimiter::new(3, Duration::from_secs(1));
1306
1307 assert!(limiter.check(0));
1308 assert!(limiter.check(100));
1309 assert!(limiter.check(200));
1310 assert!(!limiter.check(300)); }
1312
1313 #[test]
1314 fn test_rate_limiter_window_expiry() {
1315 let mut limiter = RateLimiter::new(2, Duration::from_millis(100));
1316
1317 assert!(limiter.check(0));
1318 assert!(limiter.check(50));
1319 assert!(!limiter.check(60)); assert!(limiter.check(200)); }
1324
1325 #[test]
1326 fn test_rate_limiter_current_count() {
1327 let mut limiter = RateLimiter::new(10, Duration::from_secs(1));
1328
1329 assert_eq!(limiter.current_count(), 0);
1330 limiter.check(0);
1331 assert_eq!(limiter.current_count(), 1);
1332 limiter.check(100);
1333 assert_eq!(limiter.current_count(), 2);
1334 }
1335
1336 #[test]
1337 fn test_rate_limiter_reset() {
1338 let mut limiter = RateLimiter::new(2, Duration::from_secs(1));
1339
1340 limiter.check(0);
1341 limiter.check(100);
1342 assert!(limiter.is_at_capacity());
1343
1344 limiter.reset();
1345 assert_eq!(limiter.current_count(), 0);
1346 assert!(!limiter.is_at_capacity());
1347 }
1348
1349 #[test]
1350 fn test_rate_limiter_default() {
1351 let limiter = RateLimiter::default();
1352 assert_eq!(limiter.max_messages, 100);
1353 }
1354
1355 #[test]
1360 fn test_message_buffer_in_order() {
1361 let mut buffer = MessageBuffer::new();
1362
1363 let r1 = buffer.process("sub1", 1, serde_json::json!(1));
1364 let r2 = buffer.process("sub1", 2, serde_json::json!(2));
1365 let r3 = buffer.process("sub1", 3, serde_json::json!(3));
1366
1367 assert_eq!(r1, Some(serde_json::json!(1)));
1368 assert_eq!(r2, Some(serde_json::json!(2)));
1369 assert_eq!(r3, Some(serde_json::json!(3)));
1370 }
1371
1372 #[test]
1373 fn test_message_buffer_out_of_order() {
1374 let mut buffer = MessageBuffer::new();
1375
1376 let r1 = buffer.process("sub1", 1, serde_json::json!(1));
1378 let r3 = buffer.process("sub1", 3, serde_json::json!(3)); let r2 = buffer.process("sub1", 2, serde_json::json!(2)); assert_eq!(r1, Some(serde_json::json!(1)));
1382 assert!(r3.is_none()); assert_eq!(r2, Some(serde_json::json!(3))); }
1385
1386 #[test]
1387 fn test_message_buffer_duplicate() {
1388 let mut buffer = MessageBuffer::new();
1389
1390 buffer.process("sub1", 1, serde_json::json!(1));
1391 let dup = buffer.process("sub1", 1, serde_json::json!("dup"));
1392
1393 assert!(dup.is_none()); }
1395
1396 #[test]
1397 fn test_message_buffer_last_seq() {
1398 let mut buffer = MessageBuffer::new();
1399
1400 assert_eq!(buffer.last_seq("sub1"), 0);
1401 buffer.process("sub1", 1, serde_json::json!(1));
1402 assert_eq!(buffer.last_seq("sub1"), 1);
1403 buffer.process("sub1", 2, serde_json::json!(2));
1404 assert_eq!(buffer.last_seq("sub1"), 2);
1405 }
1406
1407 #[test]
1408 fn test_message_buffer_pending_count() {
1409 let mut buffer = MessageBuffer::new();
1410
1411 buffer.process("sub1", 1, serde_json::json!(1));
1412 buffer.process("sub1", 3, serde_json::json!(3)); buffer.process("sub1", 4, serde_json::json!(4)); assert_eq!(buffer.pending_count("sub1"), 2);
1416 }
1417
1418 #[test]
1419 fn test_message_buffer_clear() {
1420 let mut buffer = MessageBuffer::new();
1421
1422 buffer.process("sub1", 1, serde_json::json!(1));
1423 buffer.process("sub2", 1, serde_json::json!(2));
1424
1425 buffer.clear("sub1");
1426 assert_eq!(buffer.last_seq("sub1"), 0);
1427 assert_eq!(buffer.last_seq("sub2"), 1);
1428 }
1429
1430 #[test]
1431 fn test_message_buffer_clear_all() {
1432 let mut buffer = MessageBuffer::new();
1433
1434 buffer.process("sub1", 1, serde_json::json!(1));
1435 buffer.process("sub2", 1, serde_json::json!(2));
1436
1437 buffer.clear_all();
1438 assert_eq!(buffer.last_seq("sub1"), 0);
1439 assert_eq!(buffer.last_seq("sub2"), 0);
1440 }
1441
1442 #[test]
1443 fn test_message_buffer_multiple_subscriptions() {
1444 let mut buffer = MessageBuffer::new();
1445
1446 buffer.process("sub1", 1, serde_json::json!("a"));
1447 buffer.process("sub2", 1, serde_json::json!("b"));
1448 buffer.process("sub1", 2, serde_json::json!("c"));
1449
1450 assert_eq!(buffer.last_seq("sub1"), 2);
1451 assert_eq!(buffer.last_seq("sub2"), 1);
1452 }
1453
1454 #[test]
1459 fn test_connection_state_debug() {
1460 assert_eq!(format!("{:?}", ConnectionState::Connected), "Connected");
1461 assert_eq!(format!("{:?}", ConnectionState::Failed), "Failed");
1462 }
1463
1464 #[test]
1465 fn test_connection_state_clone() {
1466 let state = ConnectionState::Reconnecting;
1467 let cloned = state;
1468 assert_eq!(state, cloned);
1469 }
1470
1471 #[test]
1472 fn test_stream_message_debug() {
1473 let msg = StreamMessage::ping(12345);
1474 let debug = format!("{msg:?}");
1475 assert!(debug.contains("Ping"));
1476 }
1477
1478 #[test]
1479 fn test_stream_message_clone() {
1480 let msg = StreamMessage::data("sub1", serde_json::json!({"x": 1}), 5);
1481 let cloned = msg.clone();
1482 assert_eq!(msg, cloned);
1483 }
1484
1485 #[test]
1486 fn test_stream_subscription_clone() {
1487 let sub = StreamSubscription::with_id("sub1", "metrics")
1488 .with_interval(1000)
1489 .with_transform("rate()");
1490 let cloned = sub.clone();
1491 assert_eq!(cloned.id, "sub1");
1492 assert_eq!(cloned.source, "metrics");
1493 assert_eq!(cloned.transform, Some("rate()".to_string()));
1494 }
1495
1496 #[test]
1497 fn test_stream_subscription_debug() {
1498 let sub = StreamSubscription::new("test");
1499 let debug = format!("{sub:?}");
1500 assert!(debug.contains("StreamSubscription"));
1501 }
1502
1503 #[test]
1504 fn test_stream_subscription_hash_consistency() {
1505 let sub1 = StreamSubscription::new("metrics/cpu");
1507 let sub2 = StreamSubscription::new("metrics/cpu");
1508 assert_eq!(sub1.id, sub2.id);
1509 }
1510
1511 #[test]
1512 fn test_stream_subscription_hash_different() {
1513 let sub1 = StreamSubscription::new("metrics/cpu");
1514 let sub2 = StreamSubscription::new("metrics/memory");
1515 assert_ne!(sub1.id, sub2.id);
1516 }
1517
1518 #[test]
1519 fn test_stream_config_debug() {
1520 let config = StreamConfig::default();
1521 let debug = format!("{config:?}");
1522 assert!(debug.contains("StreamConfig"));
1523 }
1524
1525 #[test]
1526 fn test_stream_config_clone() {
1527 let config = StreamConfig::new("ws://test")
1528 .with_buffer_size(2048)
1529 .with_heartbeat(Duration::from_secs(60));
1530 let cloned = config.clone();
1531 assert_eq!(cloned.url, "ws://test");
1532 assert_eq!(cloned.buffer_size, 2048);
1533 }
1534
1535 #[test]
1536 fn test_stream_config_with_reconnect() {
1537 let reconnect = ReconnectConfig {
1538 enabled: false,
1539 max_attempts: Some(5),
1540 ..Default::default()
1541 };
1542 let config = StreamConfig::new("ws://x").with_reconnect(reconnect);
1543 assert!(!config.reconnect.enabled);
1544 assert_eq!(config.reconnect.max_attempts, Some(5));
1545 }
1546
1547 #[test]
1548 fn test_reconnect_config_debug() {
1549 let config = ReconnectConfig::default();
1550 let debug = format!("{config:?}");
1551 assert!(debug.contains("ReconnectConfig"));
1552 }
1553
1554 #[test]
1555 fn test_reconnect_config_clone() {
1556 let config = ReconnectConfig {
1557 max_attempts: Some(10),
1558 ..Default::default()
1559 };
1560 let cloned = config.clone();
1561 assert_eq!(cloned.max_attempts, Some(10));
1562 }
1563
1564 #[test]
1565 fn test_reconnect_delay_large_attempt() {
1566 let config = ReconnectConfig::default();
1567 let delay = config.delay_for_attempt(100);
1569 assert!(delay <= config.max_delay);
1570 }
1571
1572 #[test]
1573 fn test_data_stream_default() {
1574 let stream = DataStream::default();
1575 assert_eq!(stream.state(), ConnectionState::Disconnected);
1576 assert_eq!(stream.subscription_count(), 0);
1577 }
1578
1579 #[test]
1580 fn test_data_stream_set_state() {
1581 let stream = DataStream::default();
1582 stream.set_state(ConnectionState::Connected);
1583 assert_eq!(stream.state(), ConnectionState::Connected);
1584 stream.set_state(ConnectionState::Failed);
1585 assert_eq!(stream.state(), ConnectionState::Failed);
1586 }
1587
1588 #[test]
1589 fn test_data_stream_send() {
1590 let stream = DataStream::default();
1591 stream.send(StreamMessage::ping(100));
1592 stream.send(StreamMessage::ping(200));
1593
1594 let outbox = stream.take_outbox();
1595 assert_eq!(outbox.len(), 2);
1596 }
1597
1598 #[test]
1599 fn test_data_stream_get_nonexistent_subscription() {
1600 let stream = DataStream::default();
1601 assert!(stream.get_subscription("nonexistent").is_none());
1602 }
1603
1604 #[test]
1605 fn test_data_stream_get_nonexistent_data() {
1606 let stream = DataStream::default();
1607 assert!(stream.get_data("nonexistent").is_none());
1608 }
1609
1610 #[test]
1611 fn test_data_stream_subscriptions_list() {
1612 let stream = DataStream::default();
1613 stream.subscribe(StreamSubscription::with_id("sub1", "a"));
1614 stream.subscribe(StreamSubscription::with_id("sub2", "b"));
1615
1616 let subs = stream.subscriptions();
1617 assert_eq!(subs.len(), 2);
1618 }
1619
1620 #[test]
1621 fn test_data_stream_handle_pong() {
1622 let stream = DataStream::default();
1623 let response = stream.handle_message(StreamMessage::pong(12345));
1624 assert!(response.is_none());
1625 }
1626
1627 #[test]
1628 fn test_data_stream_handle_subscribe() {
1629 let stream = DataStream::default();
1630 let response = stream.handle_message(StreamMessage::subscribe("sub1", "metrics"));
1632 assert!(response.is_none());
1633 }
1634
1635 #[test]
1636 fn test_data_stream_handle_error_no_id() {
1637 let stream = DataStream::default();
1638 let response = stream.handle_message(StreamMessage::error("general error"));
1640 assert!(response.is_none());
1641 }
1642
1643 #[test]
1644 fn test_data_stream_handle_error_unknown_id() {
1645 let stream = DataStream::default();
1646 let response = stream.handle_message(StreamMessage::error_for("unknown", "error"));
1648 assert!(response.is_none());
1649 }
1650
1651 #[test]
1652 fn test_data_stream_handle_data_unknown_subscription() {
1653 let stream = DataStream::default();
1654 stream.handle_message(StreamMessage::data("unknown", serde_json::json!(42), 1));
1656 assert_eq!(stream.get_data("unknown"), Some(serde_json::json!(42)));
1657 }
1658
1659 #[test]
1660 fn test_rate_limiter_debug() {
1661 let limiter = RateLimiter::new(10, Duration::from_secs(1));
1662 let debug = format!("{limiter:?}");
1663 assert!(debug.contains("RateLimiter"));
1664 }
1665
1666 #[test]
1667 fn test_rate_limiter_at_boundary() {
1668 let mut limiter = RateLimiter::new(3, Duration::from_millis(100));
1669
1670 assert!(limiter.check(0));
1672 assert!(limiter.check(0));
1673 assert!(limiter.check(0));
1674 assert!(!limiter.check(0)); assert!(!limiter.check(100)); assert!(limiter.check(101)); }
1682
1683 #[test]
1684 fn test_message_buffer_debug() {
1685 let buffer = MessageBuffer::new();
1686 let debug = format!("{buffer:?}");
1687 assert!(debug.contains("MessageBuffer"));
1688 }
1689
1690 #[test]
1691 fn test_message_buffer_old_message() {
1692 let mut buffer = MessageBuffer::new();
1693
1694 buffer.process("sub1", 1, serde_json::json!(1));
1696 buffer.process("sub1", 2, serde_json::json!(2));
1697 buffer.process("sub1", 3, serde_json::json!(3));
1698
1699 let old = buffer.process("sub1", 1, serde_json::json!("old"));
1701 assert!(old.is_none());
1702 assert_eq!(buffer.last_seq("sub1"), 3);
1703 }
1704
1705 #[test]
1706 fn test_message_buffer_large_gap() {
1707 let mut buffer = MessageBuffer::new();
1708
1709 buffer.process("sub1", 1, serde_json::json!(1));
1710 buffer.process("sub1", 100, serde_json::json!(100)); assert_eq!(buffer.last_seq("sub1"), 1);
1714 assert_eq!(buffer.pending_count("sub1"), 1);
1715 }
1716
1717 #[test]
1718 fn test_message_buffer_nonexistent_subscription() {
1719 let buffer = MessageBuffer::new();
1720 assert_eq!(buffer.last_seq("nonexistent"), 0);
1721 assert_eq!(buffer.pending_count("nonexistent"), 0);
1722 }
1723
1724 #[test]
1725 fn test_stream_message_serialize_all_variants() {
1726 let messages = vec![
1727 StreamMessage::subscribe("s1", "source"),
1728 StreamMessage::subscribe_with_transform("s2", "source", "rate()"),
1729 StreamMessage::unsubscribe("s1"),
1730 StreamMessage::data("s1", serde_json::json!({"x": 1}), 5),
1731 StreamMessage::error("msg"),
1732 StreamMessage::error_for("s1", "msg"),
1733 StreamMessage::ack("s1"),
1734 StreamMessage::ping(1000),
1735 StreamMessage::pong(1000),
1736 ];
1737
1738 for msg in messages {
1739 let json = serde_json::to_string(&msg).unwrap();
1740 let parsed: StreamMessage = serde_json::from_str(&json).unwrap();
1741 assert_eq!(msg, parsed);
1742 }
1743 }
1744
1745 #[test]
1746 fn test_stream_subscription_empty_source() {
1747 let sub = StreamSubscription::new("");
1748 assert!(sub.id.starts_with("sub_"));
1749 assert_eq!(sub.source, "");
1750 }
1751
1752 #[test]
1753 fn test_stream_subscription_unicode_source() {
1754 let sub = StreamSubscription::new("数据/指标");
1755 assert!(sub.id.starts_with("sub_"));
1756 assert_eq!(sub.source, "数据/指标");
1757 }
1758
1759 #[test]
1760 fn test_data_stream_multiple_data_updates() {
1761 let stream = DataStream::default();
1762 stream.subscribe(StreamSubscription::with_id("sub1", "x"));
1763
1764 stream.handle_message(StreamMessage::data("sub1", serde_json::json!(1), 1));
1766 assert_eq!(stream.get_data("sub1"), Some(serde_json::json!(1)));
1767
1768 stream.handle_message(StreamMessage::data("sub1", serde_json::json!(2), 2));
1769 assert_eq!(stream.get_data("sub1"), Some(serde_json::json!(2)));
1770
1771 let sub = stream.get_subscription("sub1").unwrap();
1772 assert_eq!(sub.last_seq, 2);
1773 assert_eq!(sub.error_count, 0);
1774 }
1775
1776 #[test]
1777 fn test_reconnect_infinite_attempts() {
1778 let config = ReconnectConfig {
1779 enabled: true,
1780 max_attempts: None,
1781 ..Default::default()
1782 };
1783
1784 assert!(config.should_reconnect(0));
1786 assert!(config.should_reconnect(100));
1787 assert!(config.should_reconnect(1000));
1788 assert!(config.should_reconnect(u32::MAX - 1));
1789 }
1790}