1use std::net::SocketAddr;
4use thiserror::Error;
5
6pub type Result<T> = std::result::Result<T, ClusterError>;
8
9#[derive(Debug, Error)]
11pub enum ClusterError {
12 #[error("invalid configuration: {0}")]
14 InvalidConfig(String),
15
16 #[error("node ID conflict: {0} already exists")]
17 NodeIdConflict(String),
18
19 #[error("node not found: {0}")]
21 NodeNotFound(String),
22
23 #[error("node unreachable: {addr}")]
24 NodeUnreachable { addr: SocketAddr },
25
26 #[error("node failed: {node_id}")]
27 NodeFailed { node_id: String },
28
29 #[error("no seed nodes available")]
30 NoSeedNodes,
31
32 #[error("cluster join failed: {0}")]
33 JoinFailed(String),
34
35 #[error("cluster not ready: need {required} nodes, have {current}")]
36 ClusterNotReady { required: usize, current: usize },
37
38 #[error("not leader: current leader is {leader:?}")]
40 NotLeader { leader: Option<String> },
41
42 #[error("leader election in progress")]
43 LeaderElectionInProgress,
44
45 #[error("raft error: {0}")]
46 Raft(String),
47
48 #[error("raft storage error: {0}")]
49 RaftStorage(String),
50
51 #[error("proposal timeout")]
52 ProposalTimeout,
53
54 #[error("proposal rejected: {0}")]
55 ProposalRejected(String),
56
57 #[error("topic not found: {0}")]
59 TopicNotFound(String),
60
61 #[error("topic already exists: {0}")]
62 TopicAlreadyExists(String),
63
64 #[error("partition not found: {topic}/{partition}")]
65 PartitionNotFound { topic: String, partition: u32 },
66
67 #[error("partition leader not found: {topic}/{partition}")]
68 PartitionLeaderNotFound { topic: String, partition: u32 },
69
70 #[error("invalid partition count: {0}")]
71 InvalidPartitionCount(u32),
72
73 #[error("invalid replication factor: {factor} (have {nodes} nodes)")]
74 InvalidReplicationFactor { factor: u16, nodes: usize },
75
76 #[error("not enough ISR: need {required}, have {current}")]
78 NotEnoughIsr { required: u16, current: u16 },
79
80 #[error("replication failed: {0}")]
81 ReplicationFailed(String),
82
83 #[error("replica lag exceeded: {node_id} lag {lag_messages} messages")]
84 ReplicaLagExceeded { node_id: String, lag_messages: u64 },
85
86 #[error("high watermark not advanced")]
87 HighWatermarkStalled,
88
89 #[error("protocol error: {0}")]
91 Protocol(String),
92
93 #[error("invalid message: {0}")]
94 InvalidMessage(String),
95
96 #[error("message too large: {size} bytes (max {max})")]
97 MessageTooLarge { size: usize, max: usize },
98
99 #[error("serialization error: {0}")]
100 Serialization(String),
101
102 #[error("deserialization error: {0}")]
103 Deserialization(String),
104
105 #[error("connection failed: {0}")]
107 ConnectionFailed(String),
108
109 #[error("connection closed")]
110 ConnectionClosed,
111
112 #[error("request timeout")]
113 Timeout,
114
115 #[error("network error: {0}")]
116 Network(String),
117
118 #[error("storage error: {0}")]
120 Storage(String),
121
122 #[error("corrupt data: {0}")]
123 CorruptData(String),
124
125 #[error("io error: {0}")]
126 Io(#[from] std::io::Error),
127
128 #[error("crypto error: {0}")]
130 CryptoError(String),
131
132 #[error("unauthorized: {0}")]
134 Unauthorized(String),
135
136 #[error("internal error: {0}")]
138 Internal(String),
139
140 #[error("channel closed")]
141 ChannelClosed,
142
143 #[error("shutdown in progress")]
144 ShuttingDown,
145}
146
147impl ClusterError {
148 pub fn is_retriable(&self) -> bool {
150 matches!(
151 self,
152 ClusterError::NodeUnreachable { .. }
153 | ClusterError::Timeout
154 | ClusterError::LeaderElectionInProgress
155 | ClusterError::NotLeader { .. }
156 | ClusterError::ClusterNotReady { .. }
157 | ClusterError::NotEnoughIsr { .. }
158 | ClusterError::Network(_)
159 )
160 }
161
162 pub fn should_redirect(&self) -> bool {
164 matches!(self, ClusterError::NotLeader { leader: Some(_) })
165 }
166
167 pub fn leader(&self) -> Option<&str> {
169 match self {
170 ClusterError::NotLeader { leader } => leader.as_deref(),
171 _ => None,
172 }
173 }
174
175 pub fn is_fatal(&self) -> bool {
177 matches!(
178 self,
179 ClusterError::CorruptData(_) | ClusterError::RaftStorage(_)
180 )
181 }
182}
183
184impl<T> From<tokio::sync::mpsc::error::SendError<T>> for ClusterError {
186 fn from(_: tokio::sync::mpsc::error::SendError<T>) -> Self {
187 ClusterError::ChannelClosed
188 }
189}
190
191impl From<tokio::sync::oneshot::error::RecvError> for ClusterError {
192 fn from(_: tokio::sync::oneshot::error::RecvError) -> Self {
193 ClusterError::ChannelClosed
194 }
195}
196
197impl From<postcard::Error> for ClusterError {
199 fn from(e: postcard::Error) -> Self {
200 ClusterError::Serialization(e.to_string())
201 }
202}
203
204#[cfg(test)]
205mod tests {
206 use super::*;
207
208 #[test]
209 fn test_retriable_errors() {
210 assert!(ClusterError::Timeout.is_retriable());
211 assert!(ClusterError::LeaderElectionInProgress.is_retriable());
212 assert!(!ClusterError::TopicNotFound("test".into()).is_retriable());
213 assert!(!ClusterError::CorruptData("bad".into()).is_retriable());
214 }
215
216 #[test]
217 fn test_redirect_to_leader() {
218 let err = ClusterError::NotLeader {
219 leader: Some("node-1".into()),
220 };
221 assert!(err.should_redirect());
222 assert_eq!(err.leader(), Some("node-1"));
223
224 let err = ClusterError::NotLeader { leader: None };
225 assert!(!err.should_redirect());
226 }
227
228 #[test]
229 fn test_fatal_errors() {
230 assert!(ClusterError::CorruptData("bad crc".into()).is_fatal());
231 assert!(ClusterError::RaftStorage("disk full".into()).is_fatal());
232 assert!(!ClusterError::Timeout.is_fatal());
233 }
234}