1use std::error::Error;
2
3use serde::Deserialize;
4use serde::Serialize;
5use tokio::task::JoinError;
6use tonic::Code;
7use tonic::Status;
8
9use crate::proto::error::ErrorCode;
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub enum ClientApiError {
13 #[serde(rename = "network")]
15 Network {
16 code: u32,
17 kind: NetworkErrorType,
18 message: String,
19 retry_after_ms: Option<u64>,
20 #[serde(skip_serializing_if = "Option::is_none")]
21 leader_hint: Option<LeaderInfo>,
22 },
23
24 #[serde(rename = "protocol")]
26 Protocol {
27 code: u32,
28 kind: ProtocolErrorType,
29 message: String,
30 #[serde(skip_serializing_if = "Option::is_none")]
31 supported_versions: Option<Vec<String>>,
32 },
33
34 #[serde(rename = "storage")]
36 Storage {
37 code: u32,
38 kind: StorageErrorType,
39 message: String,
40 },
41
42 #[serde(rename = "business")]
44 Business {
45 code: u32,
46 kind: BusinessErrorType,
47 message: String,
48 #[serde(skip_serializing_if = "Option::is_none")]
49 required_action: Option<String>,
50 },
51
52 #[serde(rename = "general")]
54 General {
55 code: u32,
56 kind: GeneralErrorType,
57 message: String,
58 #[serde(skip_serializing_if = "Option::is_none")]
59 required_action: Option<String>,
60 },
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub enum NetworkErrorType {
65 Timeout,
66 ConnectionLost,
67 InvalidAddress,
68 TlsFailure,
69 ProtocolViolation,
70 JoinError,
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub enum ProtocolErrorType {
75 InvalidResponseFormat,
76 VersionMismatch,
77 ChecksumFailure,
78 SerializationError,
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub enum StorageErrorType {
83 DiskFull,
84 CorruptionDetected,
85 IoFailure,
86 PermissionDenied,
87 KeyNotExist,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub enum BusinessErrorType {
92 NotLeader,
93 StaleRead,
94 InvalidRequest,
95 RateLimited,
96 ClusterUnavailable,
97 ProposeFailed,
98 RetryRequired,
99 StaleTerm,
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize)]
103pub enum GeneralErrorType {
104 General,
105}
106#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct LeaderInfo {
108 pub id: String,
109 pub address: String,
110 pub last_contact: u64, }
112impl From<tonic::transport::Error> for ClientApiError {
113 fn from(err: tonic::transport::Error) -> Self {
114 let (kind, message, retry) = match err {
115 e if e
116 .source()
117 .and_then(|e| e.downcast_ref::<std::io::Error>())
118 .map(|e| e.kind() == std::io::ErrorKind::TimedOut)
119 .unwrap_or(false) =>
120 {
121 (
122 NetworkErrorType::Timeout,
123 format!("Connection timeout: {e}"),
124 Some(3000), )
126 }
127 e if e.to_string().contains("invalid uri") => (
128 NetworkErrorType::InvalidAddress,
129 format!("Invalid address: {e}"),
130 None,
131 ),
132 _ => (
133 NetworkErrorType::ConnectionLost,
134 "Connection unexpectedly closed".into(),
135 Some(5000),
136 ),
137 };
138
139 let code = match kind {
141 NetworkErrorType::Timeout => ErrorCode::ConnectionTimeout,
142 NetworkErrorType::InvalidAddress => ErrorCode::InvalidAddress,
143 NetworkErrorType::ConnectionLost => ErrorCode::ConnectionTimeout,
144 _ => ErrorCode::Uncategorized,
145 };
146
147 ClientApiError::Network {
148 code: code as u32,
149 kind,
150 message,
151 retry_after_ms: retry,
152 leader_hint: None,
153 }
154 }
155}
156
157impl From<Status> for ClientApiError {
158 fn from(status: Status) -> Self {
159 let code = status.code();
160 let message = status.message().to_string();
161
162 match code {
163 Code::Unavailable => Self::Business {
164 code: ErrorCode::ClusterUnavailable as u32,
165 kind: BusinessErrorType::ClusterUnavailable,
166 message,
167 required_action: Some("Retry after cluster recovery".into()),
168 },
169
170 Code::Cancelled => Self::Network {
171 code: ErrorCode::ConnectionTimeout as u32,
172 kind: NetworkErrorType::Timeout,
173 message,
174 leader_hint: None,
175 retry_after_ms: Some(1000),
176 },
177
178 Code::FailedPrecondition => {
179 if let Some(leader) = parse_leader_from_metadata(&status) {
180 Self::Network {
181 code: ErrorCode::LeaderChanged as u32,
182 kind: NetworkErrorType::ProtocolViolation,
183 message: "Leadership changed".into(),
184 retry_after_ms: Some(1000),
185 leader_hint: Some(leader),
186 }
187 } else {
188 Self::Business {
189 code: ErrorCode::StaleOperation as u32,
190 kind: BusinessErrorType::StaleRead,
191 message,
192 required_action: Some("Refresh cluster state".into()),
193 }
194 }
195 }
196
197 Code::InvalidArgument => Self::Business {
198 code: ErrorCode::InvalidRequest as u32,
199 kind: BusinessErrorType::InvalidRequest,
200 message,
201 required_action: None,
202 },
203
204 Code::PermissionDenied => Self::Business {
205 code: ErrorCode::NotLeader as u32,
206 kind: BusinessErrorType::NotLeader,
207 message,
208 required_action: Some("Refresh cluster state".into()),
209 },
210
211 _ => Self::Business {
212 code: ErrorCode::Uncategorized as u32,
213 kind: BusinessErrorType::InvalidRequest,
214 message: format!("Unhandled status code: {code:?}"),
215 required_action: None,
216 },
217 }
218 }
219}
220
221fn parse_leader_from_metadata(status: &Status) -> Option<LeaderInfo> {
222 status
223 .metadata()
224 .get("x-raft-leader")
225 .and_then(|v| v.to_str().ok())
226 .and_then(|s| {
227 let mut id = None;
229 let mut address = None;
230 let mut last_contact = None;
231
232 let s = s.trim().trim_start_matches('{').trim_end_matches('}');
234
235 for pair in s.split(',') {
237 let pair = pair.trim();
238 if let Some((key, value)) = pair.split_once(':') {
239 let key = key.trim().trim_matches('"');
240 let value = value.trim().trim_matches('"');
241
242 match key {
243 "id" => id = Some(value.to_string()),
244 "address" => address = Some(value.to_string()),
245 "last_contact" => last_contact = value.parse().ok(),
246 _ => continue,
247 }
248 }
249 }
250
251 Some(LeaderInfo {
252 id: id?,
253 address: address?,
254 last_contact: last_contact?,
255 })
256 })
257}
258
259impl From<ErrorCode> for ClientApiError {
260 fn from(code: ErrorCode) -> Self {
261 match code {
262 ErrorCode::ConnectionTimeout => ClientApiError::Network {
264 code: code as u32,
265 kind: NetworkErrorType::Timeout,
266 message: "Connection timeout".to_string(),
267 retry_after_ms: None,
268 leader_hint: None,
269 },
270 ErrorCode::InvalidAddress => ClientApiError::Network {
271 code: code as u32,
272 kind: NetworkErrorType::InvalidAddress,
273 message: "Invalid address".to_string(),
274 retry_after_ms: None,
275 leader_hint: None,
276 },
277 ErrorCode::LeaderChanged => ClientApiError::Network {
278 code: code as u32,
279 kind: NetworkErrorType::ConnectionLost,
280 message: "Leader changed".to_string(),
281 retry_after_ms: Some(100), leader_hint: None, },
284 ErrorCode::JoinError => ClientApiError::Network {
285 code: code as u32,
286 kind: NetworkErrorType::JoinError,
287 message: "Task Join Error".to_string(),
288 retry_after_ms: Some(100), leader_hint: None, },
291
292 ErrorCode::InvalidResponse => ClientApiError::Protocol {
294 code: code as u32,
295 kind: ProtocolErrorType::InvalidResponseFormat,
296 message: "Invalid response format".to_string(),
297 supported_versions: None,
298 },
299 ErrorCode::VersionMismatch => ClientApiError::Protocol {
300 code: code as u32,
301 kind: ProtocolErrorType::VersionMismatch,
302 message: "Version mismatch".to_string(),
303 supported_versions: None, },
305
306 ErrorCode::DiskFull => ClientApiError::Storage {
308 code: code as u32,
309 kind: StorageErrorType::DiskFull,
310 message: "Disk full".to_string(),
311 },
312 ErrorCode::DataCorruption => ClientApiError::Storage {
313 code: code as u32,
314 kind: StorageErrorType::CorruptionDetected,
315 message: "Data corruption detected".to_string(),
316 },
317 ErrorCode::StorageIoError => ClientApiError::Storage {
318 code: code as u32,
319 kind: StorageErrorType::IoFailure,
320 message: "Storage I/O error".to_string(),
321 },
322 ErrorCode::StoragePermissionDenied => ClientApiError::Storage {
323 code: code as u32,
324 kind: StorageErrorType::PermissionDenied,
325 message: "Storage permission denied".to_string(),
326 },
327 ErrorCode::KeyNotExist => ClientApiError::Storage {
328 code: code as u32,
329 kind: StorageErrorType::KeyNotExist,
330 message: "Key not exist in storage".to_string(),
331 },
332
333 ErrorCode::NotLeader => ClientApiError::Business {
335 code: code as u32,
336 kind: BusinessErrorType::NotLeader,
337 message: "Not leader".to_string(),
338 required_action: Some("redirect to leader".to_string()),
339 },
340 ErrorCode::StaleOperation => ClientApiError::Business {
341 code: code as u32,
342 kind: BusinessErrorType::StaleRead,
343 message: "Stale operation".to_string(),
344 required_action: Some("refresh state and retry".to_string()),
345 },
346 ErrorCode::InvalidRequest => ClientApiError::Business {
347 code: code as u32,
348 kind: BusinessErrorType::InvalidRequest,
349 message: "Invalid request".to_string(),
350 required_action: Some("check request parameters".to_string()),
351 },
352 ErrorCode::RateLimited => ClientApiError::Business {
353 code: code as u32,
354 kind: BusinessErrorType::RateLimited,
355 message: "Rate limited".to_string(),
356 required_action: Some("wait and retry".to_string()),
357 },
358 ErrorCode::ClusterUnavailable => ClientApiError::Business {
359 code: code as u32,
360 kind: BusinessErrorType::ClusterUnavailable,
361 message: "Cluster unavailable".to_string(),
362 required_action: Some("try again later".to_string()),
363 },
364 ErrorCode::ProposeFailed => ClientApiError::Business {
365 code: code as u32,
366 kind: BusinessErrorType::ProposeFailed,
367 message: "Propose failed".to_string(),
368 required_action: Some("try again later".to_string()),
369 },
370 ErrorCode::Uncategorized => ClientApiError::Business {
371 code: code as u32,
372 kind: BusinessErrorType::InvalidRequest,
373 message: "Uncategorized error".to_string(),
374 required_action: None,
375 },
376 ErrorCode::TermOutdated => ClientApiError::Business {
377 code: code as u32,
378 kind: BusinessErrorType::StaleTerm,
379 message: "Stale term error".to_string(),
380 required_action: None,
381 },
382 ErrorCode::RetryRequired => ClientApiError::Business {
383 code: code as u32,
384 kind: BusinessErrorType::RetryRequired,
385 message: "Retry required. Please try again.".to_string(),
386 required_action: None,
387 },
388
389 ErrorCode::General => ClientApiError::General {
391 code: code as u32,
392 kind: GeneralErrorType::General,
393 message: "General Client Api error".to_string(),
394 required_action: None,
395 },
396 ErrorCode::Success => unreachable!(),
398 }
399 }
400}
401
402impl ClientApiError {
403 pub fn code(&self) -> u32 {
405 match self {
406 ClientApiError::Network { code, .. } => *code,
407 ClientApiError::Protocol { code, .. } => *code,
408 ClientApiError::Storage { code, .. } => *code,
409 ClientApiError::Business { code, .. } => *code,
410 ClientApiError::General { code, .. } => *code,
411 }
412 }
413
414 pub fn message(&self) -> &str {
416 match self {
417 ClientApiError::Network { message, .. } => message,
418 ClientApiError::Protocol { message, .. } => message,
419 ClientApiError::Storage { message, .. } => message,
420 ClientApiError::Business { message, .. } => message,
421 ClientApiError::General { message, .. } => message,
422 }
423 }
424}
425
426impl From<JoinError> for ClientApiError {
427 fn from(_err: JoinError) -> Self {
428 ErrorCode::JoinError.into()
429 }
430}
431impl From<std::io::Error> for ClientApiError {
432 fn from(_err: std::io::Error) -> Self {
433 ErrorCode::StorageIoError.into()
434 }
435}
436
437impl ClientApiError {
438 pub fn general_client_error(message: String) -> Self {
439 ClientApiError::General {
440 code: ErrorCode::General as u32,
441 kind: GeneralErrorType::General,
442 message,
443 required_action: None,
444 }
445 }
446}