use crate::types::cancel::CancelReason;
use crate::types::outcome::Outcome;
use serde::{Deserialize, Serialize};
use std::fmt;
mod rand_simple {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
static COUNTER: AtomicU64 = AtomicU64::new(0);
pub fn random<T>() -> T
where
T: Default + From<f64>,
{
let counter = COUNTER.fetch_add(1, Ordering::Relaxed);
let time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let mut hasher = DefaultHasher::new();
(counter, time).hash(&mut hasher);
let hash = hasher.finish();
let normalized = (hash as f64) / (u64::MAX as f64);
T::from(normalized)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum AtpError {
Transport(TransportError),
Protocol(ProtocolError),
Auth(AuthError),
Disk(DiskError),
Manifest(ManifestError),
Repair(RepairError),
Path(PathError),
Policy(PolicyError),
Relay(RelayError),
Mailbox(MailboxError),
Daemon(DaemonError),
Adapter(AdapterError),
Platform(PlatformError),
}
impl AtpError {
#[must_use]
pub const fn code(&self) -> &'static str {
match self {
Self::Transport(e) => e.code(),
Self::Protocol(e) => e.code(),
Self::Auth(e) => e.code(),
Self::Disk(e) => e.code(),
Self::Manifest(e) => e.code(),
Self::Repair(e) => e.code(),
Self::Path(e) => e.code(),
Self::Policy(e) => e.code(),
Self::Relay(e) => e.code(),
Self::Mailbox(e) => e.code(),
Self::Daemon(e) => e.code(),
Self::Adapter(e) => e.code(),
Self::Platform(e) => e.code(),
}
}
#[must_use]
pub const fn domain(&self) -> &'static str {
match self {
Self::Transport(_) => "transport",
Self::Protocol(_) => "protocol",
Self::Auth(_) => "auth",
Self::Disk(_) => "disk",
Self::Manifest(_) => "manifest",
Self::Repair(_) => "repair",
Self::Path(_) => "path",
Self::Policy(_) => "policy",
Self::Relay(_) => "relay",
Self::Mailbox(_) => "mailbox",
Self::Daemon(_) => "daemon",
Self::Adapter(_) => "adapter",
Self::Platform(_) => "platform",
}
}
#[must_use]
pub const fn is_retryable(&self) -> bool {
match self {
Self::Transport(e) => e.is_retryable(),
Self::Protocol(e) => e.is_retryable(),
Self::Auth(e) => e.is_retryable(),
Self::Disk(e) => e.is_retryable(),
Self::Manifest(e) => e.is_retryable(),
Self::Repair(e) => e.is_retryable(),
Self::Path(e) => e.is_retryable(),
Self::Policy(e) => e.is_retryable(),
Self::Relay(e) => e.is_retryable(),
Self::Mailbox(e) => e.is_retryable(),
Self::Daemon(e) => e.is_retryable(),
Self::Adapter(e) => e.is_retryable(),
Self::Platform(e) => e.is_retryable(),
}
}
}
impl fmt::Display for AtpError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Transport(e) => write!(f, "transport: {e}"),
Self::Protocol(e) => write!(f, "protocol: {e}"),
Self::Auth(e) => write!(f, "auth: {e}"),
Self::Disk(e) => write!(f, "disk: {e}"),
Self::Manifest(e) => write!(f, "manifest: {e}"),
Self::Repair(e) => write!(f, "repair: {e}"),
Self::Path(e) => write!(f, "path: {e}"),
Self::Policy(e) => write!(f, "policy: {e}"),
Self::Relay(e) => write!(f, "relay: {e}"),
Self::Mailbox(e) => write!(f, "mailbox: {e}"),
Self::Daemon(e) => write!(f, "daemon: {e}"),
Self::Adapter(e) => write!(f, "adapter: {e}"),
Self::Platform(e) => write!(f, "platform: {e}"),
}
}
}
impl std::error::Error for AtpError {}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum TransportError {
ConnectionRefused,
ConnectionReset,
ConnectionTimeout,
QuicHandshakeFailed,
QuicStreamError,
UdpSendError,
TlsError,
NetworkUnreachable,
}
impl TransportError {
const fn code(&self) -> &'static str {
match self {
Self::ConnectionRefused => "transport_connection_refused",
Self::ConnectionReset => "transport_connection_reset",
Self::ConnectionTimeout => "transport_connection_timeout",
Self::QuicHandshakeFailed => "transport_quic_handshake_failed",
Self::QuicStreamError => "transport_quic_stream_error",
Self::UdpSendError => "transport_udp_send_error",
Self::TlsError => "transport_tls_error",
Self::NetworkUnreachable => "transport_network_unreachable",
}
}
const fn is_retryable(&self) -> bool {
match self {
Self::ConnectionRefused | Self::ConnectionTimeout | Self::NetworkUnreachable => true,
Self::ConnectionReset | Self::QuicStreamError | Self::UdpSendError => true,
Self::QuicHandshakeFailed | Self::TlsError => false,
}
}
}
impl fmt::Display for TransportError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::ConnectionRefused => write!(f, "connection refused"),
Self::ConnectionReset => write!(f, "connection reset"),
Self::ConnectionTimeout => write!(f, "connection timeout"),
Self::QuicHandshakeFailed => write!(f, "QUIC handshake failed"),
Self::QuicStreamError => write!(f, "QUIC stream error"),
Self::UdpSendError => write!(f, "UDP send error"),
Self::TlsError => write!(f, "TLS error"),
Self::NetworkUnreachable => write!(f, "network unreachable"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum ProtocolError {
InvalidFrameType,
MalformedFrame,
ProtocolVersionMismatch,
InvalidVarInt,
UnexpectedFrame,
FrameTooLarge,
SessionStateMismatch,
}
impl ProtocolError {
const fn code(&self) -> &'static str {
match self {
Self::InvalidFrameType => "protocol_invalid_frame_type",
Self::MalformedFrame => "protocol_malformed_frame",
Self::ProtocolVersionMismatch => "protocol_version_mismatch",
Self::InvalidVarInt => "protocol_invalid_varint",
Self::UnexpectedFrame => "protocol_unexpected_frame",
Self::FrameTooLarge => "protocol_frame_too_large",
Self::SessionStateMismatch => "protocol_session_state_mismatch",
}
}
const fn is_retryable(&self) -> bool {
match self {
Self::InvalidFrameType | Self::MalformedFrame | Self::InvalidVarInt => false,
Self::ProtocolVersionMismatch => false,
Self::UnexpectedFrame => true, Self::FrameTooLarge => false,
Self::SessionStateMismatch => true, }
}
}
impl fmt::Display for ProtocolError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::InvalidFrameType => write!(f, "invalid frame type"),
Self::MalformedFrame => write!(f, "malformed frame"),
Self::ProtocolVersionMismatch => write!(f, "protocol version mismatch"),
Self::InvalidVarInt => write!(f, "invalid varint encoding"),
Self::UnexpectedFrame => write!(f, "unexpected frame in current state"),
Self::FrameTooLarge => write!(f, "frame exceeds size limit"),
Self::SessionStateMismatch => write!(f, "session state mismatch"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum AuthError {
InvalidSignature,
GrantExpired,
GrantRevoked,
InsufficientCapabilities,
PeerNotTrusted,
ReplayedNonce,
InvalidCertificate,
}
impl AuthError {
const fn code(&self) -> &'static str {
match self {
Self::InvalidSignature => "auth_invalid_signature",
Self::GrantExpired => "auth_grant_expired",
Self::GrantRevoked => "auth_grant_revoked",
Self::InsufficientCapabilities => "auth_insufficient_capabilities",
Self::PeerNotTrusted => "auth_peer_not_trusted",
Self::ReplayedNonce => "auth_replayed_nonce",
Self::InvalidCertificate => "auth_invalid_certificate",
}
}
const fn is_retryable(&self) -> bool {
match self {
Self::InvalidSignature | Self::ReplayedNonce | Self::InvalidCertificate => false,
Self::GrantExpired => true, Self::GrantRevoked | Self::InsufficientCapabilities | Self::PeerNotTrusted => false,
}
}
}
impl fmt::Display for AuthError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::InvalidSignature => write!(f, "invalid signature"),
Self::GrantExpired => write!(f, "capability grant expired"),
Self::GrantRevoked => write!(f, "capability grant revoked"),
Self::InsufficientCapabilities => write!(f, "insufficient capabilities"),
Self::PeerNotTrusted => write!(f, "peer not trusted"),
Self::ReplayedNonce => write!(f, "replayed nonce detected"),
Self::InvalidCertificate => write!(f, "invalid certificate"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum DiskError {
InsufficientSpace,
PermissionDenied,
FileNotFound,
DirectoryNotFound,
IoError,
CorruptedData,
QuotaExceeded,
}
impl DiskError {
const fn code(&self) -> &'static str {
match self {
Self::InsufficientSpace => "disk_insufficient_space",
Self::PermissionDenied => "disk_permission_denied",
Self::FileNotFound => "disk_file_not_found",
Self::DirectoryNotFound => "disk_directory_not_found",
Self::IoError => "disk_io_error",
Self::CorruptedData => "disk_corrupted_data",
Self::QuotaExceeded => "disk_quota_exceeded",
}
}
const fn is_retryable(&self) -> bool {
match self {
Self::InsufficientSpace | Self::QuotaExceeded => true, Self::PermissionDenied => false,
Self::FileNotFound | Self::DirectoryNotFound => false,
Self::IoError => true, Self::CorruptedData => false,
}
}
}
impl fmt::Display for DiskError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::InsufficientSpace => write!(f, "insufficient disk space"),
Self::PermissionDenied => write!(f, "permission denied"),
Self::FileNotFound => write!(f, "file not found"),
Self::DirectoryNotFound => write!(f, "directory not found"),
Self::IoError => write!(f, "I/O error"),
Self::CorruptedData => write!(f, "corrupted data detected"),
Self::QuotaExceeded => write!(f, "disk quota exceeded"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum ManifestError {
InvalidFormat,
UnsupportedVersion,
MissingRequiredField,
HashMismatch,
CircularReference,
ObjectNotFound,
}
impl ManifestError {
const fn code(&self) -> &'static str {
match self {
Self::InvalidFormat => "manifest_invalid_format",
Self::UnsupportedVersion => "manifest_unsupported_version",
Self::MissingRequiredField => "manifest_missing_required_field",
Self::HashMismatch => "manifest_hash_mismatch",
Self::CircularReference => "manifest_circular_reference",
Self::ObjectNotFound => "manifest_object_not_found",
}
}
const fn is_retryable(&self) -> bool {
match self {
Self::InvalidFormat | Self::UnsupportedVersion | Self::MissingRequiredField => false,
Self::HashMismatch | Self::CircularReference => false,
Self::ObjectNotFound => true, }
}
}
impl fmt::Display for ManifestError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::InvalidFormat => write!(f, "invalid manifest format"),
Self::UnsupportedVersion => write!(f, "unsupported manifest version"),
Self::MissingRequiredField => write!(f, "missing required manifest field"),
Self::HashMismatch => write!(f, "manifest hash mismatch"),
Self::CircularReference => write!(f, "circular reference in manifest"),
Self::ObjectNotFound => write!(f, "object not found in manifest"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum RepairError {
InsufficientSymbols,
DecodeFailure,
InvalidSourceBlock,
MissingRepairSymbol,
CorruptedSymbol,
UnsupportedCodec,
}
impl RepairError {
const fn code(&self) -> &'static str {
match self {
Self::InsufficientSymbols => "repair_insufficient_symbols",
Self::DecodeFailure => "repair_decode_failure",
Self::InvalidSourceBlock => "repair_invalid_source_block",
Self::MissingRepairSymbol => "repair_missing_repair_symbol",
Self::CorruptedSymbol => "repair_corrupted_symbol",
Self::UnsupportedCodec => "repair_unsupported_codec",
}
}
const fn is_retryable(&self) -> bool {
match self {
Self::InsufficientSymbols => true, Self::DecodeFailure => true, Self::InvalidSourceBlock => false,
Self::MissingRepairSymbol => true, Self::CorruptedSymbol => true, Self::UnsupportedCodec => false,
}
}
}
impl fmt::Display for RepairError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::InsufficientSymbols => write!(f, "insufficient repair symbols"),
Self::DecodeFailure => write!(f, "RaptorQ decode failure"),
Self::InvalidSourceBlock => write!(f, "invalid source block"),
Self::MissingRepairSymbol => write!(f, "missing repair symbol"),
Self::CorruptedSymbol => write!(f, "corrupted repair symbol"),
Self::UnsupportedCodec => write!(f, "unsupported repair codec"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum PathError {
NoAvailablePaths,
PathValidationFailed,
NatTraversalFailed,
RelayUnavailable,
StunTimeout,
PathRaceTimeout,
}
impl PathError {
const fn code(&self) -> &'static str {
match self {
Self::NoAvailablePaths => "path_no_available_paths",
Self::PathValidationFailed => "path_validation_failed",
Self::NatTraversalFailed => "path_nat_traversal_failed",
Self::RelayUnavailable => "path_relay_unavailable",
Self::StunTimeout => "path_stun_timeout",
Self::PathRaceTimeout => "path_race_timeout",
}
}
const fn is_retryable(&self) -> bool {
match self {
Self::NoAvailablePaths => true, Self::PathValidationFailed => true,
Self::NatTraversalFailed => true,
Self::RelayUnavailable => true, Self::StunTimeout => true,
Self::PathRaceTimeout => true,
}
}
}
impl fmt::Display for PathError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::NoAvailablePaths => write!(f, "no available paths"),
Self::PathValidationFailed => write!(f, "path validation failed"),
Self::NatTraversalFailed => write!(f, "NAT traversal failed"),
Self::RelayUnavailable => write!(f, "relay unavailable"),
Self::StunTimeout => write!(f, "STUN timeout"),
Self::PathRaceTimeout => write!(f, "path race timeout"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum PolicyError {
CapabilityDenied,
ResourceQuotaExceeded,
TransferSizeExceeded,
RateLimitExceeded,
RegionRestriction,
FeatureDisabled,
}
impl PolicyError {
const fn code(&self) -> &'static str {
match self {
Self::CapabilityDenied => "policy_capability_denied",
Self::ResourceQuotaExceeded => "policy_resource_quota_exceeded",
Self::TransferSizeExceeded => "policy_transfer_size_exceeded",
Self::RateLimitExceeded => "policy_rate_limit_exceeded",
Self::RegionRestriction => "policy_region_restriction",
Self::FeatureDisabled => "policy_feature_disabled",
}
}
const fn is_retryable(&self) -> bool {
match self {
Self::CapabilityDenied => false,
Self::ResourceQuotaExceeded => true, Self::TransferSizeExceeded => false,
Self::RateLimitExceeded => true, Self::RegionRestriction => false,
Self::FeatureDisabled => false,
}
}
}
impl fmt::Display for PolicyError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::CapabilityDenied => write!(f, "capability denied"),
Self::ResourceQuotaExceeded => write!(f, "resource quota exceeded"),
Self::TransferSizeExceeded => write!(f, "transfer size exceeded"),
Self::RateLimitExceeded => write!(f, "rate limit exceeded"),
Self::RegionRestriction => write!(f, "region restriction"),
Self::FeatureDisabled => write!(f, "feature disabled"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum RelayError {
RelayOffline,
RelayOverloaded,
InvalidRelayCredentials,
RelayTimeout,
RelayProtocolMismatch,
}
impl RelayError {
const fn code(&self) -> &'static str {
match self {
Self::RelayOffline => "relay_offline",
Self::RelayOverloaded => "relay_overloaded",
Self::InvalidRelayCredentials => "relay_invalid_credentials",
Self::RelayTimeout => "relay_timeout",
Self::RelayProtocolMismatch => "relay_protocol_mismatch",
}
}
const fn is_retryable(&self) -> bool {
match self {
Self::RelayOffline => true, Self::RelayOverloaded => true,
Self::InvalidRelayCredentials => false,
Self::RelayTimeout => true,
Self::RelayProtocolMismatch => false,
}
}
}
impl fmt::Display for RelayError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::RelayOffline => write!(f, "relay offline"),
Self::RelayOverloaded => write!(f, "relay overloaded"),
Self::InvalidRelayCredentials => write!(f, "invalid relay credentials"),
Self::RelayTimeout => write!(f, "relay timeout"),
Self::RelayProtocolMismatch => write!(f, "relay protocol mismatch"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum MailboxError {
MailboxFull,
MessageTooLarge,
MailboxUnavailable,
InvalidMailboxCredentials,
MessageExpired,
}
impl MailboxError {
const fn code(&self) -> &'static str {
match self {
Self::MailboxFull => "mailbox_full",
Self::MessageTooLarge => "mailbox_message_too_large",
Self::MailboxUnavailable => "mailbox_unavailable",
Self::InvalidMailboxCredentials => "mailbox_invalid_credentials",
Self::MessageExpired => "mailbox_message_expired",
}
}
const fn is_retryable(&self) -> bool {
match self {
Self::MailboxFull => true, Self::MessageTooLarge => false,
Self::MailboxUnavailable => true,
Self::InvalidMailboxCredentials => false,
Self::MessageExpired => false,
}
}
}
impl fmt::Display for MailboxError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::MailboxFull => write!(f, "mailbox full"),
Self::MessageTooLarge => write!(f, "message too large for mailbox"),
Self::MailboxUnavailable => write!(f, "mailbox unavailable"),
Self::InvalidMailboxCredentials => write!(f, "invalid mailbox credentials"),
Self::MessageExpired => write!(f, "message expired in mailbox"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum DaemonError {
DaemonOffline,
DaemonRestarting,
ConfigurationError,
ServiceUnavailable,
InternalError,
}
impl DaemonError {
const fn code(&self) -> &'static str {
match self {
Self::DaemonOffline => "daemon_offline",
Self::DaemonRestarting => "daemon_restarting",
Self::ConfigurationError => "daemon_configuration_error",
Self::ServiceUnavailable => "daemon_service_unavailable",
Self::InternalError => "daemon_internal_error",
}
}
const fn is_retryable(&self) -> bool {
match self {
Self::DaemonOffline => true, Self::DaemonRestarting => true,
Self::ConfigurationError => false,
Self::ServiceUnavailable => true,
Self::InternalError => true, }
}
}
impl fmt::Display for DaemonError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::DaemonOffline => write!(f, "daemon offline"),
Self::DaemonRestarting => write!(f, "daemon restarting"),
Self::ConfigurationError => write!(f, "daemon configuration error"),
Self::ServiceUnavailable => write!(f, "daemon service unavailable"),
Self::InternalError => write!(f, "daemon internal error"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum AdapterError {
UnsupportedProtocol,
AdapterOffline,
ProtocolMismatch,
AdapterOverloaded,
InvalidAdapterConfig,
}
impl AdapterError {
const fn code(&self) -> &'static str {
match self {
Self::UnsupportedProtocol => "adapter_unsupported_protocol",
Self::AdapterOffline => "adapter_offline",
Self::ProtocolMismatch => "adapter_protocol_mismatch",
Self::AdapterOverloaded => "adapter_overloaded",
Self::InvalidAdapterConfig => "adapter_invalid_config",
}
}
const fn is_retryable(&self) -> bool {
match self {
Self::UnsupportedProtocol => false,
Self::AdapterOffline => true, Self::ProtocolMismatch => false,
Self::AdapterOverloaded => true,
Self::InvalidAdapterConfig => false,
}
}
}
impl fmt::Display for AdapterError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::UnsupportedProtocol => write!(f, "unsupported protocol"),
Self::AdapterOffline => write!(f, "adapter offline"),
Self::ProtocolMismatch => write!(f, "adapter protocol mismatch"),
Self::AdapterOverloaded => write!(f, "adapter overloaded"),
Self::InvalidAdapterConfig => write!(f, "invalid adapter configuration"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum PlatformError {
OutOfMemory,
ProcessLimitExceeded,
SystemShutdown,
ResourceTemporarilyUnavailable,
OperatingSystemError,
}
impl PlatformError {
const fn code(&self) -> &'static str {
match self {
Self::OutOfMemory => "platform_out_of_memory",
Self::ProcessLimitExceeded => "platform_process_limit_exceeded",
Self::SystemShutdown => "platform_system_shutdown",
Self::ResourceTemporarilyUnavailable => "platform_resource_temporarily_unavailable",
Self::OperatingSystemError => "platform_operating_system_error",
}
}
const fn is_retryable(&self) -> bool {
match self {
Self::OutOfMemory => true, Self::ProcessLimitExceeded => true,
Self::SystemShutdown => false,
Self::ResourceTemporarilyUnavailable => true,
Self::OperatingSystemError => true, }
}
}
impl fmt::Display for PlatformError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::OutOfMemory => write!(f, "out of memory"),
Self::ProcessLimitExceeded => write!(f, "process limit exceeded"),
Self::SystemShutdown => write!(f, "system shutdown"),
Self::ResourceTemporarilyUnavailable => write!(f, "resource temporarily unavailable"),
Self::OperatingSystemError => write!(f, "operating system error"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum AtpCancelReason {
UserCancel(String),
Timeout,
Shutdown,
FailFast(String),
ParentCancel,
PathRaceLost,
RepairDecodeAbandoned,
DaemonRestart,
ResourceBudgetExhausted(String),
}
impl AtpCancelReason {
#[must_use]
pub const fn code(&self) -> &'static str {
match self {
Self::UserCancel(_) => "cancel_user",
Self::Timeout => "cancel_timeout",
Self::Shutdown => "cancel_shutdown",
Self::FailFast(_) => "cancel_fail_fast",
Self::ParentCancel => "cancel_parent",
Self::PathRaceLost => "cancel_path_race_lost",
Self::RepairDecodeAbandoned => "cancel_repair_decode_abandoned",
Self::DaemonRestart => "cancel_daemon_restart",
Self::ResourceBudgetExhausted(_) => "cancel_resource_budget_exhausted",
}
}
#[must_use]
pub const fn severity(&self) -> u8 {
match self {
Self::UserCancel(_) => 1,
Self::Timeout => 2,
Self::FailFast(_) => 3,
Self::ParentCancel => 4,
Self::PathRaceLost => 5,
Self::RepairDecodeAbandoned => 6,
Self::ResourceBudgetExhausted(_) => 7,
Self::DaemonRestart => 8,
Self::Shutdown => 9, }
}
#[must_use]
pub fn to_base_reason(&self) -> CancelReason {
match self {
Self::UserCancel(_) => CancelReason::user("user cancel"),
Self::Timeout => CancelReason::timeout(),
Self::Shutdown => CancelReason::shutdown(),
Self::FailFast(_) => CancelReason::fail_fast().with_message("fail-fast"),
Self::ParentCancel => CancelReason::linked_exit(),
Self::PathRaceLost => CancelReason::user("path race lost"),
Self::RepairDecodeAbandoned => CancelReason::user("repair decode abandoned"),
Self::DaemonRestart => CancelReason::user("daemon restart"),
Self::ResourceBudgetExhausted(_) => {
CancelReason::timeout().with_message("resource budget exhausted")
}
}
}
}
impl fmt::Display for AtpCancelReason {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::UserCancel(msg) => write!(f, "user cancel: {msg}"),
Self::Timeout => write!(f, "timeout"),
Self::Shutdown => write!(f, "shutdown"),
Self::FailFast(msg) => write!(f, "fail-fast: {msg}"),
Self::ParentCancel => write!(f, "parent cancelled"),
Self::PathRaceLost => write!(f, "path race lost"),
Self::RepairDecodeAbandoned => write!(f, "repair decode abandoned"),
Self::DaemonRestart => write!(f, "daemon restart"),
Self::ResourceBudgetExhausted(msg) => write!(f, "resource budget exhausted: {msg}"),
}
}
}
pub type AtpOutcome<T> = Outcome<T, AtpError>;
impl<T> AtpOutcome<T> {
#[must_use]
pub fn transport_error(error: TransportError) -> Self {
Outcome::err(AtpError::Transport(error))
}
#[must_use]
pub fn protocol_error(error: ProtocolError) -> Self {
Outcome::err(AtpError::Protocol(error))
}
#[must_use]
pub fn auth_error(error: AuthError) -> Self {
Outcome::err(AtpError::Auth(error))
}
#[must_use]
pub fn disk_error(error: DiskError) -> Self {
Outcome::err(AtpError::Disk(error))
}
#[must_use]
pub fn atp_cancelled(reason: AtpCancelReason) -> Self {
Outcome::cancelled(reason.to_base_reason())
}
}
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct IdempotencyKey(String);
impl IdempotencyKey {
#[must_use]
pub fn new(key: impl Into<String>) -> Self {
Self(key.into())
}
#[must_use]
pub fn generate<T: Hash>(input: T) -> Self {
let mut hasher = DefaultHasher::new();
input.hash(&mut hasher);
Self(format!("atp_{:016x}", hasher.finish()))
}
#[must_use]
pub fn as_str(&self) -> &str {
&self.0
}
#[must_use]
pub fn offer(manifest_hash: &[u8], peer_id: &str, timestamp_nanos: u64) -> Self {
Self::generate(("offer", manifest_hash, peer_id, timestamp_nanos))
}
#[must_use]
pub fn accept(offer_key: &IdempotencyKey, peer_id: &str, timestamp_nanos: u64) -> Self {
Self::generate(("accept", offer_key.as_str(), peer_id, timestamp_nanos))
}
#[must_use]
pub fn chunk(transfer_id: &str, chunk_index: u32, attempt: u32) -> Self {
Self::generate(("chunk", transfer_id, chunk_index, attempt))
}
#[must_use]
pub fn repair_group(source_block_id: &str, repair_generation: u32, peer_id: &str) -> Self {
Self::generate(("repair_group", source_block_id, repair_generation, peer_id))
}
#[must_use]
pub fn commit(transfer_id: &str, final_hash: &[u8], timestamp_nanos: u64) -> Self {
Self::generate(("commit", transfer_id, final_hash, timestamp_nanos))
}
#[must_use]
pub fn mailbox_store(mailbox_id: &str, message_hash: &[u8], sequence: u64) -> Self {
Self::generate(("mailbox_store", mailbox_id, message_hash, sequence))
}
#[must_use]
pub fn grant(issuer_id: &str, subject_id: &str, capability_hash: &[u8], expiry: u64) -> Self {
Self::generate(("grant", issuer_id, subject_id, capability_hash, expiry))
}
#[must_use]
pub fn relay_reservation(
relay_id: &str,
client_id: &str,
bandwidth: u64,
duration: u64,
) -> Self {
Self::generate((
"relay_reservation",
relay_id,
client_id,
bandwidth,
duration,
))
}
#[must_use]
pub fn resume_journal(transfer_id: &str, checkpoint_hash: &[u8], sequence: u64) -> Self {
Self::generate(("resume_journal", transfer_id, checkpoint_hash, sequence))
}
#[must_use]
pub fn final_proof(transfer_id: &str, proof_hash: &[u8], verifier_id: &str) -> Self {
Self::generate(("final_proof", transfer_id, proof_hash, verifier_id))
}
}
impl fmt::Display for IdempotencyKey {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
impl From<String> for IdempotencyKey {
fn from(value: String) -> Self {
Self::new(value)
}
}
impl From<&str> for IdempotencyKey {
fn from(value: &str) -> Self {
Self::new(value)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TransferTranscript {
pub transfer_id: String,
pub idempotency_key: IdempotencyKey,
pub manifest_hash: Vec<u8>,
pub peer_id: String,
pub start_time_nanos: u64,
pub end_time_nanos: Option<u64>,
pub outcome_class: OutcomeClass,
pub retry_attempt: u32,
pub cancellation_source: Option<AtpCancelReason>,
pub user_message_id: Option<String>,
pub replay_pointer: Option<String>,
pub transfer_size_bytes: u64,
pub chunks_completed: u32,
pub total_chunks: u32,
pub repair_groups_used: u32,
pub paths_attempted: Vec<String>,
pub error_code: Option<String>,
}
impl TransferTranscript {
#[must_use]
pub fn new(
transfer_id: String,
idempotency_key: IdempotencyKey,
manifest_hash: Vec<u8>,
peer_id: String,
start_time_nanos: u64,
transfer_size_bytes: u64,
total_chunks: u32,
) -> Self {
Self {
transfer_id,
idempotency_key,
manifest_hash,
peer_id,
start_time_nanos,
end_time_nanos: None,
outcome_class: OutcomeClass::Pending,
retry_attempt: 0,
cancellation_source: None,
user_message_id: None,
replay_pointer: None,
transfer_size_bytes,
chunks_completed: 0,
total_chunks,
repair_groups_used: 0,
paths_attempted: Vec::new(),
error_code: None,
}
}
pub fn complete(&mut self, outcome: &AtpOutcome<()>, end_time_nanos: u64) {
self.end_time_nanos = Some(end_time_nanos);
self.outcome_class = OutcomeClass::from_outcome(outcome);
if let Outcome::Err(error) = outcome {
self.error_code = Some(error.code().to_string());
}
if let Outcome::Cancelled(reason) = outcome {
if let Some(message) = reason.message() {
if let Some(atp_reason) = Self::parse_atp_cancel_reason(message) {
self.cancellation_source = Some(atp_reason);
}
}
}
}
pub fn add_path_attempt(&mut self, path: String) {
self.paths_attempted.push(path);
}
pub fn update_chunks_completed(&mut self, completed: u32) {
self.chunks_completed = completed;
}
pub fn increment_repair_groups(&mut self) {
self.repair_groups_used += 1;
}
pub fn set_retry_attempt(&mut self, attempt: u32) {
self.retry_attempt = attempt;
}
pub fn set_user_message_id(&mut self, message_id: String) {
self.user_message_id = Some(message_id);
}
pub fn set_replay_pointer(&mut self, pointer: String) {
self.replay_pointer = Some(pointer);
}
#[must_use]
pub fn progress_percent(&self) -> f64 {
if self.total_chunks == 0 {
return 0.0;
}
(f64::from(self.chunks_completed) / f64::from(self.total_chunks)) * 100.0
}
#[must_use]
pub fn duration_nanos(&self) -> Option<u64> {
self.end_time_nanos
.map(|end| end.saturating_sub(self.start_time_nanos))
}
#[must_use]
pub const fn is_complete(&self) -> bool {
matches!(
self.outcome_class,
OutcomeClass::Success | OutcomeClass::Error | OutcomeClass::Cancelled
)
}
fn parse_atp_cancel_reason(message: &str) -> Option<AtpCancelReason> {
if message.contains("timeout") {
Some(AtpCancelReason::Timeout)
} else if message.contains("shutdown") {
Some(AtpCancelReason::Shutdown)
} else if message.contains("user") {
Some(AtpCancelReason::UserCancel(message.to_string()))
} else if message.contains("path race") {
Some(AtpCancelReason::PathRaceLost)
} else if message.contains("repair decode") {
Some(AtpCancelReason::RepairDecodeAbandoned)
} else if message.contains("daemon restart") {
Some(AtpCancelReason::DaemonRestart)
} else if message.contains("fail-fast") {
Some(AtpCancelReason::FailFast(message.to_string()))
} else if message.contains("parent") {
Some(AtpCancelReason::ParentCancel)
} else if message.contains("resource budget") {
Some(AtpCancelReason::ResourceBudgetExhausted(
message.to_string(),
))
} else {
None
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum OutcomeClass {
Pending,
Success,
Error,
Cancelled,
Panicked,
}
impl OutcomeClass {
#[must_use]
pub fn from_outcome<T>(outcome: &AtpOutcome<T>) -> Self {
match outcome {
Outcome::Ok(_) => Self::Success,
Outcome::Err(_) => Self::Error,
Outcome::Cancelled(_) => Self::Cancelled,
Outcome::Panicked(_) => Self::Panicked,
}
}
#[must_use]
pub const fn is_retryable(self) -> bool {
matches!(self, Self::Error | Self::Cancelled)
}
#[must_use]
pub const fn severity(self) -> u8 {
match self {
Self::Success => 0,
Self::Pending => 1,
Self::Cancelled => 2,
Self::Error => 3,
Self::Panicked => 4, }
}
}
impl fmt::Display for OutcomeClass {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Pending => write!(f, "pending"),
Self::Success => write!(f, "success"),
Self::Error => write!(f, "error"),
Self::Cancelled => write!(f, "cancelled"),
Self::Panicked => write!(f, "panicked"),
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct RetryPolicy {
pub max_attempts: u32,
pub base_delay_ms: u64,
pub max_delay_ms: u64,
pub backoff_multiplier: f64,
pub jitter_percent: u8,
pub retry_on_cancel: bool,
pub retry_conditions: Vec<RetryCondition>,
}
impl RetryPolicy {
#[must_use]
pub fn default_transfer() -> Self {
Self {
max_attempts: 3,
base_delay_ms: 1000,
max_delay_ms: 30000,
backoff_multiplier: 2.0,
jitter_percent: 10,
retry_on_cancel: false,
retry_conditions: vec![
RetryCondition::ErrorClass(vec!["transport".to_string(), "path".to_string()]),
RetryCondition::ErrorCode(vec!["repair_insufficient_symbols".to_string()]),
],
}
}
#[must_use]
pub fn aggressive() -> Self {
Self {
max_attempts: 7,
base_delay_ms: 500,
max_delay_ms: 60000,
backoff_multiplier: 1.5,
jitter_percent: 20,
retry_on_cancel: true,
retry_conditions: vec![
RetryCondition::ErrorClass(vec![
"transport".to_string(),
"path".to_string(),
"repair".to_string(),
]),
RetryCondition::AlwaysRetry,
],
}
}
#[must_use]
pub fn conservative() -> Self {
Self {
max_attempts: 2,
base_delay_ms: 2000,
max_delay_ms: 10000,
backoff_multiplier: 3.0,
jitter_percent: 5,
retry_on_cancel: false,
retry_conditions: vec![RetryCondition::ErrorClass(vec!["transport".to_string()])],
}
}
#[must_use]
pub fn should_retry<T>(&self, outcome: &AtpOutcome<T>, attempt: u32) -> bool {
if attempt >= self.max_attempts {
return false;
}
match outcome {
Outcome::Ok(_) => false,
Outcome::Panicked(_) => false,
Outcome::Cancelled(_) => self.retry_on_cancel,
Outcome::Err(error) => {
if !error.is_retryable() {
return false;
}
self.retry_conditions
.iter()
.any(|condition| condition.matches(error))
}
}
}
#[must_use]
pub fn delay_for_attempt(&self, attempt: u32) -> u64 {
let base_delay = self.base_delay_ms as f64;
let multiplier = self
.backoff_multiplier
.powi(attempt.saturating_sub(1).cast_signed());
let delay = base_delay * multiplier;
let delay = delay.min(self.max_delay_ms as f64) as u64;
if self.jitter_percent > 0 {
let jitter = delay as f64 * f64::from(self.jitter_percent) / 100.0;
let jitter_amount = (jitter * (rand_simple::random::<f64>() * 2.0 - 1.0)) as i64;
delay.saturating_add_signed(jitter_amount)
} else {
delay
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum RetryCondition {
ErrorCode(Vec<String>),
ErrorClass(Vec<String>),
RetryableErrors,
AlwaysRetry,
NeverRetry,
}
impl RetryCondition {
#[must_use]
pub fn matches(&self, error: &AtpError) -> bool {
match self {
Self::ErrorCode(codes) => codes.contains(&error.code().to_string()),
Self::ErrorClass(classes) => classes.contains(&error.domain().to_string()),
Self::RetryableErrors => error.is_retryable(),
Self::AlwaysRetry => true,
Self::NeverRetry => false,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn error_codes_are_stable() {
assert_eq!(
TransportError::ConnectionTimeout.code(),
"transport_connection_timeout"
);
assert_eq!(AuthError::GrantExpired.code(), "auth_grant_expired");
assert_eq!(
RepairError::InsufficientSymbols.code(),
"repair_insufficient_symbols"
);
}
#[test]
fn retryable_classification() {
assert!(TransportError::ConnectionTimeout.is_retryable());
assert!(!AuthError::InvalidSignature.is_retryable());
assert!(RepairError::MissingRepairSymbol.is_retryable());
assert!(!ManifestError::InvalidFormat.is_retryable());
}
#[test]
fn cancel_reason_severity_ordering() {
let user = AtpCancelReason::UserCancel("test".to_string());
let shutdown = AtpCancelReason::Shutdown;
let timeout = AtpCancelReason::Timeout;
assert!(user.severity() < timeout.severity());
assert!(timeout.severity() < shutdown.severity());
}
#[test]
fn atp_outcome_constructors() {
let outcome: AtpOutcome<()> =
AtpOutcome::transport_error(TransportError::ConnectionTimeout);
assert!(outcome.is_err());
let outcome: AtpOutcome<()> = AtpOutcome::atp_cancelled(AtpCancelReason::Timeout);
assert!(outcome.is_cancelled());
}
#[test]
fn idempotency_key_generation() {
let key1 = IdempotencyKey::generate("test_input");
let key2 = IdempotencyKey::generate("test_input");
let key3 = IdempotencyKey::generate("different_input");
assert_eq!(key1, key2);
assert_ne!(key1, key3);
assert!(key1.as_str().starts_with("atp_"));
}
#[test]
fn idempotency_key_typed_constructors() {
let manifest_hash = b"test_manifest_hash";
let peer_id = "peer123";
let timestamp = 1000000000;
let offer_key = IdempotencyKey::offer(manifest_hash, peer_id, timestamp);
let accept_key = IdempotencyKey::accept(&offer_key, peer_id, timestamp);
let chunk_key = IdempotencyKey::chunk("transfer123", 42, 1);
assert!(offer_key.as_str().starts_with("atp_"));
assert!(accept_key.as_str().starts_with("atp_"));
assert!(chunk_key.as_str().starts_with("atp_"));
let offer_key2 = IdempotencyKey::offer(manifest_hash, peer_id, timestamp);
assert_eq!(offer_key, offer_key2);
}
#[test]
fn transfer_transcript_lifecycle() {
let key = IdempotencyKey::new("test_key");
let mut transcript = TransferTranscript::new(
"transfer123".to_string(),
key.clone(),
vec![1, 2, 3, 4],
"peer456".to_string(),
1000000000,
1024,
10,
);
assert_eq!(transcript.transfer_id, "transfer123");
assert_eq!(transcript.idempotency_key, key);
assert_eq!(transcript.outcome_class, OutcomeClass::Pending);
assert!(!transcript.is_complete());
transcript.update_chunks_completed(5);
assert_eq!(transcript.progress_percent(), 50.0);
let success_outcome = AtpOutcome::ok(());
transcript.complete(&success_outcome, 2000000000);
assert_eq!(transcript.outcome_class, OutcomeClass::Success);
assert!(transcript.is_complete());
assert_eq!(transcript.duration_nanos(), Some(1000000000));
}
#[test]
fn transfer_transcript_error_handling() {
let key = IdempotencyKey::new("error_test");
let mut transcript = TransferTranscript::new(
"transfer_error".to_string(),
key,
vec![],
"peer".to_string(),
0,
100,
1,
);
let error_outcome = AtpOutcome::transport_error(TransportError::ConnectionTimeout);
transcript.complete(&error_outcome, 1000);
assert_eq!(transcript.outcome_class, OutcomeClass::Error);
assert_eq!(
transcript.error_code,
Some("transport_connection_timeout".to_string())
);
}
#[test]
fn retry_policy_should_retry_logic() {
let policy = RetryPolicy::default_transfer();
let success = AtpOutcome::ok(());
assert!(!policy.should_retry(&success, 1));
let retryable_error: AtpOutcome<()> =
AtpOutcome::transport_error(TransportError::ConnectionTimeout);
assert!(policy.should_retry(&retryable_error, 1));
assert!(policy.should_retry(&retryable_error, 2));
assert!(!policy.should_retry(&retryable_error, 3));
let non_retryable: AtpOutcome<()> = AtpOutcome::auth_error(AuthError::InvalidSignature);
assert!(!policy.should_retry(&non_retryable, 1));
}
#[test]
fn retry_policy_delay_calculation() {
let policy = RetryPolicy {
max_attempts: 5,
base_delay_ms: 1000,
max_delay_ms: 10000,
backoff_multiplier: 2.0,
jitter_percent: 0, retry_on_cancel: false,
retry_conditions: vec![RetryCondition::RetryableErrors],
};
let delay1 = policy.delay_for_attempt(1);
let delay2 = policy.delay_for_attempt(2);
let delay3 = policy.delay_for_attempt(3);
assert_eq!(delay1, 1000); assert_eq!(delay2, 2000); assert_eq!(delay3, 4000);
let delay_high = policy.delay_for_attempt(10);
assert!(delay_high <= 10000);
}
#[test]
fn retry_condition_matching() {
let transport_error = AtpError::Transport(TransportError::ConnectionTimeout);
let auth_error = AtpError::Auth(AuthError::InvalidSignature);
let code_condition =
RetryCondition::ErrorCode(vec!["transport_connection_timeout".to_string()]);
assert!(code_condition.matches(&transport_error));
assert!(!code_condition.matches(&auth_error));
let class_condition = RetryCondition::ErrorClass(vec!["transport".to_string()]);
assert!(class_condition.matches(&transport_error));
assert!(!class_condition.matches(&auth_error));
let retryable_condition = RetryCondition::RetryableErrors;
assert!(retryable_condition.matches(&transport_error)); assert!(!retryable_condition.matches(&auth_error));
let always_condition = RetryCondition::AlwaysRetry;
assert!(always_condition.matches(&transport_error));
assert!(always_condition.matches(&auth_error));
let never_condition = RetryCondition::NeverRetry;
assert!(!never_condition.matches(&transport_error));
assert!(!never_condition.matches(&auth_error));
}
#[test]
fn outcome_class_from_outcome() {
let success: AtpOutcome<()> = AtpOutcome::ok(());
assert_eq!(OutcomeClass::from_outcome(&success), OutcomeClass::Success);
let error: AtpOutcome<()> = AtpOutcome::transport_error(TransportError::ConnectionTimeout);
assert_eq!(OutcomeClass::from_outcome(&error), OutcomeClass::Error);
let cancelled: AtpOutcome<()> = AtpOutcome::atp_cancelled(AtpCancelReason::Timeout);
assert_eq!(
OutcomeClass::from_outcome(&cancelled),
OutcomeClass::Cancelled
);
assert!(OutcomeClass::Error.is_retryable());
assert!(OutcomeClass::Cancelled.is_retryable());
assert!(!OutcomeClass::Success.is_retryable());
assert!(!OutcomeClass::Panicked.is_retryable());
}
#[test]
fn idempotency_key_all_operation_types() {
let manifest_hash = b"manifest";
let peer_id = "peer";
let timestamp = 123456789;
let offer_key = IdempotencyKey::offer(manifest_hash, peer_id, timestamp);
let accept_key = IdempotencyKey::accept(&offer_key, peer_id, timestamp);
let chunk_key = IdempotencyKey::chunk("transfer", 1, 1);
let repair_key = IdempotencyKey::repair_group("block", 1, peer_id);
let commit_key = IdempotencyKey::commit("transfer", b"hash", timestamp);
let mailbox_key = IdempotencyKey::mailbox_store("mailbox", b"msg", 1);
let grant_key = IdempotencyKey::grant("issuer", "subject", b"cap", timestamp);
let relay_key = IdempotencyKey::relay_reservation("relay", "client", 1000, 3600);
let journal_key = IdempotencyKey::resume_journal("transfer", b"checkpoint", 1);
let proof_key = IdempotencyKey::final_proof("transfer", b"proof", "verifier");
let keys = vec![
offer_key,
accept_key,
chunk_key,
repair_key,
commit_key,
mailbox_key,
grant_key,
relay_key,
journal_key,
proof_key,
];
for (i, key1) in keys.iter().enumerate() {
for (j, key2) in keys.iter().enumerate() {
if i != j {
assert_ne!(key1, key2, "Keys should be unique");
}
}
}
}
}