Skip to main content

pmetal_distributed/
error.rs

1//! Comprehensive error types for distributed operations.
2//!
3//! Modeled after Burn's GlobalCollectiveError for
4//! error classification and handling.
5
6use libp2p::PeerId;
7use std::net::SocketAddr;
8use thiserror::Error;
9
10/// Errors that can occur during distributed operations.
11#[derive(Error, Debug)]
12pub enum DistributedError {
13    // === IO & Network Errors ===
14    #[error("IO error: {0}")]
15    Io(#[from] std::io::Error),
16
17    #[error("Connection failed to peer at {addr}: {reason}")]
18    ConnectionFailed { addr: SocketAddr, reason: String },
19
20    #[error("Connection timeout to peer at {0} after {1:?}")]
21    ConnectionTimeout(SocketAddr, std::time::Duration),
22
23    #[error("Connection refused by peer at {0}")]
24    ConnectionRefused(SocketAddr),
25
26    #[error("Max retries ({max_retries}) exceeded connecting to {addr}")]
27    MaxRetriesExceeded { addr: SocketAddr, max_retries: u32 },
28
29    // === Peer Management Errors ===
30    #[error("Peer lost: {0}")]
31    PeerLost(PeerId),
32
33    #[error("Peer {peer} sent incoherent data: expected {expected} bytes, got {actual}")]
34    PeerIncoherentData {
35        peer: PeerId,
36        expected: usize,
37        actual: usize,
38    },
39
40    #[error("Unknown peer: {0}")]
41    UnknownPeer(PeerId),
42
43    #[error("Peer {0} is unreachable")]
44    PeerUnreachable(PeerId),
45
46    #[error("Peer {peer} timed out during {operation}")]
47    PeerTimeout { peer: PeerId, operation: String },
48
49    // === Collective Operation Errors ===
50    #[error("All-reduce called before registration")]
51    AllReduceBeforeRegister,
52
53    #[error("Node not registered when finish called")]
54    NotRegisteredOnFinish,
55
56    #[error("Double registration attempted")]
57    DoubleRegister,
58
59    #[error("Registration parameters mismatch: {0}")]
60    RegisterParamsMismatch(String),
61
62    #[error("All-reduce parameters mismatch: local has {local} elements, expected {expected}")]
63    AllReduceParamsMismatch { local: usize, expected: usize },
64
65    #[error("Ring reduce impossible: need at least 2 nodes, have {0}")]
66    RingReduceImpossible(usize),
67
68    #[error("Buffer alignment error: expected {expected}-byte alignment, got {actual}")]
69    BufferAlignment { expected: usize, actual: usize },
70
71    #[error("Buffer size error: expected multiple of {expected}, got {actual}")]
72    BufferSize { expected: usize, actual: usize },
73
74    // === Topology Errors ===
75    #[error("Cannot form ring: insufficient nodes ({have} < {need})")]
76    InsufficientNodes { have: usize, need: usize },
77
78    #[error("Ring not established")]
79    RingNotEstablished,
80
81    #[error("Topology changed during operation")]
82    TopologyChanged,
83
84    #[error("No route to peer {0}")]
85    NoRoute(PeerId),
86
87    // === Election Errors ===
88    #[error("Election timeout after {0:?}")]
89    ElectionTimeout(std::time::Duration),
90
91    #[error("Split brain detected: multiple masters ({0:?})")]
92    SplitBrain(Vec<PeerId>),
93
94    #[error("No master elected")]
95    NoMaster,
96
97    #[error("Master unreachable: {0}")]
98    MasterUnreachable(PeerId),
99
100    // === Protocol Errors ===
101    #[error("Protocol error: {0}")]
102    Protocol(String),
103
104    #[error("Invalid message: expected {expected}, got {actual}")]
105    InvalidMessage { expected: String, actual: String },
106
107    #[error("First message was not init")]
108    FirstMsgNotInit,
109
110    #[error("Version mismatch: local {local}, remote {remote}")]
111    VersionMismatch { local: String, remote: String },
112
113    #[error("Namespace mismatch: expected {expected}, got {actual}")]
114    NamespaceMismatch { expected: String, actual: String },
115
116    // === Configuration Errors ===
117    #[error("Configuration error: {0}")]
118    Config(String),
119
120    #[error("Invalid address: {0}")]
121    InvalidAddress(String),
122
123    #[error("Port {0} already in use")]
124    PortInUse(u16),
125
126    // === Serialization Errors ===
127    #[error("Serialization error: {0}")]
128    Serialization(#[from] bitcode::Error),
129
130    // === Health Check Errors ===
131    #[error("Health check failed for peer {peer}: {reason}")]
132    HealthCheckFailed { peer: PeerId, reason: String },
133
134    #[error("Heartbeat timeout for peer {0} after {1:?}")]
135    HeartbeatTimeout(PeerId, std::time::Duration),
136
137    // === Shutdown Errors ===
138    #[error("Shutdown in progress")]
139    ShuttingDown,
140
141    #[error("Operation cancelled")]
142    Cancelled,
143}
144
145impl DistributedError {
146    /// Check if this error is recoverable (can retry).
147    pub fn is_recoverable(&self) -> bool {
148        matches!(
149            self,
150            Self::ConnectionTimeout(_, _)
151                | Self::PeerTimeout { .. }
152                | Self::HeartbeatTimeout(_, _)
153                | Self::ElectionTimeout(_)
154                | Self::TopologyChanged
155        )
156    }
157
158    /// Check if this error indicates a peer failure.
159    pub fn is_peer_failure(&self) -> bool {
160        matches!(
161            self,
162            Self::PeerLost(_)
163                | Self::PeerUnreachable(_)
164                | Self::PeerTimeout { .. }
165                | Self::PeerIncoherentData { .. }
166                | Self::HealthCheckFailed { .. }
167                | Self::HeartbeatTimeout(_, _)
168        )
169    }
170
171    /// Check if this error is fatal (cannot continue).
172    pub fn is_fatal(&self) -> bool {
173        matches!(
174            self,
175            Self::SplitBrain(_)
176                | Self::ShuttingDown
177                | Self::Cancelled
178                | Self::VersionMismatch { .. }
179                | Self::NamespaceMismatch { .. }
180        )
181    }
182
183    /// Get the peer ID associated with this error, if any.
184    pub fn peer_id(&self) -> Option<&PeerId> {
185        match self {
186            Self::PeerLost(p)
187            | Self::UnknownPeer(p)
188            | Self::PeerUnreachable(p)
189            | Self::NoRoute(p)
190            | Self::MasterUnreachable(p)
191            | Self::HeartbeatTimeout(p, _) => Some(p),
192            Self::PeerTimeout { peer, .. }
193            | Self::PeerIncoherentData { peer, .. }
194            | Self::HealthCheckFailed { peer, .. } => Some(peer),
195            _ => None,
196        }
197    }
198}
199
200/// Result type alias for distributed operations.
201pub type DistributedResult<T> = Result<T, DistributedError>;