rskafka 0.6.0

A minimal Rust client for Apache Kafka
Documentation
use thiserror::Error;

pub use crate::messenger::RequestError;
pub use crate::protocol::error::Error as ProtocolError;

/// Request context for [`Error::ServerError`].
#[derive(Debug)]
#[non_exhaustive]
pub enum RequestContext {
    /// Error is specific to a topic.
    Topic(String),

    /// Error is specific to a partition (indexed via topic name and partition ID).
    Partition(String, i32),

    /// Error is specific to a fetch request.
    #[non_exhaustive]
    Fetch {
        /// Topic name.
        topic_name: String,

        /// Partition ID.
        partition_id: i32,

        /// Offset used during the request.
        offset: i64,
    },
}

/// Usable broker data for [`Error::ServerError`].
///
/// This is data that the broker sent and that is still usable despite the error.
#[derive(Debug)]
#[allow(missing_copy_implementations)] // wanna extend this later
#[non_exhaustive]
pub enum ServerErrorResponse {
    /// A broker that we thought was the partition leader forwarded us to another leader.
    LeaderForward {
        /// Original broker.
        broker: i32,

        /// New broker.
        new_leader: i32,
    },
    /// Usable response data after a fetch request.
    PartitionFetchState {
        /// High watermark.
        high_watermark: i64,

        /// Last stable offset.
        last_stable_offset: Option<i64>,
    },
}

#[derive(Error, Debug)]
#[non_exhaustive]
pub enum Error {
    #[error("Connection error: {0}")]
    Connection(#[from] crate::connection::Error),

    #[error("Request error: {0}")]
    Request(#[from] RequestError),

    #[error("Invalid response: {0}")]
    InvalidResponse(String),

    #[error(
        "Server error {} with message \"{}\", request: {:?}, response: {:?}, virtual: {}",
        protocol_error,
        string_or_na(error_message),
        request,
        response,
        is_virtual
    )]
    ServerError {
        /// Protocol-level error message.
        protocol_error: ProtocolError,

        /// Server message provided by the broker, if any.
        error_message: Option<String>,

        /// The relevant part of the request that led to the error.
        request: RequestContext,

        /// Additional response data that the broker provided and that can be used despite the error state.
        response: Option<ServerErrorResponse>,

        /// Flags if the error was generated by the client to simulate some server behavior or workaround a bug.
        ///
        /// This is mostly for debugging and bug reporting.
        is_virtual: bool,
    },

    #[error("All retries failed: {0}")]
    RetryFailed(#[from] crate::backoff::BackoffError),

    #[error("Timeout")]
    Timeout,
}

impl Error {
    pub(crate) fn exactly_one_topic(len: usize) -> Self {
        Self::InvalidResponse(format!("Expected a single topic in response, got {len}"))
    }

    pub(crate) fn exactly_one_partition(len: usize) -> Self {
        Self::InvalidResponse(format!(
            "Expected a single partition in response, got {len}"
        ))
    }
}

pub type Result<T, E = Error> = std::result::Result<T, E>;

/// Simple formatting function the replaces `None` with `"n/a"`.
fn string_or_na(s: &Option<String>) -> &str {
    match s {
        Some(s) => s.as_str(),
        None => "n/a",
    }
}