1use rust_decimal::Decimal;
9
10#[derive(Debug, thiserror::Error)]
16pub enum StreamError {
17 #[error("WebSocket connection failed to '{url}': {reason}")]
19 ConnectionFailed {
20 url: String,
22 reason: String,
24 },
25
26 #[error("WebSocket disconnected from '{url}'")]
28 Disconnected {
29 url: String,
31 },
32
33 #[error("Reconnection exhausted after {attempts} attempts to '{url}'")]
35 ReconnectExhausted {
36 url: String,
38 attempts: u32,
40 },
41
42 #[error("Tick parse error from {exchange}: {reason}")]
44 ParseError {
45 exchange: String,
47 reason: String,
49 },
50
51 #[error(
53 "Feed '{feed_id}' is stale: last tick was {elapsed_ms}ms ago (threshold: {threshold_ms}ms)"
54 )]
55 StaleFeed {
56 feed_id: String,
58 elapsed_ms: u64,
60 threshold_ms: u64,
62 },
63
64 #[error("Unknown feed '{feed_id}': not registered with the health monitor")]
66 UnknownFeed {
67 feed_id: String,
69 },
70
71 #[error("Invalid configuration: {reason}")]
77 ConfigError {
78 reason: String,
80 },
81
82 #[error("Order book reconstruction failed for '{symbol}': {reason}")]
84 BookReconstructionFailed {
85 symbol: String,
87 reason: String,
89 },
90
91 #[error("Order book crossed for '{symbol}': best bid {bid} >= best ask {ask}")]
93 BookCrossed {
94 symbol: String,
96 bid: Decimal,
98 ask: Decimal,
100 },
101
102 #[error("Sequence gap for '{symbol}': expected {expected}, got {got}")]
104 SequenceGap {
105 symbol: String,
107 expected: u64,
109 got: u64,
111 },
112
113 #[error("Backpressure on channel '{channel}': {depth}/{capacity} slots used")]
115 Backpressure {
116 channel: String,
118 depth: usize,
120 capacity: usize,
122 },
123
124 #[error("Unknown exchange format: '{0}'")]
126 UnknownExchange(String),
127
128 #[error("I/O error: {0}")]
130 Io(String),
131
132 #[error("WebSocket error: {0}")]
134 WebSocket(String),
135
136 #[error("SPSC ring buffer is full (capacity: {capacity})")]
142 RingBufferFull {
143 capacity: usize,
145 },
146
147 #[error("SPSC ring buffer is empty")]
152 RingBufferEmpty,
153
154 #[error("OHLCV aggregation error: {reason}")]
159 AggregationError {
160 reason: String,
162 },
163
164 #[error("Normalization error: {reason}")]
169 NormalizationError {
170 reason: String,
172 },
173
174 #[error("Invalid tick: {reason}")]
179 InvalidTick {
180 reason: String,
182 },
183
184 #[error("Lorentz config error: {reason}")]
190 LorentzConfigError {
191 reason: String,
193 },
194
195 #[error("fin-primitives error: {0}")]
200 FinPrimitives(String),
201}
202
203impl From<fin_primitives::error::FinError> for StreamError {
204 fn from(e: fin_primitives::error::FinError) -> Self {
205 StreamError::FinPrimitives(e.to_string())
206 }
207}
208
209impl StreamError {
210 pub fn is_fatal(&self) -> bool {
217 matches!(
218 self,
219 StreamError::ConfigError { .. }
220 | StreamError::LorentzConfigError { .. }
221 | StreamError::UnknownExchange(_)
222 | StreamError::ReconnectExhausted { .. }
223 | StreamError::UnknownFeed { .. }
224 | StreamError::BookReconstructionFailed { .. }
225 )
226 }
227
228 pub fn source_module(&self) -> &'static str {
234 match self {
235 StreamError::ConnectionFailed { .. }
236 | StreamError::Disconnected { .. }
237 | StreamError::ReconnectExhausted { .. }
238 | StreamError::WebSocket(_) => "ws",
239 StreamError::BookCrossed { .. }
240 | StreamError::BookReconstructionFailed { .. }
241 | StreamError::SequenceGap { .. } => "book",
242 StreamError::RingBufferFull { .. } | StreamError::RingBufferEmpty => "ring",
243 StreamError::AggregationError { .. } | StreamError::NormalizationError { .. } => {
244 "aggregator"
245 }
246 StreamError::StaleFeed { .. } | StreamError::UnknownFeed { .. } => "session",
247 StreamError::LorentzConfigError { .. } => "lorentz",
248 _ => "other",
249 }
250 }
251
252 pub fn is_recoverable(&self) -> bool {
256 !self.is_fatal()
257 }
258
259 pub fn is_network_error(&self) -> bool {
265 matches!(
266 self,
267 StreamError::ConnectionFailed { .. }
268 | StreamError::Disconnected { .. }
269 | StreamError::ReconnectExhausted { .. }
270 | StreamError::WebSocket(_)
271 | StreamError::Io(_)
272 )
273 }
274
275 pub fn is_book_error(&self) -> bool {
281 matches!(
282 self,
283 StreamError::SequenceGap { .. }
284 | StreamError::BookCrossed { .. }
285 | StreamError::BookReconstructionFailed { .. }
286 )
287 }
288
289 pub fn variant_name(&self) -> &'static str {
294 match self {
295 StreamError::ConnectionFailed { .. } => "ConnectionFailed",
296 StreamError::Disconnected { .. } => "Disconnected",
297 StreamError::ReconnectExhausted { .. } => "ReconnectExhausted",
298 StreamError::ParseError { .. } => "ParseError",
299 StreamError::StaleFeed { .. } => "StaleFeed",
300 StreamError::UnknownFeed { .. } => "UnknownFeed",
301 StreamError::ConfigError { .. } => "ConfigError",
302 StreamError::BookReconstructionFailed { .. } => "BookReconstructionFailed",
303 StreamError::BookCrossed { .. } => "BookCrossed",
304 StreamError::SequenceGap { .. } => "SequenceGap",
305 StreamError::Backpressure { .. } => "Backpressure",
306 StreamError::UnknownExchange(_) => "UnknownExchange",
307 StreamError::Io(_) => "Io",
308 StreamError::WebSocket(_) => "WebSocket",
309 StreamError::RingBufferFull { .. } => "RingBufferFull",
310 StreamError::RingBufferEmpty => "RingBufferEmpty",
311 StreamError::AggregationError { .. } => "AggregationError",
312 StreamError::NormalizationError { .. } => "NormalizationError",
313 StreamError::InvalidTick { .. } => "InvalidTick",
314 StreamError::LorentzConfigError { .. } => "LorentzConfigError",
315 StreamError::FinPrimitives(_) => "FinPrimitives",
316 }
317 }
318
319 pub fn is_config_error(&self) -> bool {
321 matches!(self, StreamError::ConfigError { .. })
322 }
323
324 pub fn is_feed_error(&self) -> bool {
326 matches!(
327 self,
328 StreamError::StaleFeed { .. } | StreamError::UnknownFeed { .. }
329 )
330 }
331
332 pub fn is_parse_error(&self) -> bool {
334 matches!(self, StreamError::ParseError { .. })
335 }
336
337 pub fn is_book_reconstruction_error(&self) -> bool {
339 matches!(self, StreamError::BookReconstructionFailed { .. })
340 }
341
342 pub fn is_computation_error(&self) -> bool {
346 matches!(self, StreamError::LorentzConfigError { .. })
347 }
348
349 pub fn is_ring_error(&self) -> bool {
351 matches!(
352 self,
353 StreamError::RingBufferFull { .. } | StreamError::RingBufferEmpty
354 )
355 }
356
357 pub fn category(&self) -> &'static str {
363 if self.is_connection_error() {
364 "connection"
365 } else if self.is_data_error() {
366 "data"
367 } else if self.is_pipeline_error() {
368 "pipeline"
369 } else if self.is_book_error() {
370 "book"
371 } else if matches!(self, StreamError::ConfigError { .. }) {
372 "config"
373 } else {
374 "other"
375 }
376 }
377
378 pub fn is_pipeline_error(&self) -> bool {
385 matches!(
386 self,
387 StreamError::RingBufferFull { .. }
388 | StreamError::RingBufferEmpty
389 | StreamError::AggregationError { .. }
390 | StreamError::NormalizationError { .. }
391 | StreamError::LorentzConfigError { .. }
392 )
393 }
394
395 pub fn is_connection_error(&self) -> bool {
401 matches!(
402 self,
403 StreamError::ConnectionFailed { .. }
404 | StreamError::Disconnected { .. }
405 | StreamError::ReconnectExhausted { .. }
406 )
407 }
408
409 pub fn is_data_error(&self) -> bool {
415 matches!(
416 self,
417 StreamError::ParseError { .. }
418 | StreamError::InvalidTick { .. }
419 | StreamError::BookCrossed { .. }
420 | StreamError::SequenceGap { .. }
421 | StreamError::BookReconstructionFailed { .. }
422 )
423 }
424
425 pub fn is_transient(&self) -> bool {
443 matches!(
444 self,
445 StreamError::ConnectionFailed { .. }
446 | StreamError::Disconnected { .. }
447 | StreamError::ReconnectExhausted { .. }
448 | StreamError::StaleFeed { .. }
449 | StreamError::Backpressure { .. }
450 | StreamError::RingBufferFull { .. }
451 | StreamError::RingBufferEmpty
452 | StreamError::Io(_)
453 | StreamError::WebSocket(_)
454 )
455 }
456
457 pub fn affected_symbol(&self) -> Option<&str> {
464 match self {
465 StreamError::ConnectionFailed { url, .. }
466 | StreamError::Disconnected { url }
467 | StreamError::ReconnectExhausted { url, .. } => Some(url.as_str()),
468 StreamError::BookCrossed { symbol, .. }
469 | StreamError::BookReconstructionFailed { symbol, .. }
470 | StreamError::SequenceGap { symbol, .. } => Some(symbol.as_str()),
471 StreamError::StaleFeed { feed_id, .. }
472 | StreamError::UnknownFeed { feed_id } => Some(feed_id.as_str()),
473 _ => None,
474 }
475 }
476
477 pub fn is_sequence_gap(&self) -> bool {
479 matches!(self, StreamError::SequenceGap { .. })
480 }
481
482 pub fn is_data_integrity_error(&self) -> bool {
488 matches!(
489 self,
490 StreamError::BookCrossed { .. } | StreamError::SequenceGap { .. }
491 )
492 }
493
494 pub fn is_symbol_error(&self) -> bool {
498 matches!(
499 self,
500 StreamError::BookCrossed { .. }
501 | StreamError::BookReconstructionFailed { .. }
502 | StreamError::SequenceGap { .. }
503 | StreamError::StaleFeed { .. }
504 | StreamError::UnknownFeed { .. }
505 )
506 }
507
508 pub fn to_error_code(&self) -> u32 {
519 match self {
520 StreamError::ConnectionFailed { .. } => 1001,
521 StreamError::Disconnected { .. } => 1002,
522 StreamError::ReconnectExhausted { .. } => 1003,
523 StreamError::Io(_) => 1004,
524 StreamError::WebSocket(_) => 1005,
525 StreamError::ParseError { .. } => 2001,
526 StreamError::UnknownExchange(_) => 2002,
527 StreamError::StaleFeed { .. } => 2003,
528 StreamError::UnknownFeed { .. } => 2004,
529 StreamError::ConfigError { .. } => 3001,
530 StreamError::BookReconstructionFailed { .. } => 4001,
531 StreamError::BookCrossed { .. } => 4002,
532 StreamError::SequenceGap { .. } => 4003,
533 StreamError::Backpressure { .. } => 5001,
534 StreamError::RingBufferFull { .. } => 5002,
535 StreamError::RingBufferEmpty => 5003,
536 StreamError::AggregationError { .. } => 6001,
537 StreamError::NormalizationError { .. } => 6002,
538 StreamError::InvalidTick { .. } => 6003,
539 StreamError::FinPrimitives(_) => 6004,
540 StreamError::LorentzConfigError { .. } => 7001,
541 }
542 }
543
544 pub fn has_url(&self) -> bool {
546 matches!(self, StreamError::ConnectionFailed { .. })
547 }
548
549 pub fn error_category_code(&self) -> u32 {
553 self.to_error_code() / 1000
554 }
555
556 pub fn is_buffer_error(&self) -> bool {
558 matches!(
559 self,
560 StreamError::Backpressure { .. }
561 | StreamError::RingBufferFull { .. }
562 | StreamError::RingBufferEmpty
563 )
564 }
565
566 pub fn is_normalization_error(&self) -> bool {
568 matches!(self, StreamError::NormalizationError { .. })
569 }
570
571 pub fn is_agg_error(&self) -> bool {
573 matches!(self, StreamError::AggregationError { .. })
574 }
575
576 pub fn is_sequence_error(&self) -> bool {
578 matches!(self, StreamError::SequenceGap { .. })
579 }
580
581 pub fn severity_level(&self) -> u8 {
588 match self {
589 StreamError::ReconnectExhausted { .. } => 4,
590 StreamError::ConnectionFailed { .. }
591 | StreamError::Disconnected { .. }
592 | StreamError::BookReconstructionFailed { .. }
593 | StreamError::BookCrossed { .. }
594 | StreamError::LorentzConfigError { .. } => 3,
595 StreamError::StaleFeed { .. }
596 | StreamError::SequenceGap { .. }
597 | StreamError::Backpressure { .. }
598 | StreamError::RingBufferFull { .. }
599 | StreamError::RingBufferEmpty => 2,
600 _ => 1,
601 }
602 }
603
604}
605
606#[cfg(test)]
607mod tests {
608 use super::*;
609
610 #[test]
611 fn test_connection_failed_display() {
612 let e = StreamError::ConnectionFailed {
613 url: "wss://example.com".into(),
614 reason: "timeout".into(),
615 };
616 assert!(e.to_string().contains("example.com"));
617 assert!(e.to_string().contains("timeout"));
618 }
619
620 #[test]
621 fn test_disconnected_display() {
622 let e = StreamError::Disconnected {
623 url: "wss://feed.io".into(),
624 };
625 assert!(e.to_string().contains("feed.io"));
626 }
627
628 #[test]
629 fn test_reconnect_exhausted_display() {
630 let e = StreamError::ReconnectExhausted {
631 url: "wss://x.io".into(),
632 attempts: 5,
633 };
634 assert!(e.to_string().contains("5"));
635 }
636
637 #[test]
638 fn test_parse_error_display() {
639 let e = StreamError::ParseError {
640 exchange: "Binance".into(),
641 reason: "missing field".into(),
642 };
643 assert!(e.to_string().contains("Binance"));
644 }
645
646 #[test]
647 fn test_stale_feed_display() {
648 let e = StreamError::StaleFeed {
649 feed_id: "BTC-USD".into(),
650 elapsed_ms: 5000,
651 threshold_ms: 2000,
652 };
653 assert!(e.to_string().contains("BTC-USD"));
654 assert!(e.to_string().contains("5000"));
655 }
656
657 #[test]
658 fn test_unknown_feed_display() {
659 let e = StreamError::UnknownFeed {
660 feed_id: "ghost-feed".into(),
661 };
662 assert!(e.to_string().contains("ghost-feed"));
663 assert!(e.to_string().contains("not registered"));
664 }
665
666 #[test]
667 fn test_config_error_display() {
668 let e = StreamError::ConfigError {
669 reason: "multiplier must be >= 1.0".into(),
670 };
671 assert!(e.to_string().contains("multiplier"));
672 }
673
674 #[test]
675 fn test_book_reconstruction_failed_display() {
676 let e = StreamError::BookReconstructionFailed {
677 symbol: "ETH-USD".into(),
678 reason: "gap in sequence".into(),
679 };
680 assert!(e.to_string().contains("ETH-USD"));
681 }
682
683 #[test]
684 fn test_book_crossed_display() {
685 let e = StreamError::BookCrossed {
686 symbol: "BTC-USD".into(),
687 bid: Decimal::from(50001u32),
688 ask: Decimal::from(50000u32),
689 };
690 assert!(e.to_string().contains("crossed"));
691 assert!(e.to_string().contains("BTC-USD"));
692 }
693
694 #[test]
695 fn test_sequence_gap_display() {
696 let e = StreamError::SequenceGap {
697 symbol: "BTC-USD".into(),
698 expected: 5,
699 got: 7,
700 };
701 assert!(e.to_string().contains("5"));
702 assert!(e.to_string().contains("7"));
703 }
704
705 #[test]
706 fn test_backpressure_display() {
707 let e = StreamError::Backpressure {
708 channel: "ticks".into(),
709 depth: 1000,
710 capacity: 1000,
711 };
712 assert!(e.to_string().contains("1000"));
713 }
714
715 #[test]
716 fn test_unknown_exchange_display() {
717 let e = StreamError::UnknownExchange("Kraken".into());
718 assert!(e.to_string().contains("Kraken"));
719 }
720
721 #[test]
722 fn test_ring_buffer_full_display() {
723 let e = StreamError::RingBufferFull { capacity: 1024 };
724 assert!(e.to_string().contains("1024"));
725 assert!(e.to_string().contains("full"));
726 }
727
728 #[test]
729 fn test_ring_buffer_empty_display() {
730 let e = StreamError::RingBufferEmpty;
731 assert!(e.to_string().contains("empty"));
732 }
733
734 #[test]
735 fn test_aggregation_error_display() {
736 let e = StreamError::AggregationError {
737 reason: "wrong symbol".into(),
738 };
739 assert!(e.to_string().contains("wrong symbol"));
740 }
741
742 #[test]
743 fn test_normalization_error_display() {
744 let e = StreamError::NormalizationError {
745 reason: "window not seeded".into(),
746 };
747 assert!(e.to_string().contains("window not seeded"));
748 }
749
750 #[test]
751 fn test_invalid_tick_display() {
752 let e = StreamError::InvalidTick {
753 reason: "negative price".into(),
754 };
755 assert!(e.to_string().contains("negative price"));
756 }
757
758 #[test]
759 fn test_lorentz_config_error_display() {
760 let e = StreamError::LorentzConfigError {
761 reason: "beta >= 1".into(),
762 };
763 assert!(e.to_string().contains("beta >= 1"));
764 }
765
766 #[test]
767 fn test_is_transient_websocket_is_transient() {
768 assert!(StreamError::WebSocket("reset".into()).is_transient());
769 assert!(StreamError::ConnectionFailed {
770 url: "wss://x.io".into(),
771 reason: "timeout".into()
772 }
773 .is_transient());
774 assert!(StreamError::RingBufferFull { capacity: 8 }.is_transient());
775 assert!(StreamError::RingBufferEmpty.is_transient());
776 }
777
778 #[test]
779 fn test_is_data_error_parse_and_book_errors() {
780 assert!(StreamError::ParseError {
781 exchange: "Binance".into(),
782 reason: "bad field".into()
783 }
784 .is_data_error());
785 assert!(StreamError::InvalidTick {
786 reason: "neg price".into()
787 }
788 .is_data_error());
789 assert!(StreamError::BookCrossed {
790 symbol: "BTC-USD".into(),
791 bid: Decimal::from(1u32),
792 ask: Decimal::from(1u32)
793 }
794 .is_data_error());
795 assert!(StreamError::SequenceGap {
796 symbol: "BTC-USD".into(),
797 expected: 1,
798 got: 3
799 }
800 .is_data_error());
801 }
802
803 #[test]
804 fn test_is_data_error_connectivity_errors_are_not_data() {
805 assert!(!StreamError::WebSocket("reset".into()).is_data_error());
806 assert!(!StreamError::ConnectionFailed {
807 url: "wss://x.io".into(),
808 reason: "timeout".into()
809 }
810 .is_data_error());
811 assert!(!StreamError::ConfigError {
812 reason: "bad param".into()
813 }
814 .is_data_error());
815 }
816
817 #[test]
818 fn test_is_transient_config_errors_are_not_transient() {
819 assert!(!StreamError::ConfigError {
820 reason: "bad param".into()
821 }
822 .is_transient());
823 assert!(!StreamError::ParseError {
824 exchange: "Binance".into(),
825 reason: "bad field".into()
826 }
827 .is_transient());
828 assert!(!StreamError::InvalidTick {
829 reason: "neg price".into()
830 }
831 .is_transient());
832 assert!(!StreamError::LorentzConfigError {
833 reason: "beta>=1".into()
834 }
835 .is_transient());
836 }
837
838 #[test]
841 fn test_is_connection_error_connection_variants() {
842 assert!(StreamError::ConnectionFailed {
843 url: "wss://x.io".into(),
844 reason: "refused".into()
845 }
846 .is_connection_error());
847 assert!(StreamError::Disconnected {
848 url: "wss://x.io".into()
849 }
850 .is_connection_error());
851 assert!(StreamError::ReconnectExhausted {
852 url: "wss://x.io".into(),
853 attempts: 5
854 }
855 .is_connection_error());
856 }
857
858 #[test]
859 fn test_is_connection_error_data_errors_are_not_connection() {
860 assert!(!StreamError::ParseError {
861 exchange: "Binance".into(),
862 reason: "bad json".into()
863 }
864 .is_connection_error());
865 assert!(!StreamError::ConfigError {
866 reason: "bad config".into()
867 }
868 .is_connection_error());
869 assert!(!StreamError::BookCrossed {
870 symbol: "BTC-USD".into(),
871 bid: rust_decimal_macros::dec!(100),
872 ask: rust_decimal_macros::dec!(99),
873 }
874 .is_connection_error());
875 }
876
877 #[test]
880 fn test_is_pipeline_error_ring_and_aggregation_variants() {
881 assert!(StreamError::RingBufferFull { capacity: 8 }.is_pipeline_error());
882 assert!(StreamError::RingBufferEmpty.is_pipeline_error());
883 assert!(StreamError::AggregationError {
884 reason: "wrong symbol".into()
885 }
886 .is_pipeline_error());
887 assert!(StreamError::NormalizationError {
888 reason: "out of range".into()
889 }
890 .is_pipeline_error());
891 assert!(StreamError::LorentzConfigError {
892 reason: "beta>=1".into()
893 }
894 .is_pipeline_error());
895 }
896
897 #[test]
898 fn test_is_pipeline_error_connection_errors_are_not_pipeline() {
899 assert!(!StreamError::ConnectionFailed {
900 url: "wss://x.io".into(),
901 reason: "refused".into()
902 }
903 .is_pipeline_error());
904 assert!(!StreamError::ParseError {
905 exchange: "Binance".into(),
906 reason: "bad json".into()
907 }
908 .is_pipeline_error());
909 }
910
911 #[test]
914 fn test_category_connection_errors() {
915 assert_eq!(
916 StreamError::ConnectionFailed { url: "u".into(), reason: "r".into() }.category(),
917 "connection"
918 );
919 assert_eq!(
920 StreamError::Disconnected { url: "u".into() }.category(),
921 "connection"
922 );
923 }
924
925 #[test]
926 fn test_category_data_errors() {
927 assert_eq!(
928 StreamError::ParseError { exchange: "B".into(), reason: "r".into() }.category(),
929 "data"
930 );
931 assert_eq!(
932 StreamError::InvalidTick { reason: "neg price".into() }.category(),
933 "data"
934 );
935 }
936
937 #[test]
938 fn test_category_pipeline_errors() {
939 assert_eq!(StreamError::RingBufferFull { capacity: 8 }.category(), "pipeline");
940 assert_eq!(StreamError::RingBufferEmpty.category(), "pipeline");
941 }
942
943 #[test]
944 fn test_category_config_errors() {
945 assert_eq!(
946 StreamError::ConfigError { reason: "bad param".into() }.category(),
947 "config"
948 );
949 }
950
951 #[test]
954 fn test_is_fatal_config_error() {
955 assert!(StreamError::ConfigError { reason: "bad".into() }.is_fatal());
956 }
957
958 #[test]
959 fn test_is_fatal_unknown_exchange() {
960 assert!(StreamError::UnknownExchange("Kraken".into()).is_fatal());
961 }
962
963 #[test]
964 fn test_is_fatal_reconnect_exhausted() {
965 assert!(StreamError::ReconnectExhausted { url: "wss://x".into(), attempts: 5 }.is_fatal());
966 }
967
968 #[test]
969 fn test_is_fatal_lorentz_config() {
970 assert!(StreamError::LorentzConfigError { reason: "beta>=1".into() }.is_fatal());
971 }
972
973 #[test]
974 fn test_is_fatal_parse_error_is_not_fatal() {
975 assert!(!StreamError::ParseError {
976 exchange: "Binance".into(),
977 reason: "bad json".into()
978 }.is_fatal());
979 }
980
981 #[test]
982 fn test_is_fatal_ring_buffer_full_is_not_fatal() {
983 assert!(!StreamError::RingBufferFull { capacity: 8 }.is_fatal());
984 }
985
986 #[test]
989 fn test_is_recoverable_inverse_of_is_fatal() {
990 let fatal = StreamError::ConfigError { reason: "bad".into() };
991 let transient = StreamError::RingBufferFull { capacity: 8 };
992 assert!(fatal.is_fatal() && !fatal.is_recoverable());
993 assert!(!transient.is_fatal() && transient.is_recoverable());
994 }
995
996 #[test]
997 fn test_is_recoverable_connection_failed() {
998 assert!(StreamError::ConnectionFailed {
999 url: "wss://x".into(),
1000 reason: "timeout".into()
1001 }.is_recoverable());
1002 }
1003
1004 #[test]
1007 fn test_source_module_ws_variants() {
1008 assert_eq!(StreamError::WebSocket("err".into()).source_module(), "ws");
1009 assert_eq!(
1010 StreamError::ConnectionFailed { url: "u".into(), reason: "r".into() }.source_module(),
1011 "ws"
1012 );
1013 assert_eq!(
1014 StreamError::Disconnected { url: "u".into() }.source_module(),
1015 "ws"
1016 );
1017 }
1018
1019 #[test]
1020 fn test_source_module_book_variants() {
1021 assert_eq!(
1022 StreamError::BookCrossed {
1023 symbol: "BTC-USD".into(),
1024 bid: Decimal::from(100u32),
1025 ask: Decimal::from(99u32),
1026 }
1027 .source_module(),
1028 "book"
1029 );
1030 assert_eq!(
1031 StreamError::SequenceGap { symbol: "X".into(), expected: 1, got: 3 }.source_module(),
1032 "book"
1033 );
1034 }
1035
1036 #[test]
1037 fn test_source_module_ring_variants() {
1038 assert_eq!(StreamError::RingBufferFull { capacity: 8 }.source_module(), "ring");
1039 assert_eq!(StreamError::RingBufferEmpty.source_module(), "ring");
1040 }
1041
1042 #[test]
1043 fn test_source_module_aggregator_variants() {
1044 assert_eq!(
1045 StreamError::AggregationError { reason: "bad".into() }.source_module(),
1046 "aggregator"
1047 );
1048 assert_eq!(
1049 StreamError::NormalizationError { reason: "bad".into() }.source_module(),
1050 "aggregator"
1051 );
1052 }
1053
1054 #[test]
1055 fn test_source_module_session_variants() {
1056 assert_eq!(
1057 StreamError::StaleFeed { feed_id: "X".into(), elapsed_ms: 1, threshold_ms: 1 }
1058 .source_module(),
1059 "session"
1060 );
1061 assert_eq!(
1062 StreamError::UnknownFeed { feed_id: "X".into() }.source_module(),
1063 "session"
1064 );
1065 }
1066
1067 #[test]
1068 fn test_source_module_lorentz_variant() {
1069 assert_eq!(
1070 StreamError::LorentzConfigError { reason: "bad".into() }.source_module(),
1071 "lorentz"
1072 );
1073 }
1074
1075 #[test]
1076 fn test_source_module_other_fallback() {
1077 assert_eq!(StreamError::UnknownExchange("X".into()).source_module(), "other");
1078 assert_eq!(
1079 StreamError::ConfigError { reason: "bad".into() }.source_module(),
1080 "other"
1081 );
1082 }
1083
1084 #[test]
1085 fn test_is_network_error_ws_variants() {
1086 assert!(StreamError::WebSocket("dropped".into()).is_network_error());
1087 assert!(StreamError::ConnectionFailed {
1088 url: "wss://x".into(),
1089 reason: "timeout".into()
1090 }
1091 .is_network_error());
1092 assert!(StreamError::Disconnected { url: "wss://x".into() }.is_network_error());
1093 }
1094
1095 #[test]
1096 fn test_is_network_error_false_for_config() {
1097 assert!(!StreamError::ConfigError { reason: "bad".into() }.is_network_error());
1098 assert!(!StreamError::RingBufferFull { capacity: 8 }.is_network_error());
1099 assert!(!StreamError::BookCrossed {
1100 symbol: "BTC".into(),
1101 bid: Decimal::from(100u32),
1102 ask: Decimal::from(99u32),
1103 }
1104 .is_network_error());
1105 }
1106
1107 #[test]
1110 fn test_variant_name_connection_failed() {
1111 assert_eq!(
1112 StreamError::ConnectionFailed { url: "u".into(), reason: "r".into() }.variant_name(),
1113 "ConnectionFailed"
1114 );
1115 }
1116
1117 #[test]
1118 fn test_variant_name_ring_buffer_empty() {
1119 assert_eq!(StreamError::RingBufferEmpty.variant_name(), "RingBufferEmpty");
1120 }
1121
1122 #[test]
1123 fn test_variant_name_parse_error() {
1124 assert_eq!(
1125 StreamError::ParseError { exchange: "B".into(), reason: "bad".into() }.variant_name(),
1126 "ParseError"
1127 );
1128 }
1129
1130 #[test]
1131 fn test_variant_name_book_crossed() {
1132 assert_eq!(
1133 StreamError::BookCrossed {
1134 symbol: "X".into(),
1135 bid: Decimal::from(1u32),
1136 ask: Decimal::from(1u32),
1137 }
1138 .variant_name(),
1139 "BookCrossed"
1140 );
1141 }
1142
1143 #[test]
1144 fn test_variant_name_lorentz_config_error() {
1145 assert_eq!(
1146 StreamError::LorentzConfigError { reason: "bad".into() }.variant_name(),
1147 "LorentzConfigError"
1148 );
1149 }
1150
1151 #[test]
1154 fn test_is_config_error_true_for_config_error() {
1155 let e = StreamError::ConfigError { reason: "bad config".into() };
1156 assert!(e.is_config_error());
1157 }
1158
1159 #[test]
1160 fn test_is_config_error_false_for_connection_failed() {
1161 let e = StreamError::ConnectionFailed { url: "ws://x".into(), reason: "timeout".into() };
1162 assert!(!e.is_config_error());
1163 }
1164
1165 #[test]
1166 fn test_is_config_error_false_for_ring_buffer_empty() {
1167 assert!(!StreamError::RingBufferEmpty.is_config_error());
1168 }
1169
1170 #[test]
1173 fn test_is_feed_error_true_for_stale_feed() {
1174 let e = StreamError::StaleFeed {
1175 feed_id: "btc".into(),
1176 elapsed_ms: 5_000,
1177 threshold_ms: 1_000,
1178 };
1179 assert!(e.is_feed_error());
1180 }
1181
1182 #[test]
1183 fn test_is_feed_error_true_for_unknown_feed() {
1184 let e = StreamError::UnknownFeed { feed_id: "xyz".into() };
1185 assert!(e.is_feed_error());
1186 }
1187
1188 #[test]
1189 fn test_is_feed_error_false_for_ring_buffer_empty() {
1190 assert!(!StreamError::RingBufferEmpty.is_feed_error());
1191 }
1192
1193 #[test]
1196 fn test_is_parse_error_true_for_parse_error() {
1197 let e = StreamError::ParseError {
1198 exchange: "binance".into(),
1199 reason: "unexpected field".into(),
1200 };
1201 assert!(e.is_parse_error());
1202 }
1203
1204 #[test]
1205 fn test_is_parse_error_false_for_connection_failed() {
1206 let e = StreamError::ConnectionFailed { url: "ws://x".into(), reason: "timeout".into() };
1207 assert!(!e.is_parse_error());
1208 }
1209
1210 #[test]
1211 fn test_is_parse_error_false_for_stale_feed() {
1212 let e = StreamError::StaleFeed { feed_id: "x".into(), elapsed_ms: 1, threshold_ms: 1 };
1213 assert!(!e.is_parse_error());
1214 }
1215
1216 #[test]
1219 fn test_is_book_reconstruction_error_true() {
1220 let e = StreamError::BookReconstructionFailed {
1221 symbol: "BTC-USD".into(),
1222 reason: "checksum mismatch".into(),
1223 };
1224 assert!(e.is_book_reconstruction_error());
1225 }
1226
1227 #[test]
1228 fn test_is_book_reconstruction_error_false_for_book_crossed() {
1229 let e = StreamError::BookCrossed {
1230 symbol: "BTC-USD".into(),
1231 bid: rust_decimal_macros::dec!(101),
1232 ask: rust_decimal_macros::dec!(99),
1233 };
1234 assert!(!e.is_book_reconstruction_error());
1235 }
1236
1237 #[test]
1238 fn test_is_book_reconstruction_error_false_for_ring_buffer_empty() {
1239 assert!(!StreamError::RingBufferEmpty.is_book_reconstruction_error());
1240 }
1241
1242 #[test]
1245 fn test_affected_symbol_returns_url_for_connection_failed() {
1246 let e = StreamError::ConnectionFailed {
1247 url: "wss://feed.io".into(),
1248 reason: "refused".into(),
1249 };
1250 assert_eq!(e.affected_symbol(), Some("wss://feed.io"));
1251 }
1252
1253 #[test]
1254 fn test_affected_symbol_returns_symbol_for_book_crossed() {
1255 let e = StreamError::BookCrossed {
1256 symbol: "ETH-USD".into(),
1257 bid: rust_decimal_macros::dec!(101),
1258 ask: rust_decimal_macros::dec!(99),
1259 };
1260 assert_eq!(e.affected_symbol(), Some("ETH-USD"));
1261 }
1262
1263 #[test]
1264 fn test_affected_symbol_returns_feed_id_for_stale_feed() {
1265 let e = StreamError::StaleFeed {
1266 feed_id: "BTC-USD".into(),
1267 elapsed_ms: 5_000,
1268 threshold_ms: 2_000,
1269 };
1270 assert_eq!(e.affected_symbol(), Some("BTC-USD"));
1271 }
1272
1273 #[test]
1274 fn test_affected_symbol_none_for_ring_buffer_errors() {
1275 assert!(StreamError::RingBufferEmpty.affected_symbol().is_none());
1276 assert!(StreamError::RingBufferFull { capacity: 8 }.affected_symbol().is_none());
1277 }
1278
1279 #[test]
1282 fn test_is_sequence_gap_true_for_sequence_gap() {
1283 let e = StreamError::SequenceGap {
1284 symbol: "BTC-USD".into(),
1285 expected: 10,
1286 got: 15,
1287 };
1288 assert!(e.is_sequence_gap());
1289 }
1290
1291 #[test]
1292 fn test_is_sequence_gap_false_for_book_crossed() {
1293 let e = StreamError::BookCrossed {
1294 symbol: "BTC-USD".into(),
1295 bid: rust_decimal_macros::dec!(101),
1296 ask: rust_decimal_macros::dec!(99),
1297 };
1298 assert!(!e.is_sequence_gap());
1299 }
1300
1301 #[test]
1302 fn test_is_sequence_gap_false_for_ring_buffer_empty() {
1303 assert!(!StreamError::RingBufferEmpty.is_sequence_gap());
1304 }
1305
1306 #[test]
1309 fn test_is_symbol_error_true_for_book_crossed() {
1310 let e = StreamError::BookCrossed {
1311 symbol: "X".into(),
1312 bid: rust_decimal_macros::dec!(100),
1313 ask: rust_decimal_macros::dec!(99),
1314 };
1315 assert!(e.is_symbol_error());
1316 }
1317
1318 #[test]
1319 fn test_is_symbol_error_true_for_stale_feed() {
1320 let e = StreamError::StaleFeed {
1321 feed_id: "feed1".into(),
1322 elapsed_ms: 10_000,
1323 threshold_ms: 5_000,
1324 };
1325 assert!(e.is_symbol_error());
1326 }
1327
1328 #[test]
1329 fn test_is_symbol_error_false_for_io_error() {
1330 assert!(!StreamError::Io("disk error".into()).is_symbol_error());
1331 }
1332
1333 #[test]
1334 fn test_is_symbol_error_false_for_config_error() {
1335 assert!(!StreamError::ConfigError { reason: "bad".into() }.is_symbol_error());
1336 }
1337
1338 #[test]
1341 fn test_to_error_code_connection_failed_is_1001() {
1342 let e = StreamError::ConnectionFailed {
1343 url: "wss://example.com".into(),
1344 reason: "timeout".into(),
1345 };
1346 assert_eq!(e.to_error_code(), 1001);
1347 }
1348
1349 #[test]
1350 fn test_to_error_code_book_crossed_is_4002() {
1351 let e = StreamError::BookCrossed {
1352 symbol: "X".into(),
1353 bid: rust_decimal_macros::dec!(100),
1354 ask: rust_decimal_macros::dec!(99),
1355 };
1356 assert_eq!(e.to_error_code(), 4002);
1357 }
1358
1359 #[test]
1360 fn test_to_error_code_ring_buffer_empty_is_5003() {
1361 assert_eq!(StreamError::RingBufferEmpty.to_error_code(), 5003);
1362 }
1363
1364 #[test]
1365 fn test_to_error_code_lorentz_is_7001() {
1366 let e = StreamError::LorentzConfigError { reason: "bad beta".into() };
1367 assert_eq!(e.to_error_code(), 7001);
1368 }
1369
1370 #[test]
1372 fn test_is_data_integrity_error_true_for_book_crossed() {
1373 let e = StreamError::BookCrossed {
1374 symbol: "AAPL".into(),
1375 bid: rust_decimal_macros::dec!(101),
1376 ask: rust_decimal_macros::dec!(100),
1377 };
1378 assert!(e.is_data_integrity_error());
1379 }
1380
1381 #[test]
1382 fn test_is_data_integrity_error_true_for_sequence_gap() {
1383 let e = StreamError::SequenceGap {
1384 symbol: "BTCUSDT".into(),
1385 expected: 42,
1386 got: 50,
1387 };
1388 assert!(e.is_data_integrity_error());
1389 }
1390
1391 #[test]
1392 fn test_is_data_integrity_error_false_for_connection_failed() {
1393 let e = StreamError::ConnectionFailed {
1394 url: "wss://example.com".into(),
1395 reason: "timeout".into(),
1396 };
1397 assert!(!e.is_data_integrity_error());
1398 }
1399
1400 #[test]
1401 fn test_is_data_integrity_error_false_for_stale_feed() {
1402 let e = StreamError::StaleFeed {
1403 feed_id: "kraken".into(),
1404 elapsed_ms: 9_000,
1405 threshold_ms: 5_000,
1406 };
1407 assert!(!e.is_data_integrity_error());
1408 }
1409
1410 #[test]
1412 fn test_is_connection_error_true_for_connection_failed() {
1413 let e = StreamError::ConnectionFailed { url: "ws://x".into(), reason: "refused".into() };
1414 assert!(e.is_connection_error());
1415 }
1416
1417 #[test]
1418 fn test_is_connection_error_true_for_disconnected() {
1419 let e = StreamError::Disconnected { url: "ws://x".into() };
1420 assert!(e.is_connection_error());
1421 }
1422
1423 #[test]
1424 fn test_is_connection_error_true_for_reconnect_exhausted() {
1425 let e = StreamError::ReconnectExhausted { url: "ws://x".into(), attempts: 5 };
1426 assert!(e.is_connection_error());
1427 }
1428
1429 #[test]
1430 fn test_is_connection_error_false_for_book_crossed() {
1431 let e = StreamError::BookCrossed {
1432 symbol: "AAPL".into(),
1433 bid: rust_decimal_macros::dec!(101),
1434 ask: rust_decimal_macros::dec!(100),
1435 };
1436 assert!(!e.is_connection_error());
1437 }
1438
1439 #[test]
1441 fn test_is_computation_error_true_for_lorentz() {
1442 let e = StreamError::LorentzConfigError { reason: "beta >= 1".into() };
1443 assert!(e.is_computation_error());
1444 }
1445
1446 #[test]
1447 fn test_is_computation_error_false_for_connection() {
1448 let e = StreamError::ConnectionFailed { url: "ws://x".into(), reason: "refused".into() };
1449 assert!(!e.is_computation_error());
1450 }
1451
1452 #[test]
1453 fn test_is_ring_error_true_for_ring_buffer_full() {
1454 let e = StreamError::RingBufferFull { capacity: 10 };
1455 assert!(e.is_ring_error());
1456 }
1457
1458 #[test]
1459 fn test_is_ring_error_true_for_ring_buffer_empty() {
1460 assert!(StreamError::RingBufferEmpty.is_ring_error());
1461 }
1462
1463 #[test]
1464 fn test_is_ring_error_false_for_other_errors() {
1465 let e = StreamError::ConnectionFailed { url: "ws://x".into(), reason: "timeout".into() };
1466 assert!(!e.is_ring_error());
1467 }
1468
1469 #[test]
1472 fn test_has_url_true_for_connection_failed() {
1473 let e = StreamError::ConnectionFailed { url: "wss://x".into(), reason: "timeout".into() };
1474 assert!(e.has_url());
1475 }
1476
1477 #[test]
1478 fn test_has_url_false_for_other_errors() {
1479 assert!(!StreamError::RingBufferEmpty.has_url());
1480 assert!(!StreamError::ConfigError { reason: "bad".into() }.has_url());
1481 }
1482
1483 #[test]
1484 fn test_error_category_code_connection_is_1() {
1485 let e = StreamError::ConnectionFailed { url: "wss://x".into(), reason: "x".into() };
1486 assert_eq!(e.error_category_code(), 1);
1487 }
1488
1489 #[test]
1490 fn test_error_category_code_book_is_4() {
1491 let e = StreamError::BookCrossed {
1492 symbol: "X".into(),
1493 bid: rust_decimal_macros::dec!(100),
1494 ask: rust_decimal_macros::dec!(99),
1495 };
1496 assert_eq!(e.error_category_code(), 4);
1497 }
1498
1499 #[test]
1500 fn test_error_category_code_lorentz_is_7() {
1501 let e = StreamError::LorentzConfigError { reason: "bad beta".into() };
1502 assert_eq!(e.error_category_code(), 7);
1503 }
1504
1505 #[test]
1508 fn test_is_buffer_error_true_for_ring_buffer_empty() {
1509 assert!(StreamError::RingBufferEmpty.is_buffer_error());
1510 }
1511
1512 #[test]
1513 fn test_is_buffer_error_true_for_backpressure() {
1514 assert!(StreamError::Backpressure { channel: "x".into(), depth: 8, capacity: 8 }.is_buffer_error());
1515 }
1516
1517 #[test]
1518 fn test_is_buffer_error_true_for_ring_buffer_full() {
1519 assert!(StreamError::RingBufferFull { capacity: 8 }.is_buffer_error());
1520 }
1521
1522 #[test]
1523 fn test_is_buffer_error_false_for_connection_failed() {
1524 let e = StreamError::ConnectionFailed { url: "wss://x".into(), reason: "x".into() };
1525 assert!(!e.is_buffer_error());
1526 }
1527
1528 #[test]
1531 fn test_is_normalization_error_true() {
1532 let e = StreamError::NormalizationError { reason: "empty window".into() };
1533 assert!(e.is_normalization_error());
1534 }
1535
1536 #[test]
1537 fn test_is_normalization_error_false_for_parse_error() {
1538 let e = StreamError::ParseError { exchange: "binance".into(), reason: "bad json".into() };
1539 assert!(!e.is_normalization_error());
1540 }
1541
1542 #[test]
1543 fn test_is_agg_error_true() {
1544 let e = StreamError::AggregationError { reason: "mismatched symbol".into() };
1545 assert!(e.is_agg_error());
1546 }
1547
1548 #[test]
1549 fn test_is_agg_error_false_for_other() {
1550 assert!(!StreamError::RingBufferEmpty.is_agg_error());
1551 }
1552
1553 #[test]
1556 fn test_is_book_error_true_for_reconstruction() {
1557 let e = StreamError::BookReconstructionFailed { symbol: "BTC-USD".into(), reason: "missing snapshot".into() };
1558 assert!(e.is_book_error());
1559 }
1560
1561 #[test]
1562 fn test_is_book_error_true_for_sequence_gap() {
1563 let e = StreamError::SequenceGap { symbol: "BTC-USD".into(), expected: 5, got: 7 };
1564 assert!(e.is_book_error());
1565 }
1566
1567 #[test]
1568 fn test_is_book_error_false_for_parse_error() {
1569 let e = StreamError::ParseError { exchange: "binance".into(), reason: "bad json".into() };
1570 assert!(!e.is_book_error());
1571 }
1572
1573 #[test]
1574 fn test_is_sequence_error_true() {
1575 let e = StreamError::SequenceGap { symbol: "ETH-USD".into(), expected: 1, got: 3 };
1576 assert!(e.is_sequence_error());
1577 }
1578
1579 #[test]
1580 fn test_is_sequence_error_false_for_book_crossed() {
1581 let e = StreamError::BookCrossed { symbol: "BTC-USD".into(), bid: rust_decimal_macros::dec!(101), ask: rust_decimal_macros::dec!(100) };
1582 assert!(!e.is_sequence_error());
1583 assert!(e.is_book_error());
1584 }
1585
1586 #[test]
1589 fn test_severity_level_4_for_reconnect_exhausted() {
1590 let e = StreamError::ReconnectExhausted { url: "wss://x".into(), attempts: 5 };
1591 assert_eq!(e.severity_level(), 4);
1592 }
1593
1594 #[test]
1595 fn test_severity_level_3_for_connection_failed() {
1596 let e = StreamError::ConnectionFailed { url: "wss://x".into(), reason: "refused".into() };
1597 assert_eq!(e.severity_level(), 3);
1598 }
1599
1600 #[test]
1601 fn test_severity_level_2_for_stale_feed() {
1602 let e = StreamError::StaleFeed { feed_id: "BTC".into(), elapsed_ms: 10_000, threshold_ms: 5_000 };
1603 assert_eq!(e.severity_level(), 2);
1604 }
1605
1606 #[test]
1607 fn test_severity_level_1_for_parse_error() {
1608 let e = StreamError::ParseError { exchange: "binance".into(), reason: "bad".into() };
1609 assert_eq!(e.severity_level(), 1);
1610 }
1611
1612 #[test]
1613 fn test_severity_level_2_for_ring_buffer_empty() {
1614 assert_eq!(StreamError::RingBufferEmpty.severity_level(), 2);
1615 }
1616}