1use core::fmt;
34use std::sync::Arc;
35use std::sync::atomic::{AtomicU64, Ordering};
36
37use crate::observability::SpanId;
38use crate::types::symbol::{ObjectId, SymbolId};
39use crate::types::{CancelReason, RegionId, TaskId};
40
41pub mod recovery;
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
45pub enum ErrorKind {
46 Cancelled,
49 CancelTimeout,
51
52 DeadlineExceeded,
55 PollQuotaExhausted,
57 CostQuotaExhausted,
59
60 ChannelClosed,
63 ChannelFull,
65 ChannelEmpty,
67
68 ObligationLeak,
71 ObligationAlreadyResolved,
73 RegionFinalized,
75
76 RegionClosed,
79 TaskNotOwned,
81 AdmissionDenied,
83
84 InvalidEncodingParams,
87 DataTooLarge,
89 EncodingFailed,
91 CorruptedSymbol,
93
94 InsufficientSymbols,
97 DecodingFailed,
99 ObjectMismatch,
101 DuplicateSymbol,
103 ThresholdTimeout,
105
106 RoutingFailed,
109 DispatchFailed,
111 StreamEnded,
113 SinkRejected,
115 ConnectionLost,
117 ConnectionRefused,
119 ProtocolError,
121
122 RecoveryFailed,
125 LeaseExpired,
127 LeaseRenewalFailed,
129 CoordinationFailed,
131 QuorumNotReached,
133 NodeUnavailable,
135 PartitionDetected,
137
138 Internal,
141 InvalidStateTransition,
143
144 ConfigError,
147
148 User,
151}
152
153impl ErrorKind {
154 #[must_use]
156 #[inline]
157 pub const fn category(&self) -> ErrorCategory {
158 match self {
159 Self::Cancelled | Self::CancelTimeout => ErrorCategory::Cancellation,
160 Self::DeadlineExceeded | Self::PollQuotaExhausted | Self::CostQuotaExhausted => {
161 ErrorCategory::Budget
162 }
163 Self::ChannelClosed | Self::ChannelFull | Self::ChannelEmpty => ErrorCategory::Channel,
164 Self::ObligationLeak | Self::ObligationAlreadyResolved | Self::RegionFinalized => {
165 ErrorCategory::Obligation
166 }
167 Self::RegionClosed | Self::TaskNotOwned | Self::AdmissionDenied => {
168 ErrorCategory::Region
169 }
170 Self::InvalidEncodingParams
171 | Self::DataTooLarge
172 | Self::EncodingFailed
173 | Self::CorruptedSymbol => ErrorCategory::Encoding,
174 Self::InsufficientSymbols
175 | Self::DecodingFailed
176 | Self::ObjectMismatch
177 | Self::DuplicateSymbol
178 | Self::ThresholdTimeout => ErrorCategory::Decoding,
179 Self::RoutingFailed
180 | Self::DispatchFailed
181 | Self::StreamEnded
182 | Self::SinkRejected
183 | Self::ConnectionLost
184 | Self::ConnectionRefused
185 | Self::ProtocolError => ErrorCategory::Transport,
186 Self::RecoveryFailed
187 | Self::LeaseExpired
188 | Self::LeaseRenewalFailed
189 | Self::CoordinationFailed
190 | Self::QuorumNotReached
191 | Self::NodeUnavailable
192 | Self::PartitionDetected => ErrorCategory::Distributed,
193 Self::Internal | Self::InvalidStateTransition => ErrorCategory::Internal,
194 Self::ConfigError | Self::User => ErrorCategory::User,
195 }
196 }
197
198 #[must_use]
202 #[inline]
203 pub const fn recoverability(&self) -> Recoverability {
204 match self {
205 Self::ChannelFull
207 | Self::ChannelEmpty
208 | Self::AdmissionDenied
209 | Self::ConnectionLost
210 | Self::NodeUnavailable
211 | Self::QuorumNotReached
212 | Self::ThresholdTimeout
213 | Self::LeaseRenewalFailed => Recoverability::Transient,
214
215 Self::Cancelled
217 | Self::CancelTimeout
218 | Self::ChannelClosed
219 | Self::ObligationLeak
220 | Self::ObligationAlreadyResolved
221 | Self::RegionFinalized
222 | Self::RegionClosed
223 | Self::InvalidEncodingParams
224 | Self::DataTooLarge
225 | Self::ObjectMismatch
226 | Self::Internal
227 | Self::InvalidStateTransition
228 | Self::ProtocolError
229 | Self::ConnectionRefused
230 | Self::ConfigError => Recoverability::Permanent,
231
232 Self::DeadlineExceeded
234 | Self::PollQuotaExhausted
235 | Self::CostQuotaExhausted
236 | Self::TaskNotOwned
237 | Self::EncodingFailed
238 | Self::CorruptedSymbol
239 | Self::InsufficientSymbols
240 | Self::DecodingFailed
241 | Self::DuplicateSymbol
242 | Self::RoutingFailed
243 | Self::DispatchFailed
244 | Self::StreamEnded
245 | Self::SinkRejected
246 | Self::RecoveryFailed
247 | Self::LeaseExpired
248 | Self::CoordinationFailed
249 | Self::PartitionDetected
250 | Self::User => Recoverability::Unknown,
251 }
252 }
253
254 #[must_use]
256 #[inline]
257 pub const fn is_retryable(&self) -> bool {
258 matches!(self.recoverability(), Recoverability::Transient)
259 }
260
261 #[must_use]
266 #[inline]
267 pub const fn recovery_action(&self) -> RecoveryAction {
268 match self {
269 Self::ChannelFull | Self::ChannelEmpty => RecoveryAction::RetryImmediately,
271
272 Self::AdmissionDenied
274 | Self::ThresholdTimeout
275 | Self::QuorumNotReached
276 | Self::LeaseRenewalFailed => RecoveryAction::RetryWithBackoff(BackoffHint::DEFAULT),
277 Self::NodeUnavailable => RecoveryAction::RetryWithBackoff(BackoffHint::AGGRESSIVE),
278
279 Self::ConnectionLost | Self::StreamEnded => RecoveryAction::RetryWithNewConnection,
281
282 Self::Cancelled
284 | Self::CancelTimeout
285 | Self::DeadlineExceeded
286 | Self::PollQuotaExhausted
287 | Self::CostQuotaExhausted
288 | Self::ChannelClosed
289 | Self::RegionClosed
290 | Self::InvalidEncodingParams
291 | Self::DataTooLarge
292 | Self::ObjectMismatch
293 | Self::ConnectionRefused
294 | Self::ProtocolError
295 | Self::LeaseExpired
296 | Self::PartitionDetected
297 | Self::ConfigError => RecoveryAction::Propagate,
298
299 Self::ObligationLeak
301 | Self::ObligationAlreadyResolved
302 | Self::RegionFinalized
303 | Self::Internal
304 | Self::InvalidStateTransition => RecoveryAction::Escalate,
305
306 Self::TaskNotOwned
308 | Self::EncodingFailed
309 | Self::CorruptedSymbol
310 | Self::InsufficientSymbols
311 | Self::DecodingFailed
312 | Self::DuplicateSymbol
313 | Self::RoutingFailed
314 | Self::DispatchFailed
315 | Self::SinkRejected
316 | Self::RecoveryFailed
317 | Self::CoordinationFailed
318 | Self::User => RecoveryAction::Custom,
319 }
320 }
321}
322
323#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
328pub enum Recoverability {
329 Transient,
331 Permanent,
333 Unknown,
336}
337
338#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
340pub enum RecoveryAction {
341 RetryImmediately,
343 RetryWithBackoff(BackoffHint),
345 RetryWithNewConnection,
347 Propagate,
349 Escalate,
351 Custom,
353}
354
355#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
357pub struct BackoffHint {
358 pub initial_delay_ms: u32,
360 pub max_delay_ms: u32,
362 pub max_attempts: u8,
364}
365
366impl BackoffHint {
367 pub const DEFAULT: Self = Self {
369 initial_delay_ms: 100,
370 max_delay_ms: 30_000,
371 max_attempts: 5,
372 };
373
374 pub const AGGRESSIVE: Self = Self {
376 initial_delay_ms: 1_000,
377 max_delay_ms: 60_000,
378 max_attempts: 10,
379 };
380
381 pub const QUICK: Self = Self {
383 initial_delay_ms: 10,
384 max_delay_ms: 1_000,
385 max_attempts: 3,
386 };
387}
388
389impl Default for BackoffHint {
390 #[inline]
391 fn default() -> Self {
392 Self::DEFAULT
393 }
394}
395
396impl Recoverability {
397 #[must_use]
399 #[inline]
400 pub const fn should_retry(&self) -> bool {
401 matches!(self, Self::Transient)
402 }
403
404 #[must_use]
406 #[inline]
407 pub const fn is_permanent(&self) -> bool {
408 matches!(self, Self::Permanent)
409 }
410}
411
412#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
414pub enum ErrorCategory {
415 Cancellation,
417 Budget,
419 Channel,
421 Obligation,
423 Region,
425 Encoding,
427 Decoding,
429 Transport,
431 Distributed,
433 Internal,
435 User,
437}
438
439#[derive(Debug, Clone, Default, PartialEq, Eq)]
441pub struct ErrorContext {
442 pub task_id: Option<TaskId>,
444 pub region_id: Option<RegionId>,
446 pub object_id: Option<ObjectId>,
448 pub symbol_id: Option<SymbolId>,
450 pub correlation_id: Option<u64>,
452 pub causal_chain: Vec<u64>,
454 pub span_id: Option<crate::observability::SpanId>,
456 pub parent_span_id: Option<crate::observability::SpanId>,
458 pub async_stack: Vec<String>,
460}
461
462impl ErrorContext {
463 #[must_use]
465 pub fn new() -> Self {
466 static NEXT_CORRELATION_ID: AtomicU64 = AtomicU64::new(1);
467 Self {
468 correlation_id: Some(NEXT_CORRELATION_ID.fetch_add(1, Ordering::Relaxed)),
469 ..Self::default()
470 }
471 }
472
473 #[must_use]
475 pub fn from_diagnostic_context(ctx: &crate::observability::DiagnosticContext) -> Self {
476 let mut error_ctx = Self::new();
477 error_ctx.task_id = ctx.task_id();
478 error_ctx.region_id = ctx.region_id();
479 error_ctx.span_id = ctx.span_id();
480 error_ctx.parent_span_id = ctx.parent_span_id();
481 error_ctx
482 }
483
484 #[must_use]
486 pub fn derive_child(&self, operation: &str) -> Self {
487 static NEXT_CORRELATION_ID: AtomicU64 = AtomicU64::new(1);
488 let child_correlation_id = NEXT_CORRELATION_ID.fetch_add(1, Ordering::Relaxed);
489
490 let mut causal_chain = self.causal_chain.clone();
491 if let Some(parent_id) = self.correlation_id {
492 causal_chain.push(parent_id);
493 }
494
495 let mut async_stack = self.async_stack.clone();
496 async_stack.push(operation.to_string());
497
498 Self {
499 task_id: self.task_id,
500 region_id: self.region_id,
501 object_id: self.object_id,
502 symbol_id: self.symbol_id,
503 correlation_id: Some(child_correlation_id),
504 causal_chain,
505 span_id: Some(SpanId::new()), parent_span_id: self.span_id,
507 async_stack,
508 }
509 }
510
511 #[must_use]
513 pub fn with_operation(mut self, operation: &str) -> Self {
514 self.async_stack.push(operation.to_string());
515 self
516 }
517
518 #[must_use]
520 pub fn with_span_context(mut self, span_id: SpanId, parent_span_id: Option<SpanId>) -> Self {
521 self.span_id = Some(span_id);
522 self.parent_span_id = parent_span_id;
523 self
524 }
525
526 #[must_use]
528 pub fn root_correlation_id(&self) -> Option<u64> {
529 self.causal_chain.first().copied().or(self.correlation_id)
530 }
531
532 #[must_use]
534 pub fn full_causal_chain(&self) -> Vec<u64> {
535 let mut chain = self.causal_chain.clone();
536 if let Some(id) = self.correlation_id {
537 chain.push(id);
538 }
539 chain
540 }
541
542 #[must_use]
544 pub fn format_async_stack(&self) -> String {
545 if self.async_stack.is_empty() {
546 "<no stack trace>".to_string()
547 } else {
548 self.async_stack.join(" -> ")
549 }
550 }
551}
552
553#[derive(Debug, Clone)]
555pub struct Error {
556 kind: ErrorKind,
557 message: Option<String>,
558 source: Option<Arc<dyn std::error::Error + Send + Sync>>,
559 context: ErrorContext,
560}
561
562impl Error {
563 #[must_use]
565 #[inline]
566 pub fn new(kind: ErrorKind) -> Self {
567 Self {
568 kind,
569 message: None,
570 source: None,
571 context: ErrorContext::new(),
572 }
573 }
574
575 #[must_use]
577 #[inline]
578 pub const fn kind(&self) -> ErrorKind {
579 self.kind
580 }
581
582 #[must_use]
584 #[inline]
585 pub const fn is_cancelled(&self) -> bool {
586 matches!(self.kind, ErrorKind::Cancelled)
587 }
588
589 #[must_use]
591 #[inline]
592 pub const fn is_timeout(&self) -> bool {
593 matches!(
594 self.kind,
595 ErrorKind::DeadlineExceeded | ErrorKind::CancelTimeout
596 )
597 }
598
599 #[must_use]
601 #[inline]
602 pub fn with_message(mut self, msg: impl Into<String>) -> Self {
603 self.message = Some(msg.into());
604 self
605 }
606
607 #[must_use]
609 #[inline]
610 pub fn with_context(mut self, ctx: ErrorContext) -> Self {
611 self.context = ctx;
612 self
613 }
614
615 #[must_use]
617 #[inline]
618 pub fn with_source(mut self, source: impl std::error::Error + Send + Sync + 'static) -> Self {
619 self.source = Some(Arc::new(source));
620 self
621 }
622
623 #[must_use]
625 pub fn from_cx(kind: ErrorKind, cx: &crate::cx::Cx) -> Self {
626 let diag_ctx = cx.diagnostic_context();
627 let error_ctx = ErrorContext::from_diagnostic_context(&diag_ctx)
628 .with_operation(&format!("Error::{:?}", kind));
629
630 Self::new(kind).with_context(error_ctx)
631 }
632
633 #[must_use]
635 pub fn propagate_across_async(mut self, operation: &str) -> Self {
636 self.context = self.context.derive_child(operation);
637 self
638 }
639
640 #[must_use]
642 pub fn with_operation(mut self, operation: &str) -> Self {
643 self.context = self.context.with_operation(operation);
644 self
645 }
646
647 #[must_use]
649 #[inline]
650 pub fn correlation_id(&self) -> Option<u64> {
651 self.context.correlation_id
652 }
653
654 #[must_use]
656 #[inline]
657 pub fn root_correlation_id(&self) -> Option<u64> {
658 self.context.root_correlation_id()
659 }
660
661 #[must_use]
663 #[inline]
664 pub fn causal_chain(&self) -> Vec<u64> {
665 self.context.full_causal_chain()
666 }
667
668 #[must_use]
670 #[inline]
671 pub fn async_stack(&self) -> String {
672 self.context.format_async_stack()
673 }
674
675 #[must_use]
677 #[inline]
678 pub fn cancelled(reason: &CancelReason) -> Self {
679 Self::new(ErrorKind::Cancelled).with_message(reason.to_string())
680 }
681
682 #[must_use]
684 #[inline]
685 pub const fn category(&self) -> ErrorCategory {
686 self.kind.category()
687 }
688
689 #[must_use]
691 #[inline]
692 pub const fn recoverability(&self) -> Recoverability {
693 self.kind.recoverability()
694 }
695
696 #[must_use]
698 #[inline]
699 pub const fn is_retryable(&self) -> bool {
700 self.kind.is_retryable()
701 }
702
703 #[must_use]
705 #[inline]
706 pub const fn recovery_action(&self) -> RecoveryAction {
707 self.kind.recovery_action()
708 }
709
710 #[must_use]
712 #[inline]
713 pub fn message(&self) -> Option<&str> {
714 self.message.as_deref()
715 }
716
717 #[must_use]
719 #[inline]
720 pub fn context(&self) -> &ErrorContext {
721 &self.context
722 }
723
724 #[must_use]
726 #[inline]
727 pub const fn is_encoding_error(&self) -> bool {
728 matches!(self.kind.category(), ErrorCategory::Encoding)
729 }
730
731 #[must_use]
733 #[inline]
734 pub const fn is_decoding_error(&self) -> bool {
735 matches!(self.kind.category(), ErrorCategory::Decoding)
736 }
737
738 #[must_use]
740 #[inline]
741 pub const fn is_transport_error(&self) -> bool {
742 matches!(self.kind.category(), ErrorCategory::Transport)
743 }
744
745 #[must_use]
747 #[inline]
748 pub const fn is_distributed_error(&self) -> bool {
749 matches!(self.kind.category(), ErrorCategory::Distributed)
750 }
751
752 #[must_use]
754 #[inline]
755 pub const fn is_connection_error(&self) -> bool {
756 matches!(
757 self.kind,
758 ErrorKind::ConnectionLost | ErrorKind::ConnectionRefused
759 )
760 }
761
762 #[must_use]
764 pub fn invalid_encoding_params(detail: impl Into<String>) -> Self {
765 Self::new(ErrorKind::InvalidEncodingParams).with_message(detail)
766 }
767
768 #[must_use]
770 pub fn data_too_large(actual: u64, max: u64) -> Self {
771 Self::new(ErrorKind::DataTooLarge)
772 .with_message(format!("data size {actual} exceeds maximum {max}"))
773 }
774
775 #[must_use]
777 pub fn insufficient_symbols(received: u32, needed: u32) -> Self {
778 Self::new(ErrorKind::InsufficientSymbols).with_message(format!(
779 "received {received} symbols, need at least {needed}"
780 ))
781 }
782
783 #[must_use]
785 pub fn decoding_failed(reason: impl Into<String>) -> Self {
786 Self::new(ErrorKind::DecodingFailed).with_message(reason)
787 }
788
789 #[must_use]
791 pub fn routing_failed(destination: impl Into<String>) -> Self {
792 Self::new(ErrorKind::RoutingFailed)
793 .with_message(format!("no route to destination: {}", destination.into()))
794 }
795
796 #[must_use]
798 pub fn lease_expired(lease_id: impl Into<String>) -> Self {
799 Self::new(ErrorKind::LeaseExpired)
800 .with_message(format!("lease expired: {}", lease_id.into()))
801 }
802
803 #[must_use]
805 pub fn quorum_not_reached(achieved: u32, needed: u32) -> Self {
806 Self::new(ErrorKind::QuorumNotReached)
807 .with_message(format!("achieved {achieved} of {needed} required"))
808 }
809
810 #[must_use]
812 pub fn node_unavailable(node_id: impl Into<String>) -> Self {
813 Self::new(ErrorKind::NodeUnavailable)
814 .with_message(format!("node unavailable: {}", node_id.into()))
815 }
816
817 #[must_use]
819 pub fn internal(detail: impl Into<String>) -> Self {
820 Self::new(ErrorKind::Internal).with_message(detail)
821 }
822}
823
824impl fmt::Display for Error {
825 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
826 write!(f, "{:?}", self.kind)?;
827 if let Some(msg) = &self.message {
828 write!(f, ": {msg}")?;
829 }
830 Ok(())
831 }
832}
833
834impl std::error::Error for Error {
835 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
836 self.source.as_ref().map(|e| e.as_ref() as _)
837 }
838}
839
840#[derive(Debug, Clone, PartialEq, Eq)]
842pub struct Cancelled {
843 pub reason: CancelReason,
845}
846
847impl From<Cancelled> for Error {
848 fn from(c: Cancelled) -> Self {
849 Self::cancelled(&c.reason)
850 }
851}
852
853#[derive(Debug)]
855pub enum SendError<T> {
856 Disconnected(T),
858 Full(T),
860 Cancelled(T),
862}
863
864#[derive(Debug, Clone, Copy, PartialEq, Eq)]
866pub enum RecvError {
867 Disconnected,
869 Empty,
871 Cancelled,
873}
874
875#[derive(Debug, Clone, Copy, PartialEq, Eq)]
877pub enum AcquireError {
878 Closed,
880}
881
882impl From<RecvError> for Error {
883 fn from(e: RecvError) -> Self {
884 match e {
885 RecvError::Disconnected => Self::new(ErrorKind::ChannelClosed),
886 RecvError::Empty => Self::new(ErrorKind::ChannelEmpty),
887 RecvError::Cancelled => Self::new(ErrorKind::Cancelled),
888 }
889 }
890}
891
892impl<T> From<SendError<T>> for Error {
893 fn from(e: SendError<T>) -> Self {
894 match e {
895 SendError::Disconnected(_) => Self::new(ErrorKind::ChannelClosed),
896 SendError::Full(_) => Self::new(ErrorKind::ChannelFull),
897 SendError::Cancelled(_) => Self::new(ErrorKind::Cancelled),
898 }
899 }
900}
901
902#[allow(clippy::result_large_err)]
904pub trait ResultExt<T> {
905 fn context(self, msg: impl Into<String>) -> Result<T>;
907 fn with_context<F: FnOnce() -> String>(self, f: F) -> Result<T>;
909}
910
911impl<T, E: Into<Error>> ResultExt<T> for core::result::Result<T, E> {
912 fn context(self, msg: impl Into<String>) -> Result<T> {
913 self.map_err(|e| e.into().with_message(msg))
914 }
915
916 fn with_context<F: FnOnce() -> String>(self, f: F) -> Result<T> {
917 self.map_err(|e| e.into().with_message(f()))
918 }
919}
920
921#[allow(clippy::result_large_err)]
923pub type Result<T> = core::result::Result<T, Error>;
924
925#[cfg(test)]
926mod tests {
927 #![allow(
928 clippy::pedantic,
929 clippy::nursery,
930 clippy::expect_fun_call,
931 clippy::map_unwrap_or,
932 clippy::cast_possible_wrap,
933 clippy::future_not_send
934 )]
935 use super::*;
936 use std::error::Error as _;
937
938 #[derive(Debug)]
939 struct Underlying;
940
941 impl fmt::Display for Underlying {
942 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
943 write!(f, "underlying")
944 }
945 }
946
947 impl std::error::Error for Underlying {}
948
949 #[test]
950 fn display_without_message() {
951 let err = Error::new(ErrorKind::Internal);
952 assert_eq!(err.to_string(), "Internal");
953 }
954
955 #[test]
956 fn display_with_message() {
957 let err = Error::new(ErrorKind::ChannelEmpty).with_message("no messages");
958 assert_eq!(err.to_string(), "ChannelEmpty: no messages");
959 }
960
961 #[test]
962 fn source_chain_is_exposed() {
963 let err = Error::new(ErrorKind::User)
964 .with_message("outer")
965 .with_source(Underlying);
966 let source = err.source().expect("source missing");
967 assert_eq!(source.to_string(), "underlying");
968 }
969
970 #[test]
971 fn from_recv_error() {
972 let disconnected: Error = RecvError::Disconnected.into();
973 assert_eq!(disconnected.kind(), ErrorKind::ChannelClosed);
974
975 let empty: Error = RecvError::Empty.into();
976 assert_eq!(empty.kind(), ErrorKind::ChannelEmpty);
977 }
978
979 #[test]
980 fn from_send_error() {
981 let disconnected: Error = SendError::Disconnected(()).into();
982 assert_eq!(disconnected.kind(), ErrorKind::ChannelClosed);
983
984 let full: Error = SendError::Full(()).into();
985 assert_eq!(full.kind(), ErrorKind::ChannelFull);
986 }
987
988 #[test]
989 fn result_ext_adds_message() {
990 let res: core::result::Result<(), RecvError> = Err(RecvError::Empty);
991 let err = res.context("recv failed").expect_err("expected err");
992 assert_eq!(err.kind(), ErrorKind::ChannelEmpty);
993 assert_eq!(err.to_string(), "ChannelEmpty: recv failed");
994 }
995
996 #[test]
997 fn predicates_match_kind() {
998 let cancel = Error::new(ErrorKind::Cancelled);
999 assert!(cancel.is_cancelled());
1000 assert!(!cancel.is_timeout());
1001
1002 let timeout = Error::new(ErrorKind::DeadlineExceeded);
1003 assert!(!timeout.is_cancelled());
1004 assert!(timeout.is_timeout());
1005 }
1006
1007 #[test]
1008 fn recovery_action_backoff() {
1009 let action = ErrorKind::ThresholdTimeout.recovery_action();
1010 assert!(matches!(action, RecoveryAction::RetryWithBackoff(_)));
1011 }
1012
1013 #[test]
1014 fn error_context_default() {
1015 let err = Error::new(ErrorKind::Internal);
1016 assert!(err.context().task_id.is_none());
1017 }
1018
1019 #[test]
1020 fn error_with_full_context() {
1021 use crate::util::ArenaIndex;
1022
1023 let task_id = TaskId::from_arena(ArenaIndex::new(1, 0));
1024 let region_id = RegionId::from_arena(ArenaIndex::new(2, 0));
1025 let object_id = ObjectId::new_for_test(123);
1026 let symbol_id = SymbolId::new_for_test(123, 0, 1);
1027
1028 let ctx = ErrorContext {
1029 task_id: Some(task_id),
1030 region_id: Some(region_id),
1031 object_id: Some(object_id),
1032 symbol_id: Some(symbol_id),
1033 correlation_id: None,
1034 causal_chain: Vec::new(),
1035 span_id: None,
1036 parent_span_id: None,
1037 async_stack: Vec::new(),
1038 };
1039
1040 let err = Error::new(ErrorKind::Internal).with_context(ctx);
1041
1042 assert_eq!(err.context().task_id, Some(task_id));
1043 assert_eq!(err.context().region_id, Some(region_id));
1044 assert_eq!(err.context().object_id, Some(object_id));
1045 assert_eq!(err.context().symbol_id, Some(symbol_id));
1046 }
1047
1048 #[test]
1051 fn error_kind_category_coverage() {
1052 use ErrorCategory::*;
1053 let cases: &[(ErrorKind, ErrorCategory)] = &[
1054 (ErrorKind::Cancelled, Cancellation),
1055 (ErrorKind::CancelTimeout, Cancellation),
1056 (ErrorKind::DeadlineExceeded, Budget),
1057 (ErrorKind::PollQuotaExhausted, Budget),
1058 (ErrorKind::CostQuotaExhausted, Budget),
1059 (ErrorKind::ChannelClosed, Channel),
1060 (ErrorKind::ChannelFull, Channel),
1061 (ErrorKind::ChannelEmpty, Channel),
1062 (ErrorKind::ObligationLeak, Obligation),
1063 (ErrorKind::ObligationAlreadyResolved, Obligation),
1064 (ErrorKind::RegionClosed, Region),
1065 (ErrorKind::TaskNotOwned, Region),
1066 (ErrorKind::AdmissionDenied, Region),
1067 (ErrorKind::InvalidEncodingParams, Encoding),
1068 (ErrorKind::DataTooLarge, Encoding),
1069 (ErrorKind::EncodingFailed, Encoding),
1070 (ErrorKind::CorruptedSymbol, Encoding),
1071 (ErrorKind::InsufficientSymbols, Decoding),
1072 (ErrorKind::DecodingFailed, Decoding),
1073 (ErrorKind::ObjectMismatch, Decoding),
1074 (ErrorKind::DuplicateSymbol, Decoding),
1075 (ErrorKind::ThresholdTimeout, Decoding),
1076 (ErrorKind::RoutingFailed, Transport),
1077 (ErrorKind::DispatchFailed, Transport),
1078 (ErrorKind::StreamEnded, Transport),
1079 (ErrorKind::SinkRejected, Transport),
1080 (ErrorKind::ConnectionLost, Transport),
1081 (ErrorKind::ConnectionRefused, Transport),
1082 (ErrorKind::ProtocolError, Transport),
1083 (ErrorKind::RecoveryFailed, Distributed),
1084 (ErrorKind::LeaseExpired, Distributed),
1085 (ErrorKind::LeaseRenewalFailed, Distributed),
1086 (ErrorKind::CoordinationFailed, Distributed),
1087 (ErrorKind::QuorumNotReached, Distributed),
1088 (ErrorKind::NodeUnavailable, Distributed),
1089 (ErrorKind::PartitionDetected, Distributed),
1090 (ErrorKind::Internal, Internal),
1091 (ErrorKind::InvalidStateTransition, Internal),
1092 (ErrorKind::ConfigError, User),
1093 (ErrorKind::User, User),
1094 ];
1095 for (kind, expected) in cases {
1096 assert_eq!(kind.category(), *expected, "{kind:?}");
1097 }
1098 }
1099
1100 #[test]
1101 fn error_kind_recoverability_classification() {
1102 for kind in [
1104 ErrorKind::ChannelFull,
1105 ErrorKind::ChannelEmpty,
1106 ErrorKind::AdmissionDenied,
1107 ErrorKind::ConnectionLost,
1108 ErrorKind::NodeUnavailable,
1109 ErrorKind::QuorumNotReached,
1110 ErrorKind::ThresholdTimeout,
1111 ErrorKind::LeaseRenewalFailed,
1112 ] {
1113 assert_eq!(kind.recoverability(), Recoverability::Transient, "{kind:?}");
1114 assert!(kind.is_retryable(), "{kind:?} should be retryable");
1115 }
1116
1117 for kind in [
1119 ErrorKind::Cancelled,
1120 ErrorKind::ChannelClosed,
1121 ErrorKind::ObligationLeak,
1122 ErrorKind::Internal,
1123 ErrorKind::ConnectionRefused,
1124 ErrorKind::ConfigError,
1125 ] {
1126 assert_eq!(kind.recoverability(), Recoverability::Permanent, "{kind:?}");
1127 assert!(!kind.is_retryable(), "{kind:?} should not be retryable");
1128 }
1129
1130 for kind in [
1132 ErrorKind::DeadlineExceeded,
1133 ErrorKind::EncodingFailed,
1134 ErrorKind::CorruptedSymbol,
1135 ErrorKind::User,
1136 ] {
1137 assert_eq!(kind.recoverability(), Recoverability::Unknown, "{kind:?}");
1138 assert!(!kind.is_retryable(), "{kind:?} Unknown is not retryable");
1139 }
1140 }
1141
1142 #[test]
1143 fn recoverability_predicates() {
1144 assert!(Recoverability::Transient.should_retry());
1145 assert!(!Recoverability::Transient.is_permanent());
1146
1147 assert!(!Recoverability::Permanent.should_retry());
1148 assert!(Recoverability::Permanent.is_permanent());
1149
1150 assert!(!Recoverability::Unknown.should_retry());
1151 assert!(!Recoverability::Unknown.is_permanent());
1152 }
1153
1154 #[test]
1155 fn recovery_action_variants() {
1156 assert!(matches!(
1157 ErrorKind::ChannelFull.recovery_action(),
1158 RecoveryAction::RetryImmediately
1159 ));
1160 assert!(matches!(
1161 ErrorKind::AdmissionDenied.recovery_action(),
1162 RecoveryAction::RetryWithBackoff(_)
1163 ));
1164 assert!(matches!(
1165 ErrorKind::NodeUnavailable.recovery_action(),
1166 RecoveryAction::RetryWithBackoff(_)
1167 ));
1168 assert!(matches!(
1169 ErrorKind::ConnectionLost.recovery_action(),
1170 RecoveryAction::RetryWithNewConnection
1171 ));
1172 assert!(matches!(
1173 ErrorKind::Cancelled.recovery_action(),
1174 RecoveryAction::Propagate
1175 ));
1176 assert!(matches!(
1177 ErrorKind::ObligationLeak.recovery_action(),
1178 RecoveryAction::Escalate
1179 ));
1180 assert!(matches!(
1181 ErrorKind::User.recovery_action(),
1182 RecoveryAction::Custom
1183 ));
1184 }
1185
1186 #[test]
1187 fn backoff_hint_constants() {
1188 let d = BackoffHint::DEFAULT;
1189 assert_eq!(d.initial_delay_ms, 100);
1190 assert_eq!(d.max_delay_ms, 30_000);
1191 assert_eq!(d.max_attempts, 5);
1192
1193 let a = BackoffHint::AGGRESSIVE;
1194 assert!(a.initial_delay_ms > d.initial_delay_ms);
1195 assert!(a.max_attempts > d.max_attempts);
1196
1197 let q = BackoffHint::QUICK;
1198 assert!(q.initial_delay_ms < d.initial_delay_ms);
1199 assert!(q.max_attempts < d.max_attempts);
1200
1201 assert_eq!(BackoffHint::default(), BackoffHint::DEFAULT);
1202 }
1203
1204 #[test]
1207 fn error_data_too_large() {
1208 let err = Error::data_too_large(2000, 1000);
1209 assert_eq!(err.kind(), ErrorKind::DataTooLarge);
1210 let msg = err.to_string();
1211 assert!(msg.contains("2000"), "{msg}");
1212 assert!(msg.contains("1000"), "{msg}");
1213 }
1214
1215 #[test]
1216 fn error_insufficient_symbols() {
1217 let err = Error::insufficient_symbols(5, 10);
1218 assert_eq!(err.kind(), ErrorKind::InsufficientSymbols);
1219 let msg = err.to_string();
1220 assert!(msg.contains('5'), "{msg}");
1221 assert!(msg.contains("10"), "{msg}");
1222 }
1223
1224 #[test]
1225 fn error_routing_failed() {
1226 let err = Error::routing_failed("node-7");
1227 assert_eq!(err.kind(), ErrorKind::RoutingFailed);
1228 assert!(err.to_string().contains("node-7"));
1229 }
1230
1231 #[test]
1232 fn error_lease_expired() {
1233 let err = Error::lease_expired("lease-42");
1234 assert_eq!(err.kind(), ErrorKind::LeaseExpired);
1235 assert!(err.to_string().contains("lease-42"));
1236 }
1237
1238 #[test]
1239 fn error_quorum_not_reached() {
1240 let err = Error::quorum_not_reached(2, 3);
1241 assert_eq!(err.kind(), ErrorKind::QuorumNotReached);
1242 let msg = err.to_string();
1243 assert!(msg.contains('2'), "{msg}");
1244 assert!(msg.contains('3'), "{msg}");
1245 }
1246
1247 #[test]
1248 fn error_node_unavailable() {
1249 let err = Error::node_unavailable("node-1");
1250 assert_eq!(err.kind(), ErrorKind::NodeUnavailable);
1251 assert!(err.to_string().contains("node-1"));
1252 }
1253
1254 #[test]
1255 fn error_internal() {
1256 let err = Error::internal("bug found");
1257 assert_eq!(err.kind(), ErrorKind::Internal);
1258 assert!(err.to_string().contains("bug found"));
1259 }
1260
1261 #[test]
1264 fn error_is_predicates() {
1265 assert!(Error::new(ErrorKind::EncodingFailed).is_encoding_error());
1266 assert!(!Error::new(ErrorKind::DecodingFailed).is_encoding_error());
1267
1268 assert!(Error::new(ErrorKind::InsufficientSymbols).is_decoding_error());
1269 assert!(!Error::new(ErrorKind::EncodingFailed).is_decoding_error());
1270
1271 assert!(Error::new(ErrorKind::RoutingFailed).is_transport_error());
1272 assert!(!Error::new(ErrorKind::Internal).is_transport_error());
1273
1274 assert!(Error::new(ErrorKind::QuorumNotReached).is_distributed_error());
1275 assert!(!Error::new(ErrorKind::ChannelFull).is_distributed_error());
1276
1277 assert!(Error::new(ErrorKind::ConnectionLost).is_connection_error());
1278 assert!(Error::new(ErrorKind::ConnectionRefused).is_connection_error());
1279 assert!(!Error::new(ErrorKind::RoutingFailed).is_connection_error());
1280 }
1281
1282 #[test]
1283 fn error_cancel_timeout_is_timeout() {
1284 assert!(Error::new(ErrorKind::CancelTimeout).is_timeout());
1285 assert!(!Error::new(ErrorKind::CancelTimeout).is_cancelled());
1286 }
1287
1288 #[test]
1291 fn recv_error_cancelled_conversion() {
1292 let err: Error = RecvError::Cancelled.into();
1293 assert_eq!(err.kind(), ErrorKind::Cancelled);
1294 }
1295
1296 #[test]
1297 fn send_error_cancelled_conversion() {
1298 let err: Error = SendError::Cancelled(42u32).into();
1299 assert_eq!(err.kind(), ErrorKind::Cancelled);
1300 }
1301
1302 #[test]
1303 fn cancelled_struct_into_error() {
1304 let reason = CancelReason::user("test cancel");
1305 let cancelled = Cancelled { reason };
1306 let err: Error = cancelled.into();
1307 assert_eq!(err.kind(), ErrorKind::Cancelled);
1308 assert!(err.to_string().contains("Cancelled"));
1309 }
1310
1311 #[test]
1312 fn result_ext_with_context_lazy() {
1313 let res: core::result::Result<(), RecvError> = Err(RecvError::Empty);
1314 let err = res
1315 .with_context(|| format!("lazy {}", "context"))
1316 .expect_err("expected err");
1317 assert_eq!(err.kind(), ErrorKind::ChannelEmpty);
1318 assert!(err.to_string().contains("lazy context"));
1319 }
1320
1321 #[test]
1324 fn error_category_debug() {
1325 for cat in [
1326 ErrorCategory::Cancellation,
1327 ErrorCategory::Budget,
1328 ErrorCategory::Channel,
1329 ErrorCategory::Obligation,
1330 ErrorCategory::Region,
1331 ErrorCategory::Encoding,
1332 ErrorCategory::Decoding,
1333 ErrorCategory::Transport,
1334 ErrorCategory::Distributed,
1335 ErrorCategory::Internal,
1336 ErrorCategory::User,
1337 ] {
1338 let dbg = format!("{cat:?}");
1339 assert!(!dbg.is_empty());
1340 }
1341 }
1342
1343 #[test]
1344 fn acquire_error_debug_eq() {
1345 let err = AcquireError::Closed;
1346 let dbg = format!("{err:?}");
1347 assert!(dbg.contains("Closed"), "{dbg}");
1348 assert_eq!(err, AcquireError::Closed);
1349 }
1350
1351 #[test]
1352 fn error_clone() {
1353 let err = Error::new(ErrorKind::Internal).with_message("clone me");
1354 let cloned = err.clone();
1355 assert_eq!(cloned.kind(), ErrorKind::Internal);
1356 assert_eq!(cloned.to_string(), err.to_string());
1357 }
1358
1359 #[test]
1360 fn error_no_message() {
1361 let err = Error::new(ErrorKind::User);
1362 assert!(err.message().is_none());
1363 }
1364
1365 #[test]
1366 fn error_source_none_without_with_source() {
1367 let err = Error::new(ErrorKind::User);
1368 assert!(err.source().is_none());
1369 }
1370
1371 #[test]
1374 fn error_kind_copy_hash() {
1375 use std::collections::HashSet;
1376 let kind = ErrorKind::Internal;
1377 let copied = kind;
1378 assert_eq!(copied, ErrorKind::Internal);
1379
1380 let mut set = HashSet::new();
1381 set.insert(ErrorKind::Cancelled);
1382 set.insert(ErrorKind::DeadlineExceeded);
1383 set.insert(ErrorKind::Cancelled); assert_eq!(set.len(), 2);
1385 }
1386
1387 #[test]
1388 fn recoverability_copy_hash_eq() {
1389 use std::collections::HashSet;
1390 let r = Recoverability::Transient;
1391 let copied = r;
1392 assert_eq!(copied, Recoverability::Transient);
1393 assert_ne!(r, Recoverability::Permanent);
1394
1395 let mut set = HashSet::new();
1396 set.insert(Recoverability::Transient);
1397 set.insert(Recoverability::Permanent);
1398 set.insert(Recoverability::Unknown);
1399 assert_eq!(set.len(), 3);
1400 }
1401
1402 #[test]
1403 fn recovery_action_copy_hash() {
1404 use std::collections::HashSet;
1405 let action = RecoveryAction::Propagate;
1406 let copied = action;
1407 assert_eq!(copied, RecoveryAction::Propagate);
1408
1409 let mut set = HashSet::new();
1410 set.insert(RecoveryAction::RetryImmediately);
1411 set.insert(RecoveryAction::Propagate);
1412 set.insert(RecoveryAction::Escalate);
1413 set.insert(RecoveryAction::Custom);
1414 assert_eq!(set.len(), 4);
1415 }
1416
1417 #[test]
1418 fn error_category_copy_clone_hash() {
1419 use std::collections::HashSet;
1420 let cat = ErrorCategory::Transport;
1421 let copied = cat;
1422 let cloned = cat;
1423 assert_eq!(copied, cloned);
1424
1425 let mut set = HashSet::new();
1426 set.insert(ErrorCategory::Cancellation);
1427 set.insert(ErrorCategory::Budget);
1428 set.insert(ErrorCategory::Channel);
1429 assert_eq!(set.len(), 3);
1430 }
1431
1432 #[test]
1433 fn backoff_hint_copy_hash_eq() {
1434 use std::collections::HashSet;
1435 let hint = BackoffHint::DEFAULT;
1436 let copied = hint;
1437 assert_eq!(copied, BackoffHint::DEFAULT);
1438 assert_ne!(hint, BackoffHint::AGGRESSIVE);
1439
1440 let mut set = HashSet::new();
1441 set.insert(BackoffHint::DEFAULT);
1442 set.insert(BackoffHint::AGGRESSIVE);
1443 set.insert(BackoffHint::QUICK);
1444 assert_eq!(set.len(), 3);
1445 }
1446
1447 #[test]
1448 fn recv_error_debug_clone_copy() {
1449 let err = RecvError::Disconnected;
1450 let dbg = format!("{err:?}");
1451 assert!(dbg.contains("Disconnected"));
1452
1453 let copied = err;
1454 assert_eq!(copied, RecvError::Disconnected);
1455
1456 let cloned = err;
1457 assert_eq!(cloned, err);
1458 }
1459
1460 #[test]
1461 fn cancelled_clone_eq() {
1462 let c = Cancelled {
1463 reason: CancelReason::user("test"),
1464 };
1465 let dbg = format!("{c:?}");
1466 assert!(dbg.contains("Cancelled"));
1467
1468 let cloned = c.clone();
1469 assert_eq!(cloned, c);
1470 }
1471
1472 #[test]
1473 fn error_context_auto_correlation() {
1474 let ctx = ErrorContext::new();
1475 assert!(ctx.correlation_id.is_some());
1476 assert!(ctx.causal_chain.is_empty());
1477 assert!(ctx.async_stack.is_empty());
1478 }
1479
1480 #[test]
1481 fn error_context_derive_child() {
1482 let parent = ErrorContext::new();
1483 let parent_id = parent.correlation_id.unwrap();
1484
1485 let child = parent.derive_child("async_operation");
1486
1487 assert!(child.correlation_id.is_some());
1489 assert_ne!(child.correlation_id, parent.correlation_id);
1490
1491 assert_eq!(child.causal_chain, vec![parent_id]);
1493
1494 assert_eq!(child.async_stack, vec!["async_operation"]);
1496
1497 assert!(child.span_id.is_some());
1499 assert_eq!(child.parent_span_id, parent.span_id);
1500 }
1501
1502 #[test]
1503 fn error_context_causal_chain() {
1504 let root = ErrorContext::new();
1505 let child = root.derive_child("level1");
1506 let grandchild = child.derive_child("level2");
1507
1508 let root_id = root.correlation_id.unwrap();
1509 let child_id = child.correlation_id.unwrap();
1510
1511 let chain = grandchild.full_causal_chain();
1512 assert_eq!(
1513 chain,
1514 vec![root_id, child_id, grandchild.correlation_id.unwrap()]
1515 );
1516
1517 assert_eq!(grandchild.root_correlation_id(), Some(root_id));
1518 }
1519
1520 #[test]
1521 fn error_context_async_stack_trace() {
1522 let ctx = ErrorContext::new()
1523 .with_operation("spawn_task")
1524 .with_operation("process_request");
1525
1526 let trace = ctx.format_async_stack();
1527 assert_eq!(trace, "spawn_task -> process_request");
1528 }
1529
1530 #[test]
1531 fn error_propagate_across_async() {
1532 let error = Error::new(ErrorKind::Internal).with_operation("initial_operation");
1533
1534 let propagated = error.propagate_across_async("async_boundary");
1535
1536 let chain = propagated.causal_chain();
1538 assert!(!chain.is_empty());
1539
1540 let stack = propagated.async_stack();
1542 assert!(stack.contains("async_boundary"));
1543 }
1544
1545 #[test]
1546 fn error_correlation_tracking() {
1547 let err1 = Error::new(ErrorKind::ChannelClosed);
1548 let err2 = Error::new(ErrorKind::Internal);
1549
1550 assert_ne!(err1.correlation_id(), err2.correlation_id());
1552 assert!(err1.correlation_id().is_some());
1553 assert!(err2.correlation_id().is_some());
1554 }
1555
1556 #[test]
1557 fn error_with_operations() {
1558 let error = Error::new(ErrorKind::DecodingFailed)
1559 .with_operation("read_symbol")
1560 .with_operation("decode_block");
1561
1562 let stack = error.async_stack();
1563 assert_eq!(stack, "read_symbol -> decode_block");
1564 }
1565}