use thiserror::Error;
use tokio::task::JoinError;
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, Error)]
pub enum Error {
#[error("operation cancelled")]
Cancelled,
#[error(transparent)]
Admin(#[from] AdminError),
#[error(transparent)]
Consumer(#[from] ConsumerError),
#[error(transparent)]
Producer(#[from] ProducerError),
#[error(transparent)]
ConsumerGroupMetadata(#[from] ConsumerGroupMetadataError),
#[error(transparent)]
TransactionState(#[from] TransactionStateError),
#[error(transparent)]
Internal(#[from] anyhow::Error),
}
#[derive(Debug, Error)]
pub enum AdminError {
#[error("topic names must be non-empty")]
EmptyTopicName,
#[error("topic partition count must be positive: {partitions}")]
InvalidPartitionCount {
partitions: i32,
},
#[error("topic replication factor must be positive: {replication_factor}")]
InvalidReplicationFactor {
replication_factor: i16,
},
}
#[derive(Debug, Error)]
pub enum ConsumerError {
#[error("consumer runtime stopped before {operation}")]
ThreadStoppedBefore {
operation: &'static str,
},
#[error("consumer runtime stopped during {operation}")]
ThreadStoppedDuring {
operation: &'static str,
},
#[error("failed to join consumer runtime: {0}")]
Join(#[source] JoinError),
#[error("subscribe requires at least one non-empty topic name")]
EmptySubscription,
#[error("subscribe_pattern requires a non-empty pattern")]
EmptySubscriptionPattern,
#[error("subscribe_regex requires a valid regular expression: {message}")]
InvalidSubscriptionRegex {
message: String,
},
#[error("concurrent poll calls are not supported by this simple consumer")]
ConcurrentPoll,
#[error("poll was interrupted by wakeup()")]
Wakeup,
#[error("seek offset must be non-negative: {offset}")]
InvalidSeekOffset {
offset: i64,
},
#[error("topic partition names must be non-empty")]
EmptyTopicPartition,
#[error("{operation} requires an assigned partition, but {topic}:{partition} is not assigned")]
PartitionNotAssigned {
operation: &'static str,
topic: String,
partition: i32,
},
#[error("broker rejected the subscription regex: {message}")]
InvalidRegularExpression {
message: String,
},
#[error("broker rejected the configured server assignor '{assignor}': {message}")]
UnsupportedAssignor {
assignor: String,
message: String,
},
#[error("static member '{instance_id}' is still owned by another consumer: {message}")]
UnreleasedInstanceId {
instance_id: String,
message: String,
},
#[error("static member '{instance_id}' was fenced: {message}")]
FencedInstanceId {
instance_id: String,
message: String,
},
}
#[derive(Debug, Error)]
pub enum ProducerError {
#[error("idempotent producers require acks=-1")]
IdempotenceRequiresAcksAll,
#[error("idempotent producers require max_retries > 0")]
IdempotenceRequiresRetries,
#[error(
"transactional producers require acks=-1 so the broker can commit the full transaction"
)]
TransactionalRequiresAcksAll,
#[error("broker did not advertise finalized feature level for transaction.version")]
MissingTransactionVersionFeature,
#[error(
"broker finalized transaction.version={level}, but transaction v2 requires transaction.version>=2"
)]
UnsupportedTransactionVersion {
level: i16,
},
#[error("{api} v{min_version}+ is required, broker only supports v{broker_version}")]
UnsupportedApiVersion {
api: &'static str,
min_version: i16,
broker_version: i16,
},
#[error("producer is not configured with a transactional_id")]
NotTransactional,
#[error("producer sender thread stopped before {operation}")]
ThreadStoppedBefore {
operation: &'static str,
},
#[error("producer sender thread stopped during {operation}")]
ThreadStoppedDuring {
operation: &'static str,
},
#[error("failed to join producer sender thread: {0}")]
Join(#[source] JoinError),
}
#[derive(Debug, Error)]
pub enum ConsumerGroupMetadataError {
#[error("consumer group metadata requires a non-empty group_id")]
EmptyGroupId,
#[error("consumer group metadata has generation_id > 0 but no member_id")]
MissingMemberId,
}
#[derive(Debug, Error)]
pub enum TransactionStateError {
#[error("transaction already in progress")]
AlreadyInProgress,
#[error("transaction is already completing with {0}")]
Completing(&'static str),
#[error("transaction must be aborted before reuse: {0}")]
MustAbortBeforeReuse(String),
#[error("transaction is unusable: {0}")]
Fatal(String),
#[error("transactional send requires begin_transaction() before send()")]
AppendWithoutBegin,
#[error("send_offsets_to_transaction requires begin_transaction() first")]
SendOffsetsWithoutBegin,
#[error("transaction has failed and must be aborted: {0}")]
AbortRequired(String),
#[error("transaction cannot be committed and must be aborted: {0}")]
CommitRequiresAbort(String),
#[error("no active transaction to complete")]
NoActiveTransaction,
#[error("shutdown stopped with an active transaction still in progress")]
ShutdownWithActiveTransaction,
#[error("shutdown stopped while transaction was still completing with {0}")]
ShutdownWhileCompleting(&'static str),
#[error("shutdown stopped with an un-aborted failed transaction: {0}")]
ShutdownAbortRequired(String),
#[error("transaction failed before shutdown: {0}")]
ShutdownFatal(String),
}