use std::fmt;
use std::net::SocketAddr;
#[derive(Debug)]
pub enum ClientError {
ConnectionFailed(std::io::Error),
ConnectionClosed,
IoError(std::io::Error),
ProtocolError(String),
InvalidResponse(String),
Timeout,
CrcMismatch {
expected: u32,
actual: u32,
},
ServerBackpressure,
WouldBlock,
ServerError(String),
ServerCatchingUp {
server_offset: u64,
},
NotLeader {
leader_addr: Option<SocketAddr>,
},
TlsError(String),
InvalidTopicName(String),
}
impl fmt::Display for ClientError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::ConnectionFailed(e) => write!(f, "Connection failed: {}", e),
Self::ConnectionClosed => write!(f, "Connection closed by server"),
Self::IoError(e) => write!(f, "I/O error: {}", e),
Self::ProtocolError(msg) => write!(f, "Protocol error: {}", msg),
Self::InvalidResponse(msg) => write!(f, "Invalid response: {}", msg),
Self::Timeout => write!(f, "Operation timed out"),
Self::CrcMismatch { expected, actual } => {
write!(
f,
"CRC mismatch: expected {:#x}, got {:#x}",
expected, actual
)
},
Self::ServerBackpressure => write!(f, "Server signaled backpressure"),
Self::WouldBlock => write!(f, "Operation would block (buffer full)"),
Self::ServerError(msg) => write!(f, "Server error: {}", msg),
Self::ServerCatchingUp { server_offset } => {
write!(f, "Server catching up (at offset {})", server_offset)
},
Self::NotLeader { leader_addr } => match leader_addr {
Some(addr) => write!(f, "Not leader, redirect to {}", addr),
None => write!(f, "Not leader, leader unknown"),
},
Self::TlsError(msg) => write!(f, "TLS error: {}", msg),
Self::InvalidTopicName(name) => {
write!(f, "Invalid topic name {:?}: must match [a-zA-Z0-9-]+", name)
},
}
}
}
pub fn parse_not_leader_error(msg: &str) -> Option<Option<SocketAddr>> {
if !msg.starts_with("NOT_LEADER:") {
return None;
}
if msg.contains("leader unknown") {
return Some(None);
}
if let Some(addr_str) = msg.strip_prefix("NOT_LEADER: redirect to ") {
if let Ok(addr) = addr_str.trim().parse::<SocketAddr>() {
return Some(Some(addr));
}
}
Some(None)
}
impl ClientError {
pub fn is_retryable(&self) -> bool {
match self {
Self::ConnectionClosed | Self::ConnectionFailed(_) | Self::IoError(_) => true,
Self::Timeout => true,
Self::ServerBackpressure => true,
Self::NotLeader { .. } => true,
Self::ServerCatchingUp { .. } => true,
Self::ServerError(msg) => msg.contains("FORWARD_FAILED"),
_ => false,
}
}
}
pub fn validate_topic_name(name: &str) -> Result<()> {
if name.is_empty() || !name.chars().all(|c| c.is_ascii_alphanumeric() || c == '-') {
return Err(ClientError::InvalidTopicName(name.to_owned()));
}
Ok(())
}
impl std::error::Error for ClientError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::ConnectionFailed(e) | Self::IoError(e) => Some(e),
_ => None,
}
}
}
impl From<std::io::Error> for ClientError {
fn from(err: std::io::Error) -> Self {
Self::IoError(err)
}
}
impl From<lnc_core::LanceError> for ClientError {
fn from(err: lnc_core::LanceError) -> Self {
Self::ProtocolError(err.to_string())
}
}
pub type Result<T> = std::result::Result<T, ClientError>;