pmetal_distributed/
error.rs1use libp2p::PeerId;
7use std::net::SocketAddr;
8use thiserror::Error;
9
10#[derive(Error, Debug)]
12pub enum DistributedError {
13 #[error("IO error: {0}")]
15 Io(#[from] std::io::Error),
16
17 #[error("Connection failed to peer at {addr}: {reason}")]
18 ConnectionFailed { addr: SocketAddr, reason: String },
19
20 #[error("Connection timeout to peer at {0} after {1:?}")]
21 ConnectionTimeout(SocketAddr, std::time::Duration),
22
23 #[error("Connection refused by peer at {0}")]
24 ConnectionRefused(SocketAddr),
25
26 #[error("Max retries ({max_retries}) exceeded connecting to {addr}")]
27 MaxRetriesExceeded { addr: SocketAddr, max_retries: u32 },
28
29 #[error("Peer lost: {0}")]
31 PeerLost(PeerId),
32
33 #[error("Peer {peer} sent incoherent data: expected {expected} bytes, got {actual}")]
34 PeerIncoherentData {
35 peer: PeerId,
36 expected: usize,
37 actual: usize,
38 },
39
40 #[error("Unknown peer: {0}")]
41 UnknownPeer(PeerId),
42
43 #[error("Peer {0} is unreachable")]
44 PeerUnreachable(PeerId),
45
46 #[error("Peer {peer} timed out during {operation}")]
47 PeerTimeout { peer: PeerId, operation: String },
48
49 #[error("All-reduce called before registration")]
51 AllReduceBeforeRegister,
52
53 #[error("Node not registered when finish called")]
54 NotRegisteredOnFinish,
55
56 #[error("Double registration attempted")]
57 DoubleRegister,
58
59 #[error("Registration parameters mismatch: {0}")]
60 RegisterParamsMismatch(String),
61
62 #[error("All-reduce parameters mismatch: local has {local} elements, expected {expected}")]
63 AllReduceParamsMismatch { local: usize, expected: usize },
64
65 #[error("Ring reduce impossible: need at least 2 nodes, have {0}")]
66 RingReduceImpossible(usize),
67
68 #[error("Buffer alignment error: expected {expected}-byte alignment, got {actual}")]
69 BufferAlignment { expected: usize, actual: usize },
70
71 #[error("Buffer size error: expected multiple of {expected}, got {actual}")]
72 BufferSize { expected: usize, actual: usize },
73
74 #[error("Cannot form ring: insufficient nodes ({have} < {need})")]
76 InsufficientNodes { have: usize, need: usize },
77
78 #[error("Ring not established")]
79 RingNotEstablished,
80
81 #[error("Topology changed during operation")]
82 TopologyChanged,
83
84 #[error("No route to peer {0}")]
85 NoRoute(PeerId),
86
87 #[error("Election timeout after {0:?}")]
89 ElectionTimeout(std::time::Duration),
90
91 #[error("Split brain detected: multiple masters ({0:?})")]
92 SplitBrain(Vec<PeerId>),
93
94 #[error("No master elected")]
95 NoMaster,
96
97 #[error("Master unreachable: {0}")]
98 MasterUnreachable(PeerId),
99
100 #[error("Protocol error: {0}")]
102 Protocol(String),
103
104 #[error("Invalid message: expected {expected}, got {actual}")]
105 InvalidMessage { expected: String, actual: String },
106
107 #[error("First message was not init")]
108 FirstMsgNotInit,
109
110 #[error("Version mismatch: local {local}, remote {remote}")]
111 VersionMismatch { local: String, remote: String },
112
113 #[error("Namespace mismatch: expected {expected}, got {actual}")]
114 NamespaceMismatch { expected: String, actual: String },
115
116 #[error("Configuration error: {0}")]
118 Config(String),
119
120 #[error("Invalid address: {0}")]
121 InvalidAddress(String),
122
123 #[error("Port {0} already in use")]
124 PortInUse(u16),
125
126 #[error("Serialization error: {0}")]
128 Serialization(#[from] bitcode::Error),
129
130 #[error("Health check failed for peer {peer}: {reason}")]
132 HealthCheckFailed { peer: PeerId, reason: String },
133
134 #[error("Heartbeat timeout for peer {0} after {1:?}")]
135 HeartbeatTimeout(PeerId, std::time::Duration),
136
137 #[error("Shutdown in progress")]
139 ShuttingDown,
140
141 #[error("Operation cancelled")]
142 Cancelled,
143}
144
145impl DistributedError {
146 pub fn is_recoverable(&self) -> bool {
148 matches!(
149 self,
150 Self::ConnectionTimeout(_, _)
151 | Self::PeerTimeout { .. }
152 | Self::HeartbeatTimeout(_, _)
153 | Self::ElectionTimeout(_)
154 | Self::TopologyChanged
155 )
156 }
157
158 pub fn is_peer_failure(&self) -> bool {
160 matches!(
161 self,
162 Self::PeerLost(_)
163 | Self::PeerUnreachable(_)
164 | Self::PeerTimeout { .. }
165 | Self::PeerIncoherentData { .. }
166 | Self::HealthCheckFailed { .. }
167 | Self::HeartbeatTimeout(_, _)
168 )
169 }
170
171 pub fn is_fatal(&self) -> bool {
173 matches!(
174 self,
175 Self::SplitBrain(_)
176 | Self::ShuttingDown
177 | Self::Cancelled
178 | Self::VersionMismatch { .. }
179 | Self::NamespaceMismatch { .. }
180 )
181 }
182
183 pub fn peer_id(&self) -> Option<&PeerId> {
185 match self {
186 Self::PeerLost(p)
187 | Self::UnknownPeer(p)
188 | Self::PeerUnreachable(p)
189 | Self::NoRoute(p)
190 | Self::MasterUnreachable(p)
191 | Self::HeartbeatTimeout(p, _) => Some(p),
192 Self::PeerTimeout { peer, .. }
193 | Self::PeerIncoherentData { peer, .. }
194 | Self::HealthCheckFailed { peer, .. } => Some(peer),
195 _ => None,
196 }
197 }
198}
199
200pub type DistributedResult<T> = Result<T, DistributedError>;