use std::error::Error;
use d_engine_proto::error::ErrorCode;
use serde::Deserialize;
use serde::Serialize;
use tokio::task::JoinError;
use tonic::Code;
use tonic::Status;
pub type ClientApiResult<T> = std::result::Result<T, ClientApiError>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ClientApiError {
#[serde(rename = "network")]
Network {
code: ErrorCode,
message: String,
retry_after_ms: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
leader_hint: Option<LeaderHint>,
},
#[serde(rename = "protocol")]
Protocol {
code: ErrorCode,
message: String,
#[serde(skip_serializing_if = "Option::is_none")]
supported_versions: Option<Vec<String>>,
},
#[serde(rename = "storage")]
Storage { code: ErrorCode, message: String },
#[serde(rename = "business")]
Business {
code: ErrorCode,
message: String,
#[serde(skip_serializing_if = "Option::is_none")]
required_action: Option<String>,
},
#[serde(rename = "general")]
General {
code: ErrorCode,
message: String,
#[serde(skip_serializing_if = "Option::is_none")]
required_action: Option<String>,
},
}
pub use d_engine_proto::common::LeaderHint;
impl From<tonic::transport::Error> for ClientApiError {
fn from(err: tonic::transport::Error) -> Self {
if let Some(io_err) = err.source().and_then(|e| e.downcast_ref::<std::io::Error>()) {
if io_err.kind() == std::io::ErrorKind::TimedOut {
return Self::Network {
code: ErrorCode::ConnectionTimeout,
message: format!("Connection timeout: {err}"),
retry_after_ms: Some(3000), leader_hint: None,
};
}
}
if err.to_string().contains("invalid uri") {
return Self::Network {
code: ErrorCode::InvalidAddress,
message: format!("Invalid address: {err}"),
retry_after_ms: None, leader_hint: None,
};
}
Self::Network {
code: ErrorCode::Uncategorized,
message: format!("Transport error: {err}"),
retry_after_ms: Some(5000),
leader_hint: None,
}
}
}
impl From<Status> for ClientApiError {
fn from(status: Status) -> Self {
let code = status.code();
let message = status.message().to_string();
match code {
Code::Unavailable => Self::Business {
code: ErrorCode::ClusterUnavailable,
message,
required_action: Some("Retry after cluster recovery".into()),
},
Code::Cancelled => Self::Network {
code: ErrorCode::ConnectionTimeout,
message,
leader_hint: None,
retry_after_ms: Some(1000),
},
Code::FailedPrecondition => {
if let Some(leader) = parse_leader_from_metadata(&status) {
Self::Network {
code: ErrorCode::LeaderChanged,
message: "Leadership changed".into(),
retry_after_ms: Some(1000),
leader_hint: Some(leader),
}
} else {
Self::Business {
code: ErrorCode::StaleOperation,
message,
required_action: Some("Refresh cluster state".into()),
}
}
}
Code::InvalidArgument => Self::Business {
code: ErrorCode::InvalidRequest,
message,
required_action: None,
},
Code::PermissionDenied => Self::Business {
code: ErrorCode::NotLeader,
message,
required_action: Some("Refresh cluster state".into()),
},
_ => Self::Business {
code: ErrorCode::Uncategorized,
message: format!("Unhandled status code: {code:?}"),
required_action: None,
},
}
}
}
fn parse_leader_from_metadata(status: &Status) -> Option<LeaderHint> {
status
.metadata()
.get("x-raft-leader")
.and_then(|v| v.to_str().ok())
.and_then(|s| {
let mut leader_id = None;
let mut address = None;
let s = s.trim().trim_start_matches('{').trim_end_matches('}');
for pair in s.split(',') {
let pair = pair.trim();
if let Some((key, value)) = pair.split_once(':') {
let key = key.trim().trim_matches('"');
let value = value.trim().trim_matches('"');
match key {
"leader_id" => leader_id = value.parse().ok(),
"address" => address = Some(value.to_string()),
_ => continue,
}
}
}
Some(LeaderHint {
leader_id: leader_id?,
address: address?,
})
})
}
impl From<ErrorCode> for ClientApiError {
fn from(code: ErrorCode) -> Self {
match code {
ErrorCode::ConnectionTimeout => ClientApiError::Network {
code,
message: "Connection timeout".to_string(),
retry_after_ms: None,
leader_hint: None,
},
ErrorCode::InvalidAddress => ClientApiError::Network {
code,
message: "Invalid address".to_string(),
retry_after_ms: None,
leader_hint: None,
},
ErrorCode::LeaderChanged => ClientApiError::Network {
code,
message: "Leader changed".to_string(),
retry_after_ms: Some(100), leader_hint: None, },
ErrorCode::JoinError => ClientApiError::Network {
code,
message: "Task Join Error".to_string(),
retry_after_ms: Some(100), leader_hint: None, },
ErrorCode::InvalidResponse => ClientApiError::Protocol {
code,
message: "Invalid response format".to_string(),
supported_versions: None,
},
ErrorCode::VersionMismatch => ClientApiError::Protocol {
code,
message: "Version mismatch".to_string(),
supported_versions: None, },
ErrorCode::DiskFull => ClientApiError::Storage {
code,
message: "Disk full".to_string(),
},
ErrorCode::DataCorruption => ClientApiError::Storage {
code,
message: "Data corruption detected".to_string(),
},
ErrorCode::StorageIoError => ClientApiError::Storage {
code,
message: "Storage I/O error".to_string(),
},
ErrorCode::StoragePermissionDenied => ClientApiError::Storage {
code,
message: "Storage permission denied".to_string(),
},
ErrorCode::KeyNotExist => ClientApiError::Storage {
code,
message: "Key not exist in storage".to_string(),
},
ErrorCode::NotLeader => ClientApiError::Business {
code,
message: "Not leader".to_string(),
required_action: Some("redirect to leader".to_string()),
},
ErrorCode::StaleOperation => ClientApiError::Business {
code,
message: "Stale operation".to_string(),
required_action: Some("refresh state and retry".to_string()),
},
ErrorCode::InvalidRequest => ClientApiError::Business {
code,
message: "Invalid request".to_string(),
required_action: Some("check request parameters".to_string()),
},
ErrorCode::RateLimited => ClientApiError::Business {
code,
message: "Rate limited".to_string(),
required_action: Some("wait and retry".to_string()),
},
ErrorCode::ClusterUnavailable => ClientApiError::Business {
code,
message: "Cluster unavailable".to_string(),
required_action: Some("try again later".to_string()),
},
ErrorCode::ProposeFailed => ClientApiError::Business {
code,
message: "Propose failed".to_string(),
required_action: Some("try again later".to_string()),
},
ErrorCode::Uncategorized => ClientApiError::Business {
code,
message: "Uncategorized error".to_string(),
required_action: None,
},
ErrorCode::TermOutdated => ClientApiError::Business {
code,
message: "Stale term error".to_string(),
required_action: None,
},
ErrorCode::RetryRequired => ClientApiError::Business {
code,
message: "Retry required. Please try again.".to_string(),
required_action: None,
},
ErrorCode::General => ClientApiError::General {
code,
message: "General Client Api error".to_string(),
required_action: None,
},
ErrorCode::Success => unreachable!(),
}
}
}
impl ClientApiError {
pub fn code(&self) -> ErrorCode {
match self {
ClientApiError::Network { code, .. } => *code,
ClientApiError::Protocol { code, .. } => *code,
ClientApiError::Storage { code, .. } => *code,
ClientApiError::Business { code, .. } => *code,
ClientApiError::General { code, .. } => *code,
}
}
pub fn message(&self) -> &str {
match self {
ClientApiError::Network { message, .. } => message,
ClientApiError::Protocol { message, .. } => message,
ClientApiError::Storage { message, .. } => message,
ClientApiError::Business { message, .. } => message,
ClientApiError::General { message, .. } => message,
}
}
}
impl From<JoinError> for ClientApiError {
fn from(_err: JoinError) -> Self {
ErrorCode::JoinError.into()
}
}
impl From<std::io::Error> for ClientApiError {
fn from(_err: std::io::Error) -> Self {
ErrorCode::StorageIoError.into()
}
}
impl ClientApiError {
pub fn general_client_error(message: String) -> Self {
ClientApiError::General {
code: ErrorCode::General,
message,
required_action: None,
}
}
}
impl std::fmt::Display for ClientApiError {
fn fmt(
&self,
f: &mut std::fmt::Formatter<'_>,
) -> std::fmt::Result {
write!(f, "{:?}: {}", self.code(), self.message())
}
}
impl std::error::Error for ClientApiError {}