use core::fmt;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use crate::observability::SpanId;
use crate::types::symbol::{ObjectId, SymbolId};
use crate::types::{CancelReason, RegionId, TaskId};
pub mod recovery;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ErrorKind {
Cancelled,
CancelTimeout,
DeadlineExceeded,
PollQuotaExhausted,
CostQuotaExhausted,
ChannelClosed,
ChannelFull,
ChannelEmpty,
ObligationLeak,
ObligationAlreadyResolved,
RegionClosed,
TaskNotOwned,
AdmissionDenied,
InvalidEncodingParams,
DataTooLarge,
EncodingFailed,
CorruptedSymbol,
InsufficientSymbols,
DecodingFailed,
ObjectMismatch,
DuplicateSymbol,
ThresholdTimeout,
RoutingFailed,
DispatchFailed,
StreamEnded,
SinkRejected,
ConnectionLost,
ConnectionRefused,
ProtocolError,
RecoveryFailed,
LeaseExpired,
LeaseRenewalFailed,
CoordinationFailed,
QuorumNotReached,
NodeUnavailable,
PartitionDetected,
Internal,
InvalidStateTransition,
ConfigError,
User,
}
impl ErrorKind {
#[must_use]
#[inline]
pub const fn category(&self) -> ErrorCategory {
match self {
Self::Cancelled | Self::CancelTimeout => ErrorCategory::Cancellation,
Self::DeadlineExceeded | Self::PollQuotaExhausted | Self::CostQuotaExhausted => {
ErrorCategory::Budget
}
Self::ChannelClosed | Self::ChannelFull | Self::ChannelEmpty => ErrorCategory::Channel,
Self::ObligationLeak | Self::ObligationAlreadyResolved => ErrorCategory::Obligation,
Self::RegionClosed | Self::TaskNotOwned | Self::AdmissionDenied => {
ErrorCategory::Region
}
Self::InvalidEncodingParams
| Self::DataTooLarge
| Self::EncodingFailed
| Self::CorruptedSymbol => ErrorCategory::Encoding,
Self::InsufficientSymbols
| Self::DecodingFailed
| Self::ObjectMismatch
| Self::DuplicateSymbol
| Self::ThresholdTimeout => ErrorCategory::Decoding,
Self::RoutingFailed
| Self::DispatchFailed
| Self::StreamEnded
| Self::SinkRejected
| Self::ConnectionLost
| Self::ConnectionRefused
| Self::ProtocolError => ErrorCategory::Transport,
Self::RecoveryFailed
| Self::LeaseExpired
| Self::LeaseRenewalFailed
| Self::CoordinationFailed
| Self::QuorumNotReached
| Self::NodeUnavailable
| Self::PartitionDetected => ErrorCategory::Distributed,
Self::Internal | Self::InvalidStateTransition => ErrorCategory::Internal,
Self::ConfigError | Self::User => ErrorCategory::User,
}
}
#[must_use]
#[inline]
pub const fn recoverability(&self) -> Recoverability {
match self {
Self::ChannelFull
| Self::ChannelEmpty
| Self::AdmissionDenied
| Self::ConnectionLost
| Self::NodeUnavailable
| Self::QuorumNotReached
| Self::ThresholdTimeout
| Self::LeaseRenewalFailed => Recoverability::Transient,
Self::Cancelled
| Self::CancelTimeout
| Self::ChannelClosed
| Self::ObligationLeak
| Self::ObligationAlreadyResolved
| Self::RegionClosed
| Self::InvalidEncodingParams
| Self::DataTooLarge
| Self::ObjectMismatch
| Self::Internal
| Self::InvalidStateTransition
| Self::ProtocolError
| Self::ConnectionRefused
| Self::ConfigError => Recoverability::Permanent,
Self::DeadlineExceeded
| Self::PollQuotaExhausted
| Self::CostQuotaExhausted
| Self::TaskNotOwned
| Self::EncodingFailed
| Self::CorruptedSymbol
| Self::InsufficientSymbols
| Self::DecodingFailed
| Self::DuplicateSymbol
| Self::RoutingFailed
| Self::DispatchFailed
| Self::StreamEnded
| Self::SinkRejected
| Self::RecoveryFailed
| Self::LeaseExpired
| Self::CoordinationFailed
| Self::PartitionDetected
| Self::User => Recoverability::Unknown,
}
}
#[must_use]
#[inline]
pub const fn is_retryable(&self) -> bool {
matches!(self.recoverability(), Recoverability::Transient)
}
#[must_use]
#[inline]
pub const fn recovery_action(&self) -> RecoveryAction {
match self {
Self::ChannelFull | Self::ChannelEmpty => RecoveryAction::RetryImmediately,
Self::AdmissionDenied
| Self::ThresholdTimeout
| Self::QuorumNotReached
| Self::LeaseRenewalFailed => RecoveryAction::RetryWithBackoff(BackoffHint::DEFAULT),
Self::NodeUnavailable => RecoveryAction::RetryWithBackoff(BackoffHint::AGGRESSIVE),
Self::ConnectionLost | Self::StreamEnded => RecoveryAction::RetryWithNewConnection,
Self::Cancelled
| Self::CancelTimeout
| Self::DeadlineExceeded
| Self::PollQuotaExhausted
| Self::CostQuotaExhausted
| Self::ChannelClosed
| Self::RegionClosed
| Self::InvalidEncodingParams
| Self::DataTooLarge
| Self::ObjectMismatch
| Self::ConnectionRefused
| Self::ProtocolError
| Self::LeaseExpired
| Self::PartitionDetected
| Self::ConfigError => RecoveryAction::Propagate,
Self::ObligationLeak
| Self::ObligationAlreadyResolved
| Self::Internal
| Self::InvalidStateTransition => RecoveryAction::Escalate,
Self::TaskNotOwned
| Self::EncodingFailed
| Self::CorruptedSymbol
| Self::InsufficientSymbols
| Self::DecodingFailed
| Self::DuplicateSymbol
| Self::RoutingFailed
| Self::DispatchFailed
| Self::SinkRejected
| Self::RecoveryFailed
| Self::CoordinationFailed
| Self::User => RecoveryAction::Custom,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Recoverability {
Transient,
Permanent,
Unknown,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum RecoveryAction {
RetryImmediately,
RetryWithBackoff(BackoffHint),
RetryWithNewConnection,
Propagate,
Escalate,
Custom,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct BackoffHint {
pub initial_delay_ms: u32,
pub max_delay_ms: u32,
pub max_attempts: u8,
}
impl BackoffHint {
pub const DEFAULT: Self = Self {
initial_delay_ms: 100,
max_delay_ms: 30_000,
max_attempts: 5,
};
pub const AGGRESSIVE: Self = Self {
initial_delay_ms: 1_000,
max_delay_ms: 60_000,
max_attempts: 10,
};
pub const QUICK: Self = Self {
initial_delay_ms: 10,
max_delay_ms: 1_000,
max_attempts: 3,
};
}
impl Default for BackoffHint {
#[inline]
fn default() -> Self {
Self::DEFAULT
}
}
impl Recoverability {
#[must_use]
#[inline]
pub const fn should_retry(&self) -> bool {
matches!(self, Self::Transient)
}
#[must_use]
#[inline]
pub const fn is_permanent(&self) -> bool {
matches!(self, Self::Permanent)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ErrorCategory {
Cancellation,
Budget,
Channel,
Obligation,
Region,
Encoding,
Decoding,
Transport,
Distributed,
Internal,
User,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct ErrorContext {
pub task_id: Option<TaskId>,
pub region_id: Option<RegionId>,
pub object_id: Option<ObjectId>,
pub symbol_id: Option<SymbolId>,
pub correlation_id: Option<u64>,
pub causal_chain: Vec<u64>,
pub span_id: Option<crate::observability::SpanId>,
pub parent_span_id: Option<crate::observability::SpanId>,
pub async_stack: Vec<String>,
}
impl ErrorContext {
#[must_use]
pub fn new() -> Self {
static NEXT_CORRELATION_ID: AtomicU64 = AtomicU64::new(1);
Self {
correlation_id: Some(NEXT_CORRELATION_ID.fetch_add(1, Ordering::Relaxed)),
..Self::default()
}
}
#[must_use]
pub fn from_diagnostic_context(ctx: &crate::observability::DiagnosticContext) -> Self {
let mut error_ctx = Self::new();
error_ctx.task_id = ctx.task_id();
error_ctx.region_id = ctx.region_id();
error_ctx.span_id = ctx.span_id();
error_ctx.parent_span_id = ctx.parent_span_id();
error_ctx
}
#[must_use]
pub fn derive_child(&self, operation: &str) -> Self {
static NEXT_CORRELATION_ID: AtomicU64 = AtomicU64::new(1);
let child_correlation_id = NEXT_CORRELATION_ID.fetch_add(1, Ordering::Relaxed);
let mut causal_chain = self.causal_chain.clone();
if let Some(parent_id) = self.correlation_id {
causal_chain.push(parent_id);
}
let mut async_stack = self.async_stack.clone();
async_stack.push(operation.to_string());
Self {
task_id: self.task_id,
region_id: self.region_id,
object_id: self.object_id,
symbol_id: self.symbol_id,
correlation_id: Some(child_correlation_id),
causal_chain,
span_id: Some(SpanId::new()), parent_span_id: self.span_id,
async_stack,
}
}
#[must_use]
pub fn with_operation(mut self, operation: &str) -> Self {
self.async_stack.push(operation.to_string());
self
}
#[must_use]
pub fn with_span_context(mut self, span_id: SpanId, parent_span_id: Option<SpanId>) -> Self {
self.span_id = Some(span_id);
self.parent_span_id = parent_span_id;
self
}
#[must_use]
pub fn root_correlation_id(&self) -> Option<u64> {
self.causal_chain.first().copied().or(self.correlation_id)
}
#[must_use]
pub fn full_causal_chain(&self) -> Vec<u64> {
let mut chain = self.causal_chain.clone();
if let Some(id) = self.correlation_id {
chain.push(id);
}
chain
}
#[must_use]
pub fn format_async_stack(&self) -> String {
if self.async_stack.is_empty() {
"<no stack trace>".to_string()
} else {
self.async_stack.join(" -> ")
}
}
}
#[derive(Debug, Clone)]
pub struct Error {
kind: ErrorKind,
message: Option<String>,
source: Option<Arc<dyn std::error::Error + Send + Sync>>,
context: ErrorContext,
}
impl Error {
#[must_use]
#[inline]
pub fn new(kind: ErrorKind) -> Self {
Self {
kind,
message: None,
source: None,
context: ErrorContext::new(),
}
}
#[must_use]
#[inline]
pub const fn kind(&self) -> ErrorKind {
self.kind
}
#[must_use]
#[inline]
pub const fn is_cancelled(&self) -> bool {
matches!(self.kind, ErrorKind::Cancelled)
}
#[must_use]
#[inline]
pub const fn is_timeout(&self) -> bool {
matches!(
self.kind,
ErrorKind::DeadlineExceeded | ErrorKind::CancelTimeout
)
}
#[must_use]
#[inline]
pub fn with_message(mut self, msg: impl Into<String>) -> Self {
self.message = Some(msg.into());
self
}
#[must_use]
#[inline]
pub fn with_context(mut self, ctx: ErrorContext) -> Self {
self.context = ctx;
self
}
#[must_use]
#[inline]
pub fn with_source(mut self, source: impl std::error::Error + Send + Sync + 'static) -> Self {
self.source = Some(Arc::new(source));
self
}
#[must_use]
pub fn from_cx(kind: ErrorKind, cx: &crate::cx::Cx) -> Self {
let diag_ctx = cx.diagnostic_context();
let error_ctx = ErrorContext::from_diagnostic_context(&diag_ctx)
.with_operation(&format!("Error::{:?}", kind));
Self::new(kind).with_context(error_ctx)
}
#[must_use]
pub fn propagate_across_async(mut self, operation: &str) -> Self {
self.context = self.context.derive_child(operation);
self
}
#[must_use]
pub fn with_operation(mut self, operation: &str) -> Self {
self.context = self.context.with_operation(operation);
self
}
#[must_use]
#[inline]
pub fn correlation_id(&self) -> Option<u64> {
self.context.correlation_id
}
#[must_use]
#[inline]
pub fn root_correlation_id(&self) -> Option<u64> {
self.context.root_correlation_id()
}
#[must_use]
#[inline]
pub fn causal_chain(&self) -> Vec<u64> {
self.context.full_causal_chain()
}
#[must_use]
#[inline]
pub fn async_stack(&self) -> String {
self.context.format_async_stack()
}
#[must_use]
#[inline]
pub fn cancelled(reason: &CancelReason) -> Self {
Self::new(ErrorKind::Cancelled).with_message(reason.to_string())
}
#[must_use]
#[inline]
pub const fn category(&self) -> ErrorCategory {
self.kind.category()
}
#[must_use]
#[inline]
pub const fn recoverability(&self) -> Recoverability {
self.kind.recoverability()
}
#[must_use]
#[inline]
pub const fn is_retryable(&self) -> bool {
self.kind.is_retryable()
}
#[must_use]
#[inline]
pub const fn recovery_action(&self) -> RecoveryAction {
self.kind.recovery_action()
}
#[must_use]
#[inline]
pub fn message(&self) -> Option<&str> {
self.message.as_deref()
}
#[must_use]
#[inline]
pub fn context(&self) -> &ErrorContext {
&self.context
}
#[must_use]
#[inline]
pub const fn is_encoding_error(&self) -> bool {
matches!(self.kind.category(), ErrorCategory::Encoding)
}
#[must_use]
#[inline]
pub const fn is_decoding_error(&self) -> bool {
matches!(self.kind.category(), ErrorCategory::Decoding)
}
#[must_use]
#[inline]
pub const fn is_transport_error(&self) -> bool {
matches!(self.kind.category(), ErrorCategory::Transport)
}
#[must_use]
#[inline]
pub const fn is_distributed_error(&self) -> bool {
matches!(self.kind.category(), ErrorCategory::Distributed)
}
#[must_use]
#[inline]
pub const fn is_connection_error(&self) -> bool {
matches!(
self.kind,
ErrorKind::ConnectionLost | ErrorKind::ConnectionRefused
)
}
#[must_use]
pub fn invalid_encoding_params(detail: impl Into<String>) -> Self {
Self::new(ErrorKind::InvalidEncodingParams).with_message(detail)
}
#[must_use]
pub fn data_too_large(actual: u64, max: u64) -> Self {
Self::new(ErrorKind::DataTooLarge)
.with_message(format!("data size {actual} exceeds maximum {max}"))
}
#[must_use]
pub fn insufficient_symbols(received: u32, needed: u32) -> Self {
Self::new(ErrorKind::InsufficientSymbols).with_message(format!(
"received {received} symbols, need at least {needed}"
))
}
#[must_use]
pub fn decoding_failed(reason: impl Into<String>) -> Self {
Self::new(ErrorKind::DecodingFailed).with_message(reason)
}
#[must_use]
pub fn routing_failed(destination: impl Into<String>) -> Self {
Self::new(ErrorKind::RoutingFailed)
.with_message(format!("no route to destination: {}", destination.into()))
}
#[must_use]
pub fn lease_expired(lease_id: impl Into<String>) -> Self {
Self::new(ErrorKind::LeaseExpired)
.with_message(format!("lease expired: {}", lease_id.into()))
}
#[must_use]
pub fn quorum_not_reached(achieved: u32, needed: u32) -> Self {
Self::new(ErrorKind::QuorumNotReached)
.with_message(format!("achieved {achieved} of {needed} required"))
}
#[must_use]
pub fn node_unavailable(node_id: impl Into<String>) -> Self {
Self::new(ErrorKind::NodeUnavailable)
.with_message(format!("node unavailable: {}", node_id.into()))
}
#[must_use]
pub fn internal(detail: impl Into<String>) -> Self {
Self::new(ErrorKind::Internal).with_message(detail)
}
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self.kind)?;
if let Some(msg) = &self.message {
write!(f, ": {msg}")?;
}
Ok(())
}
}
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.source.as_ref().map(|e| e.as_ref() as _)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Cancelled {
pub reason: CancelReason,
}
impl From<Cancelled> for Error {
fn from(c: Cancelled) -> Self {
Self::cancelled(&c.reason)
}
}
#[derive(Debug)]
pub enum SendError<T> {
Disconnected(T),
Full(T),
Cancelled(T),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RecvError {
Disconnected,
Empty,
Cancelled,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AcquireError {
Closed,
}
impl From<RecvError> for Error {
fn from(e: RecvError) -> Self {
match e {
RecvError::Disconnected => Self::new(ErrorKind::ChannelClosed),
RecvError::Empty => Self::new(ErrorKind::ChannelEmpty),
RecvError::Cancelled => Self::new(ErrorKind::Cancelled),
}
}
}
impl<T> From<SendError<T>> for Error {
fn from(e: SendError<T>) -> Self {
match e {
SendError::Disconnected(_) => Self::new(ErrorKind::ChannelClosed),
SendError::Full(_) => Self::new(ErrorKind::ChannelFull),
SendError::Cancelled(_) => Self::new(ErrorKind::Cancelled),
}
}
}
#[allow(clippy::result_large_err)]
pub trait ResultExt<T> {
fn context(self, msg: impl Into<String>) -> Result<T>;
fn with_context<F: FnOnce() -> String>(self, f: F) -> Result<T>;
}
impl<T, E: Into<Error>> ResultExt<T> for core::result::Result<T, E> {
fn context(self, msg: impl Into<String>) -> Result<T> {
self.map_err(|e| e.into().with_message(msg))
}
fn with_context<F: FnOnce() -> String>(self, f: F) -> Result<T> {
self.map_err(|e| e.into().with_message(f()))
}
}
#[allow(clippy::result_large_err)]
pub type Result<T> = core::result::Result<T, Error>;
#[cfg(test)]
mod tests {
use super::*;
use std::error::Error as _;
#[derive(Debug)]
struct Underlying;
impl fmt::Display for Underlying {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "underlying")
}
}
impl std::error::Error for Underlying {}
#[test]
fn display_without_message() {
let err = Error::new(ErrorKind::Internal);
assert_eq!(err.to_string(), "Internal");
}
#[test]
fn display_with_message() {
let err = Error::new(ErrorKind::ChannelEmpty).with_message("no messages");
assert_eq!(err.to_string(), "ChannelEmpty: no messages");
}
#[test]
fn source_chain_is_exposed() {
let err = Error::new(ErrorKind::User)
.with_message("outer")
.with_source(Underlying);
let source = err.source().expect("source missing");
assert_eq!(source.to_string(), "underlying");
}
#[test]
fn from_recv_error() {
let disconnected: Error = RecvError::Disconnected.into();
assert_eq!(disconnected.kind(), ErrorKind::ChannelClosed);
let empty: Error = RecvError::Empty.into();
assert_eq!(empty.kind(), ErrorKind::ChannelEmpty);
}
#[test]
fn from_send_error() {
let disconnected: Error = SendError::Disconnected(()).into();
assert_eq!(disconnected.kind(), ErrorKind::ChannelClosed);
let full: Error = SendError::Full(()).into();
assert_eq!(full.kind(), ErrorKind::ChannelFull);
}
#[test]
fn result_ext_adds_message() {
let res: core::result::Result<(), RecvError> = Err(RecvError::Empty);
let err = res.context("recv failed").expect_err("expected err");
assert_eq!(err.kind(), ErrorKind::ChannelEmpty);
assert_eq!(err.to_string(), "ChannelEmpty: recv failed");
}
#[test]
fn predicates_match_kind() {
let cancel = Error::new(ErrorKind::Cancelled);
assert!(cancel.is_cancelled());
assert!(!cancel.is_timeout());
let timeout = Error::new(ErrorKind::DeadlineExceeded);
assert!(!timeout.is_cancelled());
assert!(timeout.is_timeout());
}
#[test]
fn recovery_action_backoff() {
let action = ErrorKind::ThresholdTimeout.recovery_action();
assert!(matches!(action, RecoveryAction::RetryWithBackoff(_)));
}
#[test]
fn error_context_default() {
let err = Error::new(ErrorKind::Internal);
assert!(err.context().task_id.is_none());
}
#[test]
fn error_with_full_context() {
use crate::util::ArenaIndex;
let task_id = TaskId::from_arena(ArenaIndex::new(1, 0));
let region_id = RegionId::from_arena(ArenaIndex::new(2, 0));
let object_id = ObjectId::new_for_test(123);
let symbol_id = SymbolId::new_for_test(123, 0, 1);
let ctx = ErrorContext {
task_id: Some(task_id),
region_id: Some(region_id),
object_id: Some(object_id),
symbol_id: Some(symbol_id),
correlation_id: None,
causal_chain: Vec::new(),
span_id: None,
parent_span_id: None,
async_stack: Vec::new(),
};
let err = Error::new(ErrorKind::Internal).with_context(ctx);
assert_eq!(err.context().task_id, Some(task_id));
assert_eq!(err.context().region_id, Some(region_id));
assert_eq!(err.context().object_id, Some(object_id));
assert_eq!(err.context().symbol_id, Some(symbol_id));
}
#[test]
fn error_kind_category_coverage() {
use ErrorCategory::*;
let cases: &[(ErrorKind, ErrorCategory)] = &[
(ErrorKind::Cancelled, Cancellation),
(ErrorKind::CancelTimeout, Cancellation),
(ErrorKind::DeadlineExceeded, Budget),
(ErrorKind::PollQuotaExhausted, Budget),
(ErrorKind::CostQuotaExhausted, Budget),
(ErrorKind::ChannelClosed, Channel),
(ErrorKind::ChannelFull, Channel),
(ErrorKind::ChannelEmpty, Channel),
(ErrorKind::ObligationLeak, Obligation),
(ErrorKind::ObligationAlreadyResolved, Obligation),
(ErrorKind::RegionClosed, Region),
(ErrorKind::TaskNotOwned, Region),
(ErrorKind::AdmissionDenied, Region),
(ErrorKind::InvalidEncodingParams, Encoding),
(ErrorKind::DataTooLarge, Encoding),
(ErrorKind::EncodingFailed, Encoding),
(ErrorKind::CorruptedSymbol, Encoding),
(ErrorKind::InsufficientSymbols, Decoding),
(ErrorKind::DecodingFailed, Decoding),
(ErrorKind::ObjectMismatch, Decoding),
(ErrorKind::DuplicateSymbol, Decoding),
(ErrorKind::ThresholdTimeout, Decoding),
(ErrorKind::RoutingFailed, Transport),
(ErrorKind::DispatchFailed, Transport),
(ErrorKind::StreamEnded, Transport),
(ErrorKind::SinkRejected, Transport),
(ErrorKind::ConnectionLost, Transport),
(ErrorKind::ConnectionRefused, Transport),
(ErrorKind::ProtocolError, Transport),
(ErrorKind::RecoveryFailed, Distributed),
(ErrorKind::LeaseExpired, Distributed),
(ErrorKind::LeaseRenewalFailed, Distributed),
(ErrorKind::CoordinationFailed, Distributed),
(ErrorKind::QuorumNotReached, Distributed),
(ErrorKind::NodeUnavailable, Distributed),
(ErrorKind::PartitionDetected, Distributed),
(ErrorKind::Internal, Internal),
(ErrorKind::InvalidStateTransition, Internal),
(ErrorKind::ConfigError, User),
(ErrorKind::User, User),
];
for (kind, expected) in cases {
assert_eq!(kind.category(), *expected, "{kind:?}");
}
}
#[test]
fn error_kind_recoverability_classification() {
for kind in [
ErrorKind::ChannelFull,
ErrorKind::ChannelEmpty,
ErrorKind::AdmissionDenied,
ErrorKind::ConnectionLost,
ErrorKind::NodeUnavailable,
ErrorKind::QuorumNotReached,
ErrorKind::ThresholdTimeout,
ErrorKind::LeaseRenewalFailed,
] {
assert_eq!(kind.recoverability(), Recoverability::Transient, "{kind:?}");
assert!(kind.is_retryable(), "{kind:?} should be retryable");
}
for kind in [
ErrorKind::Cancelled,
ErrorKind::ChannelClosed,
ErrorKind::ObligationLeak,
ErrorKind::Internal,
ErrorKind::ConnectionRefused,
ErrorKind::ConfigError,
] {
assert_eq!(kind.recoverability(), Recoverability::Permanent, "{kind:?}");
assert!(!kind.is_retryable(), "{kind:?} should not be retryable");
}
for kind in [
ErrorKind::DeadlineExceeded,
ErrorKind::EncodingFailed,
ErrorKind::CorruptedSymbol,
ErrorKind::User,
] {
assert_eq!(kind.recoverability(), Recoverability::Unknown, "{kind:?}");
assert!(!kind.is_retryable(), "{kind:?} Unknown is not retryable");
}
}
#[test]
fn recoverability_predicates() {
assert!(Recoverability::Transient.should_retry());
assert!(!Recoverability::Transient.is_permanent());
assert!(!Recoverability::Permanent.should_retry());
assert!(Recoverability::Permanent.is_permanent());
assert!(!Recoverability::Unknown.should_retry());
assert!(!Recoverability::Unknown.is_permanent());
}
#[test]
fn recovery_action_variants() {
assert!(matches!(
ErrorKind::ChannelFull.recovery_action(),
RecoveryAction::RetryImmediately
));
assert!(matches!(
ErrorKind::AdmissionDenied.recovery_action(),
RecoveryAction::RetryWithBackoff(_)
));
assert!(matches!(
ErrorKind::NodeUnavailable.recovery_action(),
RecoveryAction::RetryWithBackoff(_)
));
assert!(matches!(
ErrorKind::ConnectionLost.recovery_action(),
RecoveryAction::RetryWithNewConnection
));
assert!(matches!(
ErrorKind::Cancelled.recovery_action(),
RecoveryAction::Propagate
));
assert!(matches!(
ErrorKind::ObligationLeak.recovery_action(),
RecoveryAction::Escalate
));
assert!(matches!(
ErrorKind::User.recovery_action(),
RecoveryAction::Custom
));
}
#[test]
fn backoff_hint_constants() {
let d = BackoffHint::DEFAULT;
assert_eq!(d.initial_delay_ms, 100);
assert_eq!(d.max_delay_ms, 30_000);
assert_eq!(d.max_attempts, 5);
let a = BackoffHint::AGGRESSIVE;
assert!(a.initial_delay_ms > d.initial_delay_ms);
assert!(a.max_attempts > d.max_attempts);
let q = BackoffHint::QUICK;
assert!(q.initial_delay_ms < d.initial_delay_ms);
assert!(q.max_attempts < d.max_attempts);
assert_eq!(BackoffHint::default(), BackoffHint::DEFAULT);
}
#[test]
fn error_data_too_large() {
let err = Error::data_too_large(2000, 1000);
assert_eq!(err.kind(), ErrorKind::DataTooLarge);
let msg = err.to_string();
assert!(msg.contains("2000"), "{msg}");
assert!(msg.contains("1000"), "{msg}");
}
#[test]
fn error_insufficient_symbols() {
let err = Error::insufficient_symbols(5, 10);
assert_eq!(err.kind(), ErrorKind::InsufficientSymbols);
let msg = err.to_string();
assert!(msg.contains('5'), "{msg}");
assert!(msg.contains("10"), "{msg}");
}
#[test]
fn error_routing_failed() {
let err = Error::routing_failed("node-7");
assert_eq!(err.kind(), ErrorKind::RoutingFailed);
assert!(err.to_string().contains("node-7"));
}
#[test]
fn error_lease_expired() {
let err = Error::lease_expired("lease-42");
assert_eq!(err.kind(), ErrorKind::LeaseExpired);
assert!(err.to_string().contains("lease-42"));
}
#[test]
fn error_quorum_not_reached() {
let err = Error::quorum_not_reached(2, 3);
assert_eq!(err.kind(), ErrorKind::QuorumNotReached);
let msg = err.to_string();
assert!(msg.contains('2'), "{msg}");
assert!(msg.contains('3'), "{msg}");
}
#[test]
fn error_node_unavailable() {
let err = Error::node_unavailable("node-1");
assert_eq!(err.kind(), ErrorKind::NodeUnavailable);
assert!(err.to_string().contains("node-1"));
}
#[test]
fn error_internal() {
let err = Error::internal("bug found");
assert_eq!(err.kind(), ErrorKind::Internal);
assert!(err.to_string().contains("bug found"));
}
#[test]
fn error_is_predicates() {
assert!(Error::new(ErrorKind::EncodingFailed).is_encoding_error());
assert!(!Error::new(ErrorKind::DecodingFailed).is_encoding_error());
assert!(Error::new(ErrorKind::InsufficientSymbols).is_decoding_error());
assert!(!Error::new(ErrorKind::EncodingFailed).is_decoding_error());
assert!(Error::new(ErrorKind::RoutingFailed).is_transport_error());
assert!(!Error::new(ErrorKind::Internal).is_transport_error());
assert!(Error::new(ErrorKind::QuorumNotReached).is_distributed_error());
assert!(!Error::new(ErrorKind::ChannelFull).is_distributed_error());
assert!(Error::new(ErrorKind::ConnectionLost).is_connection_error());
assert!(Error::new(ErrorKind::ConnectionRefused).is_connection_error());
assert!(!Error::new(ErrorKind::RoutingFailed).is_connection_error());
}
#[test]
fn error_cancel_timeout_is_timeout() {
assert!(Error::new(ErrorKind::CancelTimeout).is_timeout());
assert!(!Error::new(ErrorKind::CancelTimeout).is_cancelled());
}
#[test]
fn recv_error_cancelled_conversion() {
let err: Error = RecvError::Cancelled.into();
assert_eq!(err.kind(), ErrorKind::Cancelled);
}
#[test]
fn send_error_cancelled_conversion() {
let err: Error = SendError::Cancelled(42u32).into();
assert_eq!(err.kind(), ErrorKind::Cancelled);
}
#[test]
fn cancelled_struct_into_error() {
let reason = CancelReason::user("test cancel");
let cancelled = Cancelled { reason };
let err: Error = cancelled.into();
assert_eq!(err.kind(), ErrorKind::Cancelled);
assert!(err.to_string().contains("Cancelled"));
}
#[test]
fn result_ext_with_context_lazy() {
let res: core::result::Result<(), RecvError> = Err(RecvError::Empty);
let err = res
.with_context(|| format!("lazy {}", "context"))
.expect_err("expected err");
assert_eq!(err.kind(), ErrorKind::ChannelEmpty);
assert!(err.to_string().contains("lazy context"));
}
#[test]
fn error_category_debug() {
for cat in [
ErrorCategory::Cancellation,
ErrorCategory::Budget,
ErrorCategory::Channel,
ErrorCategory::Obligation,
ErrorCategory::Region,
ErrorCategory::Encoding,
ErrorCategory::Decoding,
ErrorCategory::Transport,
ErrorCategory::Distributed,
ErrorCategory::Internal,
ErrorCategory::User,
] {
let dbg = format!("{cat:?}");
assert!(!dbg.is_empty());
}
}
#[test]
fn acquire_error_debug_eq() {
let err = AcquireError::Closed;
let dbg = format!("{err:?}");
assert!(dbg.contains("Closed"), "{dbg}");
assert_eq!(err, AcquireError::Closed);
}
#[test]
fn error_clone() {
let err = Error::new(ErrorKind::Internal).with_message("clone me");
let cloned = err.clone();
assert_eq!(cloned.kind(), ErrorKind::Internal);
assert_eq!(cloned.to_string(), err.to_string());
}
#[test]
fn error_no_message() {
let err = Error::new(ErrorKind::User);
assert!(err.message().is_none());
}
#[test]
fn error_source_none_without_with_source() {
let err = Error::new(ErrorKind::User);
assert!(err.source().is_none());
}
#[test]
fn error_kind_copy_hash() {
use std::collections::HashSet;
let kind = ErrorKind::Internal;
let copied = kind;
assert_eq!(copied, ErrorKind::Internal);
let mut set = HashSet::new();
set.insert(ErrorKind::Cancelled);
set.insert(ErrorKind::DeadlineExceeded);
set.insert(ErrorKind::Cancelled); assert_eq!(set.len(), 2);
}
#[test]
fn recoverability_copy_hash_eq() {
use std::collections::HashSet;
let r = Recoverability::Transient;
let copied = r;
assert_eq!(copied, Recoverability::Transient);
assert_ne!(r, Recoverability::Permanent);
let mut set = HashSet::new();
set.insert(Recoverability::Transient);
set.insert(Recoverability::Permanent);
set.insert(Recoverability::Unknown);
assert_eq!(set.len(), 3);
}
#[test]
fn recovery_action_copy_hash() {
use std::collections::HashSet;
let action = RecoveryAction::Propagate;
let copied = action;
assert_eq!(copied, RecoveryAction::Propagate);
let mut set = HashSet::new();
set.insert(RecoveryAction::RetryImmediately);
set.insert(RecoveryAction::Propagate);
set.insert(RecoveryAction::Escalate);
set.insert(RecoveryAction::Custom);
assert_eq!(set.len(), 4);
}
#[test]
fn error_category_copy_clone_hash() {
use std::collections::HashSet;
let cat = ErrorCategory::Transport;
let copied = cat;
let cloned = cat;
assert_eq!(copied, cloned);
let mut set = HashSet::new();
set.insert(ErrorCategory::Cancellation);
set.insert(ErrorCategory::Budget);
set.insert(ErrorCategory::Channel);
assert_eq!(set.len(), 3);
}
#[test]
fn backoff_hint_copy_hash_eq() {
use std::collections::HashSet;
let hint = BackoffHint::DEFAULT;
let copied = hint;
assert_eq!(copied, BackoffHint::DEFAULT);
assert_ne!(hint, BackoffHint::AGGRESSIVE);
let mut set = HashSet::new();
set.insert(BackoffHint::DEFAULT);
set.insert(BackoffHint::AGGRESSIVE);
set.insert(BackoffHint::QUICK);
assert_eq!(set.len(), 3);
}
#[test]
fn recv_error_debug_clone_copy() {
let err = RecvError::Disconnected;
let dbg = format!("{err:?}");
assert!(dbg.contains("Disconnected"));
let copied = err;
assert_eq!(copied, RecvError::Disconnected);
let cloned = err;
assert_eq!(cloned, err);
}
#[test]
fn cancelled_clone_eq() {
let c = Cancelled {
reason: CancelReason::user("test"),
};
let dbg = format!("{c:?}");
assert!(dbg.contains("Cancelled"));
let cloned = c.clone();
assert_eq!(cloned, c);
}
#[test]
fn error_context_auto_correlation() {
let ctx = ErrorContext::new();
assert!(ctx.correlation_id.is_some());
assert!(ctx.causal_chain.is_empty());
assert!(ctx.async_stack.is_empty());
}
#[test]
fn error_context_derive_child() {
let parent = ErrorContext::new();
let parent_id = parent.correlation_id.unwrap();
let child = parent.derive_child("async_operation");
assert!(child.correlation_id.is_some());
assert_ne!(child.correlation_id, parent.correlation_id);
assert_eq!(child.causal_chain, vec![parent_id]);
assert_eq!(child.async_stack, vec!["async_operation"]);
assert!(child.span_id.is_some());
assert_eq!(child.parent_span_id, parent.span_id);
}
#[test]
fn error_context_causal_chain() {
let root = ErrorContext::new();
let child = root.derive_child("level1");
let grandchild = child.derive_child("level2");
let root_id = root.correlation_id.unwrap();
let child_id = child.correlation_id.unwrap();
let chain = grandchild.full_causal_chain();
assert_eq!(
chain,
vec![root_id, child_id, grandchild.correlation_id.unwrap()]
);
assert_eq!(grandchild.root_correlation_id(), Some(root_id));
}
#[test]
fn error_context_async_stack_trace() {
let ctx = ErrorContext::new()
.with_operation("spawn_task")
.with_operation("process_request");
let trace = ctx.format_async_stack();
assert_eq!(trace, "spawn_task -> process_request");
}
#[test]
fn error_propagate_across_async() {
let error = Error::new(ErrorKind::Internal).with_operation("initial_operation");
let propagated = error.propagate_across_async("async_boundary");
let chain = propagated.causal_chain();
assert!(!chain.is_empty());
let stack = propagated.async_stack();
assert!(stack.contains("async_boundary"));
}
#[test]
fn error_correlation_tracking() {
let err1 = Error::new(ErrorKind::ChannelClosed);
let err2 = Error::new(ErrorKind::Internal);
assert_ne!(err1.correlation_id(), err2.correlation_id());
assert!(err1.correlation_id().is_some());
assert!(err2.correlation_id().is_some());
}
#[test]
fn error_with_operations() {
let error = Error::new(ErrorKind::DecodingFailed)
.with_operation("read_symbol")
.with_operation("decode_block");
let stack = error.async_stack();
assert_eq!(stack, "read_symbol -> decode_block");
}
}