Skip to main content

rustfs_kafka/
error.rs

1//! Error types and error handling utilities.
2//!
3//! This module provides the main [`enum@Error`] enum, sub-error types for each
4//! layer ([`ConnectionError`], [`ProtocolError`], [`ConsumerError`]), and
5//! the [`KafkaCode`] enum for Kafka server error codes.
6//!
7//! # Retriable Errors
8//!
9//! Use [`Error::is_retriable()`] to determine if an error can be resolved
10//! by retrying. Retriable conditions include:
11//! - `LeaderNotAvailable`, `NotLeaderForPartition`
12//! - `GroupLoadInProgress`, `GroupCoordinatorNotAvailable`
13//! - `NetworkException`, `RequestTimedOut`
14//! - Connection I/O errors and timeouts
15//!
16//! # Broker Context
17//!
18//! Errors from broker operations can be enriched with context using
19//! [`Error::with_broker_context()`], which captures the broker host and
20//! API key for improved debuggability.
21
22use std::time::Duration;
23use std::{io, result, sync::Arc};
24
25use thiserror::Error;
26
27/// Alias for results returned by functions in this crate.
28pub type Result<T> = result::Result<T, Error>;
29
30// --------------------------------------------------------------------
31// Sub-error types
32// --------------------------------------------------------------------
33
34/// Errors originating from the network/connection layer.
35#[derive(Debug, Error)]
36pub enum ConnectionError {
37    /// Underlying I/O error.
38    #[error("I/O error: {0}")]
39    Io(#[from] io::Error),
40
41    /// TLS-related error (available when `security` feature is enabled).
42    #[cfg(feature = "security")]
43    #[error("TLS error: {0}")]
44    Tls(String),
45
46    /// A connection-level timeout occurred.
47    #[error("Connection timeout after {0:?}")]
48    Timeout(Duration),
49
50    /// No reachable host was found for the requested operation.
51    #[error("No host reachable")]
52    NoHostReachable,
53}
54
55/// Errors from the protocol layer (encoding, decoding, and version negotiation).
56#[derive(Debug, Clone, Error)]
57pub enum ProtocolError {
58    /// Protocol version is not supported.
59    #[error("Unsupported protocol version")]
60    UnsupportedVersion,
61
62    /// Compression format is not supported by this client.
63    #[error("Unsupported compression format")]
64    UnsupportedCompression,
65
66    /// Unexpected end of input while decoding protocol data.
67    #[error("Unexpected end of data")]
68    UnexpectedEOF,
69
70    /// Generic encoding/decoding error.
71    #[error("Encoding/decoding error")]
72    Codec,
73
74    /// Error decoding a string from bytes.
75    #[error("String decoding error")]
76    StringDecode,
77
78    /// Invalid duration value encountered while decoding.
79    #[error("Invalid duration")]
80    InvalidDuration,
81}
82
83/// Errors specific to the consumer high-level API.
84#[derive(Debug, Clone, Error)]
85pub enum ConsumerError {
86    /// No topics have been assigned to the consumer.
87    #[error("No topic assigned")]
88    NoTopicsAssigned,
89
90    /// Offset storage (Kafka/Zookeeper) is not configured for the consumer.
91    #[error("Offset storage not configured")]
92    UnsetOffsetStorage,
93
94    /// Consumer group id is not set for operations that require it.
95    #[error("Group ID not configured")]
96    UnsetGroupId,
97}
98
99// --------------------------------------------------------------------
100// Main Error type
101// --------------------------------------------------------------------
102
103/// The crate's primary error type, encompassing network, protocol,
104/// server-side and client-side errors.
105#[derive(Debug, Error)]
106pub enum Error {
107    // === Network layer ===
108    /// Wrapper for connection-level errors.
109    #[error("Connection error: {0}")]
110    Connection(#[source] ConnectionError),
111
112    // === Protocol layer ===
113    /// Wrapper for protocol encoding/decoding/version negotiation errors.
114    #[error("Protocol error: {0}")]
115    Protocol(#[source] ProtocolError),
116
117    // === Kafka server ===
118    /// Represents an error code returned by a Kafka broker.
119    #[error("Kafka Error ({0:?})")]
120    Kafka(KafkaCode),
121
122    // === Client configuration ===
123    /// Configuration-related error with a human message.
124    #[error("Configuration error: {0}")]
125    Config(String),
126
127    // === Consumer ===
128    /// Errors arising from the consumer high-level API.
129    #[error("Consumer error: {0}")]
130    Consumer(#[source] ConsumerError),
131
132    // === Context wrappers ===
133    /// Error contextualized to a specific topic and partition.
134    #[error("Topic Partition Error ({topic_name:?}, {partition_id:?}, {error_code:?})")]
135    TopicPartitionError {
136        /// Topic name associated with the error.
137        topic_name: String,
138        /// Partition id associated with the error.
139        partition_id: i32,
140        /// The Kafka error code returned by the broker.
141        error_code: KafkaCode,
142    },
143
144    /// Error from a broker request which preserves broker context for debugging.
145    #[error("Broker request to {broker} failed ({api_key}): {source}")]
146    BrokerRequestError {
147        /// Broker host (host:port) where the request failed.
148        broker: String,
149        /// The API key name for the failing request.
150        api_key: &'static str,
151        /// The wrapped underlying error (source).
152        #[source]
153        source: Box<Self>,
154    },
155}
156
157// Allow io::Error to convert to Error via Connection(Io(..))
158impl From<io::Error> for Error {
159    fn from(e: io::Error) -> Self {
160        Self::Connection(ConnectionError::Io(e))
161    }
162}
163
164impl From<Arc<Self>> for Error {
165    fn from(e: Arc<Self>) -> Self {
166        match Arc::try_unwrap(e) {
167            Ok(err) => err,
168            Err(arc) => match &*arc {
169                Self::Connection(ConnectionError::Io(e)) => {
170                    Self::Connection(ConnectionError::Io(io::Error::new(e.kind(), e.to_string())))
171                }
172                #[cfg(feature = "security")]
173                Self::Connection(ConnectionError::Tls(s)) => {
174                    Self::Connection(ConnectionError::Tls(s.clone()))
175                }
176                Self::Connection(ConnectionError::Timeout(d)) => {
177                    Self::Connection(ConnectionError::Timeout(*d))
178                }
179                Self::Connection(ConnectionError::NoHostReachable) => {
180                    Self::Connection(ConnectionError::NoHostReachable)
181                }
182                Self::Protocol(p) => Self::Protocol(p.clone()),
183                Self::Kafka(c) => Self::Kafka(*c),
184                Self::Config(s) => Self::Config(s.clone()),
185                Self::Consumer(c) => Self::Consumer(c.clone()),
186                Self::TopicPartitionError {
187                    topic_name,
188                    partition_id,
189                    error_code,
190                } => Self::TopicPartitionError {
191                    topic_name: topic_name.clone(),
192                    partition_id: *partition_id,
193                    error_code: *error_code,
194                },
195                Self::BrokerRequestError {
196                    broker,
197                    api_key,
198                    source: _,
199                } => Self::BrokerRequestError {
200                    broker: broker.clone(),
201                    api_key,
202                    source: Box::new(Self::from(Arc::clone(&arc))),
203                },
204            },
205        }
206    }
207}
208
209// --------------------------------------------------------------------
210// Convenience constructors
211// --------------------------------------------------------------------
212
213impl Error {
214    #[inline]
215    pub(crate) fn codec() -> Self {
216        Self::Protocol(ProtocolError::Codec)
217    }
218
219    #[inline]
220    #[allow(dead_code)]
221    pub(crate) fn unexpected_eof() -> Self {
222        Self::Protocol(ProtocolError::UnexpectedEOF)
223    }
224
225    #[inline]
226    pub(crate) fn no_host_reachable() -> Self {
227        Self::Connection(ConnectionError::NoHostReachable)
228    }
229
230    #[inline]
231    pub(crate) fn invalid_duration() -> Self {
232        Self::Protocol(ProtocolError::InvalidDuration)
233    }
234
235    #[inline]
236    pub(crate) fn no_topics_assigned() -> Self {
237        Self::Consumer(ConsumerError::NoTopicsAssigned)
238    }
239
240    #[inline]
241    pub(crate) fn unset_group_id() -> Self {
242        Self::Consumer(ConsumerError::UnsetGroupId)
243    }
244
245    #[cfg(feature = "security")]
246    #[inline]
247    #[allow(dead_code)]
248    pub(crate) fn tls(msg: impl Into<String>) -> Self {
249        Self::Connection(ConnectionError::Tls(msg.into()))
250    }
251
252    /// Wraps this error with broker request context (broker host and API key name).
253    #[must_use]
254    pub fn with_broker_context(self, broker: impl Into<String>, api_key: &'static str) -> Self {
255        Error::BrokerRequestError {
256            broker: broker.into(),
257            api_key,
258            source: Box::new(self),
259        }
260    }
261
262    /// Returns `true` if this error is likely transient and can be resolved by retrying.
263    #[must_use]
264    pub fn is_retriable(&self) -> bool {
265        match self {
266            Self::Kafka(code) => code.is_retriable(),
267            Self::Connection(conn_err) => matches!(
268                conn_err,
269                ConnectionError::Io(_)
270                    | ConnectionError::Timeout(_)
271                    | ConnectionError::NoHostReachable
272            ),
273            Self::TopicPartitionError { error_code, .. } => error_code.is_retriable(),
274            Self::BrokerRequestError { source, .. } => source.is_retriable(),
275            _ => false,
276        }
277    }
278
279    /// Returns `true` if this error originates from the connection/network layer.
280    #[must_use]
281    pub fn is_connection_error(&self) -> bool {
282        matches!(self, Self::Connection(_))
283    }
284
285    /// Returns `true` if this error originates from the protocol layer.
286    #[must_use]
287    pub fn is_protocol_error(&self) -> bool {
288        matches!(self, Self::Protocol(_))
289    }
290
291    /// Returns `true` if this error originates from the consumer layer.
292    #[must_use]
293    pub fn is_consumer_error(&self) -> bool {
294        matches!(self, Self::Consumer(_))
295    }
296
297    pub(crate) fn from_protocol(n: i16) -> Option<Error> {
298        KafkaCode::from_protocol(n).map(Error::Kafka)
299    }
300}
301
302// --------------------------------------------------------------------
303// KafkaCode
304// --------------------------------------------------------------------
305
306/// Various errors reported by a remote Kafka server.
307/// See also [Kafka Errors](http://kafka.apache.org/protocol.html)
308#[derive(Debug, Copy, Clone, PartialEq, Eq)]
309pub enum KafkaCode {
310    /// An unexpected server error
311    Unknown = -1,
312    /// The requested offset is outside the range of offsets
313    /// maintained by the server for the given topic/partition
314    OffsetOutOfRange = 1,
315    /// This indicates that a message contents does not match its CRC
316    CorruptMessage = 2,
317    /// This request is for a topic or partition that does not exist
318    /// on this broker.
319    UnknownTopicOrPartition = 3,
320    /// The message has a negative size
321    InvalidMessageSize = 4,
322    /// This error is thrown if we are in the middle of a leadership
323    /// election and there is currently no leader for this partition
324    /// and hence it is unavailable for writes.
325    LeaderNotAvailable = 5,
326    /// This error is thrown if the client attempts to send messages
327    /// to a replica that is not the leader for some partition. It
328    /// indicates that the clients metadata is out of date.
329    NotLeaderForPartition = 6,
330    /// This error is thrown if the request exceeds the user-specified
331    /// time limit in the request.
332    RequestTimedOut = 7,
333    /// This is not a client facing error and is used mostly by tools
334    /// when a broker is not alive.
335    BrokerNotAvailable = 8,
336    /// If replica is expected on a broker, but is not (this can be
337    /// safely ignored).
338    ReplicaNotAvailable = 9,
339    /// The server has a configurable maximum message size to avoid
340    /// unbounded memory allocation. This error is thrown if the
341    /// client attempt to produce a message larger than this maximum.
342    MessageSizeTooLarge = 10,
343    /// Internal error code for broker-to-broker communication.
344    StaleControllerEpoch = 11,
345    /// If you specify a string larger than configured maximum for
346    /// offset metadata
347    OffsetMetadataTooLarge = 12,
348    /// The server disconnected before a response was received.
349    NetworkException = 13,
350    /// The broker returns this error code for an offset fetch request
351    /// if it is still loading offsets (after a leader change for that
352    /// offsets topic partition), or in response to group membership
353    /// requests (such as heartbeats) when group metadata is being
354    /// loaded by the coordinator.
355    GroupLoadInProgress = 14,
356    /// The broker returns this error code for group coordinator
357    /// requests, offset commits, and most group management requests
358    /// if the offsets topic has not yet been created, or if the group
359    /// coordinator is not active.
360    GroupCoordinatorNotAvailable = 15,
361    /// The broker returns this error code if it receives an offset
362    /// fetch or commit request for a group that it is not a
363    /// coordinator for.
364    NotCoordinatorForGroup = 16,
365    /// For a request which attempts to access an invalid topic
366    /// (e.g. one which has an illegal name), or if an attempt is made
367    /// to write to an internal topic (such as the consumer offsets
368    /// topic).
369    InvalidTopic = 17,
370    /// If a message batch in a produce request exceeds the maximum
371    /// configured segment size.
372    RecordListTooLarge = 18,
373    /// Returned from a produce request when the number of in-sync
374    /// replicas is lower than the configured minimum and requiredAcks is
375    /// -1.
376    NotEnoughReplicas = 19,
377    /// Returned from a produce request when the message was written
378    /// to the log, but with fewer in-sync replicas than required.
379    NotEnoughReplicasAfterAppend = 20,
380    /// Returned from a produce request if the requested requiredAcks is
381    /// invalid (anything other than -1, 1, or 0).
382    InvalidRequiredAcks = 21,
383    /// Returned from group membership requests (such as heartbeats) when
384    /// the generation id provided in the request is not the current
385    /// generation.
386    IllegalGeneration = 22,
387    /// Returned in join group when the member provides a protocol type or
388    /// set of protocols which is not compatible with the current group.
389    InconsistentGroupProtocol = 23,
390    /// Returned in join group when the groupId is empty or null.
391    InvalidGroupId = 24,
392    /// Returned from group requests (offset commits/fetches, heartbeats,
393    /// etc) when the memberId is not in the current generation.
394    UnknownMemberId = 25,
395    /// Return in join group when the requested session timeout is outside
396    /// of the allowed range on the broker
397    InvalidSessionTimeout = 26,
398    /// Returned in heartbeat requests when the coordinator has begun
399    /// rebalancing the group. This indicates to the client that it
400    /// should rejoin the group.
401    RebalanceInProgress = 27,
402    /// This error indicates that an offset commit was rejected because of
403    /// oversize metadata.
404    InvalidCommitOffsetSize = 28,
405    /// Returned by the broker when the client is not authorized to access
406    /// the requested topic.
407    TopicAuthorizationFailed = 29,
408    /// Returned by the broker when the client is not authorized to access
409    /// a particular groupId.
410    GroupAuthorizationFailed = 30,
411    /// Returned by the broker when the client is not authorized to use an
412    /// inter-broker or administrative API.
413    ClusterAuthorizationFailed = 31,
414    /// The timestamp of the message is out of acceptable range.
415    InvalidTimestamp = 32,
416    /// The broker does not support the requested SASL mechanism.
417    UnsupportedSaslMechanism = 33,
418    /// Request is not valid given the current SASL state.
419    IllegalSaslState = 34,
420    /// The version of API is not supported.
421    UnsupportedVersion = 35,
422    // CAUTION! When adding to this list, KafkaCode::from_protocol must be updated. If there's a better way, please open an issue for it!
423}
424
425impl KafkaCode {
426    /// Returns `true` if this error code represents a transient, retriable condition.
427    #[must_use]
428    pub fn is_retriable(self) -> bool {
429        matches!(
430            self,
431            KafkaCode::LeaderNotAvailable
432                | KafkaCode::NotLeaderForPartition
433                | KafkaCode::GroupLoadInProgress
434                | KafkaCode::GroupCoordinatorNotAvailable
435                | KafkaCode::NotCoordinatorForGroup
436                | KafkaCode::NetworkException
437                | KafkaCode::RequestTimedOut
438                | KafkaCode::RebalanceInProgress
439        )
440    }
441
442    pub(crate) fn from_protocol(n: i16) -> Option<KafkaCode> {
443        if n == 0 {
444            return None;
445        }
446        Some(match n {
447            -1 => KafkaCode::Unknown,
448            1 => KafkaCode::OffsetOutOfRange,
449            2 => KafkaCode::CorruptMessage,
450            3 => KafkaCode::UnknownTopicOrPartition,
451            4 => KafkaCode::InvalidMessageSize,
452            5 => KafkaCode::LeaderNotAvailable,
453            6 => KafkaCode::NotLeaderForPartition,
454            7 => KafkaCode::RequestTimedOut,
455            8 => KafkaCode::BrokerNotAvailable,
456            9 => KafkaCode::ReplicaNotAvailable,
457            10 => KafkaCode::MessageSizeTooLarge,
458            11 => KafkaCode::StaleControllerEpoch,
459            12 => KafkaCode::OffsetMetadataTooLarge,
460            13 => KafkaCode::NetworkException,
461            14 => KafkaCode::GroupLoadInProgress,
462            15 => KafkaCode::GroupCoordinatorNotAvailable,
463            16 => KafkaCode::NotCoordinatorForGroup,
464            17 => KafkaCode::InvalidTopic,
465            18 => KafkaCode::RecordListTooLarge,
466            19 => KafkaCode::NotEnoughReplicas,
467            20 => KafkaCode::NotEnoughReplicasAfterAppend,
468            21 => KafkaCode::InvalidRequiredAcks,
469            22 => KafkaCode::IllegalGeneration,
470            23 => KafkaCode::InconsistentGroupProtocol,
471            24 => KafkaCode::InvalidGroupId,
472            25 => KafkaCode::UnknownMemberId,
473            26 => KafkaCode::InvalidSessionTimeout,
474            27 => KafkaCode::RebalanceInProgress,
475            28 => KafkaCode::InvalidCommitOffsetSize,
476            29 => KafkaCode::TopicAuthorizationFailed,
477            30 => KafkaCode::GroupAuthorizationFailed,
478            31 => KafkaCode::ClusterAuthorizationFailed,
479            32 => KafkaCode::InvalidTimestamp,
480            33 => KafkaCode::UnsupportedSaslMechanism,
481            34 => KafkaCode::IllegalSaslState,
482            35 => KafkaCode::UnsupportedVersion,
483            _ => KafkaCode::Unknown,
484        })
485    }
486}
487
488#[cfg(test)]
489mod tests {
490    use super::*;
491    use std::io::ErrorKind;
492
493    #[test]
494    fn test_kafka_code_from_i32() {
495        assert!(KafkaCode::from_protocol(0).is_none());
496        assert_eq!(
497            KafkaCode::from_protocol(1),
498            Some(KafkaCode::OffsetOutOfRange)
499        );
500        assert_eq!(
501            KafkaCode::from_protocol(6),
502            Some(KafkaCode::NotLeaderForPartition)
503        );
504        assert_eq!(KafkaCode::from_protocol(999), Some(KafkaCode::Unknown));
505    }
506
507    #[test]
508    fn test_error_display() {
509        let msg = Error::Kafka(KafkaCode::LeaderNotAvailable).to_string();
510        assert!(msg.contains("LeaderNotAvailable"), "got: {msg}");
511
512        let msg = Error::no_host_reachable().to_string();
513        assert!(msg.contains("host"), "got: {msg}");
514
515        let msg = Error::unexpected_eof().to_string();
516        assert!(msg.contains("end of data"), "got: {msg}");
517    }
518
519    #[test]
520    fn test_error_with_broker_context() {
521        let err = Error::Kafka(KafkaCode::NotLeaderForPartition)
522            .with_broker_context("broker1:9092", "Produce");
523        let msg = err.to_string();
524        assert!(msg.contains("broker1:9092"), "got: {msg}");
525        assert!(msg.contains("Produce"), "got: {msg}");
526    }
527
528    #[test]
529    fn test_error_io_conversion() {
530        let io_err = io::Error::new(ErrorKind::ConnectionRefused, "refused");
531        let err: Error = io_err.into();
532        assert!(matches!(err, Error::Connection(ConnectionError::Io(_))));
533    }
534
535    #[test]
536    fn test_topic_partition_error() {
537        let err = Error::TopicPartitionError {
538            topic_name: "test-topic".into(),
539            partition_id: 0,
540            error_code: KafkaCode::LeaderNotAvailable,
541        };
542        let msg = err.to_string();
543        assert!(msg.contains("test-topic"), "got: {msg}");
544    }
545
546    #[test]
547    fn test_is_retriable_kafka_errors() {
548        assert!(Error::Kafka(KafkaCode::LeaderNotAvailable).is_retriable());
549        assert!(Error::Kafka(KafkaCode::NotLeaderForPartition).is_retriable());
550        assert!(Error::Kafka(KafkaCode::GroupLoadInProgress).is_retriable());
551        assert!(Error::Kafka(KafkaCode::GroupCoordinatorNotAvailable).is_retriable());
552        assert!(Error::Kafka(KafkaCode::NotCoordinatorForGroup).is_retriable());
553        assert!(Error::Kafka(KafkaCode::NetworkException).is_retriable());
554        assert!(Error::Kafka(KafkaCode::RequestTimedOut).is_retriable());
555        assert!(Error::Kafka(KafkaCode::RebalanceInProgress).is_retriable());
556    }
557
558    #[test]
559    fn test_is_not_retriable_kafka_errors() {
560        assert!(!Error::Kafka(KafkaCode::UnknownTopicOrPartition).is_retriable());
561        assert!(!Error::Kafka(KafkaCode::MessageSizeTooLarge).is_retriable());
562        assert!(!Error::Kafka(KafkaCode::Unknown).is_retriable());
563    }
564
565    #[test]
566    fn test_is_retriable_connection_errors() {
567        assert!(
568            Error::Connection(ConnectionError::Io(io::Error::new(
569                ErrorKind::ConnectionReset,
570                "reset"
571            )))
572            .is_retriable()
573        );
574        assert!(Error::Connection(ConnectionError::Timeout(Duration::from_secs(5))).is_retriable());
575        assert!(Error::Connection(ConnectionError::NoHostReachable).is_retriable());
576        #[cfg(feature = "security")]
577        assert!(!Error::Connection(ConnectionError::Tls("bad cert".into())).is_retriable());
578    }
579
580    #[test]
581    fn test_is_retriable_topic_partition_error() {
582        let err = Error::TopicPartitionError {
583            topic_name: "t".into(),
584            partition_id: 0,
585            error_code: KafkaCode::LeaderNotAvailable,
586        };
587        assert!(err.is_retriable());
588
589        let err = Error::TopicPartitionError {
590            topic_name: "t".into(),
591            partition_id: 0,
592            error_code: KafkaCode::UnknownTopicOrPartition,
593        };
594        assert!(!err.is_retriable());
595    }
596
597    #[test]
598    fn test_is_retriable_broker_request_error() {
599        let inner = Error::Kafka(KafkaCode::NotLeaderForPartition);
600        let err = inner.with_broker_context("broker:9092", "Produce");
601        assert!(err.is_retriable());
602    }
603
604    #[test]
605    fn test_is_retriable_non_retriable_errors() {
606        assert!(!Error::codec().is_retriable());
607        assert!(!Error::no_topics_assigned().is_retriable());
608        assert!(!Error::Config("bad".into()).is_retriable());
609    }
610
611    #[test]
612    fn test_kafka_code_is_retriable() {
613        assert!(KafkaCode::LeaderNotAvailable.is_retriable());
614        assert!(KafkaCode::NetworkException.is_retriable());
615        assert!(!KafkaCode::UnknownTopicOrPartition.is_retriable());
616        assert!(!KafkaCode::OffsetOutOfRange.is_retriable());
617    }
618
619    #[test]
620    fn test_error_category_queries() {
621        assert!(
622            Error::Connection(ConnectionError::Io(io::Error::other("err"))).is_connection_error()
623        );
624        assert!(!Error::codec().is_connection_error());
625        assert!(Error::codec().is_protocol_error());
626        assert!(Error::no_topics_assigned().is_consumer_error());
627    }
628
629    #[test]
630    fn test_protocol_error_variants() {
631        assert!(
632            Error::Protocol(ProtocolError::UnsupportedVersion)
633                .to_string()
634                .contains("version")
635        );
636        assert!(
637            Error::Protocol(ProtocolError::UnsupportedCompression)
638                .to_string()
639                .contains("compression")
640        );
641        assert!(
642            Error::Protocol(ProtocolError::Codec)
643                .to_string()
644                .contains("Encoding")
645        );
646        assert!(
647            Error::Protocol(ProtocolError::StringDecode)
648                .to_string()
649                .contains("String")
650        );
651        assert!(
652            Error::Protocol(ProtocolError::InvalidDuration)
653                .to_string()
654                .contains("duration")
655        );
656    }
657
658    #[test]
659    fn test_consumer_error_variants() {
660        assert!(
661            Error::Consumer(ConsumerError::NoTopicsAssigned)
662                .to_string()
663                .contains("topic")
664        );
665        assert!(
666            Error::Consumer(ConsumerError::UnsetOffsetStorage)
667                .to_string()
668                .contains("Offset")
669        );
670        assert!(
671            Error::Consumer(ConsumerError::UnsetGroupId)
672                .to_string()
673                .contains("Group")
674        );
675    }
676}
677
678#[cfg(test)]
679mod proptests {
680    use super::*;
681    use proptest::prelude::*;
682
683    proptest! {
684        /// KafkaCode::from_protocol never panics for any i16 input
685        #[test]
686        fn kafka_code_from_any_i16(code in proptest::num::i16::ANY) {
687            let _ = KafkaCode::from_protocol(code);
688        }
689
690        /// Out-of-range error codes map to Unknown
691        #[test]
692        fn kafka_code_unknown_for_out_of_range(code in 36i16..=1000i16) {
693            assert_eq!(KafkaCode::from_protocol(code), Some(KafkaCode::Unknown));
694        }
695
696        /// Error::with_broker_context always produces a string containing broker
697        #[test]
698        fn broker_context_chainable(broker in "[a-z]{1,20}") {
699            let err = Error::no_host_reachable();
700            let wrapped = err.with_broker_context(broker.clone(), "Produce");
701            let msg = wrapped.to_string();
702            assert!(msg.contains(&broker));
703            assert!(msg.contains("Produce"));
704        }
705
706        /// is_retriable is safe to call on any error variant
707        #[test]
708        fn is_retriable_safe(code in proptest::num::i16::ANY) {
709            if let Some(kafka_code) = KafkaCode::from_protocol(code) {
710                let err = Error::Kafka(kafka_code);
711                let _ = err.is_retriable();
712            }
713        }
714
715        /// Error display never panics
716        #[test]
717        fn error_display_safe(code in proptest::num::i16::ANY) {
718            if let Some(kafka_code) = KafkaCode::from_protocol(code) {
719                let err = Error::Kafka(kafka_code);
720                let msg = err.to_string();
721                assert!(!msg.is_empty());
722            }
723        }
724    }
725}