1use std::error::Error;
2
3use d_engine_proto::error::ErrorCode;
4use serde::Deserialize;
5use serde::Serialize;
6use tokio::task::JoinError;
7use tonic::Code;
8use tonic::Status;
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub enum ClientApiError {
12 #[serde(rename = "network")]
14 Network {
15 code: ErrorCode,
16 message: String,
17 retry_after_ms: Option<u64>,
18 #[serde(skip_serializing_if = "Option::is_none")]
19 leader_hint: Option<LeaderHint>,
20 },
21
22 #[serde(rename = "protocol")]
24 Protocol {
25 code: ErrorCode,
26 message: String,
27 #[serde(skip_serializing_if = "Option::is_none")]
28 supported_versions: Option<Vec<String>>,
29 },
30
31 #[serde(rename = "storage")]
33 Storage { code: ErrorCode, message: String },
34
35 #[serde(rename = "business")]
37 Business {
38 code: ErrorCode,
39 message: String,
40 #[serde(skip_serializing_if = "Option::is_none")]
41 required_action: Option<String>,
42 },
43
44 #[serde(rename = "general")]
46 General {
47 code: ErrorCode,
48 message: String,
49 #[serde(skip_serializing_if = "Option::is_none")]
50 required_action: Option<String>,
51 },
52}
53
54pub use d_engine_proto::common::LeaderHint;
99impl From<tonic::transport::Error> for ClientApiError {
100 fn from(err: tonic::transport::Error) -> Self {
113 if let Some(io_err) = err.source().and_then(|e| e.downcast_ref::<std::io::Error>()) {
115 if io_err.kind() == std::io::ErrorKind::TimedOut {
116 return Self::Network {
117 code: ErrorCode::ConnectionTimeout,
118 message: format!("Connection timeout: {err}"),
119 retry_after_ms: Some(3000), leader_hint: None,
121 };
122 }
123 }
124
125 if err.to_string().contains("invalid uri") {
127 return Self::Network {
128 code: ErrorCode::InvalidAddress,
129 message: format!("Invalid address: {err}"),
130 retry_after_ms: None, leader_hint: None,
132 };
133 }
134
135 Self::Network {
137 code: ErrorCode::Uncategorized,
138 message: format!("Transport error: {err}"),
139 retry_after_ms: Some(5000),
140 leader_hint: None,
141 }
142 }
143}
144
145impl From<Status> for ClientApiError {
146 fn from(status: Status) -> Self {
147 let code = status.code();
148 let message = status.message().to_string();
149
150 match code {
151 Code::Unavailable => Self::Business {
152 code: ErrorCode::ClusterUnavailable,
153 message,
154 required_action: Some("Retry after cluster recovery".into()),
155 },
156
157 Code::Cancelled => Self::Network {
158 code: ErrorCode::ConnectionTimeout,
159 message,
160 leader_hint: None,
161 retry_after_ms: Some(1000),
162 },
163
164 Code::FailedPrecondition => {
165 if let Some(leader) = parse_leader_from_metadata(&status) {
166 Self::Network {
167 code: ErrorCode::LeaderChanged,
168 message: "Leadership changed".into(),
169 retry_after_ms: Some(1000),
170 leader_hint: Some(leader),
171 }
172 } else {
173 Self::Business {
174 code: ErrorCode::StaleOperation,
175 message,
176 required_action: Some("Refresh cluster state".into()),
177 }
178 }
179 }
180
181 Code::InvalidArgument => Self::Business {
182 code: ErrorCode::InvalidRequest,
183 message,
184 required_action: None,
185 },
186
187 Code::PermissionDenied => Self::Business {
188 code: ErrorCode::NotLeader,
189 message,
190 required_action: Some("Refresh cluster state".into()),
191 },
192
193 _ => Self::Business {
194 code: ErrorCode::Uncategorized,
195 message: format!("Unhandled status code: {code:?}"),
196 required_action: None,
197 },
198 }
199 }
200}
201
202fn parse_leader_from_metadata(status: &Status) -> Option<LeaderHint> {
203 status
204 .metadata()
205 .get("x-raft-leader")
206 .and_then(|v| v.to_str().ok())
207 .and_then(|s| {
208 let mut leader_id = None;
210 let mut address = None;
211
212 let s = s.trim().trim_start_matches('{').trim_end_matches('}');
214
215 for pair in s.split(',') {
217 let pair = pair.trim();
218 if let Some((key, value)) = pair.split_once(':') {
219 let key = key.trim().trim_matches('"');
220 let value = value.trim().trim_matches('"');
221
222 match key {
223 "leader_id" => leader_id = value.parse().ok(),
224 "address" => address = Some(value.to_string()),
225 _ => continue,
226 }
227 }
228 }
229
230 Some(LeaderHint {
231 leader_id: leader_id?,
232 address: address?,
233 })
234 })
235}
236
237impl From<ErrorCode> for ClientApiError {
238 fn from(code: ErrorCode) -> Self {
239 match code {
240 ErrorCode::ConnectionTimeout => ClientApiError::Network {
242 code,
243 message: "Connection timeout".to_string(),
244 retry_after_ms: None,
245 leader_hint: None,
246 },
247 ErrorCode::InvalidAddress => ClientApiError::Network {
248 code,
249 message: "Invalid address".to_string(),
250 retry_after_ms: None,
251 leader_hint: None,
252 },
253 ErrorCode::LeaderChanged => ClientApiError::Network {
254 code,
255 message: "Leader changed".to_string(),
256 retry_after_ms: Some(100), leader_hint: None, },
259 ErrorCode::JoinError => ClientApiError::Network {
260 code,
261 message: "Task Join Error".to_string(),
262 retry_after_ms: Some(100), leader_hint: None, },
265
266 ErrorCode::InvalidResponse => ClientApiError::Protocol {
268 code,
269 message: "Invalid response format".to_string(),
270 supported_versions: None,
271 },
272 ErrorCode::VersionMismatch => ClientApiError::Protocol {
273 code,
274 message: "Version mismatch".to_string(),
275 supported_versions: None, },
277
278 ErrorCode::DiskFull => ClientApiError::Storage {
280 code,
281 message: "Disk full".to_string(),
282 },
283 ErrorCode::DataCorruption => ClientApiError::Storage {
284 code,
285 message: "Data corruption detected".to_string(),
286 },
287 ErrorCode::StorageIoError => ClientApiError::Storage {
288 code,
289 message: "Storage I/O error".to_string(),
290 },
291 ErrorCode::StoragePermissionDenied => ClientApiError::Storage {
292 code,
293 message: "Storage permission denied".to_string(),
294 },
295 ErrorCode::KeyNotExist => ClientApiError::Storage {
296 code,
297 message: "Key not exist in storage".to_string(),
298 },
299
300 ErrorCode::NotLeader => ClientApiError::Business {
302 code,
303 message: "Not leader".to_string(),
304 required_action: Some("redirect to leader".to_string()),
305 },
306 ErrorCode::StaleOperation => ClientApiError::Business {
307 code,
308 message: "Stale operation".to_string(),
309 required_action: Some("refresh state and retry".to_string()),
310 },
311 ErrorCode::InvalidRequest => ClientApiError::Business {
312 code,
313 message: "Invalid request".to_string(),
314 required_action: Some("check request parameters".to_string()),
315 },
316 ErrorCode::RateLimited => ClientApiError::Business {
317 code,
318 message: "Rate limited".to_string(),
319 required_action: Some("wait and retry".to_string()),
320 },
321 ErrorCode::ClusterUnavailable => ClientApiError::Business {
322 code,
323 message: "Cluster unavailable".to_string(),
324 required_action: Some("try again later".to_string()),
325 },
326 ErrorCode::ProposeFailed => ClientApiError::Business {
327 code,
328 message: "Propose failed".to_string(),
329 required_action: Some("try again later".to_string()),
330 },
331 ErrorCode::Uncategorized => ClientApiError::Business {
332 code,
333 message: "Uncategorized error".to_string(),
334 required_action: None,
335 },
336 ErrorCode::TermOutdated => ClientApiError::Business {
337 code,
338 message: "Stale term error".to_string(),
339 required_action: None,
340 },
341 ErrorCode::RetryRequired => ClientApiError::Business {
342 code,
343 message: "Retry required. Please try again.".to_string(),
344 required_action: None,
345 },
346
347 ErrorCode::General => ClientApiError::General {
349 code,
350 message: "General Client Api error".to_string(),
351 required_action: None,
352 },
353 ErrorCode::Success => unreachable!(),
355 }
356 }
357}
358
359impl ClientApiError {
360 pub fn code(&self) -> ErrorCode {
362 match self {
363 ClientApiError::Network { code, .. } => *code,
364 ClientApiError::Protocol { code, .. } => *code,
365 ClientApiError::Storage { code, .. } => *code,
366 ClientApiError::Business { code, .. } => *code,
367 ClientApiError::General { code, .. } => *code,
368 }
369 }
370
371 pub fn message(&self) -> &str {
373 match self {
374 ClientApiError::Network { message, .. } => message,
375 ClientApiError::Protocol { message, .. } => message,
376 ClientApiError::Storage { message, .. } => message,
377 ClientApiError::Business { message, .. } => message,
378 ClientApiError::General { message, .. } => message,
379 }
380 }
381}
382
383impl From<JoinError> for ClientApiError {
384 fn from(_err: JoinError) -> Self {
385 ErrorCode::JoinError.into()
386 }
387}
388impl From<std::io::Error> for ClientApiError {
389 fn from(_err: std::io::Error) -> Self {
390 ErrorCode::StorageIoError.into()
391 }
392}
393
394impl ClientApiError {
395 pub fn general_client_error(message: String) -> Self {
396 ClientApiError::General {
397 code: ErrorCode::General,
398 message,
399 required_action: None,
400 }
401 }
402}
403
404impl std::fmt::Display for ClientApiError {
405 fn fmt(
406 &self,
407 f: &mut std::fmt::Formatter<'_>,
408 ) -> std::fmt::Result {
409 write!(f, "{:?}: {}", self.code(), self.message())
410 }
411}
412
413impl std::error::Error for ClientApiError {}