use rdsys;
use rdsys::types::*;
use std::{error, ffi, fmt};
use util::cstr_to_owned;
pub type KafkaResult<T> = Result<T, KafkaError>;
pub trait IsError {
fn is_error(self) -> bool;
}
impl IsError for RDKafkaRespErr {
fn is_error(self) -> bool {
self as i32 != RDKafkaRespErr::RD_KAFKA_RESP_ERR_NO_ERROR as i32
}
}
impl IsError for RDKafkaConfRes {
fn is_error(self) -> bool {
self as i32 != RDKafkaConfRes::RD_KAFKA_CONF_OK as i32
}
}
pub enum KafkaError {
ClientConfig(RDKafkaConfRes, String, String, String),
ClientCreation(String),
ConsumerCommit(RDKafkaRespErr),
ConsumerCreation(String),
GroupListFetch(RDKafkaRespErr),
MessageConsumption(RDKafkaRespErr),
MessageProduction(RDKafkaRespErr),
MetadataFetch(RDKafkaRespErr),
Nul(ffi::NulError),
PartitionEOF(i32),
Subscription(String),
TopicConfig(RDKafkaConfRes, String, String, String),
TopicCreation(String),
}
impl fmt::Debug for KafkaError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
KafkaError::ClientConfig(_, ref desc, ref key, ref value) => write!(f, "KafkaError (Client config error: {} {} {})", desc, key, value),
KafkaError::ClientCreation(ref err) => write!(f, "KafkaError (Client creation error: {})", err),
KafkaError::ConsumerCommit(err) => write!(f, "KafkaError (Consumer commit error: {})", resp_err_description(err)),
KafkaError::ConsumerCreation(ref err) => write!(f, "KafkaError (Consumer creation error: {})", err),
KafkaError::GroupListFetch(err) => write!(f, "KafkaError (Group list fetch error: {})", resp_err_description(err)),
KafkaError::MessageConsumption(err) => write!(f, "KafkaError (Message consumption error: {})", resp_err_description(err)),
KafkaError::MessageProduction(err) => write!(f, "KafkaError (Message production error: {})", resp_err_description(err)),
KafkaError::MetadataFetch(err) => write!(f, "KafkaError (Metadata fetch error: {})", resp_err_description(err)),
KafkaError::Nul(_) => write!(f, "KafkaError (FFI nul error)"),
KafkaError::PartitionEOF(part_n) => write!(f, "KafkaError (Partition EOF: {})", part_n),
KafkaError::Subscription(ref err) => write!(f, "KafkaError (Subscription error: {})", err),
KafkaError::TopicConfig(_, ref desc, ref key, ref value) => write!(f, "KafkaError (Topic config error: {} {} {})", desc, key, value),
KafkaError::TopicCreation(ref err) => write!(f, "KafkaError (Topic creation error: {})", err),
}
}
}
impl fmt::Display for KafkaError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
KafkaError::ClientConfig(_, ref desc, ref key, ref value) => write!(f, "Client config error: {} {} {}", desc, key, value),
KafkaError::ClientCreation(ref err) => write!(f, "Client creation error: {}", err),
KafkaError::ConsumerCommit(err) => write!(f, "Consumer commit error: {}", resp_err_description(err)),
KafkaError::ConsumerCreation(ref err) => write!(f, "Consumer creation error: {}", err),
KafkaError::GroupListFetch(err) => write!(f, "Group list fetch error: {}", resp_err_description(err)),
KafkaError::MessageConsumption(err) => write!(f, "Message consumption error: {}", resp_err_description(err)),
KafkaError::MessageProduction(err) => write!(f, "Message production error: {}", resp_err_description(err)),
KafkaError::MetadataFetch(err) => write!(f, "Meta data fetch error: {}", resp_err_description(err)),
KafkaError::Nul(_) => write!(f, "FFI nul error"),
KafkaError::PartitionEOF(part_n) => write!(f, "Partition EOF: {}", part_n),
KafkaError::Subscription(ref err) => write!(f, "Subscription error: {}", err),
KafkaError::TopicConfig(_, ref desc, ref key, ref value) => write!(f, "Topic config error: {} {} {}", desc, key, value),
KafkaError::TopicCreation(ref err) => write!(f, "Topic creation error: {}", err),
}
}
}
impl error::Error for KafkaError {
fn description(&self) -> &str {
match *self {
KafkaError::ClientConfig(_, _, _, _) => "Client config error",
KafkaError::ClientCreation(_) => "Client creation error",
KafkaError::ConsumerCreation(_) => "Consumer creation error",
KafkaError::MessageConsumption(_) => "Message consumption error",
KafkaError::MessageProduction(_) => "Message production error",
KafkaError::ConsumerCommit(_) => "Consumer commit error",
KafkaError::MetadataFetch(_) => "Meta data fetch error",
KafkaError::Nul(_) => "FFI nul error",
KafkaError::Subscription(_) => "Subscription error",
KafkaError::TopicConfig(_, _, _, _) => "Topic config error",
KafkaError::TopicCreation(_) => "Topic creation error",
KafkaError::PartitionEOF(_) => "Partition EOF error",
KafkaError::GroupListFetch(_) => "Group list fetch error",
}
}
fn cause(&self) -> Option<&error::Error> {
match *self {
KafkaError::Nul(ref err) => Some(err),
_ => None
}
}
}
impl From<ffi::NulError> for KafkaError {
fn from(err: ffi::NulError) -> KafkaError {
KafkaError::Nul(err)
}
}
pub fn resp_err_description(err: RDKafkaRespErr) -> String {
unsafe {
cstr_to_owned(rdsys::rd_kafka_err2str(err))
}
}