kafka 0.3.2

Rust client for Apache Kafka
Documentation
//! Error struct and methods

use std::result;
use std::error;
use std::io;
use std::fmt;

use byteorder;

#[cfg(feature = "security")]
use openssl::ssl;


/// A type for results generated by this crate's functions where the `Err` type
/// is hard-wired to `enums::Error`.
///
/// This typedef is generally used to avoid writing out `enums::Error` directly and
/// is otherwise a direct mapping to `std::result::Result`.
pub type Result<T> = result::Result<T, Error>;

/// The various errors this library can produce.
#[derive(Debug)]
pub enum Error {
    /// Input/Output error while communicating with Kafka
    Io(io::Error),
    /// An error as reported by a remote Kafka server
    Kafka(KafkaCode),
    /// An error as reported by OpenSsl
    #[cfg(feature = "security")]
    Ssl(ssl::error::SslError),

    /// Failure to correctly parse the server response due to the
    /// server speaking a newer protocol version (than the one this
    /// library supports)
    UnsupportedProtocol,
    /// Failure to correctly parse the server response by this library
    /// due to an unsupported compression format of the data
    UnsupportedCompression,
    /// Failure to decode a snappy compressed response from Kafka
    #[cfg(feature = "snappy")]
    InvalidInputSnappy,
    /// Failure to decode a response due to an insufficient number of bytes available
    UnexpectedEOF,
    /// Failure to decode or encode a response or request respectively
    CodecError,
    /// Failure to decode a string into a valid utf8 byte sequence
    StringDecodeError,
    /// Unable to reach any host
    NoHostReachable,
}

/// Various errors reported by a remote Kafka server.
/// See also [Kafka Errors](https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes)
#[derive(Debug, Copy, Clone)]
pub enum KafkaCode {
    /// An unexpected server error
    Unknown,
    /// The requested offset is outside the range of offsets
    /// maintained by the server for the given topic/partition
    OffsetOutOfRange,
    /// This indicates that a message contents does not match its CRC
    CorruptMessage,
    /// This request is for a topic or partition that does not exist
    /// on this broker.
    UnknownTopicOrPartition,
    /// The message has a negative size
    InvalidMessageSize,
    /// This error is thrown if we are in the middle of a leadership
    /// election and there is currently no leader for this partition
    /// and hence it is unavailable for writes.
    LeaderNotAvailable,
    /// This error is thrown if the client attempts to send messages
    /// to a replica that is not the leader for some partition. It
    /// indicates that the clients metadata is out of date.
    NotLeaderForPartition,
    /// This error is thrown if the request exceeds the user-specified
    /// time limit in the request.
    RequestTimedOut,
    /// This is not a client facing error and is used mostly by tools
    /// when a broker is not alive.
    BrokerNotAvailable,
    /// If replica is expected on a broker, but is not (this can be
    /// safely ignored).
    ReplicaNotAvailable,
    /// The server has a configurable maximum message size to avoid
    /// unbounded memory allocation. This error is thrown if the
    /// client attempt to produce a message larger than this maximum.
    MessageSizeTooLarge,
    /// Internal error code for broker-to-broker communication.
    StaleControllerEpochCode,
    /// If you specify a string larger than configured maximum for
    /// offset metadata
    OffsetMetadataTooLargeCode,
    /// The broker returns this error code for an offset fetch request
    /// if it is still loading offsets (after a leader change for that
    /// offsets topic partition), or in response to group membership
    /// requests (such as heartbeats) when group metadata is being
    /// loaded by the coordinator.
    OffsetsLoadInProgressCode,
    /// The broker returns this error code for group coordinator
    /// requests, offset commits, and most group management requests
    /// if the offsets topic has not yet been created, or if the group
    /// coordinator is not active.
    ConsumerCoordinatorNotAvailableCode,
    /// The broker returns this error code if it receives an offset
    /// fetch or commit request for a group that it is not a
    /// coordinator for.
    NotCoordinatorForConsumerCode,
    /// For a request which attempts to access an invalid topic
    /// (e.g. one which has an illegal name), or if an attempt is made
    /// to write to an internal topic (such as the consumer offsets
    /// topic).
    InvalidTopicCode,
    /// If a message batch in a produce request exceeds the maximum
    /// configured segment size.
    RecordListTooLargeCode,
    /// Returned from a produce request when the number of in-sync
    /// replicas is lower than the configured minimum and requiredAcks is
    /// -1.
    NotEnoughReplicasCode,
    /// Returned from a produce request when the message was written
    /// to the log, but with fewer in-sync replicas than required.
    NotEnoughReplicasAfterAppendCode,
    /// Returned from a produce request if the requested requiredAcks is
    /// invalid (anything other than -1, 1, or 0).
    InvalidRequiredAcksCode,
    /// Returned from group membership requests (such as heartbeats) when
    /// the generation id provided in the request is not the current
    /// generation.
    IllegalGenerationCode,
    /// Returned in join group when the member provides a protocol type or
    /// set of protocols which is not compatible with the current group.
    InconsistentGroupProtocolCode,
    /// Returned in join group when the groupId is empty or null.
    InvalidGroupIdCode,
    /// Returned from group requests (offset commits/fetches, heartbeats,
    /// etc) when the memberId is not in the current generation.
    UnknownMemberIdCode,
    /// Return in join group when the requested session timeout is outside
    /// of the allowed range on the broker
    InvalidSessionTimeoutCode,
    /// Returned in heartbeat requests when the coordinator has begun
    /// rebalancing the group. This indicates to the client that it
    /// should rejoin the group.
    RebalanceInProgressCode,
    /// This error indicates that an offset commit was rejected because of
    /// oversize metadata.
    InvalidCommitOffsetSizeCode,
    /// Returned by the broker when the client is not authorized to access
    /// the requested topic.
    TopicAuthorizationFailedCode,
    /// Returned by the broker when the client is not authorized to access
    /// a particular groupId.
    GroupAuthorizationFailedCode,
    /// Returned by the broker when the client is not authorized to use an
    /// inter-broker or administrative API.
    ClusterAuthorizationFailedCode,
}

