use std::net::SocketAddr;
use thiserror::Error;
pub type Result<T> = std::result::Result<T, ClusterError>;
#[derive(Debug, Error)]
pub enum ClusterError {
#[error("invalid configuration: {0}")]
InvalidConfig(String),
#[error("node ID conflict: {0} already exists")]
NodeIdConflict(String),
#[error("node not found: {0}")]
NodeNotFound(String),
#[error("node unreachable: {addr}")]
NodeUnreachable { addr: SocketAddr },
#[error("node failed: {node_id}")]
NodeFailed { node_id: String },
#[error("no seed nodes available")]
NoSeedNodes,
#[error("cluster join failed: {0}")]
JoinFailed(String),
#[error("cluster not ready: need {required} nodes, have {current}")]
ClusterNotReady { required: usize, current: usize },
#[error("not leader: current leader is {leader:?}")]
NotLeader { leader: Option<String> },
#[error("leader election in progress")]
LeaderElectionInProgress,
#[error("raft error: {0}")]
Raft(String),
#[error("raft storage error: {0}")]
RaftStorage(String),
#[error("proposal timeout")]
ProposalTimeout,
#[error("proposal rejected: {0}")]
ProposalRejected(String),
#[error("topic not found: {0}")]
TopicNotFound(String),
#[error("topic already exists: {0}")]
TopicAlreadyExists(String),
#[error("partition not found: {topic}/{partition}")]
PartitionNotFound { topic: String, partition: u32 },
#[error("partition leader not found: {topic}/{partition}")]
PartitionLeaderNotFound { topic: String, partition: u32 },
#[error("invalid partition count: {0}")]
InvalidPartitionCount(u32),
#[error("invalid replication factor: {factor} (have {nodes} nodes)")]
InvalidReplicationFactor { factor: u16, nodes: usize },
#[error("not enough ISR: need {required}, have {current}")]
NotEnoughIsr { required: u16, current: u16 },
#[error("replication failed: {0}")]
ReplicationFailed(String),
#[error("replica lag exceeded: {node_id} lag {lag_messages} messages")]
ReplicaLagExceeded { node_id: String, lag_messages: u64 },
#[error("high watermark not advanced")]
HighWatermarkStalled,
#[error("protocol error: {0}")]
Protocol(String),
#[error("invalid message: {0}")]
InvalidMessage(String),
#[error("message too large: {size} bytes (max {max})")]
MessageTooLarge { size: usize, max: usize },
#[error("serialization error: {0}")]
Serialization(String),
#[error("deserialization error: {0}")]
Deserialization(String),
#[error("connection failed: {0}")]
ConnectionFailed(String),
#[error("connection closed")]
ConnectionClosed,
#[error("request timeout")]
Timeout,
#[error("network error: {0}")]
Network(String),
#[error("storage error: {0}")]
Storage(String),
#[error("corrupt data: {0}")]
CorruptData(String),
#[error("io error: {0}")]
Io(#[from] std::io::Error),
#[error("crypto error: {0}")]
CryptoError(String),
#[error("unauthorized: {0}")]
Unauthorized(String),
#[error("internal error: {0}")]
Internal(String),
#[error("channel closed")]
ChannelClosed,
#[error("shutdown in progress")]
ShuttingDown,
}
impl ClusterError {
pub fn is_retriable(&self) -> bool {
matches!(
self,
ClusterError::NodeUnreachable { .. }
| ClusterError::Timeout
| ClusterError::LeaderElectionInProgress
| ClusterError::NotLeader { .. }
| ClusterError::ClusterNotReady { .. }
| ClusterError::NotEnoughIsr { .. }
| ClusterError::Network(_)
)
}
pub fn should_redirect(&self) -> bool {
matches!(self, ClusterError::NotLeader { leader: Some(_) })
}
pub fn leader(&self) -> Option<&str> {
match self {
ClusterError::NotLeader { leader } => leader.as_deref(),
_ => None,
}
}
pub fn is_fatal(&self) -> bool {
matches!(
self,
ClusterError::CorruptData(_) | ClusterError::RaftStorage(_)
)
}
}
impl<T> From<tokio::sync::mpsc::error::SendError<T>> for ClusterError {
fn from(_: tokio::sync::mpsc::error::SendError<T>) -> Self {
ClusterError::ChannelClosed
}
}
impl From<tokio::sync::oneshot::error::RecvError> for ClusterError {
fn from(_: tokio::sync::oneshot::error::RecvError) -> Self {
ClusterError::ChannelClosed
}
}
impl From<postcard::Error> for ClusterError {
fn from(e: postcard::Error) -> Self {
ClusterError::Serialization(e.to_string())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_retriable_errors() {
assert!(ClusterError::Timeout.is_retriable());
assert!(ClusterError::LeaderElectionInProgress.is_retriable());
assert!(!ClusterError::TopicNotFound("test".into()).is_retriable());
assert!(!ClusterError::CorruptData("bad".into()).is_retriable());
}
#[test]
fn test_redirect_to_leader() {
let err = ClusterError::NotLeader {
leader: Some("node-1".into()),
};
assert!(err.should_redirect());
assert_eq!(err.leader(), Some("node-1"));
let err = ClusterError::NotLeader { leader: None };
assert!(!err.should_redirect());
}
#[test]
fn test_fatal_errors() {
assert!(ClusterError::CorruptData("bad crc".into()).is_fatal());
assert!(ClusterError::RaftStorage("disk full".into()).is_fatal());
assert!(!ClusterError::Timeout.is_fatal());
}
}