1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
//! Error struct and methods
use std::result;
use std::error;
use std::io;
use std::fmt;
use byteorder;
#[cfg(feature = "security")]
use openssl::ssl;
/// A type for results generated by this crate's functions where the `Err` type
/// is hard-wired to `enums::Error`.
///
/// This typedef is generally used to avoid writing out `enums::Error` directly and
/// is otherwise a direct mapping to `std::result::Result`.
pub type Result<T> = result::Result<T, Error>;
/// The various errors this library can produce.
#[derive(Debug)]
pub enum Error {
/// Input/Output error while communicating with Kafka
Io(io::Error),
/// An error as reported by a remote Kafka server
Kafka(KafkaCode),
/// An error as reported by OpenSsl
#[cfg(feature = "security")]
Ssl(ssl::error::SslError),
/// Failure to correctly parse the server response due to the
/// server speaking a newer protocol version (than the one this
/// library supports)
UnsupportedProtocol,
/// Failure to correctly parse the server response by this library
/// due to an unsupported compression format of the data
UnsupportedCompression,
/// Failure to decode a snappy compressed response from Kafka
#[cfg(feature = "snappy")]
InvalidInputSnappy,
/// Failure to decode a response due to an insufficient number of bytes available
UnexpectedEOF,
/// Failure to decode or encode a response or request respectively
CodecError,
/// Failure to decode a string into a valid utf8 byte sequence
StringDecodeError,
/// Unable to reach any host
NoHostReachable,
}
/// Various errors reported by a remote Kafka server.
/// See also [Kafka Errors](https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes)
#[derive(Debug, Copy, Clone)]
pub enum KafkaCode {
/// An unexpected server error
Unknown,
/// The requested offset is outside the range of offsets
/// maintained by the server for the given topic/partition
OffsetOutOfRange,
/// This indicates that a message contents does not match its CRC
CorruptMessage,
/// This request is for a topic or partition that does not exist
/// on this broker.
UnknownTopicOrPartition,
/// The message has a negative size
InvalidMessageSize,
/// This error is thrown if we are in the middle of a leadership
/// election and there is currently no leader for this partition
/// and hence it is unavailable for writes.
LeaderNotAvailable,
/// This error is thrown if the client attempts to send messages
/// to a replica that is not the leader for some partition. It
/// indicates that the clients metadata is out of date.
NotLeaderForPartition,
/// This error is thrown if the request exceeds the user-specified
/// time limit in the request.
RequestTimedOut,
/// This is not a client facing error and is used mostly by tools
/// when a broker is not alive.
BrokerNotAvailable,
/// If replica is expected on a broker, but is not (this can be
/// safely ignored).
ReplicaNotAvailable,
/// The server has a configurable maximum message size to avoid
/// unbounded memory allocation. This error is thrown if the
/// client attempt to produce a message larger than this maximum.
MessageSizeTooLarge,
/// Internal error code for broker-to-broker communication.
StaleControllerEpochCode,
/// If you specify a string larger than configured maximum for
/// offset metadata
OffsetMetadataTooLargeCode,
/// The broker returns this error code for an offset fetch request
/// if it is still loading offsets (after a leader change for that
/// offsets topic partition), or in response to group membership
/// requests (such as heartbeats) when group metadata is being
/// loaded by the coordinator.
OffsetsLoadInProgressCode,
/// The broker returns this error code for group coordinator
/// requests, offset commits, and most group management requests
/// if the offsets topic has not yet been created, or if the group
/// coordinator is not active.
ConsumerCoordinatorNotAvailableCode,
/// The broker returns this error code if it receives an offset
/// fetch or commit request for a group that it is not a
/// coordinator for.
NotCoordinatorForConsumerCode,
/// For a request which attempts to access an invalid topic
/// (e.g. one which has an illegal name), or if an attempt is made
/// to write to an internal topic (such as the consumer offsets
/// topic).
InvalidTopicCode,
/// If a message batch in a produce request exceeds the maximum
/// configured segment size.
RecordListTooLargeCode,
/// Returned from a produce request when the number of in-sync
/// replicas is lower than the configured minimum and requiredAcks is
/// -1.
NotEnoughReplicasCode,
/// Returned from a produce request when the message was written
/// to the log, but with fewer in-sync replicas than required.
NotEnoughReplicasAfterAppendCode,
/// Returned from a produce request if the requested requiredAcks is
/// invalid (anything other than -1, 1, or 0).
InvalidRequiredAcksCode,
/// Returned from group membership requests (such as heartbeats) when
/// the generation id provided in the request is not the current
/// generation.
IllegalGenerationCode,
/// Returned in join group when the member provides a protocol type or
/// set of protocols which is not compatible with the current group.
InconsistentGroupProtocolCode,
/// Returned in join group when the groupId is empty or null.
InvalidGroupIdCode,
/// Returned from group requests (offset commits/fetches, heartbeats,
/// etc) when the memberId is not in the current generation.
UnknownMemberIdCode,
/// Return in join group when the requested session timeout is outside
/// of the allowed range on the broker
InvalidSessionTimeoutCode,
/// Returned in heartbeat requests when the coordinator has begun
/// rebalancing the group. This indicates to the client that it
/// should rejoin the group.
RebalanceInProgressCode,
/// This error indicates that an offset commit was rejected because of
/// oversize metadata.
InvalidCommitOffsetSizeCode,
/// Returned by the broker when the client is not authorized to access
/// the requested topic.
TopicAuthorizationFailedCode,
/// Returned by the broker when the client is not authorized to access
/// a particular groupId.
GroupAuthorizationFailedCode,
/// Returned by the broker when the client is not authorized to use an
/// inter-broker or administrative API.
ClusterAuthorizationFailedCode,
}
impl From<io::Error> for Error {
fn from(err: io::Error) -> Error { Error::Io(err) }
}
impl From<byteorder::Error> for Error {
fn from(err: byteorder::Error) -> Error {
match err {
byteorder::Error::UnexpectedEOF => Error::UnexpectedEOF,
byteorder::Error::Io(err) => Error::Io(err)
}
}
}
#[cfg(feature = "security")]
impl From<ssl::error::SslError> for Error {
fn from(err: ssl::error::SslError) -> Error { Error::Ssl(err) }
}
impl Clone for Error {
fn clone(&self) -> Error {
match self {
&Error::Io(ref err) => Error::Io(io::Error::new(err.kind(), "Io Error")),
&Error::Kafka(x) => Error::Kafka(x),
#[cfg(feature = "security")]
&Error::Ssl(ref x) => match x {
&ssl::error::SslError::StreamError(ref e) =>
Error::Ssl(ssl::error::SslError::StreamError(
io::Error::new(e.kind(), "Stream Error"))
),
&ssl::error::SslError::SslSessionClosed =>
Error::Ssl(ssl::error::SslError::SslSessionClosed),
&ssl::error::SslError::OpenSslErrors(ref v) =>
Error::Ssl(ssl::error::SslError::OpenSslErrors(v.clone())),
},
&Error::UnsupportedProtocol => Error::UnsupportedProtocol,
&Error::UnsupportedCompression => Error::UnsupportedCompression,
#[cfg(feature = "snappy")]
&Error::InvalidInputSnappy => Error::InvalidInputSnappy,
&Error::UnexpectedEOF => Error::UnexpectedEOF,
&Error::CodecError => Error::CodecError,
&Error::StringDecodeError => Error::StringDecodeError,
&Error::NoHostReachable => Error::NoHostReachable,
}
}
}
impl error::Error for Error {
fn description(&self) -> &str {
match *self {
Error::Io(ref err) => error::Error::description(err),
Error::Kafka(_) => "Kafka Error",
#[cfg(feature = "security")]
Error::Ssl(ref err) => error::Error::description(err),
Error::UnsupportedProtocol => "Unsupported protocol version",
Error::UnsupportedCompression => "Unsupported compression format",
#[cfg(feature = "snappy")]
Error::InvalidInputSnappy => "Snappy decode error",
Error::UnexpectedEOF => "Unexpected EOF",
Error::CodecError => "Encoding/Decoding error",
Error::StringDecodeError => "String decoding error",
Error::NoHostReachable => "No host reachable",
}
}
fn cause(&self) -> Option<&error::Error> {
match *self {
Error::Io(ref err) => err.cause(),
_ => None
}
}
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::Io(ref err) => err.fmt(f),
Error::Kafka(ref c) => write!(f, "Kafka Error ({:?})", c),
#[cfg(feature = "security")]
Error::Ssl(ref err) => err.fmt(f),
Error::UnsupportedProtocol => write!(f, "Unsupported protocol version"),
Error::UnsupportedCompression => write!(f, "Unsupported compression format"),
#[cfg(feature = "snappy")]
Error::InvalidInputSnappy => write!(f, "Snappy decode error"),
Error::UnexpectedEOF => write!(f, "Unexpected EOF"),
Error::CodecError => write!(f, "Encoding/Decoding Error"),
Error::StringDecodeError => write!(f, "String decoding error"),
Error::NoHostReachable => write!(f, "No Host Reachable"),
}
}
}