impl From<io::Error> for Error {
    fn from(err: io::Error) -> Error { Error::Io(err) }
}

impl From<byteorder::Error> for Error {
    fn from(err: byteorder::Error) -> Error {
        match err {
            byteorder::Error::UnexpectedEOF => Error::UnexpectedEOF,
            byteorder::Error::Io(err) => Error::Io(err)
        }
    }
}

#[cfg(feature = "security")]
impl From<ssl::error::SslError> for Error {
    fn from(err: ssl::error::SslError) -> Error { Error::Ssl(err) }
}

impl Clone for Error {
    fn clone(&self) -> Error {
        match self {
            &Error::Io(ref err) => Error::Io(io::Error::new(err.kind(), "Io Error")),
            &Error::Kafka(x) => Error::Kafka(x),
            #[cfg(feature = "security")]
            &Error::Ssl(ref x) => match x {
                &ssl::error::SslError::StreamError(ref e) => 
                    Error::Ssl(ssl::error::SslError::StreamError(
                            io::Error::new(e.kind(), "Stream Error"))
                    ),
                &ssl::error::SslError::SslSessionClosed => 
                    Error::Ssl(ssl::error::SslError::SslSessionClosed),
                &ssl::error::SslError::OpenSslErrors(ref v) => 
                    Error::Ssl(ssl::error::SslError::OpenSslErrors(v.clone())),
            },
            &Error::UnsupportedProtocol => Error::UnsupportedProtocol,
            &Error::UnsupportedCompression => Error::UnsupportedCompression,
            #[cfg(feature = "snappy")]
            &Error::InvalidInputSnappy => Error::InvalidInputSnappy,
            &Error::UnexpectedEOF => Error::UnexpectedEOF,
            &Error::CodecError => Error::CodecError,
            &Error::StringDecodeError => Error::StringDecodeError,
            &Error::NoHostReachable => Error::NoHostReachable,
        }
    }
}

impl error::Error for Error {
    fn description(&self) -> &str {
        match *self {
            Error::Io(ref err) => error::Error::description(err),
            Error::Kafka(_) => "Kafka Error",
            #[cfg(feature = "security")]
            Error::Ssl(ref err) => error::Error::description(err),
            Error::UnsupportedProtocol => "Unsupported protocol version",
            Error::UnsupportedCompression => "Unsupported compression format",
            #[cfg(feature = "snappy")]
            Error::InvalidInputSnappy => "Snappy decode error",
            Error::UnexpectedEOF => "Unexpected EOF",
            Error::CodecError => "Encoding/Decoding error",
            Error::StringDecodeError => "String decoding error",
            Error::NoHostReachable => "No host reachable",
        }
    }

    fn cause(&self) -> Option<&error::Error> {
        match *self {
            Error::Io(ref err) => err.cause(),
            _ => None
        }
    }
}

impl fmt::Display for Error {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match *self {
            Error::Io(ref err) => err.fmt(f),
            Error::Kafka(ref c) => write!(f, "Kafka Error ({:?})", c),
            #[cfg(feature = "security")]
            Error::Ssl(ref err) => err.fmt(f),
            Error::UnsupportedProtocol => write!(f, "Unsupported protocol version"),
            Error::UnsupportedCompression => write!(f, "Unsupported compression format"),
            #[cfg(feature = "snappy")]
            Error::InvalidInputSnappy => write!(f, "Snappy decode error"),
            Error::UnexpectedEOF => write!(f, "Unexpected EOF"),
            Error::CodecError => write!(f, "Encoding/Decoding Error"),
            Error::StringDecodeError => write!(f, "String decoding error"),
            Error::NoHostReachable => write!(f, "No Host Reachable"),
        }
    }
}