//! Error struct and methods
use std::result;
use std::error;
use std::io;
use std::fmt;
use byteorder;
#[cfg(feature = "security")]
use openssl::ssl;
/// A type for results generated by this crate's functions where the `Err` type
/// is hard-wired to `enums::Error`.
///
/// This typedef is generally used to avoid writing out `enums::Error` directly and
/// is otherwise a direct mapping to `std::result::Result`.
pub type Result<T> = result::Result<T, Error>;
/// The various errors this library can produce.
#[derive(Debug)]
pub enum Error {
/// Input/Output error while communicating with Kafka
Io(io::Error),
/// An error as reported by a remote Kafka server
Kafka(KafkaCode),
/// An error as reported by OpenSsl
#[cfg(feature = "security")]
Ssl(ssl::error::SslError),
/// Failure to correctly parse the server response due to the
/// server speaking a newer protocol version (than the one this
/// library supports)
UnsupportedProtocol,
/// Failure to correctly parse the server response by this library
/// due to an unsupported compression format of the data
UnsupportedCompression,
/// Failure to decode a snappy compressed response from Kafka
#[cfg(feature = "snappy")]
InvalidInputSnappy,
/// Failure to decode a response due to an insufficient number of bytes available
UnexpectedEOF,
/// Failure to decode or encode a response or request respectively
CodecError,
/// Failure to decode a string into a valid utf8 byte sequence
StringDecodeError,
/// Unable to reach any host
NoHostReachable,
}
/// Various errors reported by a remote Kafka server.
/// See also [Kafka Errors](https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes)
#[derive(Debug, Copy, Clone)]
pub enum KafkaCode {
/// An unexpected server error
Unknown,
/// The requested offset is outside the range of offsets
/// maintained by the server for the given topic/partition
OffsetOutOfRange,
/// This indicates that a message contents does not match its CRC
CorruptMessage,
/// This request is for a topic or partition that does not exist
/// on this broker.
UnknownTopicOrPartition,
/// The message has a negative size
InvalidMessageSize,
/// This error is thrown if we are in the middle of a leadership
/// election and there is currently no leader for this partition
/// and hence it is unavailable for writes.
LeaderNotAvailable,
/// This error is thrown if the client attempts to send messages
/// to a replica that is not the leader for some partition. It
/// indicates that the clients metadata is out of date.
NotLeaderForPartition,
/// This error is thrown if the request exceeds the user-specified
/// time limit in the request.
RequestTimedOut,
/// This is not a client facing error and is used mostly by tools
/// when a broker is not alive.
BrokerNotAvailable,
/// If replica is expected on a broker, but is not (this can be
/// safely ignored).
ReplicaNotAvailable,
/// The server has a configurable maximum message size to avoid
/// unbounded memory allocation. This error is thrown if the
/// client attempt to produce a message larger than this maximum.
MessageSizeTooLarge,
/// Internal error code for broker-to-broker communication.
StaleControllerEpochCode,
/// If you specify a string larger than configured maximum for
/// offset metadata
OffsetMetadataTooLargeCode,
/// The broker returns this error code for an offset fetch request
/// if it is still loading offsets (after a leader change for that
/// offsets topic partition), or in response to group membership
/// requests (such as heartbeats) when group metadata is being
/// loaded by the coordinator.
OffsetsLoadInProgressCode,
/// The broker returns this error code for group coordinator
/// requests, offset commits, and most group management requests
/// if the offsets topic has not yet been created, or if the group
/// coordinator is not active.
ConsumerCoordinatorNotAvailableCode,
/// The broker returns this error code if it receives an offset
/// fetch or commit request for a group that it is not a
/// coordinator for.
NotCoordinatorForConsumerCode,
/// For a request which attempts to access an invalid topic
/// (e.g. one which has an illegal name), or if an attempt is made
/// to write to an internal topic (such as the consumer offsets
/// topic).
InvalidTopicCode,
/// If a message batch in a produce request exceeds the maximum
/// configured segment size.
RecordListTooLargeCode,
/// Returned from a produce request when the number of in-sync
/// replicas is lower than the configured minimum and requiredAcks is
/// -1.
NotEnoughReplicasCode,
/// Returned from a produce request when the message was written
/// to the log, but with fewer in-sync replicas than required.
NotEnoughReplicasAfterAppendCode,
/// Returned from a produce request if the requested requiredAcks is
/// invalid (anything other than -1, 1, or 0).
InvalidRequiredAcksCode,
/// Returned from group membership requests (such as heartbeats) when
/// the generation id provided in the request is not the current
/// generation.
IllegalGenerationCode,
/// Returned in join group when the member provides a protocol type or
/// set of protocols which is not compatible with the current group.
InconsistentGroupProtocolCode,
/// Returned in join group when the groupId is empty or null.
InvalidGroupIdCode,
/// Returned from group requests (offset commits/fetches, heartbeats,
/// etc) when the memberId is not in the current generation.
UnknownMemberIdCode,
/// Return in join group when the requested session timeout is outside
/// of the allowed range on the broker
InvalidSessionTimeoutCode,
/// Returned in heartbeat requests when the coordinator has begun
/// rebalancing the group. This indicates to the client that it
/// should rejoin the group.
RebalanceInProgressCode,
/// This error indicates that an offset commit was rejected because of
/// oversize metadata.
InvalidCommitOffsetSizeCode,
/// Returned by the broker when the client is not authorized to access
/// the requested topic.
TopicAuthorizationFailedCode,
/// Returned by the broker when the client is not authorized to access
/// a particular groupId.
GroupAuthorizationFailedCode,
/// Returned by the broker when the client is not authorized to use an
/// inter-broker or administrative API.
ClusterAuthorizationFailedCode,
}
impl From<io::Error> for Error {
fn from(err: io::Error) -> Error { Error::Io(err) }
}
impl From<byteorder::Error> for Error {
fn from(err: byteorder::Error) -> Error {
match err {
byteorder::Error::UnexpectedEOF => Error::UnexpectedEOF,
byteorder::Error::Io(err) => Error::Io(err)
}
}
}
#[cfg(feature = "security")]
impl From<ssl::error::SslError> for Error {
fn from(err: ssl::error::SslError) -> Error { Error::Ssl(err) }
}
impl Clone for Error {
fn clone(&self) -> Error {
match self {
&Error::Io(ref err) => Error::Io(io::Error::new(err.kind(), "Io Error")),
&Error::Kafka(x) => Error::Kafka(x),
#[cfg(feature = "security")]
&Error::Ssl(ref x) => match x {
&ssl::error::SslError::StreamError(ref e) =>
Error::Ssl(ssl::error::SslError::StreamError(
io::Error::new(e.kind(), "Stream Error"))
),
&ssl::error::SslError::SslSessionClosed =>
Error::Ssl(ssl::error::SslError::SslSessionClosed),
&ssl::error::SslError::OpenSslErrors(ref v) =>
Error::Ssl(ssl::error::SslError::OpenSslErrors(v.clone())),
},
&Error::UnsupportedProtocol => Error::UnsupportedProtocol,
&Error::UnsupportedCompression => Error::UnsupportedCompression,
#[cfg(feature = "snappy")]
&Error::InvalidInputSnappy => Error::InvalidInputSnappy,
&Error::UnexpectedEOF => Error::UnexpectedEOF,
&Error::CodecError => Error::CodecError,
&Error::StringDecodeError => Error::StringDecodeError,
&Error::NoHostReachable => Error::NoHostReachable,
}
}
}
impl error::Error for Error {
fn description(&self) -> &str {
match *self {
Error::Io(ref err) => error::Error::description(err),
Error::Kafka(_) => "Kafka Error",
#[cfg(feature = "security")]
Error::Ssl(ref err) => error::Error::description(err),
Error::UnsupportedProtocol => "Unsupported protocol version",
Error::UnsupportedCompression => "Unsupported compression format",
#[cfg(feature = "snappy")]
Error::InvalidInputSnappy => "Snappy decode error",
Error::UnexpectedEOF => "Unexpected EOF",
Error::CodecError => "Encoding/Decoding error",
Error::StringDecodeError => "String decoding error",
Error::NoHostReachable => "No host reachable",
}
}
fn cause(&self) -> Option<&error::Error> {
match *self {
Error::Io(ref err) => err.cause(),
_ => None
}
}
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::Io(ref err) => err.fmt(f),
Error::Kafka(ref c) => write!(f, "Kafka Error ({:?})", c),
#[cfg(feature = "security")]
Error::Ssl(ref err) => err.fmt(f),
Error::UnsupportedProtocol => write!(f, "Unsupported protocol version"),
Error::UnsupportedCompression => write!(f, "Unsupported compression format"),
#[cfg(feature = "snappy")]
Error::InvalidInputSnappy => write!(f, "Snappy decode error"),
Error::UnexpectedEOF => write!(f, "Unexpected EOF"),
Error::CodecError => write!(f, "Encoding/Decoding Error"),
Error::StringDecodeError => write!(f, "String decoding error"),
Error::NoHostReachable => write!(f, "No Host Reachable"),
}
}
}