1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
//! Error manipulations.
use rdsys::types::*;

use std::{error, ffi, fmt};

// Re-export rdkafka error
pub use rdsys::types::RDKafkaError;

/// Kafka result.
pub type KafkaResult<T> = Result<T, KafkaError>;

/// Verify if the value represents an error condition.
pub trait IsError {
    /// Return true if the value represents an error.
    fn is_error(self) -> bool;
}

impl IsError for RDKafkaRespErr {
    fn is_error(self) -> bool {
        self as i32 != RDKafkaRespErr::RD_KAFKA_RESP_ERR_NO_ERROR as i32
    }
}

impl IsError for RDKafkaConfRes {
    fn is_error(self) -> bool {
        self as i32 != RDKafkaConfRes::RD_KAFKA_CONF_OK as i32
    }
}

// TODO: consider using macro

/// Represents all Kafka errors. Check the underlying `RDKafkaError` to get details.
#[derive(Clone, PartialEq, Eq)]
pub enum KafkaError {
    ClientConfig(RDKafkaConfRes, String, String, String),
    ClientCreation(String),
    ConsumerCommit(RDKafkaError),
    ConsumerCreation(String),
    FutureCanceled,
    Global(RDKafkaError),
    GroupListFetch(RDKafkaError),
    MessageConsumption(RDKafkaError),
    MessageProduction(RDKafkaError),
    MetadataFetch(RDKafkaError),
    NoMessageReceived,
    Nul(ffi::NulError),
    OffsetFetch(RDKafkaError),
    PartitionEOF(i32),
    SetPartitionOffset(RDKafkaError),
    StoreOffset(RDKafkaError),
    Subscription(String),
}

impl fmt::Debug for KafkaError {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match *self {
            KafkaError::ClientConfig(_, ref desc, ref key, ref value) => write!(f, "KafkaError (Client config error: {} {} {})", desc, key, value),
            KafkaError::ClientCreation(ref err) => write!(f, "KafkaError (Client creation error: {})", err),
            KafkaError::ConsumerCommit(err) => write!(f, "KafkaError (Consumer commit error: {})", err),
            KafkaError::ConsumerCreation(ref err) => write!(f, "KafkaError (Consumer creation error: {})", err),
            KafkaError::FutureCanceled => write!(f, "Future canceled"),
            KafkaError::Global(err) => write!(f, "KafkaError (Global error: {})", err),
            KafkaError::GroupListFetch(err) => write!(f, "KafkaError (Group list fetch error: {})", err),
            KafkaError::MessageConsumption(err) => write!(f, "KafkaError (Message consumption error: {})", err),
            KafkaError::MessageProduction(err) => write!(f, "KafkaError (Message production error: {})", err),
            KafkaError::MetadataFetch(err) => write!(f, "KafkaError (Metadata fetch error: {})", err),
            KafkaError::NoMessageReceived => write!(f, "No message received within the given poll interval"),
            KafkaError::Nul(_) => write!(f, "FFI null error"),
            KafkaError::OffsetFetch(err) => write!(f, "KafkaError (Offset fetch error: {})", err),
            KafkaError::PartitionEOF(part_n) => write!(f, "KafkaError (Partition EOF: {})", part_n),
            KafkaError::SetPartitionOffset(err) => write!(f, "KafkaError (Set partition offset error: {})", err),
            KafkaError::StoreOffset(err) => write!(f, "KafkaError (Store offset error: {})", err),
            KafkaError::Subscription(ref err) => write!(f, "KafkaError (Subscription error: {})", err),
        }
    }
}

