1use core::fmt;
34use std::sync::Arc;
35use std::sync::atomic::{AtomicU64, Ordering};
36
37use crate::observability::SpanId;
38use crate::types::symbol::{ObjectId, SymbolId};
39use crate::types::{CancelReason, RegionId, TaskId};
40
41pub mod recovery;
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
45pub enum ErrorKind {
46 Cancelled,
49 CancelTimeout,
51
52 DeadlineExceeded,
55 PollQuotaExhausted,
57 CostQuotaExhausted,
59
60 ChannelClosed,
63 ChannelFull,
65 ChannelEmpty,
67
68 ObligationLeak,
71 ObligationAlreadyResolved,
73
74 RegionClosed,
77 TaskNotOwned,
79 AdmissionDenied,
81
82 InvalidEncodingParams,
85 DataTooLarge,
87 EncodingFailed,
89 CorruptedSymbol,
91
92 InsufficientSymbols,
95 DecodingFailed,
97 ObjectMismatch,
99 DuplicateSymbol,
101 ThresholdTimeout,
103
104 RoutingFailed,
107 DispatchFailed,
109 StreamEnded,
111 SinkRejected,
113 ConnectionLost,
115 ConnectionRefused,
117 ProtocolError,
119
120 RecoveryFailed,
123 LeaseExpired,
125 LeaseRenewalFailed,
127 CoordinationFailed,
129 QuorumNotReached,
131 NodeUnavailable,
133 PartitionDetected,
135
136 Internal,
139 InvalidStateTransition,
141
142 ConfigError,
145
146 User,
149}
150
151impl ErrorKind {
152 #[must_use]
154 #[inline]
155 pub const fn category(&self) -> ErrorCategory {
156 match self {
157 Self::Cancelled | Self::CancelTimeout => ErrorCategory::Cancellation,
158 Self::DeadlineExceeded | Self::PollQuotaExhausted | Self::CostQuotaExhausted => {
159 ErrorCategory::Budget
160 }
161 Self::ChannelClosed | Self::ChannelFull | Self::ChannelEmpty => ErrorCategory::Channel,
162 Self::ObligationLeak | Self::ObligationAlreadyResolved => ErrorCategory::Obligation,
163 Self::RegionClosed | Self::TaskNotOwned | Self::AdmissionDenied => {
164 ErrorCategory::Region
165 }
166 Self::InvalidEncodingParams
167 | Self::DataTooLarge
168 | Self::EncodingFailed
169 | Self::CorruptedSymbol => ErrorCategory::Encoding,
170 Self::InsufficientSymbols
171 | Self::DecodingFailed
172 | Self::ObjectMismatch
173 | Self::DuplicateSymbol
174 | Self::ThresholdTimeout => ErrorCategory::Decoding,
175 Self::RoutingFailed
176 | Self::DispatchFailed
177 | Self::StreamEnded
178 | Self::SinkRejected
179 | Self::ConnectionLost
180 | Self::ConnectionRefused
181 | Self::ProtocolError => ErrorCategory::Transport,
182 Self::RecoveryFailed
183 | Self::LeaseExpired
184 | Self::LeaseRenewalFailed
185 | Self::CoordinationFailed
186 | Self::QuorumNotReached
187 | Self::NodeUnavailable
188 | Self::PartitionDetected => ErrorCategory::Distributed,
189 Self::Internal | Self::InvalidStateTransition => ErrorCategory::Internal,
190 Self::ConfigError | Self::User => ErrorCategory::User,
191 }
192 }
193
194 #[must_use]
198 #[inline]
199 pub const fn recoverability(&self) -> Recoverability {
200 match self {
201 Self::ChannelFull
203 | Self::ChannelEmpty
204 | Self::AdmissionDenied
205 | Self::ConnectionLost
206 | Self::NodeUnavailable
207 | Self::QuorumNotReached
208 | Self::ThresholdTimeout
209 | Self::LeaseRenewalFailed => Recoverability::Transient,
210
211 Self::Cancelled
213 | Self::CancelTimeout
214 | Self::ChannelClosed
215 | Self::ObligationLeak
216 | Self::ObligationAlreadyResolved
217 | Self::RegionClosed
218 | Self::InvalidEncodingParams
219 | Self::DataTooLarge
220 | Self::ObjectMismatch
221 | Self::Internal
222 | Self::InvalidStateTransition
223 | Self::ProtocolError
224 | Self::ConnectionRefused
225 | Self::ConfigError => Recoverability::Permanent,
226
227 Self::DeadlineExceeded
229 | Self::PollQuotaExhausted
230 | Self::CostQuotaExhausted
231 | Self::TaskNotOwned
232 | Self::EncodingFailed
233 | Self::CorruptedSymbol
234 | Self::InsufficientSymbols
235 | Self::DecodingFailed
236 | Self::DuplicateSymbol
237 | Self::RoutingFailed
238 | Self::DispatchFailed
239 | Self::StreamEnded
240 | Self::SinkRejected
241 | Self::RecoveryFailed
242 | Self::LeaseExpired
243 | Self::CoordinationFailed
244 | Self::PartitionDetected
245 | Self::User => Recoverability::Unknown,
246 }
247 }
248
249 #[must_use]
251 #[inline]
252 pub const fn is_retryable(&self) -> bool {
253 matches!(self.recoverability(), Recoverability::Transient)
254 }
255
256 #[must_use]
261 #[inline]
262 pub const fn recovery_action(&self) -> RecoveryAction {
263 match self {
264 Self::ChannelFull | Self::ChannelEmpty => RecoveryAction::RetryImmediately,
266
267 Self::AdmissionDenied
269 | Self::ThresholdTimeout
270 | Self::QuorumNotReached
271 | Self::LeaseRenewalFailed => RecoveryAction::RetryWithBackoff(BackoffHint::DEFAULT),
272 Self::NodeUnavailable => RecoveryAction::RetryWithBackoff(BackoffHint::AGGRESSIVE),
273
274 Self::ConnectionLost | Self::StreamEnded => RecoveryAction::RetryWithNewConnection,
276
277 Self::Cancelled
279 | Self::CancelTimeout
280 | Self::DeadlineExceeded
281 | Self::PollQuotaExhausted
282 | Self::CostQuotaExhausted
283 | Self::ChannelClosed
284 | Self::RegionClosed
285 | Self::InvalidEncodingParams
286 | Self::DataTooLarge
287 | Self::ObjectMismatch
288 | Self::ConnectionRefused
289 | Self::ProtocolError
290 | Self::LeaseExpired
291 | Self::PartitionDetected
292 | Self::ConfigError => RecoveryAction::Propagate,
293
294 Self::ObligationLeak
296 | Self::ObligationAlreadyResolved
297 | Self::Internal
298 | Self::InvalidStateTransition => RecoveryAction::Escalate,
299
300 Self::TaskNotOwned
302 | Self::EncodingFailed
303 | Self::CorruptedSymbol
304 | Self::InsufficientSymbols
305 | Self::DecodingFailed
306 | Self::DuplicateSymbol
307 | Self::RoutingFailed
308 | Self::DispatchFailed
309 | Self::SinkRejected
310 | Self::RecoveryFailed
311 | Self::CoordinationFailed
312 | Self::User => RecoveryAction::Custom,
313 }
314 }
315}
316
317#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
322pub enum Recoverability {
323 Transient,
325 Permanent,
327 Unknown,
330}
331
332#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
334pub enum RecoveryAction {
335 RetryImmediately,
337 RetryWithBackoff(BackoffHint),
339 RetryWithNewConnection,
341 Propagate,
343 Escalate,
345 Custom,
347}
348
349#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
351pub struct BackoffHint {
352 pub initial_delay_ms: u32,
354 pub max_delay_ms: u32,
356 pub max_attempts: u8,
358}
359
360impl BackoffHint {
361 pub const DEFAULT: Self = Self {
363 initial_delay_ms: 100,
364 max_delay_ms: 30_000,
365 max_attempts: 5,
366 };
367
368 pub const AGGRESSIVE: Self = Self {
370 initial_delay_ms: 1_000,
371 max_delay_ms: 60_000,
372 max_attempts: 10,
373 };
374
375 pub const QUICK: Self = Self {
377 initial_delay_ms: 10,
378 max_delay_ms: 1_000,
379 max_attempts: 3,
380 };
381}
382
383impl Default for BackoffHint {
384 #[inline]
385 fn default() -> Self {
386 Self::DEFAULT
387 }
388}
389
390impl Recoverability {
391 #[must_use]
393 #[inline]
394 pub const fn should_retry(&self) -> bool {
395 matches!(self, Self::Transient)
396 }
397
398 #[must_use]
400 #[inline]
401 pub const fn is_permanent(&self) -> bool {
402 matches!(self, Self::Permanent)
403 }
404}
405
406#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
408pub enum ErrorCategory {
409 Cancellation,
411 Budget,
413 Channel,
415 Obligation,
417 Region,
419 Encoding,
421 Decoding,
423 Transport,
425 Distributed,
427 Internal,
429 User,
431}
432
433#[derive(Debug, Clone, Default, PartialEq, Eq)]
435pub struct ErrorContext {
436 pub task_id: Option<TaskId>,
438 pub region_id: Option<RegionId>,
440 pub object_id: Option<ObjectId>,
442 pub symbol_id: Option<SymbolId>,
444 pub correlation_id: Option<u64>,
446 pub causal_chain: Vec<u64>,
448 pub span_id: Option<crate::observability::SpanId>,
450 pub parent_span_id: Option<crate::observability::SpanId>,
452 pub async_stack: Vec<String>,
454}
455
456impl ErrorContext {
457 #[must_use]
459 pub fn new() -> Self {
460 static NEXT_CORRELATION_ID: AtomicU64 = AtomicU64::new(1);
461 Self {
462 correlation_id: Some(NEXT_CORRELATION_ID.fetch_add(1, Ordering::Relaxed)),
463 ..Self::default()
464 }
465 }
466
467 #[must_use]
469 pub fn from_diagnostic_context(ctx: &crate::observability::DiagnosticContext) -> Self {
470 let mut error_ctx = Self::new();
471 error_ctx.task_id = ctx.task_id();
472 error_ctx.region_id = ctx.region_id();
473 error_ctx.span_id = ctx.span_id();
474 error_ctx.parent_span_id = ctx.parent_span_id();
475 error_ctx
476 }
477
478 #[must_use]
480 pub fn derive_child(&self, operation: &str) -> Self {
481 static NEXT_CORRELATION_ID: AtomicU64 = AtomicU64::new(1);
482 let child_correlation_id = NEXT_CORRELATION_ID.fetch_add(1, Ordering::Relaxed);
483
484 let mut causal_chain = self.causal_chain.clone();
485 if let Some(parent_id) = self.correlation_id {
486 causal_chain.push(parent_id);
487 }
488
489 let mut async_stack = self.async_stack.clone();
490 async_stack.push(operation.to_string());
491
492 Self {
493 task_id: self.task_id,
494 region_id: self.region_id,
495 object_id: self.object_id,
496 symbol_id: self.symbol_id,
497 correlation_id: Some(child_correlation_id),
498 causal_chain,
499 span_id: Some(SpanId::new()), parent_span_id: self.span_id,
501 async_stack,
502 }
503 }
504
505 #[must_use]
507 pub fn with_operation(mut self, operation: &str) -> Self {
508 self.async_stack.push(operation.to_string());
509 self
510 }
511
512 #[must_use]
514 pub fn with_span_context(mut self, span_id: SpanId, parent_span_id: Option<SpanId>) -> Self {
515 self.span_id = Some(span_id);
516 self.parent_span_id = parent_span_id;
517 self
518 }
519
520 #[must_use]
522 pub fn root_correlation_id(&self) -> Option<u64> {
523 self.causal_chain.first().copied().or(self.correlation_id)
524 }
525
526 #[must_use]
528 pub fn full_causal_chain(&self) -> Vec<u64> {
529 let mut chain = self.causal_chain.clone();
530 if let Some(id) = self.correlation_id {
531 chain.push(id);
532 }
533 chain
534 }
535
536 #[must_use]
538 pub fn format_async_stack(&self) -> String {
539 if self.async_stack.is_empty() {
540 "<no stack trace>".to_string()
541 } else {
542 self.async_stack.join(" -> ")
543 }
544 }
545}
546
547#[derive(Debug, Clone)]
549pub struct Error {
550 kind: ErrorKind,
551 message: Option<String>,
552 source: Option<Arc<dyn std::error::Error + Send + Sync>>,
553 context: ErrorContext,
554}
555
556impl Error {
557 #[must_use]
559 #[inline]
560 pub fn new(kind: ErrorKind) -> Self {
561 Self {
562 kind,
563 message: None,
564 source: None,
565 context: ErrorContext::new(),
566 }
567 }
568
569 #[must_use]
571 #[inline]
572 pub const fn kind(&self) -> ErrorKind {
573 self.kind
574 }
575
576 #[must_use]
578 #[inline]
579 pub const fn is_cancelled(&self) -> bool {
580 matches!(self.kind, ErrorKind::Cancelled)
581 }
582
583 #[must_use]
585 #[inline]
586 pub const fn is_timeout(&self) -> bool {
587 matches!(
588 self.kind,
589 ErrorKind::DeadlineExceeded | ErrorKind::CancelTimeout
590 )
591 }
592
593 #[must_use]
595 #[inline]
596 pub fn with_message(mut self, msg: impl Into<String>) -> Self {
597 self.message = Some(msg.into());
598 self
599 }
600
601 #[must_use]
603 #[inline]
604 pub fn with_context(mut self, ctx: ErrorContext) -> Self {
605 self.context = ctx;
606 self
607 }
608
609 #[must_use]
611 #[inline]
612 pub fn with_source(mut self, source: impl std::error::Error + Send + Sync + 'static) -> Self {
613 self.source = Some(Arc::new(source));
614 self
615 }
616
617 #[must_use]
619 pub fn from_cx(kind: ErrorKind, cx: &crate::cx::Cx) -> Self {
620 let diag_ctx = cx.diagnostic_context();
621 let error_ctx = ErrorContext::from_diagnostic_context(&diag_ctx)
622 .with_operation(&format!("Error::{:?}", kind));
623
624 Self::new(kind).with_context(error_ctx)
625 }
626
627 #[must_use]
629 pub fn propagate_across_async(mut self, operation: &str) -> Self {
630 self.context = self.context.derive_child(operation);
631 self
632 }
633
634 #[must_use]
636 pub fn with_operation(mut self, operation: &str) -> Self {
637 self.context = self.context.with_operation(operation);
638 self
639 }
640
641 #[must_use]
643 #[inline]
644 pub fn correlation_id(&self) -> Option<u64> {
645 self.context.correlation_id
646 }
647
648 #[must_use]
650 #[inline]
651 pub fn root_correlation_id(&self) -> Option<u64> {
652 self.context.root_correlation_id()
653 }
654
655 #[must_use]
657 #[inline]
658 pub fn causal_chain(&self) -> Vec<u64> {
659 self.context.full_causal_chain()
660 }
661
662 #[must_use]
664 #[inline]
665 pub fn async_stack(&self) -> String {
666 self.context.format_async_stack()
667 }
668
669 #[must_use]
671 #[inline]
672 pub fn cancelled(reason: &CancelReason) -> Self {
673 Self::new(ErrorKind::Cancelled).with_message(reason.to_string())
674 }
675
676 #[must_use]
678 #[inline]
679 pub const fn category(&self) -> ErrorCategory {
680 self.kind.category()
681 }
682
683 #[must_use]
685 #[inline]
686 pub const fn recoverability(&self) -> Recoverability {
687 self.kind.recoverability()
688 }
689
690 #[must_use]
692 #[inline]
693 pub const fn is_retryable(&self) -> bool {
694 self.kind.is_retryable()
695 }
696
697 #[must_use]
699 #[inline]
700 pub const fn recovery_action(&self) -> RecoveryAction {
701 self.kind.recovery_action()
702 }
703
704 #[must_use]
706 #[inline]
707 pub fn message(&self) -> Option<&str> {
708 self.message.as_deref()
709 }
710
711 #[must_use]
713 #[inline]
714 pub fn context(&self) -> &ErrorContext {
715 &self.context
716 }
717
718 #[must_use]
720 #[inline]
721 pub const fn is_encoding_error(&self) -> bool {
722 matches!(self.kind.category(), ErrorCategory::Encoding)
723 }
724
725 #[must_use]
727 #[inline]
728 pub const fn is_decoding_error(&self) -> bool {
729 matches!(self.kind.category(), ErrorCategory::Decoding)
730 }
731
732 #[must_use]
734 #[inline]
735 pub const fn is_transport_error(&self) -> bool {
736 matches!(self.kind.category(), ErrorCategory::Transport)
737 }
738
739 #[must_use]
741 #[inline]
742 pub const fn is_distributed_error(&self) -> bool {
743 matches!(self.kind.category(), ErrorCategory::Distributed)
744 }
745
746 #[must_use]
748 #[inline]
749 pub const fn is_connection_error(&self) -> bool {
750 matches!(
751 self.kind,
752 ErrorKind::ConnectionLost | ErrorKind::ConnectionRefused
753 )
754 }
755
756 #[must_use]
758 pub fn invalid_encoding_params(detail: impl Into<String>) -> Self {
759 Self::new(ErrorKind::InvalidEncodingParams).with_message(detail)
760 }
761
762 #[must_use]
764 pub fn data_too_large(actual: u64, max: u64) -> Self {
765 Self::new(ErrorKind::DataTooLarge)
766 .with_message(format!("data size {actual} exceeds maximum {max}"))
767 }
768
769 #[must_use]
771 pub fn insufficient_symbols(received: u32, needed: u32) -> Self {
772 Self::new(ErrorKind::InsufficientSymbols).with_message(format!(
773 "received {received} symbols, need at least {needed}"
774 ))
775 }
776
777 #[must_use]
779 pub fn decoding_failed(reason: impl Into<String>) -> Self {
780 Self::new(ErrorKind::DecodingFailed).with_message(reason)
781 }
782
783 #[must_use]
785 pub fn routing_failed(destination: impl Into<String>) -> Self {
786 Self::new(ErrorKind::RoutingFailed)
787 .with_message(format!("no route to destination: {}", destination.into()))
788 }
789
790 #[must_use]
792 pub fn lease_expired(lease_id: impl Into<String>) -> Self {
793 Self::new(ErrorKind::LeaseExpired)
794 .with_message(format!("lease expired: {}", lease_id.into()))
795 }
796
797 #[must_use]
799 pub fn quorum_not_reached(achieved: u32, needed: u32) -> Self {
800 Self::new(ErrorKind::QuorumNotReached)
801 .with_message(format!("achieved {achieved} of {needed} required"))
802 }
803
804 #[must_use]
806 pub fn node_unavailable(node_id: impl Into<String>) -> Self {
807 Self::new(ErrorKind::NodeUnavailable)
808 .with_message(format!("node unavailable: {}", node_id.into()))
809 }
810
811 #[must_use]
813 pub fn internal(detail: impl Into<String>) -> Self {
814 Self::new(ErrorKind::Internal).with_message(detail)
815 }
816}
817
818impl fmt::Display for Error {
819 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
820 write!(f, "{:?}", self.kind)?;
821 if let Some(msg) = &self.message {
822 write!(f, ": {msg}")?;
823 }
824 Ok(())
825 }
826}
827
828impl std::error::Error for Error {
829 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
830 self.source.as_ref().map(|e| e.as_ref() as _)
831 }
832}
833
834#[derive(Debug, Clone, PartialEq, Eq)]
836pub struct Cancelled {
837 pub reason: CancelReason,
839}
840
841impl From<Cancelled> for Error {
842 fn from(c: Cancelled) -> Self {
843 Self::cancelled(&c.reason)
844 }
845}
846
847#[derive(Debug)]
849pub enum SendError<T> {
850 Disconnected(T),
852 Full(T),
854 Cancelled(T),
856}
857
858#[derive(Debug, Clone, Copy, PartialEq, Eq)]
860pub enum RecvError {
861 Disconnected,
863 Empty,
865 Cancelled,
867}
868
869#[derive(Debug, Clone, Copy, PartialEq, Eq)]
871pub enum AcquireError {
872 Closed,
874}
875
876impl From<RecvError> for Error {
877 fn from(e: RecvError) -> Self {
878 match e {
879 RecvError::Disconnected => Self::new(ErrorKind::ChannelClosed),
880 RecvError::Empty => Self::new(ErrorKind::ChannelEmpty),
881 RecvError::Cancelled => Self::new(ErrorKind::Cancelled),
882 }
883 }
884}
885
886impl<T> From<SendError<T>> for Error {
887 fn from(e: SendError<T>) -> Self {
888 match e {
889 SendError::Disconnected(_) => Self::new(ErrorKind::ChannelClosed),
890 SendError::Full(_) => Self::new(ErrorKind::ChannelFull),
891 SendError::Cancelled(_) => Self::new(ErrorKind::Cancelled),
892 }
893 }
894}
895
896#[allow(clippy::result_large_err)]
898pub trait ResultExt<T> {
899 fn context(self, msg: impl Into<String>) -> Result<T>;
901 fn with_context<F: FnOnce() -> String>(self, f: F) -> Result<T>;
903}
904
905impl<T, E: Into<Error>> ResultExt<T> for core::result::Result<T, E> {
906 fn context(self, msg: impl Into<String>) -> Result<T> {
907 self.map_err(|e| e.into().with_message(msg))
908 }
909
910 fn with_context<F: FnOnce() -> String>(self, f: F) -> Result<T> {
911 self.map_err(|e| e.into().with_message(f()))
912 }
913}
914
915#[allow(clippy::result_large_err)]
917pub type Result<T> = core::result::Result<T, Error>;
918
919#[cfg(test)]
920mod tests {
921 use super::*;
922 use std::error::Error as _;
923
924 #[derive(Debug)]
925 struct Underlying;
926
927 impl fmt::Display for Underlying {
928 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
929 write!(f, "underlying")
930 }
931 }
932
933 impl std::error::Error for Underlying {}
934
935 #[test]
936 fn display_without_message() {
937 let err = Error::new(ErrorKind::Internal);
938 assert_eq!(err.to_string(), "Internal");
939 }
940
941 #[test]
942 fn display_with_message() {
943 let err = Error::new(ErrorKind::ChannelEmpty).with_message("no messages");
944 assert_eq!(err.to_string(), "ChannelEmpty: no messages");
945 }
946
947 #[test]
948 fn source_chain_is_exposed() {
949 let err = Error::new(ErrorKind::User)
950 .with_message("outer")
951 .with_source(Underlying);
952 let source = err.source().expect("source missing");
953 assert_eq!(source.to_string(), "underlying");
954 }
955
956 #[test]
957 fn from_recv_error() {
958 let disconnected: Error = RecvError::Disconnected.into();
959 assert_eq!(disconnected.kind(), ErrorKind::ChannelClosed);
960
961 let empty: Error = RecvError::Empty.into();
962 assert_eq!(empty.kind(), ErrorKind::ChannelEmpty);
963 }
964
965 #[test]
966 fn from_send_error() {
967 let disconnected: Error = SendError::Disconnected(()).into();
968 assert_eq!(disconnected.kind(), ErrorKind::ChannelClosed);
969
970 let full: Error = SendError::Full(()).into();
971 assert_eq!(full.kind(), ErrorKind::ChannelFull);
972 }
973
974 #[test]
975 fn result_ext_adds_message() {
976 let res: core::result::Result<(), RecvError> = Err(RecvError::Empty);
977 let err = res.context("recv failed").expect_err("expected err");
978 assert_eq!(err.kind(), ErrorKind::ChannelEmpty);
979 assert_eq!(err.to_string(), "ChannelEmpty: recv failed");
980 }
981
982 #[test]
983 fn predicates_match_kind() {
984 let cancel = Error::new(ErrorKind::Cancelled);
985 assert!(cancel.is_cancelled());
986 assert!(!cancel.is_timeout());
987
988 let timeout = Error::new(ErrorKind::DeadlineExceeded);
989 assert!(!timeout.is_cancelled());
990 assert!(timeout.is_timeout());
991 }
992
993 #[test]
994 fn recovery_action_backoff() {
995 let action = ErrorKind::ThresholdTimeout.recovery_action();
996 assert!(matches!(action, RecoveryAction::RetryWithBackoff(_)));
997 }
998
999 #[test]
1000 fn error_context_default() {
1001 let err = Error::new(ErrorKind::Internal);
1002 assert!(err.context().task_id.is_none());
1003 }
1004
1005 #[test]
1006 fn error_with_full_context() {
1007 use crate::util::ArenaIndex;
1008
1009 let task_id = TaskId::from_arena(ArenaIndex::new(1, 0));
1010 let region_id = RegionId::from_arena(ArenaIndex::new(2, 0));
1011 let object_id = ObjectId::new_for_test(123);
1012 let symbol_id = SymbolId::new_for_test(123, 0, 1);
1013
1014 let ctx = ErrorContext {
1015 task_id: Some(task_id),
1016 region_id: Some(region_id),
1017 object_id: Some(object_id),
1018 symbol_id: Some(symbol_id),
1019 correlation_id: None,
1020 causal_chain: Vec::new(),
1021 span_id: None,
1022 parent_span_id: None,
1023 async_stack: Vec::new(),
1024 };
1025
1026 let err = Error::new(ErrorKind::Internal).with_context(ctx);
1027
1028 assert_eq!(err.context().task_id, Some(task_id));
1029 assert_eq!(err.context().region_id, Some(region_id));
1030 assert_eq!(err.context().object_id, Some(object_id));
1031 assert_eq!(err.context().symbol_id, Some(symbol_id));
1032 }
1033
1034 #[test]
1037 fn error_kind_category_coverage() {
1038 use ErrorCategory::*;
1039 let cases: &[(ErrorKind, ErrorCategory)] = &[
1040 (ErrorKind::Cancelled, Cancellation),
1041 (ErrorKind::CancelTimeout, Cancellation),
1042 (ErrorKind::DeadlineExceeded, Budget),
1043 (ErrorKind::PollQuotaExhausted, Budget),
1044 (ErrorKind::CostQuotaExhausted, Budget),
1045 (ErrorKind::ChannelClosed, Channel),
1046 (ErrorKind::ChannelFull, Channel),
1047 (ErrorKind::ChannelEmpty, Channel),
1048 (ErrorKind::ObligationLeak, Obligation),
1049 (ErrorKind::ObligationAlreadyResolved, Obligation),
1050 (ErrorKind::RegionClosed, Region),
1051 (ErrorKind::TaskNotOwned, Region),
1052 (ErrorKind::AdmissionDenied, Region),
1053 (ErrorKind::InvalidEncodingParams, Encoding),
1054 (ErrorKind::DataTooLarge, Encoding),
1055 (ErrorKind::EncodingFailed, Encoding),
1056 (ErrorKind::CorruptedSymbol, Encoding),
1057 (ErrorKind::InsufficientSymbols, Decoding),
1058 (ErrorKind::DecodingFailed, Decoding),
1059 (ErrorKind::ObjectMismatch, Decoding),
1060 (ErrorKind::DuplicateSymbol, Decoding),
1061 (ErrorKind::ThresholdTimeout, Decoding),
1062 (ErrorKind::RoutingFailed, Transport),
1063 (ErrorKind::DispatchFailed, Transport),
1064 (ErrorKind::StreamEnded, Transport),
1065 (ErrorKind::SinkRejected, Transport),
1066 (ErrorKind::ConnectionLost, Transport),
1067 (ErrorKind::ConnectionRefused, Transport),
1068 (ErrorKind::ProtocolError, Transport),
1069 (ErrorKind::RecoveryFailed, Distributed),
1070 (ErrorKind::LeaseExpired, Distributed),
1071 (ErrorKind::LeaseRenewalFailed, Distributed),
1072 (ErrorKind::CoordinationFailed, Distributed),
1073 (ErrorKind::QuorumNotReached, Distributed),
1074 (ErrorKind::NodeUnavailable, Distributed),
1075 (ErrorKind::PartitionDetected, Distributed),
1076 (ErrorKind::Internal, Internal),
1077 (ErrorKind::InvalidStateTransition, Internal),
1078 (ErrorKind::ConfigError, User),
1079 (ErrorKind::User, User),
1080 ];
1081 for (kind, expected) in cases {
1082 assert_eq!(kind.category(), *expected, "{kind:?}");
1083 }
1084 }
1085
1086 #[test]
1087 fn error_kind_recoverability_classification() {
1088 for kind in [
1090 ErrorKind::ChannelFull,
1091 ErrorKind::ChannelEmpty,
1092 ErrorKind::AdmissionDenied,
1093 ErrorKind::ConnectionLost,
1094 ErrorKind::NodeUnavailable,
1095 ErrorKind::QuorumNotReached,
1096 ErrorKind::ThresholdTimeout,
1097 ErrorKind::LeaseRenewalFailed,
1098 ] {
1099 assert_eq!(kind.recoverability(), Recoverability::Transient, "{kind:?}");
1100 assert!(kind.is_retryable(), "{kind:?} should be retryable");
1101 }
1102
1103 for kind in [
1105 ErrorKind::Cancelled,
1106 ErrorKind::ChannelClosed,
1107 ErrorKind::ObligationLeak,
1108 ErrorKind::Internal,
1109 ErrorKind::ConnectionRefused,
1110 ErrorKind::ConfigError,
1111 ] {
1112 assert_eq!(kind.recoverability(), Recoverability::Permanent, "{kind:?}");
1113 assert!(!kind.is_retryable(), "{kind:?} should not be retryable");
1114 }
1115
1116 for kind in [
1118 ErrorKind::DeadlineExceeded,
1119 ErrorKind::EncodingFailed,
1120 ErrorKind::CorruptedSymbol,
1121 ErrorKind::User,
1122 ] {
1123 assert_eq!(kind.recoverability(), Recoverability::Unknown, "{kind:?}");
1124 assert!(!kind.is_retryable(), "{kind:?} Unknown is not retryable");
1125 }
1126 }
1127
1128 #[test]
1129 fn recoverability_predicates() {
1130 assert!(Recoverability::Transient.should_retry());
1131 assert!(!Recoverability::Transient.is_permanent());
1132
1133 assert!(!Recoverability::Permanent.should_retry());
1134 assert!(Recoverability::Permanent.is_permanent());
1135
1136 assert!(!Recoverability::Unknown.should_retry());
1137 assert!(!Recoverability::Unknown.is_permanent());
1138 }
1139
1140 #[test]
1141 fn recovery_action_variants() {
1142 assert!(matches!(
1143 ErrorKind::ChannelFull.recovery_action(),
1144 RecoveryAction::RetryImmediately
1145 ));
1146 assert!(matches!(
1147 ErrorKind::AdmissionDenied.recovery_action(),
1148 RecoveryAction::RetryWithBackoff(_)
1149 ));
1150 assert!(matches!(
1151 ErrorKind::NodeUnavailable.recovery_action(),
1152 RecoveryAction::RetryWithBackoff(_)
1153 ));
1154 assert!(matches!(
1155 ErrorKind::ConnectionLost.recovery_action(),
1156 RecoveryAction::RetryWithNewConnection
1157 ));
1158 assert!(matches!(
1159 ErrorKind::Cancelled.recovery_action(),
1160 RecoveryAction::Propagate
1161 ));
1162 assert!(matches!(
1163 ErrorKind::ObligationLeak.recovery_action(),
1164 RecoveryAction::Escalate
1165 ));
1166 assert!(matches!(
1167 ErrorKind::User.recovery_action(),
1168 RecoveryAction::Custom
1169 ));
1170 }
1171
1172 #[test]
1173 fn backoff_hint_constants() {
1174 let d = BackoffHint::DEFAULT;
1175 assert_eq!(d.initial_delay_ms, 100);
1176 assert_eq!(d.max_delay_ms, 30_000);
1177 assert_eq!(d.max_attempts, 5);
1178
1179 let a = BackoffHint::AGGRESSIVE;
1180 assert!(a.initial_delay_ms > d.initial_delay_ms);
1181 assert!(a.max_attempts > d.max_attempts);
1182
1183 let q = BackoffHint::QUICK;
1184 assert!(q.initial_delay_ms < d.initial_delay_ms);
1185 assert!(q.max_attempts < d.max_attempts);
1186
1187 assert_eq!(BackoffHint::default(), BackoffHint::DEFAULT);
1188 }
1189
1190 #[test]
1193 fn error_data_too_large() {
1194 let err = Error::data_too_large(2000, 1000);
1195 assert_eq!(err.kind(), ErrorKind::DataTooLarge);
1196 let msg = err.to_string();
1197 assert!(msg.contains("2000"), "{msg}");
1198 assert!(msg.contains("1000"), "{msg}");
1199 }
1200
1201 #[test]
1202 fn error_insufficient_symbols() {
1203 let err = Error::insufficient_symbols(5, 10);
1204 assert_eq!(err.kind(), ErrorKind::InsufficientSymbols);
1205 let msg = err.to_string();
1206 assert!(msg.contains('5'), "{msg}");
1207 assert!(msg.contains("10"), "{msg}");
1208 }
1209
1210 #[test]
1211 fn error_routing_failed() {
1212 let err = Error::routing_failed("node-7");
1213 assert_eq!(err.kind(), ErrorKind::RoutingFailed);
1214 assert!(err.to_string().contains("node-7"));
1215 }
1216
1217 #[test]
1218 fn error_lease_expired() {
1219 let err = Error::lease_expired("lease-42");
1220 assert_eq!(err.kind(), ErrorKind::LeaseExpired);
1221 assert!(err.to_string().contains("lease-42"));
1222 }
1223
1224 #[test]
1225 fn error_quorum_not_reached() {
1226 let err = Error::quorum_not_reached(2, 3);
1227 assert_eq!(err.kind(), ErrorKind::QuorumNotReached);
1228 let msg = err.to_string();
1229 assert!(msg.contains('2'), "{msg}");
1230 assert!(msg.contains('3'), "{msg}");
1231 }
1232
1233 #[test]
1234 fn error_node_unavailable() {
1235 let err = Error::node_unavailable("node-1");
1236 assert_eq!(err.kind(), ErrorKind::NodeUnavailable);
1237 assert!(err.to_string().contains("node-1"));
1238 }
1239
1240 #[test]
1241 fn error_internal() {
1242 let err = Error::internal("bug found");
1243 assert_eq!(err.kind(), ErrorKind::Internal);
1244 assert!(err.to_string().contains("bug found"));
1245 }
1246
1247 #[test]
1250 fn error_is_predicates() {
1251 assert!(Error::new(ErrorKind::EncodingFailed).is_encoding_error());
1252 assert!(!Error::new(ErrorKind::DecodingFailed).is_encoding_error());
1253
1254 assert!(Error::new(ErrorKind::InsufficientSymbols).is_decoding_error());
1255 assert!(!Error::new(ErrorKind::EncodingFailed).is_decoding_error());
1256
1257 assert!(Error::new(ErrorKind::RoutingFailed).is_transport_error());
1258 assert!(!Error::new(ErrorKind::Internal).is_transport_error());
1259
1260 assert!(Error::new(ErrorKind::QuorumNotReached).is_distributed_error());
1261 assert!(!Error::new(ErrorKind::ChannelFull).is_distributed_error());
1262
1263 assert!(Error::new(ErrorKind::ConnectionLost).is_connection_error());
1264 assert!(Error::new(ErrorKind::ConnectionRefused).is_connection_error());
1265 assert!(!Error::new(ErrorKind::RoutingFailed).is_connection_error());
1266 }
1267
1268 #[test]
1269 fn error_cancel_timeout_is_timeout() {
1270 assert!(Error::new(ErrorKind::CancelTimeout).is_timeout());
1271 assert!(!Error::new(ErrorKind::CancelTimeout).is_cancelled());
1272 }
1273
1274 #[test]
1277 fn recv_error_cancelled_conversion() {
1278 let err: Error = RecvError::Cancelled.into();
1279 assert_eq!(err.kind(), ErrorKind::Cancelled);
1280 }
1281
1282 #[test]
1283 fn send_error_cancelled_conversion() {
1284 let err: Error = SendError::Cancelled(42u32).into();
1285 assert_eq!(err.kind(), ErrorKind::Cancelled);
1286 }
1287
1288 #[test]
1289 fn cancelled_struct_into_error() {
1290 let reason = CancelReason::user("test cancel");
1291 let cancelled = Cancelled { reason };
1292 let err: Error = cancelled.into();
1293 assert_eq!(err.kind(), ErrorKind::Cancelled);
1294 assert!(err.to_string().contains("Cancelled"));
1295 }
1296
1297 #[test]
1298 fn result_ext_with_context_lazy() {
1299 let res: core::result::Result<(), RecvError> = Err(RecvError::Empty);
1300 let err = res
1301 .with_context(|| format!("lazy {}", "context"))
1302 .expect_err("expected err");
1303 assert_eq!(err.kind(), ErrorKind::ChannelEmpty);
1304 assert!(err.to_string().contains("lazy context"));
1305 }
1306
1307 #[test]
1310 fn error_category_debug() {
1311 for cat in [
1312 ErrorCategory::Cancellation,
1313 ErrorCategory::Budget,
1314 ErrorCategory::Channel,
1315 ErrorCategory::Obligation,
1316 ErrorCategory::Region,
1317 ErrorCategory::Encoding,
1318 ErrorCategory::Decoding,
1319 ErrorCategory::Transport,
1320 ErrorCategory::Distributed,
1321 ErrorCategory::Internal,
1322 ErrorCategory::User,
1323 ] {
1324 let dbg = format!("{cat:?}");
1325 assert!(!dbg.is_empty());
1326 }
1327 }
1328
1329 #[test]
1330 fn acquire_error_debug_eq() {
1331 let err = AcquireError::Closed;
1332 let dbg = format!("{err:?}");
1333 assert!(dbg.contains("Closed"), "{dbg}");
1334 assert_eq!(err, AcquireError::Closed);
1335 }
1336
1337 #[test]
1338 fn error_clone() {
1339 let err = Error::new(ErrorKind::Internal).with_message("clone me");
1340 let cloned = err.clone();
1341 assert_eq!(cloned.kind(), ErrorKind::Internal);
1342 assert_eq!(cloned.to_string(), err.to_string());
1343 }
1344
1345 #[test]
1346 fn error_no_message() {
1347 let err = Error::new(ErrorKind::User);
1348 assert!(err.message().is_none());
1349 }
1350
1351 #[test]
1352 fn error_source_none_without_with_source() {
1353 let err = Error::new(ErrorKind::User);
1354 assert!(err.source().is_none());
1355 }
1356
1357 #[test]
1360 fn error_kind_copy_hash() {
1361 use std::collections::HashSet;
1362 let kind = ErrorKind::Internal;
1363 let copied = kind;
1364 assert_eq!(copied, ErrorKind::Internal);
1365
1366 let mut set = HashSet::new();
1367 set.insert(ErrorKind::Cancelled);
1368 set.insert(ErrorKind::DeadlineExceeded);
1369 set.insert(ErrorKind::Cancelled); assert_eq!(set.len(), 2);
1371 }
1372
1373 #[test]
1374 fn recoverability_copy_hash_eq() {
1375 use std::collections::HashSet;
1376 let r = Recoverability::Transient;
1377 let copied = r;
1378 assert_eq!(copied, Recoverability::Transient);
1379 assert_ne!(r, Recoverability::Permanent);
1380
1381 let mut set = HashSet::new();
1382 set.insert(Recoverability::Transient);
1383 set.insert(Recoverability::Permanent);
1384 set.insert(Recoverability::Unknown);
1385 assert_eq!(set.len(), 3);
1386 }
1387
1388 #[test]
1389 fn recovery_action_copy_hash() {
1390 use std::collections::HashSet;
1391 let action = RecoveryAction::Propagate;
1392 let copied = action;
1393 assert_eq!(copied, RecoveryAction::Propagate);
1394
1395 let mut set = HashSet::new();
1396 set.insert(RecoveryAction::RetryImmediately);
1397 set.insert(RecoveryAction::Propagate);
1398 set.insert(RecoveryAction::Escalate);
1399 set.insert(RecoveryAction::Custom);
1400 assert_eq!(set.len(), 4);
1401 }
1402
1403 #[test]
1404 fn error_category_copy_clone_hash() {
1405 use std::collections::HashSet;
1406 let cat = ErrorCategory::Transport;
1407 let copied = cat;
1408 let cloned = cat;
1409 assert_eq!(copied, cloned);
1410
1411 let mut set = HashSet::new();
1412 set.insert(ErrorCategory::Cancellation);
1413 set.insert(ErrorCategory::Budget);
1414 set.insert(ErrorCategory::Channel);
1415 assert_eq!(set.len(), 3);
1416 }
1417
1418 #[test]
1419 fn backoff_hint_copy_hash_eq() {
1420 use std::collections::HashSet;
1421 let hint = BackoffHint::DEFAULT;
1422 let copied = hint;
1423 assert_eq!(copied, BackoffHint::DEFAULT);
1424 assert_ne!(hint, BackoffHint::AGGRESSIVE);
1425
1426 let mut set = HashSet::new();
1427 set.insert(BackoffHint::DEFAULT);
1428 set.insert(BackoffHint::AGGRESSIVE);
1429 set.insert(BackoffHint::QUICK);
1430 assert_eq!(set.len(), 3);
1431 }
1432
1433 #[test]
1434 fn recv_error_debug_clone_copy() {
1435 let err = RecvError::Disconnected;
1436 let dbg = format!("{err:?}");
1437 assert!(dbg.contains("Disconnected"));
1438
1439 let copied = err;
1440 assert_eq!(copied, RecvError::Disconnected);
1441
1442 let cloned = err;
1443 assert_eq!(cloned, err);
1444 }
1445
1446 #[test]
1447 fn cancelled_clone_eq() {
1448 let c = Cancelled {
1449 reason: CancelReason::user("test"),
1450 };
1451 let dbg = format!("{c:?}");
1452 assert!(dbg.contains("Cancelled"));
1453
1454 let cloned = c.clone();
1455 assert_eq!(cloned, c);
1456 }
1457
1458 #[test]
1459 fn error_context_auto_correlation() {
1460 let ctx = ErrorContext::new();
1461 assert!(ctx.correlation_id.is_some());
1462 assert!(ctx.causal_chain.is_empty());
1463 assert!(ctx.async_stack.is_empty());
1464 }
1465
1466 #[test]
1467 fn error_context_derive_child() {
1468 let parent = ErrorContext::new();
1469 let parent_id = parent.correlation_id.unwrap();
1470
1471 let child = parent.derive_child("async_operation");
1472
1473 assert!(child.correlation_id.is_some());
1475 assert_ne!(child.correlation_id, parent.correlation_id);
1476
1477 assert_eq!(child.causal_chain, vec![parent_id]);
1479
1480 assert_eq!(child.async_stack, vec!["async_operation"]);
1482
1483 assert!(child.span_id.is_some());
1485 assert_eq!(child.parent_span_id, parent.span_id);
1486 }
1487
1488 #[test]
1489 fn error_context_causal_chain() {
1490 let root = ErrorContext::new();
1491 let child = root.derive_child("level1");
1492 let grandchild = child.derive_child("level2");
1493
1494 let root_id = root.correlation_id.unwrap();
1495 let child_id = child.correlation_id.unwrap();
1496
1497 let chain = grandchild.full_causal_chain();
1498 assert_eq!(
1499 chain,
1500 vec![root_id, child_id, grandchild.correlation_id.unwrap()]
1501 );
1502
1503 assert_eq!(grandchild.root_correlation_id(), Some(root_id));
1504 }
1505
1506 #[test]
1507 fn error_context_async_stack_trace() {
1508 let ctx = ErrorContext::new()
1509 .with_operation("spawn_task")
1510 .with_operation("process_request");
1511
1512 let trace = ctx.format_async_stack();
1513 assert_eq!(trace, "spawn_task -> process_request");
1514 }
1515
1516 #[test]
1517 fn error_propagate_across_async() {
1518 let error = Error::new(ErrorKind::Internal).with_operation("initial_operation");
1519
1520 let propagated = error.propagate_across_async("async_boundary");
1521
1522 let chain = propagated.causal_chain();
1524 assert!(!chain.is_empty());
1525
1526 let stack = propagated.async_stack();
1528 assert!(stack.contains("async_boundary"));
1529 }
1530
1531 #[test]
1532 fn error_correlation_tracking() {
1533 let err1 = Error::new(ErrorKind::ChannelClosed);
1534 let err2 = Error::new(ErrorKind::Internal);
1535
1536 assert_ne!(err1.correlation_id(), err2.correlation_id());
1538 assert!(err1.correlation_id().is_some());
1539 assert!(err2.correlation_id().is_some());
1540 }
1541
1542 #[test]
1543 fn error_with_operations() {
1544 let error = Error::new(ErrorKind::DecodingFailed)
1545 .with_operation("read_symbol")
1546 .with_operation("decode_block");
1547
1548 let stack = error.async_stack();
1549 assert_eq!(stack, "read_symbol -> decode_block");
1550 }
1551}