1use core::fmt;
34use std::sync::Arc;
35
36use crate::types::symbol::{ObjectId, SymbolId};
37use crate::types::{CancelReason, RegionId, TaskId};
38
39pub mod recovery;
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
43pub enum ErrorKind {
44 Cancelled,
47 CancelTimeout,
49
50 DeadlineExceeded,
53 PollQuotaExhausted,
55 CostQuotaExhausted,
57
58 ChannelClosed,
61 ChannelFull,
63 ChannelEmpty,
65
66 ObligationLeak,
69 ObligationAlreadyResolved,
71
72 RegionClosed,
75 TaskNotOwned,
77 AdmissionDenied,
79
80 InvalidEncodingParams,
83 DataTooLarge,
85 EncodingFailed,
87 CorruptedSymbol,
89
90 InsufficientSymbols,
93 DecodingFailed,
95 ObjectMismatch,
97 DuplicateSymbol,
99 ThresholdTimeout,
101
102 RoutingFailed,
105 DispatchFailed,
107 StreamEnded,
109 SinkRejected,
111 ConnectionLost,
113 ConnectionRefused,
115 ProtocolError,
117
118 RecoveryFailed,
121 LeaseExpired,
123 LeaseRenewalFailed,
125 CoordinationFailed,
127 QuorumNotReached,
129 NodeUnavailable,
131 PartitionDetected,
133
134 Internal,
137 InvalidStateTransition,
139
140 ConfigError,
143
144 User,
147}
148
149impl ErrorKind {
150 #[must_use]
152 pub const fn category(&self) -> ErrorCategory {
153 match self {
154 Self::Cancelled | Self::CancelTimeout => ErrorCategory::Cancellation,
155 Self::DeadlineExceeded | Self::PollQuotaExhausted | Self::CostQuotaExhausted => {
156 ErrorCategory::Budget
157 }
158 Self::ChannelClosed | Self::ChannelFull | Self::ChannelEmpty => ErrorCategory::Channel,
159 Self::ObligationLeak | Self::ObligationAlreadyResolved => ErrorCategory::Obligation,
160 Self::RegionClosed | Self::TaskNotOwned | Self::AdmissionDenied => {
161 ErrorCategory::Region
162 }
163 Self::InvalidEncodingParams
164 | Self::DataTooLarge
165 | Self::EncodingFailed
166 | Self::CorruptedSymbol => ErrorCategory::Encoding,
167 Self::InsufficientSymbols
168 | Self::DecodingFailed
169 | Self::ObjectMismatch
170 | Self::DuplicateSymbol
171 | Self::ThresholdTimeout => ErrorCategory::Decoding,
172 Self::RoutingFailed
173 | Self::DispatchFailed
174 | Self::StreamEnded
175 | Self::SinkRejected
176 | Self::ConnectionLost
177 | Self::ConnectionRefused
178 | Self::ProtocolError => ErrorCategory::Transport,
179 Self::RecoveryFailed
180 | Self::LeaseExpired
181 | Self::LeaseRenewalFailed
182 | Self::CoordinationFailed
183 | Self::QuorumNotReached
184 | Self::NodeUnavailable
185 | Self::PartitionDetected => ErrorCategory::Distributed,
186 Self::Internal | Self::InvalidStateTransition => ErrorCategory::Internal,
187 Self::ConfigError | Self::User => ErrorCategory::User,
188 }
189 }
190
191 #[must_use]
195 pub const fn recoverability(&self) -> Recoverability {
196 match self {
197 Self::ChannelFull
199 | Self::ChannelEmpty
200 | Self::AdmissionDenied
201 | Self::ConnectionLost
202 | Self::NodeUnavailable
203 | Self::QuorumNotReached
204 | Self::ThresholdTimeout
205 | Self::LeaseRenewalFailed => Recoverability::Transient,
206
207 Self::Cancelled
209 | Self::CancelTimeout
210 | Self::ChannelClosed
211 | Self::ObligationLeak
212 | Self::ObligationAlreadyResolved
213 | Self::RegionClosed
214 | Self::InvalidEncodingParams
215 | Self::DataTooLarge
216 | Self::ObjectMismatch
217 | Self::Internal
218 | Self::InvalidStateTransition
219 | Self::ProtocolError
220 | Self::ConnectionRefused
221 | Self::ConfigError => Recoverability::Permanent,
222
223 Self::DeadlineExceeded
225 | Self::PollQuotaExhausted
226 | Self::CostQuotaExhausted
227 | Self::TaskNotOwned
228 | Self::EncodingFailed
229 | Self::CorruptedSymbol
230 | Self::InsufficientSymbols
231 | Self::DecodingFailed
232 | Self::DuplicateSymbol
233 | Self::RoutingFailed
234 | Self::DispatchFailed
235 | Self::StreamEnded
236 | Self::SinkRejected
237 | Self::RecoveryFailed
238 | Self::LeaseExpired
239 | Self::CoordinationFailed
240 | Self::PartitionDetected
241 | Self::User => Recoverability::Unknown,
242 }
243 }
244
245 #[must_use]
247 pub const fn is_retryable(&self) -> bool {
248 matches!(self.recoverability(), Recoverability::Transient)
249 }
250
251 #[must_use]
256 pub const fn recovery_action(&self) -> RecoveryAction {
257 match self {
258 Self::ChannelFull | Self::ChannelEmpty => RecoveryAction::RetryImmediately,
260
261 Self::AdmissionDenied
263 | Self::ThresholdTimeout
264 | Self::QuorumNotReached
265 | Self::LeaseRenewalFailed => RecoveryAction::RetryWithBackoff(BackoffHint::DEFAULT),
266 Self::NodeUnavailable => RecoveryAction::RetryWithBackoff(BackoffHint::AGGRESSIVE),
267
268 Self::ConnectionLost | Self::StreamEnded => RecoveryAction::RetryWithNewConnection,
270
271 Self::Cancelled
273 | Self::CancelTimeout
274 | Self::DeadlineExceeded
275 | Self::PollQuotaExhausted
276 | Self::CostQuotaExhausted
277 | Self::ChannelClosed
278 | Self::RegionClosed
279 | Self::InvalidEncodingParams
280 | Self::DataTooLarge
281 | Self::ObjectMismatch
282 | Self::ConnectionRefused
283 | Self::ProtocolError
284 | Self::LeaseExpired
285 | Self::PartitionDetected
286 | Self::ConfigError => RecoveryAction::Propagate,
287
288 Self::ObligationLeak
290 | Self::ObligationAlreadyResolved
291 | Self::Internal
292 | Self::InvalidStateTransition => RecoveryAction::Escalate,
293
294 Self::TaskNotOwned
296 | Self::EncodingFailed
297 | Self::CorruptedSymbol
298 | Self::InsufficientSymbols
299 | Self::DecodingFailed
300 | Self::DuplicateSymbol
301 | Self::RoutingFailed
302 | Self::DispatchFailed
303 | Self::SinkRejected
304 | Self::RecoveryFailed
305 | Self::CoordinationFailed
306 | Self::User => RecoveryAction::Custom,
307 }
308 }
309}
310
311#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
316pub enum Recoverability {
317 Transient,
319 Permanent,
321 Unknown,
324}
325
326#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
328pub enum RecoveryAction {
329 RetryImmediately,
331 RetryWithBackoff(BackoffHint),
333 RetryWithNewConnection,
335 Propagate,
337 Escalate,
339 Custom,
341}
342
343#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
345pub struct BackoffHint {
346 pub initial_delay_ms: u32,
348 pub max_delay_ms: u32,
350 pub max_attempts: u8,
352}
353
354impl BackoffHint {
355 pub const DEFAULT: Self = Self {
357 initial_delay_ms: 100,
358 max_delay_ms: 30_000,
359 max_attempts: 5,
360 };
361
362 pub const AGGRESSIVE: Self = Self {
364 initial_delay_ms: 1_000,
365 max_delay_ms: 60_000,
366 max_attempts: 10,
367 };
368
369 pub const QUICK: Self = Self {
371 initial_delay_ms: 10,
372 max_delay_ms: 1_000,
373 max_attempts: 3,
374 };
375}
376
377impl Default for BackoffHint {
378 fn default() -> Self {
379 Self::DEFAULT
380 }
381}
382
383impl Recoverability {
384 #[must_use]
386 pub const fn should_retry(&self) -> bool {
387 matches!(self, Self::Transient)
388 }
389
390 #[must_use]
392 pub const fn is_permanent(&self) -> bool {
393 matches!(self, Self::Permanent)
394 }
395}
396
397#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
399pub enum ErrorCategory {
400 Cancellation,
402 Budget,
404 Channel,
406 Obligation,
408 Region,
410 Encoding,
412 Decoding,
414 Transport,
416 Distributed,
418 Internal,
420 User,
422}
423
424#[derive(Debug, Clone, Default, PartialEq, Eq)]
426pub struct ErrorContext {
427 pub task_id: Option<TaskId>,
429 pub region_id: Option<RegionId>,
431 pub object_id: Option<ObjectId>,
433 pub symbol_id: Option<SymbolId>,
435}
436
437#[derive(Debug, Clone)]
439pub struct Error {
440 kind: ErrorKind,
441 message: Option<String>,
442 source: Option<Arc<dyn std::error::Error + Send + Sync>>,
443 context: ErrorContext,
444}
445
446impl Error {
447 #[must_use]
449 pub const fn new(kind: ErrorKind) -> Self {
450 Self {
451 kind,
452 message: None,
453 source: None,
454 context: ErrorContext {
455 task_id: None,
456 region_id: None,
457 object_id: None,
458 symbol_id: None,
459 },
460 }
461 }
462
463 #[must_use]
465 pub const fn kind(&self) -> ErrorKind {
466 self.kind
467 }
468
469 #[must_use]
471 pub const fn is_cancelled(&self) -> bool {
472 matches!(self.kind, ErrorKind::Cancelled)
473 }
474
475 #[must_use]
477 pub const fn is_timeout(&self) -> bool {
478 matches!(
479 self.kind,
480 ErrorKind::DeadlineExceeded | ErrorKind::CancelTimeout
481 )
482 }
483
484 #[must_use]
486 pub fn with_message(mut self, msg: impl Into<String>) -> Self {
487 self.message = Some(msg.into());
488 self
489 }
490
491 #[must_use]
493 pub fn with_context(mut self, ctx: ErrorContext) -> Self {
494 self.context = ctx;
495 self
496 }
497
498 #[must_use]
500 pub fn with_source(mut self, source: impl std::error::Error + Send + Sync + 'static) -> Self {
501 self.source = Some(Arc::new(source));
502 self
503 }
504
505 #[must_use]
507 pub fn cancelled(reason: &CancelReason) -> Self {
508 Self::new(ErrorKind::Cancelled).with_message(reason.to_string())
509 }
510
511 #[must_use]
513 pub const fn category(&self) -> ErrorCategory {
514 self.kind.category()
515 }
516
517 #[must_use]
519 pub const fn recoverability(&self) -> Recoverability {
520 self.kind.recoverability()
521 }
522
523 #[must_use]
525 pub const fn is_retryable(&self) -> bool {
526 self.kind.is_retryable()
527 }
528
529 #[must_use]
531 pub const fn recovery_action(&self) -> RecoveryAction {
532 self.kind.recovery_action()
533 }
534
535 #[must_use]
537 pub fn message(&self) -> Option<&str> {
538 self.message.as_deref()
539 }
540
541 #[must_use]
543 pub fn context(&self) -> &ErrorContext {
544 &self.context
545 }
546
547 #[must_use]
549 pub const fn is_encoding_error(&self) -> bool {
550 matches!(self.kind.category(), ErrorCategory::Encoding)
551 }
552
553 #[must_use]
555 pub const fn is_decoding_error(&self) -> bool {
556 matches!(self.kind.category(), ErrorCategory::Decoding)
557 }
558
559 #[must_use]
561 pub const fn is_transport_error(&self) -> bool {
562 matches!(self.kind.category(), ErrorCategory::Transport)
563 }
564
565 #[must_use]
567 pub const fn is_distributed_error(&self) -> bool {
568 matches!(self.kind.category(), ErrorCategory::Distributed)
569 }
570
571 #[must_use]
573 pub const fn is_connection_error(&self) -> bool {
574 matches!(
575 self.kind,
576 ErrorKind::ConnectionLost | ErrorKind::ConnectionRefused
577 )
578 }
579
580 #[must_use]
582 pub fn invalid_encoding_params(detail: impl Into<String>) -> Self {
583 Self::new(ErrorKind::InvalidEncodingParams).with_message(detail)
584 }
585
586 #[must_use]
588 pub fn data_too_large(actual: u64, max: u64) -> Self {
589 Self::new(ErrorKind::DataTooLarge)
590 .with_message(format!("data size {actual} exceeds maximum {max}"))
591 }
592
593 #[must_use]
595 pub fn insufficient_symbols(received: u32, needed: u32) -> Self {
596 Self::new(ErrorKind::InsufficientSymbols).with_message(format!(
597 "received {received} symbols, need at least {needed}"
598 ))
599 }
600
601 #[must_use]
603 pub fn decoding_failed(reason: impl Into<String>) -> Self {
604 Self::new(ErrorKind::DecodingFailed).with_message(reason)
605 }
606
607 #[must_use]
609 pub fn routing_failed(destination: impl Into<String>) -> Self {
610 Self::new(ErrorKind::RoutingFailed)
611 .with_message(format!("no route to destination: {}", destination.into()))
612 }
613
614 #[must_use]
616 pub fn lease_expired(lease_id: impl Into<String>) -> Self {
617 Self::new(ErrorKind::LeaseExpired)
618 .with_message(format!("lease expired: {}", lease_id.into()))
619 }
620
621 #[must_use]
623 pub fn quorum_not_reached(achieved: u32, needed: u32) -> Self {
624 Self::new(ErrorKind::QuorumNotReached)
625 .with_message(format!("achieved {achieved} of {needed} required"))
626 }
627
628 #[must_use]
630 pub fn node_unavailable(node_id: impl Into<String>) -> Self {
631 Self::new(ErrorKind::NodeUnavailable)
632 .with_message(format!("node unavailable: {}", node_id.into()))
633 }
634
635 #[must_use]
637 pub fn internal(detail: impl Into<String>) -> Self {
638 Self::new(ErrorKind::Internal).with_message(detail)
639 }
640}
641
642impl fmt::Display for Error {
643 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
644 write!(f, "{:?}", self.kind)?;
645 if let Some(msg) = &self.message {
646 write!(f, ": {msg}")?;
647 }
648 Ok(())
649 }
650}
651
652impl std::error::Error for Error {
653 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
654 self.source.as_ref().map(|e| e.as_ref() as _)
655 }
656}
657
658#[derive(Debug, Clone, PartialEq, Eq)]
660pub struct Cancelled {
661 pub reason: CancelReason,
663}
664
665impl From<Cancelled> for Error {
666 fn from(c: Cancelled) -> Self {
667 Self::cancelled(&c.reason)
668 }
669}
670
671#[derive(Debug)]
673pub enum SendError<T> {
674 Disconnected(T),
676 Full(T),
678 Cancelled(T),
680}
681
682#[derive(Debug, Clone, Copy, PartialEq, Eq)]
684pub enum RecvError {
685 Disconnected,
687 Empty,
689 Cancelled,
691}
692
693#[derive(Debug, Clone, Copy, PartialEq, Eq)]
695pub enum AcquireError {
696 Closed,
698}
699
700impl From<RecvError> for Error {
701 fn from(e: RecvError) -> Self {
702 match e {
703 RecvError::Disconnected => Self::new(ErrorKind::ChannelClosed),
704 RecvError::Empty => Self::new(ErrorKind::ChannelEmpty),
705 RecvError::Cancelled => Self::new(ErrorKind::Cancelled),
706 }
707 }
708}
709
710impl<T> From<SendError<T>> for Error {
711 fn from(e: SendError<T>) -> Self {
712 match e {
713 SendError::Disconnected(_) => Self::new(ErrorKind::ChannelClosed),
714 SendError::Full(_) => Self::new(ErrorKind::ChannelFull),
715 SendError::Cancelled(_) => Self::new(ErrorKind::Cancelled),
716 }
717 }
718}
719
720#[allow(clippy::result_large_err)]
722pub trait ResultExt<T> {
723 fn context(self, msg: impl Into<String>) -> Result<T>;
725 fn with_context<F: FnOnce() -> String>(self, f: F) -> Result<T>;
727}
728
729impl<T, E: Into<Error>> ResultExt<T> for core::result::Result<T, E> {
730 fn context(self, msg: impl Into<String>) -> Result<T> {
731 self.map_err(|e| e.into().with_message(msg))
732 }
733
734 fn with_context<F: FnOnce() -> String>(self, f: F) -> Result<T> {
735 self.map_err(|e| e.into().with_message(f()))
736 }
737}
738
739#[allow(clippy::result_large_err)]
741pub type Result<T> = core::result::Result<T, Error>;
742
743#[cfg(test)]
744mod tests {
745 use super::*;
746 use std::error::Error as _;
747
748 #[derive(Debug)]
749 struct Underlying;
750
751 impl fmt::Display for Underlying {
752 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
753 write!(f, "underlying")
754 }
755 }
756
757 impl std::error::Error for Underlying {}
758
759 #[test]
760 fn display_without_message() {
761 let err = Error::new(ErrorKind::Internal);
762 assert_eq!(err.to_string(), "Internal");
763 }
764
765 #[test]
766 fn display_with_message() {
767 let err = Error::new(ErrorKind::ChannelEmpty).with_message("no messages");
768 assert_eq!(err.to_string(), "ChannelEmpty: no messages");
769 }
770
771 #[test]
772 fn source_chain_is_exposed() {
773 let err = Error::new(ErrorKind::User)
774 .with_message("outer")
775 .with_source(Underlying);
776 let source = err.source().expect("source missing");
777 assert_eq!(source.to_string(), "underlying");
778 }
779
780 #[test]
781 fn from_recv_error() {
782 let disconnected: Error = RecvError::Disconnected.into();
783 assert_eq!(disconnected.kind(), ErrorKind::ChannelClosed);
784
785 let empty: Error = RecvError::Empty.into();
786 assert_eq!(empty.kind(), ErrorKind::ChannelEmpty);
787 }
788
789 #[test]
790 fn from_send_error() {
791 let disconnected: Error = SendError::Disconnected(()).into();
792 assert_eq!(disconnected.kind(), ErrorKind::ChannelClosed);
793
794 let full: Error = SendError::Full(()).into();
795 assert_eq!(full.kind(), ErrorKind::ChannelFull);
796 }
797
798 #[test]
799 fn result_ext_adds_message() {
800 let res: core::result::Result<(), RecvError> = Err(RecvError::Empty);
801 let err = res.context("recv failed").expect_err("expected err");
802 assert_eq!(err.kind(), ErrorKind::ChannelEmpty);
803 assert_eq!(err.to_string(), "ChannelEmpty: recv failed");
804 }
805
806 #[test]
807 fn predicates_match_kind() {
808 let cancel = Error::new(ErrorKind::Cancelled);
809 assert!(cancel.is_cancelled());
810 assert!(!cancel.is_timeout());
811
812 let timeout = Error::new(ErrorKind::DeadlineExceeded);
813 assert!(!timeout.is_cancelled());
814 assert!(timeout.is_timeout());
815 }
816
817 #[test]
818 fn recovery_action_backoff() {
819 let action = ErrorKind::ThresholdTimeout.recovery_action();
820 assert!(matches!(action, RecoveryAction::RetryWithBackoff(_)));
821 }
822
823 #[test]
824 fn error_context_default() {
825 let err = Error::new(ErrorKind::Internal);
826 assert!(err.context().task_id.is_none());
827 }
828
829 #[test]
830 fn error_with_full_context() {
831 use crate::util::ArenaIndex;
832
833 let task_id = TaskId::from_arena(ArenaIndex::new(1, 0));
834 let region_id = RegionId::from_arena(ArenaIndex::new(2, 0));
835 let object_id = ObjectId::new_for_test(123);
836 let symbol_id = SymbolId::new_for_test(123, 0, 1);
837
838 let ctx = ErrorContext {
839 task_id: Some(task_id),
840 region_id: Some(region_id),
841 object_id: Some(object_id),
842 symbol_id: Some(symbol_id),
843 };
844
845 let err = Error::new(ErrorKind::Internal).with_context(ctx);
846
847 assert_eq!(err.context().task_id, Some(task_id));
848 assert_eq!(err.context().region_id, Some(region_id));
849 assert_eq!(err.context().object_id, Some(object_id));
850 assert_eq!(err.context().symbol_id, Some(symbol_id));
851 }
852
853 #[test]
856 fn error_kind_category_coverage() {
857 use ErrorCategory::*;
858 let cases: &[(ErrorKind, ErrorCategory)] = &[
859 (ErrorKind::Cancelled, Cancellation),
860 (ErrorKind::CancelTimeout, Cancellation),
861 (ErrorKind::DeadlineExceeded, Budget),
862 (ErrorKind::PollQuotaExhausted, Budget),
863 (ErrorKind::CostQuotaExhausted, Budget),
864 (ErrorKind::ChannelClosed, Channel),
865 (ErrorKind::ChannelFull, Channel),
866 (ErrorKind::ChannelEmpty, Channel),
867 (ErrorKind::ObligationLeak, Obligation),
868 (ErrorKind::ObligationAlreadyResolved, Obligation),
869 (ErrorKind::RegionClosed, Region),
870 (ErrorKind::TaskNotOwned, Region),
871 (ErrorKind::AdmissionDenied, Region),
872 (ErrorKind::InvalidEncodingParams, Encoding),
873 (ErrorKind::DataTooLarge, Encoding),
874 (ErrorKind::EncodingFailed, Encoding),
875 (ErrorKind::CorruptedSymbol, Encoding),
876 (ErrorKind::InsufficientSymbols, Decoding),
877 (ErrorKind::DecodingFailed, Decoding),
878 (ErrorKind::ObjectMismatch, Decoding),
879 (ErrorKind::DuplicateSymbol, Decoding),
880 (ErrorKind::ThresholdTimeout, Decoding),
881 (ErrorKind::RoutingFailed, Transport),
882 (ErrorKind::DispatchFailed, Transport),
883 (ErrorKind::StreamEnded, Transport),
884 (ErrorKind::SinkRejected, Transport),
885 (ErrorKind::ConnectionLost, Transport),
886 (ErrorKind::ConnectionRefused, Transport),
887 (ErrorKind::ProtocolError, Transport),
888 (ErrorKind::RecoveryFailed, Distributed),
889 (ErrorKind::LeaseExpired, Distributed),
890 (ErrorKind::LeaseRenewalFailed, Distributed),
891 (ErrorKind::CoordinationFailed, Distributed),
892 (ErrorKind::QuorumNotReached, Distributed),
893 (ErrorKind::NodeUnavailable, Distributed),
894 (ErrorKind::PartitionDetected, Distributed),
895 (ErrorKind::Internal, Internal),
896 (ErrorKind::InvalidStateTransition, Internal),
897 (ErrorKind::ConfigError, User),
898 (ErrorKind::User, User),
899 ];
900 for (kind, expected) in cases {
901 assert_eq!(kind.category(), *expected, "{kind:?}");
902 }
903 }
904
905 #[test]
906 fn error_kind_recoverability_classification() {
907 for kind in [
909 ErrorKind::ChannelFull,
910 ErrorKind::ChannelEmpty,
911 ErrorKind::AdmissionDenied,
912 ErrorKind::ConnectionLost,
913 ErrorKind::NodeUnavailable,
914 ErrorKind::QuorumNotReached,
915 ErrorKind::ThresholdTimeout,
916 ErrorKind::LeaseRenewalFailed,
917 ] {
918 assert_eq!(kind.recoverability(), Recoverability::Transient, "{kind:?}");
919 assert!(kind.is_retryable(), "{kind:?} should be retryable");
920 }
921
922 for kind in [
924 ErrorKind::Cancelled,
925 ErrorKind::ChannelClosed,
926 ErrorKind::ObligationLeak,
927 ErrorKind::Internal,
928 ErrorKind::ConnectionRefused,
929 ErrorKind::ConfigError,
930 ] {
931 assert_eq!(kind.recoverability(), Recoverability::Permanent, "{kind:?}");
932 assert!(!kind.is_retryable(), "{kind:?} should not be retryable");
933 }
934
935 for kind in [
937 ErrorKind::DeadlineExceeded,
938 ErrorKind::EncodingFailed,
939 ErrorKind::CorruptedSymbol,
940 ErrorKind::User,
941 ] {
942 assert_eq!(kind.recoverability(), Recoverability::Unknown, "{kind:?}");
943 assert!(!kind.is_retryable(), "{kind:?} Unknown is not retryable");
944 }
945 }
946
947 #[test]
948 fn recoverability_predicates() {
949 assert!(Recoverability::Transient.should_retry());
950 assert!(!Recoverability::Transient.is_permanent());
951
952 assert!(!Recoverability::Permanent.should_retry());
953 assert!(Recoverability::Permanent.is_permanent());
954
955 assert!(!Recoverability::Unknown.should_retry());
956 assert!(!Recoverability::Unknown.is_permanent());
957 }
958
959 #[test]
960 fn recovery_action_variants() {
961 assert!(matches!(
962 ErrorKind::ChannelFull.recovery_action(),
963 RecoveryAction::RetryImmediately
964 ));
965 assert!(matches!(
966 ErrorKind::AdmissionDenied.recovery_action(),
967 RecoveryAction::RetryWithBackoff(_)
968 ));
969 assert!(matches!(
970 ErrorKind::NodeUnavailable.recovery_action(),
971 RecoveryAction::RetryWithBackoff(_)
972 ));
973 assert!(matches!(
974 ErrorKind::ConnectionLost.recovery_action(),
975 RecoveryAction::RetryWithNewConnection
976 ));
977 assert!(matches!(
978 ErrorKind::Cancelled.recovery_action(),
979 RecoveryAction::Propagate
980 ));
981 assert!(matches!(
982 ErrorKind::ObligationLeak.recovery_action(),
983 RecoveryAction::Escalate
984 ));
985 assert!(matches!(
986 ErrorKind::User.recovery_action(),
987 RecoveryAction::Custom
988 ));
989 }
990
991 #[test]
992 fn backoff_hint_constants() {
993 let d = BackoffHint::DEFAULT;
994 assert_eq!(d.initial_delay_ms, 100);
995 assert_eq!(d.max_delay_ms, 30_000);
996 assert_eq!(d.max_attempts, 5);
997
998 let a = BackoffHint::AGGRESSIVE;
999 assert!(a.initial_delay_ms > d.initial_delay_ms);
1000 assert!(a.max_attempts > d.max_attempts);
1001
1002 let q = BackoffHint::QUICK;
1003 assert!(q.initial_delay_ms < d.initial_delay_ms);
1004 assert!(q.max_attempts < d.max_attempts);
1005
1006 assert_eq!(BackoffHint::default(), BackoffHint::DEFAULT);
1007 }
1008
1009 #[test]
1012 fn error_data_too_large() {
1013 let err = Error::data_too_large(2000, 1000);
1014 assert_eq!(err.kind(), ErrorKind::DataTooLarge);
1015 let msg = err.to_string();
1016 assert!(msg.contains("2000"), "{msg}");
1017 assert!(msg.contains("1000"), "{msg}");
1018 }
1019
1020 #[test]
1021 fn error_insufficient_symbols() {
1022 let err = Error::insufficient_symbols(5, 10);
1023 assert_eq!(err.kind(), ErrorKind::InsufficientSymbols);
1024 let msg = err.to_string();
1025 assert!(msg.contains('5'), "{msg}");
1026 assert!(msg.contains("10"), "{msg}");
1027 }
1028
1029 #[test]
1030 fn error_routing_failed() {
1031 let err = Error::routing_failed("node-7");
1032 assert_eq!(err.kind(), ErrorKind::RoutingFailed);
1033 assert!(err.to_string().contains("node-7"));
1034 }
1035
1036 #[test]
1037 fn error_lease_expired() {
1038 let err = Error::lease_expired("lease-42");
1039 assert_eq!(err.kind(), ErrorKind::LeaseExpired);
1040 assert!(err.to_string().contains("lease-42"));
1041 }
1042
1043 #[test]
1044 fn error_quorum_not_reached() {
1045 let err = Error::quorum_not_reached(2, 3);
1046 assert_eq!(err.kind(), ErrorKind::QuorumNotReached);
1047 let msg = err.to_string();
1048 assert!(msg.contains('2'), "{msg}");
1049 assert!(msg.contains('3'), "{msg}");
1050 }
1051
1052 #[test]
1053 fn error_node_unavailable() {
1054 let err = Error::node_unavailable("node-1");
1055 assert_eq!(err.kind(), ErrorKind::NodeUnavailable);
1056 assert!(err.to_string().contains("node-1"));
1057 }
1058
1059 #[test]
1060 fn error_internal() {
1061 let err = Error::internal("bug found");
1062 assert_eq!(err.kind(), ErrorKind::Internal);
1063 assert!(err.to_string().contains("bug found"));
1064 }
1065
1066 #[test]
1069 fn error_is_predicates() {
1070 assert!(Error::new(ErrorKind::EncodingFailed).is_encoding_error());
1071 assert!(!Error::new(ErrorKind::DecodingFailed).is_encoding_error());
1072
1073 assert!(Error::new(ErrorKind::InsufficientSymbols).is_decoding_error());
1074 assert!(!Error::new(ErrorKind::EncodingFailed).is_decoding_error());
1075
1076 assert!(Error::new(ErrorKind::RoutingFailed).is_transport_error());
1077 assert!(!Error::new(ErrorKind::Internal).is_transport_error());
1078
1079 assert!(Error::new(ErrorKind::QuorumNotReached).is_distributed_error());
1080 assert!(!Error::new(ErrorKind::ChannelFull).is_distributed_error());
1081
1082 assert!(Error::new(ErrorKind::ConnectionLost).is_connection_error());
1083 assert!(Error::new(ErrorKind::ConnectionRefused).is_connection_error());
1084 assert!(!Error::new(ErrorKind::RoutingFailed).is_connection_error());
1085 }
1086
1087 #[test]
1088 fn error_cancel_timeout_is_timeout() {
1089 assert!(Error::new(ErrorKind::CancelTimeout).is_timeout());
1090 assert!(!Error::new(ErrorKind::CancelTimeout).is_cancelled());
1091 }
1092
1093 #[test]
1096 fn recv_error_cancelled_conversion() {
1097 let err: Error = RecvError::Cancelled.into();
1098 assert_eq!(err.kind(), ErrorKind::Cancelled);
1099 }
1100
1101 #[test]
1102 fn send_error_cancelled_conversion() {
1103 let err: Error = SendError::Cancelled(42u32).into();
1104 assert_eq!(err.kind(), ErrorKind::Cancelled);
1105 }
1106
1107 #[test]
1108 fn cancelled_struct_into_error() {
1109 let reason = CancelReason::user("test cancel");
1110 let cancelled = Cancelled { reason };
1111 let err: Error = cancelled.into();
1112 assert_eq!(err.kind(), ErrorKind::Cancelled);
1113 assert!(err.to_string().contains("Cancelled"));
1114 }
1115
1116 #[test]
1117 fn result_ext_with_context_lazy() {
1118 let res: core::result::Result<(), RecvError> = Err(RecvError::Empty);
1119 let err = res
1120 .with_context(|| format!("lazy {}", "context"))
1121 .expect_err("expected err");
1122 assert_eq!(err.kind(), ErrorKind::ChannelEmpty);
1123 assert!(err.to_string().contains("lazy context"));
1124 }
1125
1126 #[test]
1129 fn error_category_debug() {
1130 for cat in [
1131 ErrorCategory::Cancellation,
1132 ErrorCategory::Budget,
1133 ErrorCategory::Channel,
1134 ErrorCategory::Obligation,
1135 ErrorCategory::Region,
1136 ErrorCategory::Encoding,
1137 ErrorCategory::Decoding,
1138 ErrorCategory::Transport,
1139 ErrorCategory::Distributed,
1140 ErrorCategory::Internal,
1141 ErrorCategory::User,
1142 ] {
1143 let dbg = format!("{cat:?}");
1144 assert!(!dbg.is_empty());
1145 }
1146 }
1147
1148 #[test]
1149 fn acquire_error_debug_eq() {
1150 let err = AcquireError::Closed;
1151 let dbg = format!("{err:?}");
1152 assert!(dbg.contains("Closed"), "{dbg}");
1153 assert_eq!(err, AcquireError::Closed);
1154 }
1155
1156 #[test]
1157 fn error_clone() {
1158 let err = Error::new(ErrorKind::Internal).with_message("clone me");
1159 let cloned = err.clone();
1160 assert_eq!(cloned.kind(), ErrorKind::Internal);
1161 assert_eq!(cloned.to_string(), err.to_string());
1162 }
1163
1164 #[test]
1165 fn error_no_message() {
1166 let err = Error::new(ErrorKind::User);
1167 assert!(err.message().is_none());
1168 }
1169
1170 #[test]
1171 fn error_source_none_without_with_source() {
1172 let err = Error::new(ErrorKind::User);
1173 assert!(err.source().is_none());
1174 }
1175
1176 #[test]
1179 fn error_kind_copy_hash() {
1180 use std::collections::HashSet;
1181 let kind = ErrorKind::Internal;
1182 let copied = kind;
1183 assert_eq!(copied, ErrorKind::Internal);
1184
1185 let mut set = HashSet::new();
1186 set.insert(ErrorKind::Cancelled);
1187 set.insert(ErrorKind::DeadlineExceeded);
1188 set.insert(ErrorKind::Cancelled); assert_eq!(set.len(), 2);
1190 }
1191
1192 #[test]
1193 fn recoverability_copy_hash_eq() {
1194 use std::collections::HashSet;
1195 let r = Recoverability::Transient;
1196 let copied = r;
1197 assert_eq!(copied, Recoverability::Transient);
1198 assert_ne!(r, Recoverability::Permanent);
1199
1200 let mut set = HashSet::new();
1201 set.insert(Recoverability::Transient);
1202 set.insert(Recoverability::Permanent);
1203 set.insert(Recoverability::Unknown);
1204 assert_eq!(set.len(), 3);
1205 }
1206
1207 #[test]
1208 fn recovery_action_copy_hash() {
1209 use std::collections::HashSet;
1210 let action = RecoveryAction::Propagate;
1211 let copied = action;
1212 assert_eq!(copied, RecoveryAction::Propagate);
1213
1214 let mut set = HashSet::new();
1215 set.insert(RecoveryAction::RetryImmediately);
1216 set.insert(RecoveryAction::Propagate);
1217 set.insert(RecoveryAction::Escalate);
1218 set.insert(RecoveryAction::Custom);
1219 assert_eq!(set.len(), 4);
1220 }
1221
1222 #[test]
1223 fn error_category_copy_clone_hash() {
1224 use std::collections::HashSet;
1225 let cat = ErrorCategory::Transport;
1226 let copied = cat;
1227 let cloned = cat;
1228 assert_eq!(copied, cloned);
1229
1230 let mut set = HashSet::new();
1231 set.insert(ErrorCategory::Cancellation);
1232 set.insert(ErrorCategory::Budget);
1233 set.insert(ErrorCategory::Channel);
1234 assert_eq!(set.len(), 3);
1235 }
1236
1237 #[test]
1238 fn backoff_hint_copy_hash_eq() {
1239 use std::collections::HashSet;
1240 let hint = BackoffHint::DEFAULT;
1241 let copied = hint;
1242 assert_eq!(copied, BackoffHint::DEFAULT);
1243 assert_ne!(hint, BackoffHint::AGGRESSIVE);
1244
1245 let mut set = HashSet::new();
1246 set.insert(BackoffHint::DEFAULT);
1247 set.insert(BackoffHint::AGGRESSIVE);
1248 set.insert(BackoffHint::QUICK);
1249 assert_eq!(set.len(), 3);
1250 }
1251
1252 #[test]
1253 fn recv_error_debug_clone_copy() {
1254 let err = RecvError::Disconnected;
1255 let dbg = format!("{err:?}");
1256 assert!(dbg.contains("Disconnected"));
1257
1258 let copied = err;
1259 assert_eq!(copied, RecvError::Disconnected);
1260
1261 let cloned = err;
1262 assert_eq!(cloned, err);
1263 }
1264
1265 #[test]
1266 fn cancelled_clone_eq() {
1267 let c = Cancelled {
1268 reason: CancelReason::user("test"),
1269 };
1270 let dbg = format!("{c:?}");
1271 assert!(dbg.contains("Cancelled"));
1272
1273 let cloned = c.clone();
1274 assert_eq!(cloned, c);
1275 }
1276}