use std::time::Duration;
use std::{io, result, sync::Arc};
use thiserror::Error;
pub type Result<T> = result::Result<T, Error>;
#[derive(Debug, Error)]
pub enum ConnectionError {
#[error("I/O error: {0}")]
Io(#[from] io::Error),
#[cfg(feature = "security")]
#[error("TLS error: {0}")]
Tls(String),
#[error("Connection timeout after {0:?}")]
Timeout(Duration),
#[error("No host reachable")]
NoHostReachable,
}
#[derive(Debug, Clone, Error)]
pub enum ProtocolError {
#[error("Unsupported protocol version")]
UnsupportedVersion,
#[error("Unsupported compression format")]
UnsupportedCompression,
#[error("Unexpected end of data")]
UnexpectedEOF,
#[error("Encoding/decoding error")]
Codec,
#[error("String decoding error")]
StringDecode,
#[error("Invalid duration")]
InvalidDuration,
}
#[derive(Debug, Clone, Error)]
pub enum ConsumerError {
#[error("No topic assigned")]
NoTopicsAssigned,
#[error("Offset storage not configured")]
UnsetOffsetStorage,
#[error("Group ID not configured")]
UnsetGroupId,
}
#[derive(Debug, Error)]
pub enum Error {
#[error("Connection error: {0}")]
Connection(#[source] ConnectionError),
#[error("Protocol error: {0}")]
Protocol(#[source] ProtocolError),
#[error("Kafka Error ({0:?})")]
Kafka(KafkaCode),
#[error("Configuration error: {0}")]
Config(String),
#[error("Consumer error: {0}")]
Consumer(#[source] ConsumerError),
#[error("Topic Partition Error ({topic_name:?}, {partition_id:?}, {error_code:?})")]
TopicPartitionError {
topic_name: String,
partition_id: i32,
error_code: KafkaCode,
},
#[error("Broker request to {broker} failed ({api_key}): {source}")]
BrokerRequestError {
broker: String,
api_key: &'static str,
#[source]
source: Box<Self>,
},
}
impl From<io::Error> for Error {
fn from(e: io::Error) -> Self {
Self::Connection(ConnectionError::Io(e))
}
}
impl From<Arc<Self>> for Error {
fn from(e: Arc<Self>) -> Self {
match Arc::try_unwrap(e) {
Ok(err) => err,
Err(arc) => match &*arc {
Self::Connection(ConnectionError::Io(e)) => {
Self::Connection(ConnectionError::Io(io::Error::new(e.kind(), e.to_string())))
}
#[cfg(feature = "security")]
Self::Connection(ConnectionError::Tls(s)) => {
Self::Connection(ConnectionError::Tls(s.clone()))
}
Self::Connection(ConnectionError::Timeout(d)) => {
Self::Connection(ConnectionError::Timeout(*d))
}
Self::Connection(ConnectionError::NoHostReachable) => {
Self::Connection(ConnectionError::NoHostReachable)
}
Self::Protocol(p) => Self::Protocol(p.clone()),
Self::Kafka(c) => Self::Kafka(*c),
Self::Config(s) => Self::Config(s.clone()),
Self::Consumer(c) => Self::Consumer(c.clone()),
Self::TopicPartitionError {
topic_name,
partition_id,
error_code,
} => Self::TopicPartitionError {
topic_name: topic_name.clone(),
partition_id: *partition_id,
error_code: *error_code,
},
Self::BrokerRequestError {
broker,
api_key,
source: _,
} => Self::BrokerRequestError {
broker: broker.clone(),
api_key,
source: Box::new(Self::from(Arc::clone(&arc))),
},
},
}
}
}
impl Error {
#[inline]
pub(crate) fn codec() -> Self {
Self::Protocol(ProtocolError::Codec)
}
#[inline]
#[allow(dead_code)]
pub(crate) fn unexpected_eof() -> Self {
Self::Protocol(ProtocolError::UnexpectedEOF)
}
#[inline]
pub(crate) fn no_host_reachable() -> Self {
Self::Connection(ConnectionError::NoHostReachable)
}
#[inline]
pub(crate) fn invalid_duration() -> Self {
Self::Protocol(ProtocolError::InvalidDuration)
}
#[inline]
pub(crate) fn no_topics_assigned() -> Self {
Self::Consumer(ConsumerError::NoTopicsAssigned)
}
#[inline]
pub(crate) fn unset_group_id() -> Self {
Self::Consumer(ConsumerError::UnsetGroupId)
}
#[cfg(feature = "security")]
#[inline]
#[allow(dead_code)]
pub(crate) fn tls(msg: impl Into<String>) -> Self {
Self::Connection(ConnectionError::Tls(msg.into()))
}
#[must_use]
pub fn with_broker_context(self, broker: impl Into<String>, api_key: &'static str) -> Self {
Error::BrokerRequestError {
broker: broker.into(),
api_key,
source: Box::new(self),
}
}
#[must_use]
pub fn is_retriable(&self) -> bool {
match self {
Self::Kafka(code) => code.is_retriable(),
Self::Connection(conn_err) => matches!(
conn_err,
ConnectionError::Io(_)
| ConnectionError::Timeout(_)
| ConnectionError::NoHostReachable
),
Self::TopicPartitionError { error_code, .. } => error_code.is_retriable(),
Self::BrokerRequestError { source, .. } => source.is_retriable(),
_ => false,
}
}
#[must_use]
pub fn is_connection_error(&self) -> bool {
matches!(self, Self::Connection(_))
}
#[must_use]
pub fn is_protocol_error(&self) -> bool {
matches!(self, Self::Protocol(_))
}
#[must_use]
pub fn is_consumer_error(&self) -> bool {
matches!(self, Self::Consumer(_))
}
pub(crate) fn from_protocol(n: i16) -> Option<Error> {
KafkaCode::from_protocol(n).map(Error::Kafka)
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum KafkaCode {
Unknown = -1,
OffsetOutOfRange = 1,
CorruptMessage = 2,
UnknownTopicOrPartition = 3,
InvalidMessageSize = 4,
LeaderNotAvailable = 5,
NotLeaderForPartition = 6,
RequestTimedOut = 7,
BrokerNotAvailable = 8,
ReplicaNotAvailable = 9,
MessageSizeTooLarge = 10,
StaleControllerEpoch = 11,
OffsetMetadataTooLarge = 12,
NetworkException = 13,
GroupLoadInProgress = 14,
GroupCoordinatorNotAvailable = 15,
NotCoordinatorForGroup = 16,
InvalidTopic = 17,
RecordListTooLarge = 18,
NotEnoughReplicas = 19,
NotEnoughReplicasAfterAppend = 20,
InvalidRequiredAcks = 21,
IllegalGeneration = 22,
InconsistentGroupProtocol = 23,
InvalidGroupId = 24,
UnknownMemberId = 25,
InvalidSessionTimeout = 26,
RebalanceInProgress = 27,
InvalidCommitOffsetSize = 28,
TopicAuthorizationFailed = 29,
GroupAuthorizationFailed = 30,
ClusterAuthorizationFailed = 31,
InvalidTimestamp = 32,
UnsupportedSaslMechanism = 33,
IllegalSaslState = 34,
UnsupportedVersion = 35,
}
impl KafkaCode {
#[must_use]
pub fn is_retriable(self) -> bool {
matches!(
self,
KafkaCode::LeaderNotAvailable
| KafkaCode::NotLeaderForPartition
| KafkaCode::GroupLoadInProgress
| KafkaCode::GroupCoordinatorNotAvailable
| KafkaCode::NotCoordinatorForGroup
| KafkaCode::NetworkException
| KafkaCode::RequestTimedOut
| KafkaCode::RebalanceInProgress
)
}
pub(crate) fn from_protocol(n: i16) -> Option<KafkaCode> {
if n == 0 {
return None;
}
Some(match n {
-1 => KafkaCode::Unknown,
1 => KafkaCode::OffsetOutOfRange,
2 => KafkaCode::CorruptMessage,
3 => KafkaCode::UnknownTopicOrPartition,
4 => KafkaCode::InvalidMessageSize,
5 => KafkaCode::LeaderNotAvailable,
6 => KafkaCode::NotLeaderForPartition,
7 => KafkaCode::RequestTimedOut,
8 => KafkaCode::BrokerNotAvailable,
9 => KafkaCode::ReplicaNotAvailable,
10 => KafkaCode::MessageSizeTooLarge,
11 => KafkaCode::StaleControllerEpoch,
12 => KafkaCode::OffsetMetadataTooLarge,
13 => KafkaCode::NetworkException,
14 => KafkaCode::GroupLoadInProgress,
15 => KafkaCode::GroupCoordinatorNotAvailable,
16 => KafkaCode::NotCoordinatorForGroup,
17 => KafkaCode::InvalidTopic,
18 => KafkaCode::RecordListTooLarge,
19 => KafkaCode::NotEnoughReplicas,
20 => KafkaCode::NotEnoughReplicasAfterAppend,
21 => KafkaCode::InvalidRequiredAcks,
22 => KafkaCode::IllegalGeneration,
23 => KafkaCode::InconsistentGroupProtocol,
24 => KafkaCode::InvalidGroupId,
25 => KafkaCode::UnknownMemberId,
26 => KafkaCode::InvalidSessionTimeout,
27 => KafkaCode::RebalanceInProgress,
28 => KafkaCode::InvalidCommitOffsetSize,
29 => KafkaCode::TopicAuthorizationFailed,
30 => KafkaCode::GroupAuthorizationFailed,
31 => KafkaCode::ClusterAuthorizationFailed,
32 => KafkaCode::InvalidTimestamp,
33 => KafkaCode::UnsupportedSaslMechanism,
34 => KafkaCode::IllegalSaslState,
35 => KafkaCode::UnsupportedVersion,
_ => KafkaCode::Unknown,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::ErrorKind;
#[test]
fn test_kafka_code_from_i32() {
assert!(KafkaCode::from_protocol(0).is_none());
assert_eq!(
KafkaCode::from_protocol(1),
Some(KafkaCode::OffsetOutOfRange)
);
assert_eq!(
KafkaCode::from_protocol(6),
Some(KafkaCode::NotLeaderForPartition)
);
assert_eq!(KafkaCode::from_protocol(999), Some(KafkaCode::Unknown));
}
#[test]
fn test_error_display() {
let msg = Error::Kafka(KafkaCode::LeaderNotAvailable).to_string();
assert!(msg.contains("LeaderNotAvailable"), "got: {msg}");
let msg = Error::no_host_reachable().to_string();
assert!(msg.contains("host"), "got: {msg}");
let msg = Error::unexpected_eof().to_string();
assert!(msg.contains("end of data"), "got: {msg}");
}
#[test]
fn test_error_with_broker_context() {
let err = Error::Kafka(KafkaCode::NotLeaderForPartition)
.with_broker_context("broker1:9092", "Produce");
let msg = err.to_string();
assert!(msg.contains("broker1:9092"), "got: {msg}");
assert!(msg.contains("Produce"), "got: {msg}");
}
#[test]
fn test_error_io_conversion() {
let io_err = io::Error::new(ErrorKind::ConnectionRefused, "refused");
let err: Error = io_err.into();
assert!(matches!(err, Error::Connection(ConnectionError::Io(_))));
}
#[test]
fn test_topic_partition_error() {
let err = Error::TopicPartitionError {
topic_name: "test-topic".into(),
partition_id: 0,
error_code: KafkaCode::LeaderNotAvailable,
};
let msg = err.to_string();
assert!(msg.contains("test-topic"), "got: {msg}");
}
#[test]
fn test_is_retriable_kafka_errors() {
assert!(Error::Kafka(KafkaCode::LeaderNotAvailable).is_retriable());
assert!(Error::Kafka(KafkaCode::NotLeaderForPartition).is_retriable());
assert!(Error::Kafka(KafkaCode::GroupLoadInProgress).is_retriable());
assert!(Error::Kafka(KafkaCode::GroupCoordinatorNotAvailable).is_retriable());
assert!(Error::Kafka(KafkaCode::NotCoordinatorForGroup).is_retriable());
assert!(Error::Kafka(KafkaCode::NetworkException).is_retriable());
assert!(Error::Kafka(KafkaCode::RequestTimedOut).is_retriable());
assert!(Error::Kafka(KafkaCode::RebalanceInProgress).is_retriable());
}
#[test]
fn test_is_not_retriable_kafka_errors() {
assert!(!Error::Kafka(KafkaCode::UnknownTopicOrPartition).is_retriable());
assert!(!Error::Kafka(KafkaCode::MessageSizeTooLarge).is_retriable());
assert!(!Error::Kafka(KafkaCode::Unknown).is_retriable());
}
#[test]
fn test_is_retriable_connection_errors() {
assert!(
Error::Connection(ConnectionError::Io(io::Error::new(
ErrorKind::ConnectionReset,
"reset"
)))
.is_retriable()
);
assert!(Error::Connection(ConnectionError::Timeout(Duration::from_secs(5))).is_retriable());
assert!(Error::Connection(ConnectionError::NoHostReachable).is_retriable());
#[cfg(feature = "security")]
assert!(!Error::Connection(ConnectionError::Tls("bad cert".into())).is_retriable());
}
#[test]
fn test_is_retriable_topic_partition_error() {
let err = Error::TopicPartitionError {
topic_name: "t".into(),
partition_id: 0,
error_code: KafkaCode::LeaderNotAvailable,
};
assert!(err.is_retriable());
let err = Error::TopicPartitionError {
topic_name: "t".into(),
partition_id: 0,
error_code: KafkaCode::UnknownTopicOrPartition,
};
assert!(!err.is_retriable());
}
#[test]
fn test_is_retriable_broker_request_error() {
let inner = Error::Kafka(KafkaCode::NotLeaderForPartition);
let err = inner.with_broker_context("broker:9092", "Produce");
assert!(err.is_retriable());
}
#[test]
fn test_is_retriable_non_retriable_errors() {
assert!(!Error::codec().is_retriable());
assert!(!Error::no_topics_assigned().is_retriable());
assert!(!Error::Config("bad".into()).is_retriable());
}
#[test]
fn test_kafka_code_is_retriable() {
assert!(KafkaCode::LeaderNotAvailable.is_retriable());
assert!(KafkaCode::NetworkException.is_retriable());
assert!(!KafkaCode::UnknownTopicOrPartition.is_retriable());
assert!(!KafkaCode::OffsetOutOfRange.is_retriable());
}
#[test]
fn test_error_category_queries() {
assert!(
Error::Connection(ConnectionError::Io(io::Error::other("err"))).is_connection_error()
);
assert!(!Error::codec().is_connection_error());
assert!(Error::codec().is_protocol_error());
assert!(Error::no_topics_assigned().is_consumer_error());
}
#[test]
fn test_protocol_error_variants() {
assert!(
Error::Protocol(ProtocolError::UnsupportedVersion)
.to_string()
.contains("version")
);
assert!(
Error::Protocol(ProtocolError::UnsupportedCompression)
.to_string()
.contains("compression")
);
assert!(
Error::Protocol(ProtocolError::Codec)
.to_string()
.contains("Encoding")
);
assert!(
Error::Protocol(ProtocolError::StringDecode)
.to_string()
.contains("String")
);
assert!(
Error::Protocol(ProtocolError::InvalidDuration)
.to_string()
.contains("duration")
);
}
#[test]
fn test_consumer_error_variants() {
assert!(
Error::Consumer(ConsumerError::NoTopicsAssigned)
.to_string()
.contains("topic")
);
assert!(
Error::Consumer(ConsumerError::UnsetOffsetStorage)
.to_string()
.contains("Offset")
);
assert!(
Error::Consumer(ConsumerError::UnsetGroupId)
.to_string()
.contains("Group")
);
}
}
#[cfg(test)]
mod proptests {
use super::*;
use proptest::prelude::*;
proptest! {
#[test]
fn kafka_code_from_any_i16(code in proptest::num::i16::ANY) {
let _ = KafkaCode::from_protocol(code);
}
#[test]
fn kafka_code_unknown_for_out_of_range(code in 36i16..=1000i16) {
assert_eq!(KafkaCode::from_protocol(code), Some(KafkaCode::Unknown));
}
#[test]
fn broker_context_chainable(broker in "[a-z]{1,20}") {
let err = Error::no_host_reachable();
let wrapped = err.with_broker_context(broker.clone(), "Produce");
let msg = wrapped.to_string();
assert!(msg.contains(&broker));
assert!(msg.contains("Produce"));
}
#[test]
fn is_retriable_safe(code in proptest::num::i16::ANY) {
if let Some(kafka_code) = KafkaCode::from_protocol(code) {
let err = Error::Kafka(kafka_code);
let _ = err.is_retriable();
}
}
#[test]
fn error_display_safe(code in proptest::num::i16::ANY) {
if let Some(kafka_code) = KafkaCode::from_protocol(code) {
let err = Error::Kafka(kafka_code);
let msg = err.to_string();
assert!(!msg.is_empty());
}
}
}
}