d_engine_client/
error.rs

1use std::error::Error;
2
3use d_engine_proto::error::ErrorCode;
4use serde::Deserialize;
5use serde::Serialize;
6use tokio::task::JoinError;
7use tonic::Code;
8use tonic::Status;
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub enum ClientApiError {
12    /// Network layer error (retryable)
13    #[serde(rename = "network")]
14    Network {
15        code: ErrorCode,
16        message: String,
17        retry_after_ms: Option<u64>,
18        #[serde(skip_serializing_if = "Option::is_none")]
19        leader_hint: Option<LeaderHint>,
20    },
21
22    /// Protocol layer error (client needs to check protocol compatibility)
23    #[serde(rename = "protocol")]
24    Protocol {
25        code: ErrorCode,
26        message: String,
27        #[serde(skip_serializing_if = "Option::is_none")]
28        supported_versions: Option<Vec<String>>,
29    },
30
31    /// Storage layer error (internal problem on the server)
32    #[serde(rename = "storage")]
33    Storage { code: ErrorCode, message: String },
34
35    /// Business logic error (client needs to adjust behavior)
36    #[serde(rename = "business")]
37    Business {
38        code: ErrorCode,
39        message: String,
40        #[serde(skip_serializing_if = "Option::is_none")]
41        required_action: Option<String>,
42    },
43
44    /// General Client API error
45    #[serde(rename = "general")]
46    General {
47        code: ErrorCode,
48        message: String,
49        #[serde(skip_serializing_if = "Option::is_none")]
50        required_action: Option<String>,
51    },
52}
53
54// #[derive(Debug, Clone, Serialize, Deserialize)]
55// pub enum NetworkErrorType {
56//     Timeout,
57//     ConnectionLost,
58//     InvalidAddress,
59//     TlsFailure,
60//     ProtocolViolation,
61//     JoinError,
62// }
63
64// #[derive(Debug, Clone, Serialize, Deserialize)]
65// pub enum ProtocolErrorType {
66//     InvalidResponseFormat,
67//     VersionMismatch,
68//     ChecksumFailure,
69//     SerializationError,
70// }
71
72// #[derive(Debug, Clone, Serialize, Deserialize)]
73// pub enum StorageErrorType {
74//     DiskFull,
75//     CorruptionDetected,
76//     IoFailure,
77//     PermissionDenied,
78//     KeyNotExist,
79// }
80
81// #[derive(Debug, Clone, Serialize, Deserialize)]
82// pub enum BusinessErrorType {
83//     NotLeader,
84//     StaleRead,
85//     InvalidRequest,
86//     RateLimited,
87//     ClusterUnavailable,
88//     ProposeFailed,
89//     RetryRequired,
90//     StaleTerm,
91// }
92
93// #[derive(Debug, Clone, Serialize, Deserialize)]
94// pub enum GeneralErrorType {
95//     General,
96// }
97// Re-export LeaderHint from proto (network layer use)
98pub use d_engine_proto::common::LeaderHint;
99impl From<tonic::transport::Error> for ClientApiError {
100    /// Converts a tonic transport error into a ClientApiError
101    ///
102    /// This implementation handles different transport error scenarios:
103    /// - Connection timeouts
104    /// - Invalid URI/address formats
105    /// - Unexpected connection loss
106    ///
107    /// # Parameters
108    /// - `err`: The tonic transport error to convert
109    ///
110    /// # Returns
111    /// A ClientApiError with appropriate error code and retry information
112    fn from(err: tonic::transport::Error) -> Self {
113        // Determine the error details based on the underlying error
114        if let Some(io_err) = err.source().and_then(|e| e.downcast_ref::<std::io::Error>()) {
115            if io_err.kind() == std::io::ErrorKind::TimedOut {
116                return Self::Network {
117                    code: ErrorCode::ConnectionTimeout,
118                    message: format!("Connection timeout: {err}"),
119                    retry_after_ms: Some(3000), // Retry after 3 seconds
120                    leader_hint: None,
121                };
122            }
123        }
124
125        // Check for invalid address errors
126        if err.to_string().contains("invalid uri") {
127            return Self::Network {
128                code: ErrorCode::InvalidAddress,
129                message: format!("Invalid address: {err}"),
130                retry_after_ms: None, // Not retryable - needs address correction
131                leader_hint: None,
132            };
133        }
134
135        // Default case: unexpected transport failure
136        Self::Network {
137            code: ErrorCode::Uncategorized,
138            message: format!("Transport error: {err}"),
139            retry_after_ms: Some(5000),
140            leader_hint: None,
141        }
142    }
143}
144
145impl From<Status> for ClientApiError {
146    fn from(status: Status) -> Self {
147        let code = status.code();
148        let message = status.message().to_string();
149
150        match code {
151            Code::Unavailable => Self::Business {
152                code: ErrorCode::ClusterUnavailable,
153                message,
154                required_action: Some("Retry after cluster recovery".into()),
155            },
156
157            Code::Cancelled => Self::Network {
158                code: ErrorCode::ConnectionTimeout,
159                message,
160                leader_hint: None,
161                retry_after_ms: Some(1000),
162            },
163
164            Code::FailedPrecondition => {
165                if let Some(leader) = parse_leader_from_metadata(&status) {
166                    Self::Network {
167                        code: ErrorCode::LeaderChanged,
168                        message: "Leadership changed".into(),
169                        retry_after_ms: Some(1000),
170                        leader_hint: Some(leader),
171                    }
172                } else {
173                    Self::Business {
174                        code: ErrorCode::StaleOperation,
175                        message,
176                        required_action: Some("Refresh cluster state".into()),
177                    }
178                }
179            }
180
181            Code::InvalidArgument => Self::Business {
182                code: ErrorCode::InvalidRequest,
183                message,
184                required_action: None,
185            },
186
187            Code::PermissionDenied => Self::Business {
188                code: ErrorCode::NotLeader,
189                message,
190                required_action: Some("Refresh cluster state".into()),
191            },
192
193            _ => Self::Business {
194                code: ErrorCode::Uncategorized,
195                message: format!("Unhandled status code: {code:?}"),
196                required_action: None,
197            },
198        }
199    }
200}
201
202fn parse_leader_from_metadata(status: &Status) -> Option<LeaderHint> {
203    status
204        .metadata()
205        .get("x-raft-leader")
206        .and_then(|v| v.to_str().ok())
207        .and_then(|s| {
208            // Manually parse JSON-like string
209            let mut leader_id = None;
210            let mut address = None;
211
212            // Remove whitespace and outer braces
213            let s = s.trim().trim_start_matches('{').trim_end_matches('}');
214
215            // Split into key-value pairs
216            for pair in s.split(',') {
217                let pair = pair.trim();
218                if let Some((key, value)) = pair.split_once(':') {
219                    let key = key.trim().trim_matches('"');
220                    let value = value.trim().trim_matches('"');
221
222                    match key {
223                        "leader_id" => leader_id = value.parse().ok(),
224                        "address" => address = Some(value.to_string()),
225                        _ => continue,
226                    }
227                }
228            }
229
230            Some(LeaderHint {
231                leader_id: leader_id?,
232                address: address?,
233            })
234        })
235}
236
237impl From<ErrorCode> for ClientApiError {
238    fn from(code: ErrorCode) -> Self {
239        match code {
240            // Network layer errors
241            ErrorCode::ConnectionTimeout => ClientApiError::Network {
242                code,
243                message: "Connection timeout".to_string(),
244                retry_after_ms: None,
245                leader_hint: None,
246            },
247            ErrorCode::InvalidAddress => ClientApiError::Network {
248                code,
249                message: "Invalid address".to_string(),
250                retry_after_ms: None,
251                leader_hint: None,
252            },
253            ErrorCode::LeaderChanged => ClientApiError::Network {
254                code,
255                message: "Leader changed".to_string(),
256                retry_after_ms: Some(100), // suggest immediate retry
257                leader_hint: None,         // Note: This would ideally be populated from context
258            },
259            ErrorCode::JoinError => ClientApiError::Network {
260                code,
261                message: "Task Join Error".to_string(),
262                retry_after_ms: Some(100), // suggest immediate retry
263                leader_hint: None,         // Note: This would ideally be populated from context
264            },
265
266            // Protocol layer errors
267            ErrorCode::InvalidResponse => ClientApiError::Protocol {
268                code,
269                message: "Invalid response format".to_string(),
270                supported_versions: None,
271            },
272            ErrorCode::VersionMismatch => ClientApiError::Protocol {
273                code,
274                message: "Version mismatch".to_string(),
275                supported_versions: None, // Note: This would ideally be populated from context
276            },
277
278            // Storage layer errors
279            ErrorCode::DiskFull => ClientApiError::Storage {
280                code,
281                message: "Disk full".to_string(),
282            },
283            ErrorCode::DataCorruption => ClientApiError::Storage {
284                code,
285                message: "Data corruption detected".to_string(),
286            },
287            ErrorCode::StorageIoError => ClientApiError::Storage {
288                code,
289                message: "Storage I/O error".to_string(),
290            },
291            ErrorCode::StoragePermissionDenied => ClientApiError::Storage {
292                code,
293                message: "Storage permission denied".to_string(),
294            },
295            ErrorCode::KeyNotExist => ClientApiError::Storage {
296                code,
297                message: "Key not exist in storage".to_string(),
298            },
299
300            // Business logic errors
301            ErrorCode::NotLeader => ClientApiError::Business {
302                code,
303                message: "Not leader".to_string(),
304                required_action: Some("redirect to leader".to_string()),
305            },
306            ErrorCode::StaleOperation => ClientApiError::Business {
307                code,
308                message: "Stale operation".to_string(),
309                required_action: Some("refresh state and retry".to_string()),
310            },
311            ErrorCode::InvalidRequest => ClientApiError::Business {
312                code,
313                message: "Invalid request".to_string(),
314                required_action: Some("check request parameters".to_string()),
315            },
316            ErrorCode::RateLimited => ClientApiError::Business {
317                code,
318                message: "Rate limited".to_string(),
319                required_action: Some("wait and retry".to_string()),
320            },
321            ErrorCode::ClusterUnavailable => ClientApiError::Business {
322                code,
323                message: "Cluster unavailable".to_string(),
324                required_action: Some("try again later".to_string()),
325            },
326            ErrorCode::ProposeFailed => ClientApiError::Business {
327                code,
328                message: "Propose failed".to_string(),
329                required_action: Some("try again later".to_string()),
330            },
331            ErrorCode::Uncategorized => ClientApiError::Business {
332                code,
333                message: "Uncategorized error".to_string(),
334                required_action: None,
335            },
336            ErrorCode::TermOutdated => ClientApiError::Business {
337                code,
338                message: "Stale term error".to_string(),
339                required_action: None,
340            },
341            ErrorCode::RetryRequired => ClientApiError::Business {
342                code,
343                message: "Retry required. Please try again.".to_string(),
344                required_action: None,
345            },
346
347            // Unclassified error
348            ErrorCode::General => ClientApiError::General {
349                code,
350                message: "General Client Api error".to_string(),
351                required_action: None,
352            },
353            // Success case - should normally not be converted to error
354            ErrorCode::Success => unreachable!(),
355        }
356    }
357}
358
359impl ClientApiError {
360    /// Returns the error code associated with this error
361    pub fn code(&self) -> ErrorCode {
362        match self {
363            ClientApiError::Network { code, .. } => *code,
364            ClientApiError::Protocol { code, .. } => *code,
365            ClientApiError::Storage { code, .. } => *code,
366            ClientApiError::Business { code, .. } => *code,
367            ClientApiError::General { code, .. } => *code,
368        }
369    }
370
371    /// Returns the error message
372    pub fn message(&self) -> &str {
373        match self {
374            ClientApiError::Network { message, .. } => message,
375            ClientApiError::Protocol { message, .. } => message,
376            ClientApiError::Storage { message, .. } => message,
377            ClientApiError::Business { message, .. } => message,
378            ClientApiError::General { message, .. } => message,
379        }
380    }
381}
382
383impl From<JoinError> for ClientApiError {
384    fn from(_err: JoinError) -> Self {
385        ErrorCode::JoinError.into()
386    }
387}
388impl From<std::io::Error> for ClientApiError {
389    fn from(_err: std::io::Error) -> Self {
390        ErrorCode::StorageIoError.into()
391    }
392}
393
394impl ClientApiError {
395    pub fn general_client_error(message: String) -> Self {
396        ClientApiError::General {
397            code: ErrorCode::General,
398            message,
399            required_action: None,
400        }
401    }
402}
403
404impl std::fmt::Display for ClientApiError {
405    fn fmt(
406        &self,
407        f: &mut std::fmt::Formatter<'_>,
408    ) -> std::fmt::Result {
409        write!(f, "{:?}: {}", self.code(), self.message())
410    }
411}
412
413impl std::error::Error for ClientApiError {}