kafkit-client 0.1.2

Kafka 4.0+ pure Rust client.
Documentation
//! Error types returned by Kafkit.
//!
//! Most applications can use [`Result`] directly and match on [`crate::Error`] only
//! when they need to handle a specific category.
//!
//! ```no_run
//! # async fn example() -> kafkit_client::Result<()> {
//! use kafkit_client::{Error, KafkaClient};
//!
//! let admin = KafkaClient::new("localhost:9092").admin().connect().await?;
//! if let Err(Error::Admin(error)) = admin.create_topics(Vec::<kafkit_client::NewTopic>::new()).await {
//!     eprintln!("admin request failed: {error}");
//! }
//! # Ok(())
//! # }
//! ```
//!
use thiserror::Error;
use tokio::task::JoinError;

/// Result type used by the client APIs.
pub type Result<T> = std::result::Result<T, Error>;

#[derive(Debug, Error)]
/// Top-level error returned by the crate.
pub enum Error {
    /// Admin operation failed.
    #[error(transparent)]
    Admin(#[from] AdminError),
    /// Consumer operation failed.
    #[error(transparent)]
    Consumer(#[from] ConsumerError),
    /// Producer operation failed.
    #[error(transparent)]
    Producer(#[from] ProducerError),
    /// Consumer group metadata was invalid.
    #[error(transparent)]
    ConsumerGroupMetadata(#[from] ConsumerGroupMetadataError),
    /// Transaction state prevented the operation.
    #[error(transparent)]
    TransactionState(#[from] TransactionStateError),
    /// Internal error from protocol handling, IO, or validation.
    #[error(transparent)]
    Internal(#[from] anyhow::Error),
}

#[derive(Debug, Error)]
/// Errors raised before or during admin operations.
pub enum AdminError {
    /// A topic name was empty.
    #[error("topic names must be non-empty")]
    EmptyTopicName,
    /// A topic was requested with an invalid partition count.
    #[error("topic partition count must be positive: {partitions}")]
    InvalidPartitionCount {
        /// Requested partition count.
        partitions: i32,
    },
    /// A topic was requested with an invalid replication factor.
    #[error("topic replication factor must be positive: {replication_factor}")]
    InvalidReplicationFactor {
        /// Requested replication factor.
        replication_factor: i16,
    },
}

#[derive(Debug, Error)]
/// Errors raised by the consumer API.
pub enum ConsumerError {
    /// The runtime stopped before the operation was sent.
    #[error("consumer runtime stopped before {operation}")]
    ThreadStoppedBefore {
        /// Operation that was waiting to be sent.
        operation: &'static str,
    },
    /// The runtime stopped while the operation was waiting for a reply.
    #[error("consumer runtime stopped during {operation}")]
    ThreadStoppedDuring {
        /// Operation that was in flight.
        operation: &'static str,
    },
    /// The background consumer task could not be joined.
    #[error("failed to join consumer runtime: {0}")]
    Join(#[source] JoinError),
    /// A subscription was requested without any topic names.
    #[error("subscribe requires at least one non-empty topic name")]
    EmptySubscription,
    /// A regex subscription was requested with an empty pattern.
    #[error("subscribe_pattern requires a non-empty pattern")]
    EmptySubscriptionPattern,
    /// A regex subscription could not be compiled locally.
    #[error("subscribe_regex requires a valid regular expression: {message}")]
    InvalidSubscriptionRegex {
        /// Regex parser error message.
        message: String,
    },
    /// Another poll call was already active.
    #[error("concurrent poll calls are not supported by this simple consumer")]
    ConcurrentPoll,
    /// A blocking poll was interrupted with `wakeup`.
    #[error("poll was interrupted by wakeup()")]
    Wakeup,
    /// A seek offset was negative.
    #[error("seek offset must be non-negative: {offset}")]
    InvalidSeekOffset {
        /// Requested offset.
        offset: i64,
    },
    /// A topic partition had an empty topic name.
    #[error("topic partition names must be non-empty")]
    EmptyTopicPartition,
    /// The operation needs a partition currently assigned to this consumer.
    #[error("{operation} requires an assigned partition, but {topic}:{partition} is not assigned")]
    PartitionNotAssigned {
        /// Operation that needed the assignment.
        operation: &'static str,
        /// Topic name.
        topic: String,
        /// Partition number.
        partition: i32,
    },
    /// The broker rejected the subscription regex.
    #[error("broker rejected the subscription regex: {message}")]
    InvalidRegularExpression {
        /// Broker error message.
        message: String,
    },
    /// The broker rejected the configured server-side assignor.
    #[error("broker rejected the configured server assignor '{assignor}': {message}")]
    UnsupportedAssignor {
        /// Requested assignor name.
        assignor: String,
        /// Broker error message.
        message: String,
    },
    /// A static member id is still owned by another consumer instance.
    #[error("static member '{instance_id}' is still owned by another consumer: {message}")]
    UnreleasedInstanceId {
        /// Static member instance id.
        instance_id: String,
        /// Broker error message.
        message: String,
    },
    /// The broker fenced this static member instance.
    #[error("static member '{instance_id}' was fenced: {message}")]
    FencedInstanceId {
        /// Static member instance id.
        instance_id: String,
        /// Broker error message.
        message: String,
    },
}

#[derive(Debug, Error)]
/// Errors raised by the producer API.
pub enum ProducerError {
    /// Idempotence was enabled without `acks=-1`.
    #[error("idempotent producers require acks=-1")]
    IdempotenceRequiresAcksAll,
    /// Idempotence was enabled without retries.
    #[error("idempotent producers require max_retries > 0")]
    IdempotenceRequiresRetries,
    /// Transactions require all replicas to acknowledge records.
    #[error(
        "transactional producers require acks=-1 so the broker can commit the full transaction"
    )]
    TransactionalRequiresAcksAll,
    /// The broker did not report the transaction feature level.
    #[error("broker did not advertise finalized feature level for transaction.version")]
    MissingTransactionVersionFeature,
    /// The broker transaction feature level is too old.
    #[error(
        "broker finalized transaction.version={level}, but transaction v2 requires transaction.version>=2"
    )]
    UnsupportedTransactionVersion {
        /// Broker feature level.
        level: i16,
    },
    /// A broker API version is older than this client needs.
    #[error("{api} v{min_version}+ is required, broker only supports v{broker_version}")]
    UnsupportedApiVersion {
        /// API name.
        api: &'static str,
        /// Minimum required API version.
        min_version: i16,
        /// Version advertised by the broker.
        broker_version: i16,
    },
    /// The operation requires a configured transactional id.
    #[error("producer is not configured with a transactional_id")]
    NotTransactional,
    /// The sender task stopped before the operation was sent.
    #[error("producer sender thread stopped before {operation}")]
    ThreadStoppedBefore {
        /// Operation that was waiting to be sent.
        operation: &'static str,
    },
    /// The sender task stopped while the operation was waiting for a reply.
    #[error("producer sender thread stopped during {operation}")]
    ThreadStoppedDuring {
        /// Operation that was in flight.
        operation: &'static str,
    },
    /// The sender task could not be joined.
    #[error("failed to join producer sender thread: {0}")]
    Join(#[source] JoinError),
}

#[derive(Debug, Error)]
/// Errors found while validating consumer group metadata.
pub enum ConsumerGroupMetadataError {
    /// The group id was empty.
    #[error("consumer group metadata requires a non-empty group_id")]
    EmptyGroupId,
    /// Active group metadata needs a member id.
    #[error("consumer group metadata has generation_id > 0 but no member_id")]
    MissingMemberId,
}

#[derive(Debug, Error)]
/// Errors raised by the producer transaction state machine.
pub enum TransactionStateError {
    /// A transaction is already open.
    #[error("transaction already in progress")]
    AlreadyInProgress,
    /// The transaction is already completing.
    #[error("transaction is already completing with {0}")]
    Completing(&'static str),
    /// The transaction must be aborted before it can be used again.
    #[error("transaction must be aborted before reuse: {0}")]
    MustAbortBeforeReuse(String),
    /// The transaction is permanently failed.
    #[error("transaction is unusable: {0}")]
    Fatal(String),
    /// Records were sent before a transaction was started.
    #[error("transactional send requires begin_transaction() before send()")]
    AppendWithoutBegin,
    /// Offsets were sent before a transaction was started.
    #[error("send_offsets_to_transaction requires begin_transaction() first")]
    SendOffsetsWithoutBegin,
    /// The transaction needs an abort before more work can happen.
    #[error("transaction has failed and must be aborted: {0}")]
    AbortRequired(String),
    /// The transaction cannot be committed and must be aborted.
    #[error("transaction cannot be committed and must be aborted: {0}")]
    CommitRequiresAbort(String),
    /// There is no active transaction to finish.
    #[error("no active transaction to complete")]
    NoActiveTransaction,
    /// Shutdown found an active transaction.
    #[error("shutdown stopped with an active transaction still in progress")]
    ShutdownWithActiveTransaction,
    /// Shutdown found a transaction completion in progress.
    #[error("shutdown stopped while transaction was still completing with {0}")]
    ShutdownWhileCompleting(&'static str),
    /// Shutdown found a failed transaction that still needs aborting.
    #[error("shutdown stopped with an un-aborted failed transaction: {0}")]
    ShutdownAbortRequired(String),
    /// Shutdown found a fatal transaction error.
    #[error("transaction failed before shutdown: {0}")]
    ShutdownFatal(String),
}