kafka_rs/
error.rs

1//! Crate error types.
2
3use kafka_protocol::{messages::RequestKind, ResponseError};
4
5use crate::StrBytes;
6
7/// Client results from interaction with a Kafka cluster.
8pub type ClientResult<T> = std::result::Result<T, ClientError>;
9
10/// Client errors from interacting with a Kafka cluster.
11///
12/// TODO: probably just refactor this into an opaque Retryable and Fatal errors, which just dump info on debug.
13#[derive(Debug, thiserror::Error)]
14pub enum ClientError {
15    /// Error while interacting with a broker.
16    #[error("error while interacting with a broker: {0:?}")]
17    BrokerError(BrokerRequestError),
18    /// Error while encoding a batch of records.
19    #[error("error while encoding a batch of records: {0}")]
20    EncodingError(String),
21    /// The broker returned a malformed response.
22    #[error("broker returned a malformed response")]
23    MalformedResponse,
24    /// The specified topic has no available partitions.
25    #[error("the specified topic has no available partitions: {0}")]
26    NoPartitionsAvailable(String),
27    /// Produce requests must include at least 1 record.
28    #[error("produce requests must include at least 1 record")]
29    ProducerMessagesEmpty,
30    /// The specified topic is unknown to the cluster.
31    #[error("the specified topic is unknown to the cluster: {0}")]
32    UnknownTopic(String),
33    /// The specified topic partition is unknown to the cluster.
34    #[error("the specified topic partition is unknown to the cluster: {0}/{1}")]
35    UnknownPartition(String, i32),
36    /// The specified topic partition is does not currently have a known leader.
37    #[error("the specified topic partition is does not currently have a known leader: {0}/{1}")]
38    NoPartitionLeader(String, i32),
39    /// An error was returned in a response from a broker.
40    #[error("an error was returned in a response from a broker: {0} {1:?} {2:?}")]
41    ResponseError(i16, Option<ResponseError>, Option<StrBytes>),
42    /// Timeout while waiting for cluster metadata to bootstrap.
43    #[error("timeout while waiting for cluster metadata to bootstrap")]
44    ClusterMetadataTimeout,
45    /// Could not find a broker specified by ID, or any broker at all.
46    #[error("could not find a broker specified by ID, or any broker at all")]
47    NoBrokerFound,
48    #[error("{0}")]
49    Other(String),
50    #[error("no topics were specified in the request")]
51    NoTopicsSpecified,
52    #[error("no controller was found in the cluster metadata")]
53    NoControllerFound,
54}
55
56/// Broker connection level error.
57#[derive(Debug, thiserror::Error)]
58#[error("broker connection error: {kind:?}")]
59pub struct BrokerRequestError {
60    /// The original request payload.
61    pub(crate) payload: RequestKind,
62    /// The kind of error which has taken place.
63    pub(crate) kind: BrokerErrorKind,
64}
65
66/// Broker connection level error kind.
67#[derive(Debug, thiserror::Error)]
68pub enum BrokerErrorKind {
69    /// The connection to the broker has terminated.
70    #[error("the client is disconnected")]
71    Disconnected,
72    /// The broker returned a malformed response.
73    #[error("the broker returned a malformed response")]
74    MalformedBrokerResponse,
75}