impl fmt::Display for KafkaError {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match *self {
            KafkaError::ClientConfig(_, ref desc, ref key, ref value) => write!(f, "Client config error: {} {} {}", desc, key, value),
            KafkaError::ClientCreation(ref err) => write!(f, "Client creation error: {}", err),
            KafkaError::ConsumerCommit(err) => write!(f, "Consumer commit error: {}", err),
            KafkaError::ConsumerCreation(ref err) => write!(f, "Consumer creation error: {}", err),
            KafkaError::FutureCanceled => write!(f, "Future canceled"),
            KafkaError::Global(err) => write!(f, "Global error: {}", err),
            KafkaError::GroupListFetch(err) => write!(f, "Group list fetch error: {}", err),
            KafkaError::MessageConsumption(err) => write!(f, "Message consumption error: {}", err),
            KafkaError::MessageProduction(err) => write!(f, "Message production error: {}", err),
            KafkaError::MetadataFetch(err) => write!(f, "Meta data fetch error: {}", err),
            KafkaError::NoMessageReceived => write!(f, "No message received within the given poll interval"),
            KafkaError::Nul(_) => write!(f, "FFI nul error"),
            KafkaError::OffsetFetch(err) => write!(f, "Offset fetch error: {}", err),
            KafkaError::PartitionEOF(part_n) => write!(f, "Partition EOF: {}", part_n),
            KafkaError::SetPartitionOffset(err) => write!(f, "Set partition offset error: {}", err),
            KafkaError::StoreOffset(err) => write!(f, "Store offset error: {}", err),
            KafkaError::Subscription(ref err) => write!(f, "Subscription error: {}", err),
        }
    }
}

impl error::Error for KafkaError {
    fn description(&self) -> &str {
        match *self {
            KafkaError::ClientConfig(_, _, _, _) => "Client config error",
            KafkaError::ClientCreation(_) => "Client creation error",
            KafkaError::ConsumerCommit(_) => "Consumer commit error",
            KafkaError::ConsumerCreation(_) => "Consumer creation error",
            KafkaError::FutureCanceled => "Future canceled",
            KafkaError::Global(_) => "Global error",
            KafkaError::GroupListFetch(_) => "Group list fetch error",
            KafkaError::MessageConsumption(_) => "Message consumption error",
            KafkaError::MessageProduction(_) => "Message production error",
            KafkaError::MetadataFetch(_) => "Meta data fetch error",
            KafkaError::NoMessageReceived => "No message received within the given poll interval",
            KafkaError::Nul(_) => "FFI nul error",
            KafkaError::OffsetFetch(_) => "Offset fetch error",
            KafkaError::PartitionEOF(_) => "Partition EOF error",
            KafkaError::SetPartitionOffset(_) => "Set partition offset error",
            KafkaError::StoreOffset(_) => "Store offset error",
            KafkaError::Subscription(_) => "Subscription error",
        }
    }

    #[cfg_attr(feature = "cargo-clippy", allow(match_same_arms))]
    fn cause(&self) -> Option<&error::Error> {
        match *self {
            KafkaError::ClientConfig(_, _, _, _) => None,
            KafkaError::ClientCreation(_) => None,
            KafkaError::ConsumerCommit(ref err) => Some(err),
            KafkaError::ConsumerCreation(_) => None,
            KafkaError::FutureCanceled => None,
            KafkaError::Global(ref err) => Some(err),
            KafkaError::GroupListFetch(ref err) => Some(err),
            KafkaError::MessageConsumption(ref err) => Some(err),
            KafkaError::MessageProduction(ref err) => Some(err),
            KafkaError::MetadataFetch(ref err) => Some(err),
            KafkaError::NoMessageReceived => None,
            KafkaError::Nul(_) => None,
            KafkaError::OffsetFetch(ref err) => Some(err),
            KafkaError::PartitionEOF(_) => None,
            KafkaError::SetPartitionOffset(ref err) => Some(err),
            KafkaError::StoreOffset(ref err) => Some(err),
            KafkaError::Subscription(_) => None,
        }
    }
}

impl From<ffi::NulError> for KafkaError {
    fn from(err: ffi::NulError) -> KafkaError {
        KafkaError::Nul(err)
    }
}