use thiserror::Error;
use tokio::task::JoinError;
use kafka_protocol::error::ResponseError;
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct ErrorClassification {
pub retriable: bool,
pub fatal: bool,
pub transaction_abort_required: bool,
}
#[derive(Debug, Error)]
pub enum Error {
#[error("operation cancelled")]
Cancelled,
#[error(transparent)]
Admin(#[from] AdminError),
#[error(transparent)]
Consumer(#[from] ConsumerError),
#[error(transparent)]
Producer(#[from] ProducerError),
#[error(transparent)]
ConsumerGroupMetadata(#[from] ConsumerGroupMetadataError),
#[error(transparent)]
TransactionState(#[from] TransactionStateError),
#[error(transparent)]
Broker(#[from] BrokerError),
#[error(transparent)]
Validation(#[from] ValidationError),
#[error(transparent)]
Protocol(#[from] ProtocolError),
#[error(transparent)]
Internal(#[from] anyhow::Error),
}
impl Error {
pub fn classification(&self) -> ErrorClassification {
match self {
Self::Cancelled => ErrorClassification::default(),
Self::Admin(error) => error.classification(),
Self::Consumer(error) => error.classification(),
Self::Producer(error) => error.classification(),
Self::ConsumerGroupMetadata(_) => ErrorClassification::default(),
Self::TransactionState(error) => error.classification(),
Self::Broker(error) => error.classification(),
Self::Validation(error) => error.classification(),
Self::Protocol(error) => error.classification(),
Self::Internal(_) => ErrorClassification::default(),
}
}
pub fn is_retriable(&self) -> bool {
self.classification().retriable
}
pub fn is_fatal(&self) -> bool {
self.classification().fatal
}
pub fn transaction_abort_required(&self) -> bool {
self.classification().transaction_abort_required
}
}
#[derive(Debug, Clone, Error)]
pub enum BrokerError {
#[error("{operation} failed with broker error {name} ({code}){resource}")]
Response {
operation: &'static str,
resource: String,
code: i16,
name: String,
retriable: bool,
fatal: bool,
transaction_abort_required: bool,
},
}
impl BrokerError {
pub fn response(
operation: &'static str,
resource: impl Into<Option<String>>,
error: ResponseError,
) -> Self {
let resource = resource
.into()
.map(|value| format!(" for {value}"))
.unwrap_or_default();
Self::Response {
operation,
resource,
code: error.code(),
name: error.to_string(),
retriable: error.is_retriable(),
fatal: false,
transaction_abort_required: false,
}
}
pub fn fatal(mut self) -> Self {
let Self::Response { fatal, .. } = &mut self;
*fatal = true;
self
}
pub fn transaction_abort_required(mut self) -> Self {
let Self::Response {
transaction_abort_required,
..
} = &mut self;
*transaction_abort_required = true;
self
}
fn classification(&self) -> ErrorClassification {
match self {
Self::Response {
retriable,
fatal,
transaction_abort_required,
..
} => ErrorClassification {
retriable: *retriable,
fatal: *fatal,
transaction_abort_required: *transaction_abort_required,
},
}
}
}
#[derive(Debug, Clone, Error)]
pub enum ValidationError {
#[error("{operation} requires a non-empty topic name")]
EmptyTopicName {
operation: &'static str,
},
#[error("{operation} requires a topic")]
MissingTopic {
operation: &'static str,
},
#[error("{operation} requires a non-negative partition: {partition}")]
NegativePartition {
operation: &'static str,
partition: i32,
},
#[error("{resource} names must be non-empty")]
EmptyResourceName {
resource: &'static str,
},
#[error("ACL resource_type must not be {resource_type}")]
InvalidAclResourceType {
resource_type: String,
},
#[error("ACL pattern_type must not be {pattern_type}")]
InvalidAclPatternType {
pattern_type: String,
},
#[error("ACL principal must be non-empty")]
EmptyAclPrincipal,
#[error("ACL host must be non-empty")]
EmptyAclHost,
#[error("ACL operation must not be {operation}")]
InvalidAclOperation {
operation: String,
},
#[error("ACL permission_type must not be {permission_type}")]
InvalidAclPermissionType {
permission_type: String,
},
#[error("consumer group id must be non-empty")]
EmptyConsumerGroupId,
#[error("feature names must be non-empty")]
EmptyFeatureName,
}
impl ValidationError {
fn classification(&self) -> ErrorClassification {
ErrorClassification::default()
}
}
#[derive(Debug, Clone, Error)]
pub enum ProtocolError {
#[error("{operation} response was missing required data: {detail}")]
MissingResponseData {
operation: &'static str,
detail: String,
},
#[error("{api} v{min_version}+ is required, broker only supports v{broker_version}")]
UnsupportedApiVersion {
api: &'static str,
min_version: i16,
broker_version: i16,
},
}
impl ProtocolError {
fn classification(&self) -> ErrorClassification {
match self {
Self::MissingResponseData { .. } | Self::UnsupportedApiVersion { .. } => {
ErrorClassification {
fatal: true,
..ErrorClassification::default()
}
}
}
}
}
#[derive(Debug, Error)]
pub enum AdminError {
#[error("topic names must be non-empty")]
EmptyTopicName,
#[error("topic partition count must be positive: {partitions}")]
InvalidPartitionCount {
partitions: i32,
},
#[error("topic replication factor must be positive: {replication_factor}")]
InvalidReplicationFactor {
replication_factor: i16,
},
}
impl AdminError {
fn classification(&self) -> ErrorClassification {
ErrorClassification::default()
}
}
#[derive(Debug, Error)]
pub enum ConsumerError {
#[error("consumer runtime stopped before {operation}")]
ThreadStoppedBefore {
operation: &'static str,
},
#[error("consumer runtime stopped during {operation}")]
ThreadStoppedDuring {
operation: &'static str,
},
#[error("failed to join consumer runtime: {0}")]
Join(#[source] JoinError),
#[error("subscribe requires at least one non-empty topic name")]
EmptySubscription,
#[error("subscribe_pattern requires a non-empty pattern")]
EmptySubscriptionPattern,
#[error("subscribe_regex requires a valid regular expression: {message}")]
InvalidSubscriptionRegex {
message: String,
},
#[error("concurrent poll calls are not supported by this simple consumer")]
ConcurrentPoll,
#[error("poll was interrupted by wakeup()")]
Wakeup,
#[error("seek offset must be non-negative: {offset}")]
InvalidSeekOffset {
offset: i64,
},
#[error("topic partition names must be non-empty")]
EmptyTopicPartition,
#[error("{operation} requires an assigned partition, but {topic}:{partition} is not assigned")]
PartitionNotAssigned {
operation: &'static str,
topic: String,
partition: i32,
},
#[error("broker rejected the subscription regex: {message}")]
InvalidRegularExpression {
message: String,
},
#[error("broker rejected the configured server assignor '{assignor}': {message}")]
UnsupportedAssignor {
assignor: String,
message: String,
},
#[error("static member '{instance_id}' is still owned by another consumer: {message}")]
UnreleasedInstanceId {
instance_id: String,
message: String,
},
#[error("static member '{instance_id}' was fenced: {message}")]
FencedInstanceId {
instance_id: String,
message: String,
},
#[error("consumer is shutting down")]
ShuttingDown,
#[error("consumer runtime is fatal: {message}")]
Fatal {
message: String,
},
}
impl ConsumerError {
fn classification(&self) -> ErrorClassification {
match self {
Self::ThreadStoppedBefore { .. } | Self::ThreadStoppedDuring { .. } | Self::Join(_) => {
ErrorClassification {
fatal: true,
..ErrorClassification::default()
}
}
Self::UnreleasedInstanceId { .. } => ErrorClassification {
fatal: true,
..ErrorClassification::default()
},
Self::FencedInstanceId { .. } | Self::Fatal { .. } => ErrorClassification {
fatal: true,
..ErrorClassification::default()
},
Self::EmptySubscription
| Self::EmptySubscriptionPattern
| Self::InvalidSubscriptionRegex { .. }
| Self::ConcurrentPoll
| Self::Wakeup
| Self::InvalidSeekOffset { .. }
| Self::EmptyTopicPartition
| Self::PartitionNotAssigned { .. }
| Self::InvalidRegularExpression { .. }
| Self::UnsupportedAssignor { .. }
| Self::ShuttingDown => ErrorClassification::default(),
}
}
}
#[derive(Debug, Error)]
pub enum ProducerError {
#[error("idempotent producers require acks=-1")]
IdempotenceRequiresAcksAll,
#[error("idempotent producers require max_retries > 0")]
IdempotenceRequiresRetries,
#[error(
"transactional producers require acks=-1 so the broker can commit the full transaction"
)]
TransactionalRequiresAcksAll,
#[error("broker did not advertise finalized feature level for transaction.version")]
MissingTransactionVersionFeature,
#[error(
"broker finalized transaction.version={level}, but transaction v2 requires transaction.version>=2"
)]
UnsupportedTransactionVersion {
level: i16,
},
#[error("{api} v{min_version}+ is required, broker only supports v{broker_version}")]
UnsupportedApiVersion {
api: &'static str,
min_version: i16,
broker_version: i16,
},
#[error("producer is not configured with a transactional_id")]
NotTransactional,
#[error("topic '{topic}' has no partitions in metadata")]
TopicHasNoPartitions {
topic: String,
},
#[error("missing partition metadata for topic '{topic}'")]
MissingPartitionMetadata {
topic: String,
},
#[error("metadata for topic '{topic}' was not available within max_block={max_block_ms}ms")]
MetadataTimeout {
topic: String,
max_block_ms: u128,
},
#[error("transaction manager is unavailable during {operation}")]
TransactionManagerUnavailable {
operation: &'static str,
},
#[error("transactional producer id is not initialized during {operation}")]
TransactionalProducerNotInitialized {
operation: &'static str,
},
#[error("transaction coordinator connection is missing")]
TransactionCoordinatorConnectionMissing,
#[error("{operation} exhausted {attempts} attempts{resource}")]
AttemptsExhausted {
operation: &'static str,
resource: String,
attempts: usize,
},
#[error("produce batch failed for {topic}:{partition}: {message}")]
BatchFailed {
topic: String,
partition: i32,
message: String,
},
#[error("producer record is {size} bytes, larger than {limit_name}={limit}")]
RecordTooLarge {
size: usize,
limit_name: &'static str,
limit: usize,
},
#[error("produce request is {size} bytes, larger than max_request_size={limit}")]
RequestTooLarge {
size: usize,
limit: usize,
},
#[error(
"producer buffer has {buffered} of {limit} bytes queued and cannot accept {required} more bytes within max_block={max_block_ms}ms"
)]
BufferExhausted {
buffered: usize,
required: usize,
limit: usize,
max_block_ms: u128,
},
#[error("{operation} failed and the transaction must be aborted: {message}")]
TransactionAbortRequired {
operation: &'static str,
message: String,
},
#[error("{operation} failed and the transactional producer is no longer usable: {message}")]
TransactionFatal {
operation: &'static str,
message: String,
},
#[error("producer runtime stopped before {operation}")]
RuntimeStoppedBefore {
operation: &'static str,
},
#[error("producer runtime stopped during {operation}")]
RuntimeStoppedDuring {
operation: &'static str,
},
#[error("failed to join producer runtime: {0}")]
Join(#[source] JoinError),
}
impl ProducerError {
fn classification(&self) -> ErrorClassification {
match self {
Self::MissingTransactionVersionFeature
| Self::UnsupportedTransactionVersion { .. }
| Self::UnsupportedApiVersion { .. }
| Self::TransactionFatal { .. }
| Self::TransactionManagerUnavailable { .. }
| Self::TransactionalProducerNotInitialized { .. }
| Self::TransactionCoordinatorConnectionMissing
| Self::RuntimeStoppedBefore { .. }
| Self::RuntimeStoppedDuring { .. }
| Self::Join(_) => ErrorClassification {
fatal: true,
..ErrorClassification::default()
},
Self::TransactionAbortRequired { .. } => ErrorClassification {
transaction_abort_required: true,
..ErrorClassification::default()
},
Self::AttemptsExhausted { .. } => ErrorClassification {
retriable: true,
..ErrorClassification::default()
},
Self::IdempotenceRequiresAcksAll
| Self::IdempotenceRequiresRetries
| Self::TransactionalRequiresAcksAll
| Self::NotTransactional
| Self::TopicHasNoPartitions { .. }
| Self::MissingPartitionMetadata { .. }
| Self::MetadataTimeout { .. }
| Self::BatchFailed { .. }
| Self::RecordTooLarge { .. }
| Self::RequestTooLarge { .. }
| Self::BufferExhausted { .. } => ErrorClassification::default(),
}
}
}
#[derive(Debug, Error)]
pub enum ConsumerGroupMetadataError {
#[error("consumer group metadata requires a non-empty group_id")]
EmptyGroupId,
#[error("consumer group metadata has generation_id > 0 but no member_id")]
MissingMemberId,
}
#[derive(Debug, Error)]
pub enum TransactionStateError {
#[error("transaction already in progress")]
AlreadyInProgress,
#[error("transaction is already completing with {0}")]
Completing(&'static str),
#[error("transaction must be aborted before reuse: {0}")]
MustAbortBeforeReuse(String),
#[error("transaction is unusable: {0}")]
Fatal(String),
#[error("transactional send requires begin_transaction() before send()")]
AppendWithoutBegin,
#[error("send_offsets_to_transaction requires begin_transaction() first")]
SendOffsetsWithoutBegin,
#[error("transaction has failed and must be aborted: {0}")]
AbortRequired(String),
#[error("transaction cannot be committed and must be aborted: {0}")]
CommitRequiresAbort(String),
#[error("no active transaction to complete")]
NoActiveTransaction,
#[error("shutdown stopped with an active transaction still in progress")]
ShutdownWithActiveTransaction,
#[error("shutdown stopped while transaction was still completing with {0}")]
ShutdownWhileCompleting(&'static str),
#[error("shutdown stopped with an un-aborted failed transaction: {0}")]
ShutdownAbortRequired(String),
#[error("transaction failed before shutdown: {0}")]
ShutdownFatal(String),
}
impl TransactionStateError {
fn classification(&self) -> ErrorClassification {
match self {
Self::Fatal(_) | Self::ShutdownFatal(_) => ErrorClassification {
fatal: true,
..ErrorClassification::default()
},
Self::MustAbortBeforeReuse(_)
| Self::AbortRequired(_)
| Self::CommitRequiresAbort(_)
| Self::ShutdownAbortRequired(_) => ErrorClassification {
transaction_abort_required: true,
..ErrorClassification::default()
},
Self::AlreadyInProgress
| Self::Completing(_)
| Self::AppendWithoutBegin
| Self::SendOffsetsWithoutBegin
| Self::NoActiveTransaction
| Self::ShutdownWithActiveTransaction
| Self::ShutdownWhileCompleting(_) => ErrorClassification::default(),
}
}
}
#[cfg(test)]
mod tests {
use kafka_protocol::error::ResponseError;
use super::{
BrokerError, ConsumerError, Error, ProducerError, TransactionStateError, ValidationError,
};
#[test]
fn broker_errors_preserve_retriable_flag() {
let error = Error::Broker(BrokerError::response(
"produce",
Some("orders:0".to_owned()),
ResponseError::NotLeaderOrFollower,
));
assert!(error.is_retriable());
assert!(!error.is_fatal());
assert!(!error.transaction_abort_required());
}
#[test]
fn broker_errors_can_mark_transaction_abort_required() {
let error = Error::Broker(
BrokerError::response(
"send_offsets_to_transaction",
Some("orders-reader".to_owned()),
ResponseError::UnknownServerError,
)
.transaction_abort_required(),
);
assert!(!error.is_fatal());
assert!(error.transaction_abort_required());
}
#[test]
fn broker_errors_can_mark_fatal() {
let error = Error::Broker(
BrokerError::response(
"end_transaction",
None::<String>,
ResponseError::ProducerFenced,
)
.fatal(),
);
assert!(error.is_fatal());
assert!(!error.transaction_abort_required());
}
#[test]
fn static_member_ownership_errors_are_fatal() {
let unreleased = Error::Consumer(ConsumerError::UnreleasedInstanceId {
instance_id: "instance-a".to_owned(),
message: "still owned".to_owned(),
});
let fenced = Error::Consumer(ConsumerError::FencedInstanceId {
instance_id: "instance-a".to_owned(),
message: "fenced".to_owned(),
});
assert!(unreleased.is_fatal());
assert!(!unreleased.is_retriable());
assert!(fenced.is_fatal());
assert!(!fenced.is_retriable());
}
#[test]
fn transaction_state_errors_are_classified() {
let abort_required = Error::TransactionState(TransactionStateError::AbortRequired(
"send failed".to_owned(),
));
let fatal = Error::TransactionState(TransactionStateError::Fatal("fenced".to_owned()));
assert!(abort_required.transaction_abort_required());
assert!(!abort_required.is_fatal());
assert!(fatal.is_fatal());
}
#[test]
fn producer_transaction_runtime_errors_are_classified() {
let abort_required = Error::Producer(ProducerError::TransactionAbortRequired {
operation: "send_offsets_to_transaction",
message: "connection reset".to_owned(),
});
let fatal = Error::Producer(ProducerError::TransactionFatal {
operation: "commit_transaction",
message: "connection reset".to_owned(),
});
assert!(abort_required.transaction_abort_required());
assert!(!abort_required.is_fatal());
assert!(fatal.is_fatal());
assert!(!fatal.transaction_abort_required());
}
#[test]
fn validation_errors_are_not_retriable_or_fatal() {
let error = Error::Validation(ValidationError::MissingTopic {
operation: "message conversion",
});
assert!(!error.is_retriable());
assert!(!error.is_fatal());
assert!(!error.transaction_abort_required());
}
#[test]
fn producer_attempt_exhaustion_is_retriable_and_typed() {
let error = Error::Producer(ProducerError::AttemptsExhausted {
operation: "find_transaction_coordinator",
resource: " for transactional_id 'tx-a'".to_owned(),
attempts: 11,
});
assert!(error.is_retriable());
assert!(!error.is_fatal());
assert_eq!(
error.to_string(),
"find_transaction_coordinator exhausted 11 attempts for transactional_id 'tx-a'"
);
}
}