1use std::time::Duration;
23use std::{io, result, sync::Arc};
24
25use thiserror::Error;
26
27pub type Result<T> = result::Result<T, Error>;
29
30#[derive(Debug, Error)]
36pub enum ConnectionError {
37 #[error("I/O error: {0}")]
39 Io(#[from] io::Error),
40
41 #[cfg(feature = "security")]
43 #[error("TLS error: {0}")]
44 Tls(String),
45
46 #[error("Connection timeout after {0:?}")]
48 Timeout(Duration),
49
50 #[error("No host reachable")]
52 NoHostReachable,
53}
54
55#[derive(Debug, Clone, Error)]
57pub enum ProtocolError {
58 #[error("Unsupported protocol version")]
60 UnsupportedVersion,
61
62 #[error("Unsupported compression format")]
64 UnsupportedCompression,
65
66 #[error("Unexpected end of data")]
68 UnexpectedEOF,
69
70 #[error("Encoding/decoding error")]
72 Codec,
73
74 #[error("String decoding error")]
76 StringDecode,
77
78 #[error("Invalid duration")]
80 InvalidDuration,
81}
82
83#[derive(Debug, Clone, Error)]
85pub enum ConsumerError {
86 #[error("No topic assigned")]
88 NoTopicsAssigned,
89
90 #[error("Offset storage not configured")]
92 UnsetOffsetStorage,
93
94 #[error("Group ID not configured")]
96 UnsetGroupId,
97}
98
99#[derive(Debug, Error)]
106pub enum Error {
107 #[error("Connection error: {0}")]
110 Connection(#[source] ConnectionError),
111
112 #[error("Protocol error: {0}")]
115 Protocol(#[source] ProtocolError),
116
117 #[error("Kafka Error ({0:?})")]
120 Kafka(KafkaCode),
121
122 #[error("Configuration error: {0}")]
125 Config(String),
126
127 #[error("Consumer error: {0}")]
130 Consumer(#[source] ConsumerError),
131
132 #[error("Topic Partition Error ({topic_name:?}, {partition_id:?}, {error_code:?})")]
135 TopicPartitionError {
136 topic_name: String,
138 partition_id: i32,
140 error_code: KafkaCode,
142 },
143
144 #[error("Broker request to {broker} failed ({api_key}): {source}")]
146 BrokerRequestError {
147 broker: String,
149 api_key: &'static str,
151 #[source]
153 source: Box<Self>,
154 },
155}
156
157impl From<io::Error> for Error {
159 fn from(e: io::Error) -> Self {
160 Self::Connection(ConnectionError::Io(e))
161 }
162}
163
164impl From<Arc<Self>> for Error {
165 fn from(e: Arc<Self>) -> Self {
166 match Arc::try_unwrap(e) {
167 Ok(err) => err,
168 Err(arc) => match &*arc {
169 Self::Connection(ConnectionError::Io(e)) => {
170 Self::Connection(ConnectionError::Io(io::Error::new(e.kind(), e.to_string())))
171 }
172 #[cfg(feature = "security")]
173 Self::Connection(ConnectionError::Tls(s)) => {
174 Self::Connection(ConnectionError::Tls(s.clone()))
175 }
176 Self::Connection(ConnectionError::Timeout(d)) => {
177 Self::Connection(ConnectionError::Timeout(*d))
178 }
179 Self::Connection(ConnectionError::NoHostReachable) => {
180 Self::Connection(ConnectionError::NoHostReachable)
181 }
182 Self::Protocol(p) => Self::Protocol(p.clone()),
183 Self::Kafka(c) => Self::Kafka(*c),
184 Self::Config(s) => Self::Config(s.clone()),
185 Self::Consumer(c) => Self::Consumer(c.clone()),
186 Self::TopicPartitionError {
187 topic_name,
188 partition_id,
189 error_code,
190 } => Self::TopicPartitionError {
191 topic_name: topic_name.clone(),
192 partition_id: *partition_id,
193 error_code: *error_code,
194 },
195 Self::BrokerRequestError {
196 broker,
197 api_key,
198 source: _,
199 } => Self::BrokerRequestError {
200 broker: broker.clone(),
201 api_key,
202 source: Box::new(Self::from(Arc::clone(&arc))),
203 },
204 },
205 }
206 }
207}
208
209impl Error {
214 #[inline]
215 pub(crate) fn codec() -> Self {
216 Self::Protocol(ProtocolError::Codec)
217 }
218
219 #[inline]
220 #[allow(dead_code)]
221 pub(crate) fn unexpected_eof() -> Self {
222 Self::Protocol(ProtocolError::UnexpectedEOF)
223 }
224
225 #[inline]
226 pub(crate) fn no_host_reachable() -> Self {
227 Self::Connection(ConnectionError::NoHostReachable)
228 }
229
230 #[inline]
231 pub(crate) fn invalid_duration() -> Self {
232 Self::Protocol(ProtocolError::InvalidDuration)
233 }
234
235 #[inline]
236 pub(crate) fn no_topics_assigned() -> Self {
237 Self::Consumer(ConsumerError::NoTopicsAssigned)
238 }
239
240 #[inline]
241 pub(crate) fn unset_group_id() -> Self {
242 Self::Consumer(ConsumerError::UnsetGroupId)
243 }
244
245 #[cfg(feature = "security")]
246 #[inline]
247 #[allow(dead_code)]
248 pub(crate) fn tls(msg: impl Into<String>) -> Self {
249 Self::Connection(ConnectionError::Tls(msg.into()))
250 }
251
252 #[must_use]
254 pub fn with_broker_context(self, broker: impl Into<String>, api_key: &'static str) -> Self {
255 Error::BrokerRequestError {
256 broker: broker.into(),
257 api_key,
258 source: Box::new(self),
259 }
260 }
261
262 #[must_use]
264 pub fn is_retriable(&self) -> bool {
265 match self {
266 Self::Kafka(code) => code.is_retriable(),
267 Self::Connection(conn_err) => matches!(
268 conn_err,
269 ConnectionError::Io(_)
270 | ConnectionError::Timeout(_)
271 | ConnectionError::NoHostReachable
272 ),
273 Self::TopicPartitionError { error_code, .. } => error_code.is_retriable(),
274 Self::BrokerRequestError { source, .. } => source.is_retriable(),
275 _ => false,
276 }
277 }
278
279 #[must_use]
281 pub fn is_connection_error(&self) -> bool {
282 matches!(self, Self::Connection(_))
283 }
284
285 #[must_use]
287 pub fn is_protocol_error(&self) -> bool {
288 matches!(self, Self::Protocol(_))
289 }
290
291 #[must_use]
293 pub fn is_consumer_error(&self) -> bool {
294 matches!(self, Self::Consumer(_))
295 }
296
297 pub(crate) fn from_protocol(n: i16) -> Option<Error> {
298 KafkaCode::from_protocol(n).map(Error::Kafka)
299 }
300}
301
302#[derive(Debug, Copy, Clone, PartialEq, Eq)]
309pub enum KafkaCode {
310 Unknown = -1,
312 OffsetOutOfRange = 1,
315 CorruptMessage = 2,
317 UnknownTopicOrPartition = 3,
320 InvalidMessageSize = 4,
322 LeaderNotAvailable = 5,
326 NotLeaderForPartition = 6,
330 RequestTimedOut = 7,
333 BrokerNotAvailable = 8,
336 ReplicaNotAvailable = 9,
339 MessageSizeTooLarge = 10,
343 StaleControllerEpoch = 11,
345 OffsetMetadataTooLarge = 12,
348 NetworkException = 13,
350 GroupLoadInProgress = 14,
356 GroupCoordinatorNotAvailable = 15,
361 NotCoordinatorForGroup = 16,
365 InvalidTopic = 17,
370 RecordListTooLarge = 18,
373 NotEnoughReplicas = 19,
377 NotEnoughReplicasAfterAppend = 20,
380 InvalidRequiredAcks = 21,
383 IllegalGeneration = 22,
387 InconsistentGroupProtocol = 23,
390 InvalidGroupId = 24,
392 UnknownMemberId = 25,
395 InvalidSessionTimeout = 26,
398 RebalanceInProgress = 27,
402 InvalidCommitOffsetSize = 28,
405 TopicAuthorizationFailed = 29,
408 GroupAuthorizationFailed = 30,
411 ClusterAuthorizationFailed = 31,
414 InvalidTimestamp = 32,
416 UnsupportedSaslMechanism = 33,
418 IllegalSaslState = 34,
420 UnsupportedVersion = 35,
422 }
424
425impl KafkaCode {
426 #[must_use]
428 pub fn is_retriable(self) -> bool {
429 matches!(
430 self,
431 KafkaCode::LeaderNotAvailable
432 | KafkaCode::NotLeaderForPartition
433 | KafkaCode::GroupLoadInProgress
434 | KafkaCode::GroupCoordinatorNotAvailable
435 | KafkaCode::NotCoordinatorForGroup
436 | KafkaCode::NetworkException
437 | KafkaCode::RequestTimedOut
438 | KafkaCode::RebalanceInProgress
439 )
440 }
441
442 pub(crate) fn from_protocol(n: i16) -> Option<KafkaCode> {
443 if n == 0 {
444 return None;
445 }
446 Some(match n {
447 -1 => KafkaCode::Unknown,
448 1 => KafkaCode::OffsetOutOfRange,
449 2 => KafkaCode::CorruptMessage,
450 3 => KafkaCode::UnknownTopicOrPartition,
451 4 => KafkaCode::InvalidMessageSize,
452 5 => KafkaCode::LeaderNotAvailable,
453 6 => KafkaCode::NotLeaderForPartition,
454 7 => KafkaCode::RequestTimedOut,
455 8 => KafkaCode::BrokerNotAvailable,
456 9 => KafkaCode::ReplicaNotAvailable,
457 10 => KafkaCode::MessageSizeTooLarge,
458 11 => KafkaCode::StaleControllerEpoch,
459 12 => KafkaCode::OffsetMetadataTooLarge,
460 13 => KafkaCode::NetworkException,
461 14 => KafkaCode::GroupLoadInProgress,
462 15 => KafkaCode::GroupCoordinatorNotAvailable,
463 16 => KafkaCode::NotCoordinatorForGroup,
464 17 => KafkaCode::InvalidTopic,
465 18 => KafkaCode::RecordListTooLarge,
466 19 => KafkaCode::NotEnoughReplicas,
467 20 => KafkaCode::NotEnoughReplicasAfterAppend,
468 21 => KafkaCode::InvalidRequiredAcks,
469 22 => KafkaCode::IllegalGeneration,
470 23 => KafkaCode::InconsistentGroupProtocol,
471 24 => KafkaCode::InvalidGroupId,
472 25 => KafkaCode::UnknownMemberId,
473 26 => KafkaCode::InvalidSessionTimeout,
474 27 => KafkaCode::RebalanceInProgress,
475 28 => KafkaCode::InvalidCommitOffsetSize,
476 29 => KafkaCode::TopicAuthorizationFailed,
477 30 => KafkaCode::GroupAuthorizationFailed,
478 31 => KafkaCode::ClusterAuthorizationFailed,
479 32 => KafkaCode::InvalidTimestamp,
480 33 => KafkaCode::UnsupportedSaslMechanism,
481 34 => KafkaCode::IllegalSaslState,
482 35 => KafkaCode::UnsupportedVersion,
483 _ => KafkaCode::Unknown,
484 })
485 }
486}
487
488#[cfg(test)]
489mod tests {
490 use super::*;
491 use std::io::ErrorKind;
492
493 #[test]
494 fn test_kafka_code_from_i32() {
495 assert!(KafkaCode::from_protocol(0).is_none());
496 assert_eq!(
497 KafkaCode::from_protocol(1),
498 Some(KafkaCode::OffsetOutOfRange)
499 );
500 assert_eq!(
501 KafkaCode::from_protocol(6),
502 Some(KafkaCode::NotLeaderForPartition)
503 );
504 assert_eq!(KafkaCode::from_protocol(999), Some(KafkaCode::Unknown));
505 }
506
507 #[test]
508 fn test_error_display() {
509 let msg = Error::Kafka(KafkaCode::LeaderNotAvailable).to_string();
510 assert!(msg.contains("LeaderNotAvailable"), "got: {msg}");
511
512 let msg = Error::no_host_reachable().to_string();
513 assert!(msg.contains("host"), "got: {msg}");
514
515 let msg = Error::unexpected_eof().to_string();
516 assert!(msg.contains("end of data"), "got: {msg}");
517 }
518
519 #[test]
520 fn test_error_with_broker_context() {
521 let err = Error::Kafka(KafkaCode::NotLeaderForPartition)
522 .with_broker_context("broker1:9092", "Produce");
523 let msg = err.to_string();
524 assert!(msg.contains("broker1:9092"), "got: {msg}");
525 assert!(msg.contains("Produce"), "got: {msg}");
526 }
527
528 #[test]
529 fn test_error_io_conversion() {
530 let io_err = io::Error::new(ErrorKind::ConnectionRefused, "refused");
531 let err: Error = io_err.into();
532 assert!(matches!(err, Error::Connection(ConnectionError::Io(_))));
533 }
534
535 #[test]
536 fn test_topic_partition_error() {
537 let err = Error::TopicPartitionError {
538 topic_name: "test-topic".into(),
539 partition_id: 0,
540 error_code: KafkaCode::LeaderNotAvailable,
541 };
542 let msg = err.to_string();
543 assert!(msg.contains("test-topic"), "got: {msg}");
544 }
545
546 #[test]
547 fn test_is_retriable_kafka_errors() {
548 assert!(Error::Kafka(KafkaCode::LeaderNotAvailable).is_retriable());
549 assert!(Error::Kafka(KafkaCode::NotLeaderForPartition).is_retriable());
550 assert!(Error::Kafka(KafkaCode::GroupLoadInProgress).is_retriable());
551 assert!(Error::Kafka(KafkaCode::GroupCoordinatorNotAvailable).is_retriable());
552 assert!(Error::Kafka(KafkaCode::NotCoordinatorForGroup).is_retriable());
553 assert!(Error::Kafka(KafkaCode::NetworkException).is_retriable());
554 assert!(Error::Kafka(KafkaCode::RequestTimedOut).is_retriable());
555 assert!(Error::Kafka(KafkaCode::RebalanceInProgress).is_retriable());
556 }
557
558 #[test]
559 fn test_is_not_retriable_kafka_errors() {
560 assert!(!Error::Kafka(KafkaCode::UnknownTopicOrPartition).is_retriable());
561 assert!(!Error::Kafka(KafkaCode::MessageSizeTooLarge).is_retriable());
562 assert!(!Error::Kafka(KafkaCode::Unknown).is_retriable());
563 }
564
565 #[test]
566 fn test_is_retriable_connection_errors() {
567 assert!(
568 Error::Connection(ConnectionError::Io(io::Error::new(
569 ErrorKind::ConnectionReset,
570 "reset"
571 )))
572 .is_retriable()
573 );
574 assert!(Error::Connection(ConnectionError::Timeout(Duration::from_secs(5))).is_retriable());
575 assert!(Error::Connection(ConnectionError::NoHostReachable).is_retriable());
576 #[cfg(feature = "security")]
577 assert!(!Error::Connection(ConnectionError::Tls("bad cert".into())).is_retriable());
578 }
579
580 #[test]
581 fn test_is_retriable_topic_partition_error() {
582 let err = Error::TopicPartitionError {
583 topic_name: "t".into(),
584 partition_id: 0,
585 error_code: KafkaCode::LeaderNotAvailable,
586 };
587 assert!(err.is_retriable());
588
589 let err = Error::TopicPartitionError {
590 topic_name: "t".into(),
591 partition_id: 0,
592 error_code: KafkaCode::UnknownTopicOrPartition,
593 };
594 assert!(!err.is_retriable());
595 }
596
597 #[test]
598 fn test_is_retriable_broker_request_error() {
599 let inner = Error::Kafka(KafkaCode::NotLeaderForPartition);
600 let err = inner.with_broker_context("broker:9092", "Produce");
601 assert!(err.is_retriable());
602 }
603
604 #[test]
605 fn test_is_retriable_non_retriable_errors() {
606 assert!(!Error::codec().is_retriable());
607 assert!(!Error::no_topics_assigned().is_retriable());
608 assert!(!Error::Config("bad".into()).is_retriable());
609 }
610
611 #[test]
612 fn test_kafka_code_is_retriable() {
613 assert!(KafkaCode::LeaderNotAvailable.is_retriable());
614 assert!(KafkaCode::NetworkException.is_retriable());
615 assert!(!KafkaCode::UnknownTopicOrPartition.is_retriable());
616 assert!(!KafkaCode::OffsetOutOfRange.is_retriable());
617 }
618
619 #[test]
620 fn test_error_category_queries() {
621 assert!(
622 Error::Connection(ConnectionError::Io(io::Error::other("err"))).is_connection_error()
623 );
624 assert!(!Error::codec().is_connection_error());
625 assert!(Error::codec().is_protocol_error());
626 assert!(Error::no_topics_assigned().is_consumer_error());
627 }
628
629 #[test]
630 fn test_protocol_error_variants() {
631 assert!(
632 Error::Protocol(ProtocolError::UnsupportedVersion)
633 .to_string()
634 .contains("version")
635 );
636 assert!(
637 Error::Protocol(ProtocolError::UnsupportedCompression)
638 .to_string()
639 .contains("compression")
640 );
641 assert!(
642 Error::Protocol(ProtocolError::Codec)
643 .to_string()
644 .contains("Encoding")
645 );
646 assert!(
647 Error::Protocol(ProtocolError::StringDecode)
648 .to_string()
649 .contains("String")
650 );
651 assert!(
652 Error::Protocol(ProtocolError::InvalidDuration)
653 .to_string()
654 .contains("duration")
655 );
656 }
657
658 #[test]
659 fn test_consumer_error_variants() {
660 assert!(
661 Error::Consumer(ConsumerError::NoTopicsAssigned)
662 .to_string()
663 .contains("topic")
664 );
665 assert!(
666 Error::Consumer(ConsumerError::UnsetOffsetStorage)
667 .to_string()
668 .contains("Offset")
669 );
670 assert!(
671 Error::Consumer(ConsumerError::UnsetGroupId)
672 .to_string()
673 .contains("Group")
674 );
675 }
676}
677
678#[cfg(test)]
679mod proptests {
680 use super::*;
681 use proptest::prelude::*;
682
683 proptest! {
684 #[test]
686 fn kafka_code_from_any_i16(code in proptest::num::i16::ANY) {
687 let _ = KafkaCode::from_protocol(code);
688 }
689
690 #[test]
692 fn kafka_code_unknown_for_out_of_range(code in 36i16..=1000i16) {
693 assert_eq!(KafkaCode::from_protocol(code), Some(KafkaCode::Unknown));
694 }
695
696 #[test]
698 fn broker_context_chainable(broker in "[a-z]{1,20}") {
699 let err = Error::no_host_reachable();
700 let wrapped = err.with_broker_context(broker.clone(), "Produce");
701 let msg = wrapped.to_string();
702 assert!(msg.contains(&broker));
703 assert!(msg.contains("Produce"));
704 }
705
706 #[test]
708 fn is_retriable_safe(code in proptest::num::i16::ANY) {
709 if let Some(kafka_code) = KafkaCode::from_protocol(code) {
710 let err = Error::Kafka(kafka_code);
711 let _ = err.is_retriable();
712 }
713 }
714
715 #[test]
717 fn error_display_safe(code in proptest::num::i16::ANY) {
718 if let Some(kafka_code) = KafkaCode::from_protocol(code) {
719 let err = Error::Kafka(kafka_code);
720 let msg = err.to_string();
721 assert!(!msg.is_empty());
722 }
723 }
724 }
725}