d_engine/client/
error.rs

1use std::error::Error;
2
3use serde::Deserialize;
4use serde::Serialize;
5use tokio::task::JoinError;
6use tonic::Code;
7use tonic::Status;
8
9use crate::proto::error::ErrorCode;
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub enum ClientApiError {
13    /// Network layer error (retryable)
14    #[serde(rename = "network")]
15    Network {
16        code: u32,
17        kind: NetworkErrorType,
18        message: String,
19        retry_after_ms: Option<u64>,
20        #[serde(skip_serializing_if = "Option::is_none")]
21        leader_hint: Option<LeaderInfo>,
22    },
23
24    /// Protocol layer error (client needs to check protocol compatibility)
25    #[serde(rename = "protocol")]
26    Protocol {
27        code: u32,
28        kind: ProtocolErrorType,
29        message: String,
30        #[serde(skip_serializing_if = "Option::is_none")]
31        supported_versions: Option<Vec<String>>,
32    },
33
34    /// Storage layer error (internal problem on the server)
35    #[serde(rename = "storage")]
36    Storage {
37        code: u32,
38        kind: StorageErrorType,
39        message: String,
40    },
41
42    /// Business logic error (client needs to adjust behavior)
43    #[serde(rename = "business")]
44    Business {
45        code: u32,
46        kind: BusinessErrorType,
47        message: String,
48        #[serde(skip_serializing_if = "Option::is_none")]
49        required_action: Option<String>,
50    },
51
52    /// General Client API error
53    #[serde(rename = "general")]
54    General {
55        code: u32,
56        kind: GeneralErrorType,
57        message: String,
58        #[serde(skip_serializing_if = "Option::is_none")]
59        required_action: Option<String>,
60    },
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub enum NetworkErrorType {
65    Timeout,
66    ConnectionLost,
67    InvalidAddress,
68    TlsFailure,
69    ProtocolViolation,
70    JoinError,
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub enum ProtocolErrorType {
75    InvalidResponseFormat,
76    VersionMismatch,
77    ChecksumFailure,
78    SerializationError,
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub enum StorageErrorType {
83    DiskFull,
84    CorruptionDetected,
85    IoFailure,
86    PermissionDenied,
87    KeyNotExist,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub enum BusinessErrorType {
92    NotLeader,
93    StaleRead,
94    InvalidRequest,
95    RateLimited,
96    ClusterUnavailable,
97    ProposeFailed,
98    RetryRequired,
99    StaleTerm,
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize)]
103pub enum GeneralErrorType {
104    General,
105}
106#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct LeaderInfo {
108    pub id: String,
109    pub address: String,
110    pub last_contact: u64, // Unix timestamp in ms
111}
112impl From<tonic::transport::Error> for ClientApiError {
113    fn from(err: tonic::transport::Error) -> Self {
114        let (kind, message, retry) = match err {
115            e if e
116                .source()
117                .and_then(|e| e.downcast_ref::<std::io::Error>())
118                .map(|e| e.kind() == std::io::ErrorKind::TimedOut)
119                .unwrap_or(false) =>
120            {
121                (
122                    NetworkErrorType::Timeout,
123                    format!("Connection timeout: {e}"),
124                    Some(3000), // Retry after 3 seconds
125                )
126            }
127            e if e.to_string().contains("invalid uri") => (
128                NetworkErrorType::InvalidAddress,
129                format!("Invalid address: {e}"),
130                None,
131            ),
132            _ => (
133                NetworkErrorType::ConnectionLost,
134                "Connection unexpectedly closed".into(),
135                Some(5000),
136            ),
137        };
138
139        // Convert NetworkErrorType to an appropriate ErrorCode
140        let code = match kind {
141            NetworkErrorType::Timeout => ErrorCode::ConnectionTimeout,
142            NetworkErrorType::InvalidAddress => ErrorCode::InvalidAddress,
143            NetworkErrorType::ConnectionLost => ErrorCode::ConnectionTimeout,
144            _ => ErrorCode::Uncategorized,
145        };
146
147        ClientApiError::Network {
148            code: code as u32,
149            kind,
150            message,
151            retry_after_ms: retry,
152            leader_hint: None,
153        }
154    }
155}
156
157impl From<Status> for ClientApiError {
158    fn from(status: Status) -> Self {
159        let code = status.code();
160        let message = status.message().to_string();
161
162        match code {
163            Code::Unavailable => Self::Business {
164                code: ErrorCode::ClusterUnavailable as u32,
165                kind: BusinessErrorType::ClusterUnavailable,
166                message,
167                required_action: Some("Retry after cluster recovery".into()),
168            },
169
170            Code::Cancelled => Self::Network {
171                code: ErrorCode::ConnectionTimeout as u32,
172                kind: NetworkErrorType::Timeout,
173                message,
174                leader_hint: None,
175                retry_after_ms: Some(1000),
176            },
177
178            Code::FailedPrecondition => {
179                if let Some(leader) = parse_leader_from_metadata(&status) {
180                    Self::Network {
181                        code: ErrorCode::LeaderChanged as u32,
182                        kind: NetworkErrorType::ProtocolViolation,
183                        message: "Leadership changed".into(),
184                        retry_after_ms: Some(1000),
185                        leader_hint: Some(leader),
186                    }
187                } else {
188                    Self::Business {
189                        code: ErrorCode::StaleOperation as u32,
190                        kind: BusinessErrorType::StaleRead,
191                        message,
192                        required_action: Some("Refresh cluster state".into()),
193                    }
194                }
195            }
196
197            Code::InvalidArgument => Self::Business {
198                code: ErrorCode::InvalidRequest as u32,
199                kind: BusinessErrorType::InvalidRequest,
200                message,
201                required_action: None,
202            },
203
204            Code::PermissionDenied => Self::Business {
205                code: ErrorCode::NotLeader as u32,
206                kind: BusinessErrorType::NotLeader,
207                message,
208                required_action: Some("Refresh cluster state".into()),
209            },
210
211            _ => Self::Business {
212                code: ErrorCode::Uncategorized as u32,
213                kind: BusinessErrorType::InvalidRequest,
214                message: format!("Unhandled status code: {code:?}"),
215                required_action: None,
216            },
217        }
218    }
219}
220
221fn parse_leader_from_metadata(status: &Status) -> Option<LeaderInfo> {
222    status
223        .metadata()
224        .get("x-raft-leader")
225        .and_then(|v| v.to_str().ok())
226        .and_then(|s| {
227            // Manually parse JSON-like string
228            let mut id = None;
229            let mut address = None;
230            let mut last_contact = None;
231
232            // Remove whitespace and outer braces
233            let s = s.trim().trim_start_matches('{').trim_end_matches('}');
234
235            // Split into key-value pairs
236            for pair in s.split(',') {
237                let pair = pair.trim();
238                if let Some((key, value)) = pair.split_once(':') {
239                    let key = key.trim().trim_matches('"');
240                    let value = value.trim().trim_matches('"');
241
242                    match key {
243                        "id" => id = Some(value.to_string()),
244                        "address" => address = Some(value.to_string()),
245                        "last_contact" => last_contact = value.parse().ok(),
246                        _ => continue,
247                    }
248                }
249            }
250
251            Some(LeaderInfo {
252                id: id?,
253                address: address?,
254                last_contact: last_contact?,
255            })
256        })
257}
258
259impl From<ErrorCode> for ClientApiError {
260    fn from(code: ErrorCode) -> Self {
261        match code {
262            // Network layer errors
263            ErrorCode::ConnectionTimeout => ClientApiError::Network {
264                code: code as u32,
265                kind: NetworkErrorType::Timeout,
266                message: "Connection timeout".to_string(),
267                retry_after_ms: None,
268                leader_hint: None,
269            },
270            ErrorCode::InvalidAddress => ClientApiError::Network {
271                code: code as u32,
272                kind: NetworkErrorType::InvalidAddress,
273                message: "Invalid address".to_string(),
274                retry_after_ms: None,
275                leader_hint: None,
276            },
277            ErrorCode::LeaderChanged => ClientApiError::Network {
278                code: code as u32,
279                kind: NetworkErrorType::ConnectionLost,
280                message: "Leader changed".to_string(),
281                retry_after_ms: Some(100), // suggest immediate retry
282                leader_hint: None,         // Note: This would ideally be populated from context
283            },
284            ErrorCode::JoinError => ClientApiError::Network {
285                code: code as u32,
286                kind: NetworkErrorType::JoinError,
287                message: "Task Join Error".to_string(),
288                retry_after_ms: Some(100), // suggest immediate retry
289                leader_hint: None,         // Note: This would ideally be populated from context
290            },
291
292            // Protocol layer errors
293            ErrorCode::InvalidResponse => ClientApiError::Protocol {
294                code: code as u32,
295                kind: ProtocolErrorType::InvalidResponseFormat,
296                message: "Invalid response format".to_string(),
297                supported_versions: None,
298            },
299            ErrorCode::VersionMismatch => ClientApiError::Protocol {
300                code: code as u32,
301                kind: ProtocolErrorType::VersionMismatch,
302                message: "Version mismatch".to_string(),
303                supported_versions: None, // Note: This would ideally be populated from context
304            },
305
306            // Storage layer errors
307            ErrorCode::DiskFull => ClientApiError::Storage {
308                code: code as u32,
309                kind: StorageErrorType::DiskFull,
310                message: "Disk full".to_string(),
311            },
312            ErrorCode::DataCorruption => ClientApiError::Storage {
313                code: code as u32,
314                kind: StorageErrorType::CorruptionDetected,
315                message: "Data corruption detected".to_string(),
316            },
317            ErrorCode::StorageIoError => ClientApiError::Storage {
318                code: code as u32,
319                kind: StorageErrorType::IoFailure,
320                message: "Storage I/O error".to_string(),
321            },
322            ErrorCode::StoragePermissionDenied => ClientApiError::Storage {
323                code: code as u32,
324                kind: StorageErrorType::PermissionDenied,
325                message: "Storage permission denied".to_string(),
326            },
327            ErrorCode::KeyNotExist => ClientApiError::Storage {
328                code: code as u32,
329                kind: StorageErrorType::KeyNotExist,
330                message: "Key not exist in storage".to_string(),
331            },
332
333            // Business logic errors
334            ErrorCode::NotLeader => ClientApiError::Business {
335                code: code as u32,
336                kind: BusinessErrorType::NotLeader,
337                message: "Not leader".to_string(),
338                required_action: Some("redirect to leader".to_string()),
339            },
340            ErrorCode::StaleOperation => ClientApiError::Business {
341                code: code as u32,
342                kind: BusinessErrorType::StaleRead,
343                message: "Stale operation".to_string(),
344                required_action: Some("refresh state and retry".to_string()),
345            },
346            ErrorCode::InvalidRequest => ClientApiError::Business {
347                code: code as u32,
348                kind: BusinessErrorType::InvalidRequest,
349                message: "Invalid request".to_string(),
350                required_action: Some("check request parameters".to_string()),
351            },
352            ErrorCode::RateLimited => ClientApiError::Business {
353                code: code as u32,
354                kind: BusinessErrorType::RateLimited,
355                message: "Rate limited".to_string(),
356                required_action: Some("wait and retry".to_string()),
357            },
358            ErrorCode::ClusterUnavailable => ClientApiError::Business {
359                code: code as u32,
360                kind: BusinessErrorType::ClusterUnavailable,
361                message: "Cluster unavailable".to_string(),
362                required_action: Some("try again later".to_string()),
363            },
364            ErrorCode::ProposeFailed => ClientApiError::Business {
365                code: code as u32,
366                kind: BusinessErrorType::ProposeFailed,
367                message: "Propose failed".to_string(),
368                required_action: Some("try again later".to_string()),
369            },
370            ErrorCode::Uncategorized => ClientApiError::Business {
371                code: code as u32,
372                kind: BusinessErrorType::InvalidRequest,
373                message: "Uncategorized error".to_string(),
374                required_action: None,
375            },
376            ErrorCode::TermOutdated => ClientApiError::Business {
377                code: code as u32,
378                kind: BusinessErrorType::StaleTerm,
379                message: "Stale term error".to_string(),
380                required_action: None,
381            },
382            ErrorCode::RetryRequired => ClientApiError::Business {
383                code: code as u32,
384                kind: BusinessErrorType::RetryRequired,
385                message: "Retry required. Please try again.".to_string(),
386                required_action: None,
387            },
388
389            // Unclassified error
390            ErrorCode::General => ClientApiError::General {
391                code: code as u32,
392                kind: GeneralErrorType::General,
393                message: "General Client Api error".to_string(),
394                required_action: None,
395            },
396            // Success case - should normally not be converted to error
397            ErrorCode::Success => unreachable!(),
398        }
399    }
400}
401
402impl ClientApiError {
403    /// Returns the error code associated with this error
404    pub fn code(&self) -> u32 {
405        match self {
406            ClientApiError::Network { code, .. } => *code,
407            ClientApiError::Protocol { code, .. } => *code,
408            ClientApiError::Storage { code, .. } => *code,
409            ClientApiError::Business { code, .. } => *code,
410            ClientApiError::General { code, .. } => *code,
411        }
412    }
413
414    /// Returns the error message
415    pub fn message(&self) -> &str {
416        match self {
417            ClientApiError::Network { message, .. } => message,
418            ClientApiError::Protocol { message, .. } => message,
419            ClientApiError::Storage { message, .. } => message,
420            ClientApiError::Business { message, .. } => message,
421            ClientApiError::General { message, .. } => message,
422        }
423    }
424}
425
426impl From<JoinError> for ClientApiError {
427    fn from(_err: JoinError) -> Self {
428        ErrorCode::JoinError.into()
429    }
430}
431impl From<std::io::Error> for ClientApiError {
432    fn from(_err: std::io::Error) -> Self {
433        ErrorCode::StorageIoError.into()
434    }
435}
436
437impl ClientApiError {
438    pub fn general_client_error(message: String) -> Self {
439        ClientApiError::General {
440            code: ErrorCode::General as u32,
441            kind: GeneralErrorType::General,
442            message,
443            required_action: None,
444        }
445    }
446}