1use core::fmt;
34use std::sync::Arc;
35
36use crate::types::symbol::{ObjectId, SymbolId};
37use crate::types::{CancelReason, RegionId, TaskId};
38
39pub mod recovery;
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
43pub enum ErrorKind {
44 Cancelled,
47 CancelTimeout,
49
50 DeadlineExceeded,
53 PollQuotaExhausted,
55 CostQuotaExhausted,
57
58 ChannelClosed,
61 ChannelFull,
63 ChannelEmpty,
65
66 ObligationLeak,
69 ObligationAlreadyResolved,
71
72 RegionClosed,
75 TaskNotOwned,
77 AdmissionDenied,
79
80 InvalidEncodingParams,
83 DataTooLarge,
85 EncodingFailed,
87 CorruptedSymbol,
89
90 InsufficientSymbols,
93 DecodingFailed,
95 ObjectMismatch,
97 DuplicateSymbol,
99 ThresholdTimeout,
101
102 RoutingFailed,
105 DispatchFailed,
107 StreamEnded,
109 SinkRejected,
111 ConnectionLost,
113 ConnectionRefused,
115 ProtocolError,
117
118 RecoveryFailed,
121 LeaseExpired,
123 LeaseRenewalFailed,
125 CoordinationFailed,
127 QuorumNotReached,
129 NodeUnavailable,
131 PartitionDetected,
133
134 Internal,
137 InvalidStateTransition,
139
140 ConfigError,
143
144 User,
147}
148
149impl ErrorKind {
150 #[must_use]
152 pub const fn category(&self) -> ErrorCategory {
153 match self {
154 Self::Cancelled | Self::CancelTimeout => ErrorCategory::Cancellation,
155 Self::DeadlineExceeded | Self::PollQuotaExhausted | Self::CostQuotaExhausted => {
156 ErrorCategory::Budget
157 }
158 Self::ChannelClosed | Self::ChannelFull | Self::ChannelEmpty => ErrorCategory::Channel,
159 Self::ObligationLeak | Self::ObligationAlreadyResolved => ErrorCategory::Obligation,
160 Self::RegionClosed | Self::TaskNotOwned | Self::AdmissionDenied => {
161 ErrorCategory::Region
162 }
163 Self::InvalidEncodingParams
164 | Self::DataTooLarge
165 | Self::EncodingFailed
166 | Self::CorruptedSymbol => ErrorCategory::Encoding,
167 Self::InsufficientSymbols
168 | Self::DecodingFailed
169 | Self::ObjectMismatch
170 | Self::DuplicateSymbol
171 | Self::ThresholdTimeout => ErrorCategory::Decoding,
172 Self::RoutingFailed
173 | Self::DispatchFailed
174 | Self::StreamEnded
175 | Self::SinkRejected
176 | Self::ConnectionLost
177 | Self::ConnectionRefused
178 | Self::ProtocolError => ErrorCategory::Transport,
179 Self::RecoveryFailed
180 | Self::LeaseExpired
181 | Self::LeaseRenewalFailed
182 | Self::CoordinationFailed
183 | Self::QuorumNotReached
184 | Self::NodeUnavailable
185 | Self::PartitionDetected => ErrorCategory::Distributed,
186 Self::Internal | Self::InvalidStateTransition => ErrorCategory::Internal,
187 Self::ConfigError | Self::User => ErrorCategory::User,
188 }
189 }
190
191 #[must_use]
195 pub const fn recoverability(&self) -> Recoverability {
196 match self {
197 Self::ChannelFull
199 | Self::ChannelEmpty
200 | Self::AdmissionDenied
201 | Self::ConnectionLost
202 | Self::NodeUnavailable
203 | Self::QuorumNotReached
204 | Self::ThresholdTimeout
205 | Self::LeaseRenewalFailed => Recoverability::Transient,
206
207 Self::Cancelled
209 | Self::CancelTimeout
210 | Self::ChannelClosed
211 | Self::ObligationLeak
212 | Self::ObligationAlreadyResolved
213 | Self::RegionClosed
214 | Self::InvalidEncodingParams
215 | Self::DataTooLarge
216 | Self::ObjectMismatch
217 | Self::Internal
218 | Self::InvalidStateTransition
219 | Self::ProtocolError
220 | Self::ConnectionRefused
221 | Self::ConfigError => Recoverability::Permanent,
222
223 Self::DeadlineExceeded
225 | Self::PollQuotaExhausted
226 | Self::CostQuotaExhausted
227 | Self::TaskNotOwned
228 | Self::EncodingFailed
229 | Self::CorruptedSymbol
230 | Self::InsufficientSymbols
231 | Self::DecodingFailed
232 | Self::DuplicateSymbol
233 | Self::RoutingFailed
234 | Self::DispatchFailed
235 | Self::StreamEnded
236 | Self::SinkRejected
237 | Self::RecoveryFailed
238 | Self::LeaseExpired
239 | Self::CoordinationFailed
240 | Self::PartitionDetected
241 | Self::User => Recoverability::Unknown,
242 }
243 }
244
245 #[must_use]
247 pub const fn is_retryable(&self) -> bool {
248 matches!(self.recoverability(), Recoverability::Transient)
249 }
250
251 #[must_use]
256 pub const fn recovery_action(&self) -> RecoveryAction {
257 match self {
258 Self::ChannelFull | Self::ChannelEmpty => RecoveryAction::RetryImmediately,
260
261 Self::AdmissionDenied
263 | Self::ThresholdTimeout
264 | Self::QuorumNotReached
265 | Self::LeaseRenewalFailed => RecoveryAction::RetryWithBackoff(BackoffHint::DEFAULT),
266 Self::NodeUnavailable => RecoveryAction::RetryWithBackoff(BackoffHint::AGGRESSIVE),
267
268 Self::ConnectionLost | Self::StreamEnded => RecoveryAction::RetryWithNewConnection,
270
271 Self::Cancelled
273 | Self::CancelTimeout
274 | Self::DeadlineExceeded
275 | Self::PollQuotaExhausted
276 | Self::CostQuotaExhausted
277 | Self::ChannelClosed
278 | Self::RegionClosed
279 | Self::InvalidEncodingParams
280 | Self::DataTooLarge
281 | Self::ObjectMismatch
282 | Self::ConnectionRefused
283 | Self::ProtocolError
284 | Self::LeaseExpired
285 | Self::PartitionDetected
286 | Self::ConfigError => RecoveryAction::Propagate,
287
288 Self::ObligationLeak
290 | Self::ObligationAlreadyResolved
291 | Self::Internal
292 | Self::InvalidStateTransition => RecoveryAction::Escalate,
293
294 Self::TaskNotOwned
296 | Self::EncodingFailed
297 | Self::CorruptedSymbol
298 | Self::InsufficientSymbols
299 | Self::DecodingFailed
300 | Self::DuplicateSymbol
301 | Self::RoutingFailed
302 | Self::DispatchFailed
303 | Self::SinkRejected
304 | Self::RecoveryFailed
305 | Self::CoordinationFailed
306 | Self::User => RecoveryAction::Custom,
307 }
308 }
309}
310
311#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
316pub enum Recoverability {
317 Transient,
319 Permanent,
321 Unknown,
324}
325
326#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
328pub enum RecoveryAction {
329 RetryImmediately,
331 RetryWithBackoff(BackoffHint),
333 RetryWithNewConnection,
335 Propagate,
337 Escalate,
339 Custom,
341}
342
343#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
345pub struct BackoffHint {
346 pub initial_delay_ms: u32,
348 pub max_delay_ms: u32,
350 pub max_attempts: u8,
352}
353
354impl BackoffHint {
355 pub const DEFAULT: Self = Self {
357 initial_delay_ms: 100,
358 max_delay_ms: 30_000,
359 max_attempts: 5,
360 };
361
362 pub const AGGRESSIVE: Self = Self {
364 initial_delay_ms: 1_000,
365 max_delay_ms: 60_000,
366 max_attempts: 10,
367 };
368
369 pub const QUICK: Self = Self {
371 initial_delay_ms: 10,
372 max_delay_ms: 1_000,
373 max_attempts: 3,
374 };
375}
376
377impl Default for BackoffHint {
378 fn default() -> Self {
379 Self::DEFAULT
380 }
381}
382
383impl Recoverability {
384 #[must_use]
386 pub const fn should_retry(&self) -> bool {
387 matches!(self, Self::Transient)
388 }
389
390 #[must_use]
392 pub const fn is_permanent(&self) -> bool {
393 matches!(self, Self::Permanent)
394 }
395}
396
397#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
399pub enum ErrorCategory {
400 Cancellation,
402 Budget,
404 Channel,
406 Obligation,
408 Region,
410 Encoding,
412 Decoding,
414 Transport,
416 Distributed,
418 Internal,
420 User,
422}
423
424#[derive(Debug, Clone, Default, PartialEq, Eq)]
426pub struct ErrorContext {
427 pub task_id: Option<TaskId>,
429 pub region_id: Option<RegionId>,
431 pub object_id: Option<ObjectId>,
433 pub symbol_id: Option<SymbolId>,
435}
436
437#[derive(Debug, Clone)]
439pub struct Error {
440 kind: ErrorKind,
441 message: Option<String>,
442 source: Option<Arc<dyn std::error::Error + Send + Sync>>,
443 context: ErrorContext,
444}
445
446impl Error {
447 #[must_use]
449 pub const fn new(kind: ErrorKind) -> Self {
450 Self {
451 kind,
452 message: None,
453 source: None,
454 context: ErrorContext {
455 task_id: None,
456 region_id: None,
457 object_id: None,
458 symbol_id: None,
459 },
460 }
461 }
462
463 #[must_use]
465 pub const fn kind(&self) -> ErrorKind {
466 self.kind
467 }
468
469 #[must_use]
471 pub const fn is_cancelled(&self) -> bool {
472 matches!(self.kind, ErrorKind::Cancelled)
473 }
474
475 #[must_use]
477 pub const fn is_timeout(&self) -> bool {
478 matches!(
479 self.kind,
480 ErrorKind::DeadlineExceeded | ErrorKind::CancelTimeout
481 )
482 }
483
484 #[must_use]
486 pub fn with_message(mut self, msg: impl Into<String>) -> Self {
487 self.message = Some(msg.into());
488 self
489 }
490
491 #[must_use]
493 pub fn with_context(mut self, ctx: ErrorContext) -> Self {
494 self.context = ctx;
495 self
496 }
497
498 #[must_use]
500 pub fn with_source(mut self, source: impl std::error::Error + Send + Sync + 'static) -> Self {
501 self.source = Some(Arc::new(source));
502 self
503 }
504
505 #[must_use]
507 pub fn cancelled(reason: &CancelReason) -> Self {
508 Self::new(ErrorKind::Cancelled).with_message(format!("{reason}"))
509 }
510
511 #[must_use]
513 pub const fn category(&self) -> ErrorCategory {
514 self.kind.category()
515 }
516
517 #[must_use]
519 pub const fn recoverability(&self) -> Recoverability {
520 self.kind.recoverability()
521 }
522
523 #[must_use]
525 pub const fn is_retryable(&self) -> bool {
526 self.kind.is_retryable()
527 }
528
529 #[must_use]
531 pub const fn recovery_action(&self) -> RecoveryAction {
532 self.kind.recovery_action()
533 }
534
535 #[must_use]
537 pub fn message(&self) -> Option<&str> {
538 self.message.as_deref()
539 }
540
541 #[must_use]
543 pub fn context(&self) -> &ErrorContext {
544 &self.context
545 }
546
547 #[must_use]
549 pub const fn is_encoding_error(&self) -> bool {
550 matches!(self.kind.category(), ErrorCategory::Encoding)
551 }
552
553 #[must_use]
555 pub const fn is_decoding_error(&self) -> bool {
556 matches!(self.kind.category(), ErrorCategory::Decoding)
557 }
558
559 #[must_use]
561 pub const fn is_transport_error(&self) -> bool {
562 matches!(self.kind.category(), ErrorCategory::Transport)
563 }
564
565 #[must_use]
567 pub const fn is_distributed_error(&self) -> bool {
568 matches!(self.kind.category(), ErrorCategory::Distributed)
569 }
570
571 #[must_use]
573 pub const fn is_connection_error(&self) -> bool {
574 matches!(
575 self.kind,
576 ErrorKind::ConnectionLost | ErrorKind::ConnectionRefused
577 )
578 }
579
580 #[must_use]
582 pub fn invalid_encoding_params(detail: impl Into<String>) -> Self {
583 Self::new(ErrorKind::InvalidEncodingParams).with_message(detail)
584 }
585
586 #[must_use]
588 pub fn data_too_large(actual: u64, max: u64) -> Self {
589 Self::new(ErrorKind::DataTooLarge)
590 .with_message(format!("data size {actual} exceeds maximum {max}"))
591 }
592
593 #[must_use]
595 pub fn insufficient_symbols(received: u32, needed: u32) -> Self {
596 Self::new(ErrorKind::InsufficientSymbols).with_message(format!(
597 "received {received} symbols, need at least {needed}"
598 ))
599 }
600
601 #[must_use]
603 pub fn decoding_failed(reason: impl Into<String>) -> Self {
604 Self::new(ErrorKind::DecodingFailed).with_message(reason)
605 }
606
607 #[must_use]
609 pub fn routing_failed(destination: impl Into<String>) -> Self {
610 Self::new(ErrorKind::RoutingFailed)
611 .with_message(format!("no route to destination: {}", destination.into()))
612 }
613
614 #[must_use]
616 pub fn lease_expired(lease_id: impl Into<String>) -> Self {
617 Self::new(ErrorKind::LeaseExpired)
618 .with_message(format!("lease expired: {}", lease_id.into()))
619 }
620
621 #[must_use]
623 pub fn quorum_not_reached(achieved: u32, needed: u32) -> Self {
624 Self::new(ErrorKind::QuorumNotReached)
625 .with_message(format!("achieved {achieved} of {needed} required"))
626 }
627
628 #[must_use]
630 pub fn node_unavailable(node_id: impl Into<String>) -> Self {
631 Self::new(ErrorKind::NodeUnavailable)
632 .with_message(format!("node unavailable: {}", node_id.into()))
633 }
634
635 #[must_use]
637 pub fn internal(detail: impl Into<String>) -> Self {
638 Self::new(ErrorKind::Internal).with_message(detail)
639 }
640}
641
642impl fmt::Display for Error {
643 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
644 write!(f, "{:?}", self.kind)?;
645 if let Some(msg) = &self.message {
646 write!(f, ": {msg}")?;
647 }
648 Ok(())
649 }
650}
651
652impl std::error::Error for Error {
653 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
654 self.source.as_ref().map(|e| e.as_ref() as _)
655 }
656}
657
658#[derive(Debug, Clone, PartialEq, Eq)]
660pub struct Cancelled {
661 pub reason: CancelReason,
663}
664
665impl From<Cancelled> for Error {
666 fn from(c: Cancelled) -> Self {
667 Self::cancelled(&c.reason)
668 }
669}
670
671#[derive(Debug)]
673pub enum SendError<T> {
674 Disconnected(T),
676 Full(T),
678 Cancelled(T),
680}
681
682#[derive(Debug, Clone, Copy, PartialEq, Eq)]
684pub enum RecvError {
685 Disconnected,
687 Empty,
689 Cancelled,
691}
692
693#[derive(Debug, Clone, Copy, PartialEq, Eq)]
695pub enum AcquireError {
696 Closed,
698}
699
700impl From<RecvError> for Error {
701 fn from(e: RecvError) -> Self {
702 match e {
703 RecvError::Disconnected => Self::new(ErrorKind::ChannelClosed),
704 RecvError::Empty => Self::new(ErrorKind::ChannelEmpty),
705 RecvError::Cancelled => Self::new(ErrorKind::Cancelled),
706 }
707 }
708}
709
710impl<T> From<SendError<T>> for Error {
711 fn from(e: SendError<T>) -> Self {
712 match e {
713 SendError::Disconnected(_) => Self::new(ErrorKind::ChannelClosed),
714 SendError::Full(_) => Self::new(ErrorKind::ChannelFull),
715 SendError::Cancelled(_) => Self::new(ErrorKind::Cancelled),
716 }
717 }
718}
719
720#[allow(clippy::result_large_err)]
722pub trait ResultExt<T> {
723 fn context(self, msg: impl Into<String>) -> Result<T>;
725 fn with_context<F: FnOnce() -> String>(self, f: F) -> Result<T>;
727}
728
729impl<T, E: Into<Error>> ResultExt<T> for core::result::Result<T, E> {
730 fn context(self, msg: impl Into<String>) -> Result<T> {
731 self.map_err(|e| e.into().with_message(msg))
732 }
733
734 fn with_context<F: FnOnce() -> String>(self, f: F) -> Result<T> {
735 self.map_err(|e| e.into().with_message(f()))
736 }
737}
738
739#[allow(clippy::result_large_err)]
741pub type Result<T> = core::result::Result<T, Error>;
742
743#[cfg(test)]
744mod tests {
745 use super::*;
746 use std::error::Error as _;
747
748 #[derive(Debug)]
749 struct Underlying;
750
751 impl fmt::Display for Underlying {
752 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
753 write!(f, "underlying")
754 }
755 }
756
757 impl std::error::Error for Underlying {}
758
759 #[test]
760 fn display_without_message() {
761 let err = Error::new(ErrorKind::Internal);
762 assert_eq!(err.to_string(), "Internal");
763 }
764
765 #[test]
766 fn display_with_message() {
767 let err = Error::new(ErrorKind::ChannelEmpty).with_message("no messages");
768 assert_eq!(err.to_string(), "ChannelEmpty: no messages");
769 }
770
771 #[test]
772 fn source_chain_is_exposed() {
773 let err = Error::new(ErrorKind::User)
774 .with_message("outer")
775 .with_source(Underlying);
776 let source = err.source().expect("source missing");
777 assert_eq!(source.to_string(), "underlying");
778 }
779
780 #[test]
781 fn from_recv_error() {
782 let disconnected: Error = RecvError::Disconnected.into();
783 assert_eq!(disconnected.kind(), ErrorKind::ChannelClosed);
784
785 let empty: Error = RecvError::Empty.into();
786 assert_eq!(empty.kind(), ErrorKind::ChannelEmpty);
787 }
788
789 #[test]
790 fn from_send_error() {
791 let disconnected: Error = SendError::Disconnected(()).into();
792 assert_eq!(disconnected.kind(), ErrorKind::ChannelClosed);
793
794 let full: Error = SendError::Full(()).into();
795 assert_eq!(full.kind(), ErrorKind::ChannelFull);
796 }
797
798 #[test]
799 fn result_ext_adds_message() {
800 let res: core::result::Result<(), RecvError> = Err(RecvError::Empty);
801 let err = res.context("recv failed").expect_err("expected err");
802 assert_eq!(err.kind(), ErrorKind::ChannelEmpty);
803 assert_eq!(err.to_string(), "ChannelEmpty: recv failed");
804 }
805
806 #[test]
807 fn predicates_match_kind() {
808 let cancel = Error::new(ErrorKind::Cancelled);
809 assert!(cancel.is_cancelled());
810 assert!(!cancel.is_timeout());
811
812 let timeout = Error::new(ErrorKind::DeadlineExceeded);
813 assert!(!timeout.is_cancelled());
814 assert!(timeout.is_timeout());
815 }
816
817 #[test]
818 fn recovery_action_backoff() {
819 let action = ErrorKind::ThresholdTimeout.recovery_action();
820 assert!(matches!(action, RecoveryAction::RetryWithBackoff(_)));
821 }
822
823 #[test]
824 fn error_context_default() {
825 let err = Error::new(ErrorKind::Internal);
826 assert!(err.context().task_id.is_none());
827 }
828
829 #[test]
830 fn error_with_full_context() {
831 use crate::util::ArenaIndex;
832
833 let task_id = TaskId::from_arena(ArenaIndex::new(1, 0));
834 let region_id = RegionId::from_arena(ArenaIndex::new(2, 0));
835 let object_id = ObjectId::new_for_test(123);
836 let symbol_id = SymbolId::new_for_test(123, 0, 1);
837
838 let ctx = ErrorContext {
839 task_id: Some(task_id),
840 region_id: Some(region_id),
841 object_id: Some(object_id),
842 symbol_id: Some(symbol_id),
843 };
844
845 let err = Error::new(ErrorKind::Internal).with_context(ctx);
846
847 assert_eq!(err.context().task_id, Some(task_id));
848 assert_eq!(err.context().region_id, Some(region_id));
849 assert_eq!(err.context().object_id, Some(object_id));
850 assert_eq!(err.context().symbol_id, Some(symbol_id));
851 }
852}