Skip to main content

kafkit_client/core/
error.rs

1//! Error types returned by Kafkit.
2//!
3//! Most applications can use [`Result`] directly and match on [`crate::Error`] only
4//! when they need to handle a specific category.
5//!
6//! ```no_run
7//! # async fn example() -> kafkit_client::Result<()> {
8//! use kafkit_client::{Error, KafkaClient};
9//!
10//! let admin = KafkaClient::new("localhost:9092").admin().connect().await?;
11//! if let Err(Error::Admin(error)) = admin.create_topics(Vec::<kafkit_client::NewTopic>::new()).await {
12//!     eprintln!("admin request failed: {error}");
13//! }
14//! # Ok(())
15//! # }
16//! ```
17//!
18use thiserror::Error;
19use tokio::task::JoinError;
20
21use kafka_protocol::error::ResponseError;
22
23/// Result type used by the client APIs.
24pub type Result<T> = std::result::Result<T, Error>;
25
26/// Operational classification for a client error.
27#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
28pub struct ErrorClassification {
29    /// The same operation may succeed if retried after backoff or metadata refresh.
30    pub retriable: bool,
31    /// The client instance, producer id, or transaction is no longer usable.
32    pub fatal: bool,
33    /// A transactional producer must abort the current transaction before more work.
34    pub transaction_abort_required: bool,
35}
36
37#[derive(Debug, Error)]
38/// Top-level error returned by the crate.
39pub enum Error {
40    /// Operation was cancelled before it completed.
41    #[error("operation cancelled")]
42    Cancelled,
43    /// Admin operation failed.
44    #[error(transparent)]
45    Admin(#[from] AdminError),
46    /// Consumer operation failed.
47    #[error(transparent)]
48    Consumer(#[from] ConsumerError),
49    /// Producer operation failed.
50    #[error(transparent)]
51    Producer(#[from] ProducerError),
52    /// Consumer group metadata was invalid.
53    #[error(transparent)]
54    ConsumerGroupMetadata(#[from] ConsumerGroupMetadataError),
55    /// Transaction state prevented the operation.
56    #[error(transparent)]
57    TransactionState(#[from] TransactionStateError),
58    /// A Kafka broker returned a protocol error code.
59    #[error(transparent)]
60    Broker(#[from] BrokerError),
61    /// Public input validation failed before a broker request was sent.
62    #[error(transparent)]
63    Validation(#[from] ValidationError),
64    /// The client could not decode or validate a Kafka protocol response.
65    #[error(transparent)]
66    Protocol(#[from] ProtocolError),
67    /// Internal error from protocol handling, IO, or validation.
68    #[error(transparent)]
69    Internal(#[from] anyhow::Error),
70}
71
72impl Error {
73    /// Classifies this error for retry and transaction recovery decisions.
74    pub fn classification(&self) -> ErrorClassification {
75        match self {
76            Self::Cancelled => ErrorClassification::default(),
77            Self::Admin(error) => error.classification(),
78            Self::Consumer(error) => error.classification(),
79            Self::Producer(error) => error.classification(),
80            Self::ConsumerGroupMetadata(_) => ErrorClassification::default(),
81            Self::TransactionState(error) => error.classification(),
82            Self::Broker(error) => error.classification(),
83            Self::Validation(error) => error.classification(),
84            Self::Protocol(error) => error.classification(),
85            Self::Internal(_) => ErrorClassification::default(),
86        }
87    }
88
89    /// Returns whether retrying the same operation may succeed.
90    pub fn is_retriable(&self) -> bool {
91        self.classification().retriable
92    }
93
94    /// Returns whether the client or transaction is permanently unusable.
95    pub fn is_fatal(&self) -> bool {
96        self.classification().fatal
97    }
98
99    /// Returns whether the current transaction must be aborted before reuse.
100    pub fn transaction_abort_required(&self) -> bool {
101        self.classification().transaction_abort_required
102    }
103}
104
105#[derive(Debug, Clone, Error)]
106/// Kafka broker error returned in a response payload.
107pub enum BrokerError {
108    /// A broker rejected an operation with a Kafka error code.
109    #[error("{operation} failed with broker error {name} ({code}){resource}")]
110    Response {
111        /// Client operation that received the error.
112        operation: &'static str,
113        /// Optional topic, partition, group, or coordinator context.
114        resource: String,
115        /// Numeric Kafka protocol error code.
116        code: i16,
117        /// Kafka protocol error name.
118        name: String,
119        /// Whether Kafka marks the error retriable.
120        retriable: bool,
121        /// Whether this error makes the client or transaction unusable.
122        fatal: bool,
123        /// Whether a transactional producer must abort before reuse.
124        transaction_abort_required: bool,
125    },
126}
127
128impl BrokerError {
129    /// Builds a broker-response error from a Kafka protocol error.
130    pub fn response(
131        operation: &'static str,
132        resource: impl Into<Option<String>>,
133        error: ResponseError,
134    ) -> Self {
135        let resource = resource
136            .into()
137            .map(|value| format!(" for {value}"))
138            .unwrap_or_default();
139        Self::Response {
140            operation,
141            resource,
142            code: error.code(),
143            name: error.to_string(),
144            retriable: error.is_retriable(),
145            fatal: false,
146            transaction_abort_required: false,
147        }
148    }
149
150    /// Marks this broker error as fatal to the current client or transaction.
151    pub fn fatal(mut self) -> Self {
152        let Self::Response { fatal, .. } = &mut self;
153        *fatal = true;
154        self
155    }
156
157    /// Marks this broker error as requiring transaction abort before reuse.
158    pub fn transaction_abort_required(mut self) -> Self {
159        let Self::Response {
160            transaction_abort_required,
161            ..
162        } = &mut self;
163        *transaction_abort_required = true;
164        self
165    }
166
167    fn classification(&self) -> ErrorClassification {
168        match self {
169            Self::Response {
170                retriable,
171                fatal,
172                transaction_abort_required,
173                ..
174            } => ErrorClassification {
175                retriable: *retriable,
176                fatal: *fatal,
177                transaction_abort_required: *transaction_abort_required,
178            },
179        }
180    }
181}
182
183#[derive(Debug, Clone, Error)]
184/// Errors raised while validating public API inputs.
185pub enum ValidationError {
186    /// A topic name was empty after trimming.
187    #[error("{operation} requires a non-empty topic name")]
188    EmptyTopicName {
189        /// Operation validating the topic.
190        operation: &'static str,
191    },
192    /// A message did not provide a topic and no default topic was configured.
193    #[error("{operation} requires a topic")]
194    MissingTopic {
195        /// Operation validating the message.
196        operation: &'static str,
197    },
198    /// A partition was negative.
199    #[error("{operation} requires a non-negative partition: {partition}")]
200    NegativePartition {
201        /// Operation validating the partition.
202        operation: &'static str,
203        /// Requested partition.
204        partition: i32,
205    },
206    /// A resource name was empty after trimming.
207    #[error("{resource} names must be non-empty")]
208    EmptyResourceName {
209        /// Resource type being validated.
210        resource: &'static str,
211    },
212    /// An ACL resource type cannot be used in a concrete ACL binding.
213    #[error("ACL resource_type must not be {resource_type}")]
214    InvalidAclResourceType {
215        /// Invalid resource type.
216        resource_type: String,
217    },
218    /// An ACL pattern type cannot be used in a concrete ACL binding.
219    #[error("ACL pattern_type must not be {pattern_type}")]
220    InvalidAclPatternType {
221        /// Invalid pattern type.
222        pattern_type: String,
223    },
224    /// An ACL principal was empty after trimming.
225    #[error("ACL principal must be non-empty")]
226    EmptyAclPrincipal,
227    /// An ACL host was empty after trimming.
228    #[error("ACL host must be non-empty")]
229    EmptyAclHost,
230    /// An ACL operation cannot be used in a concrete ACL binding.
231    #[error("ACL operation must not be {operation}")]
232    InvalidAclOperation {
233        /// Invalid ACL operation.
234        operation: String,
235    },
236    /// An ACL permission type cannot be used in a concrete ACL binding.
237    #[error("ACL permission_type must not be {permission_type}")]
238    InvalidAclPermissionType {
239        /// Invalid permission type.
240        permission_type: String,
241    },
242    /// A consumer group id was empty after trimming.
243    #[error("consumer group id must be non-empty")]
244    EmptyConsumerGroupId,
245    /// A finalized feature name was empty after trimming.
246    #[error("feature names must be non-empty")]
247    EmptyFeatureName,
248}
249
250impl ValidationError {
251    fn classification(&self) -> ErrorClassification {
252        ErrorClassification::default()
253    }
254}
255
256#[derive(Debug, Clone, Error)]
257/// Kafka protocol-level errors raised before a typed broker error is available.
258pub enum ProtocolError {
259    /// A response omitted data the client needs to complete the operation.
260    #[error("{operation} response was missing required data: {detail}")]
261    MissingResponseData {
262        /// Client operation being decoded.
263        operation: &'static str,
264        /// Missing or malformed response detail.
265        detail: String,
266    },
267    /// A broker advertised an API version lower than required.
268    #[error("{api} v{min_version}+ is required, broker only supports v{broker_version}")]
269    UnsupportedApiVersion {
270        /// API name.
271        api: &'static str,
272        /// Minimum required API version.
273        min_version: i16,
274        /// Version advertised by the broker.
275        broker_version: i16,
276    },
277}
278
279impl ProtocolError {
280    fn classification(&self) -> ErrorClassification {
281        match self {
282            Self::MissingResponseData { .. } | Self::UnsupportedApiVersion { .. } => {
283                ErrorClassification {
284                    fatal: true,
285                    ..ErrorClassification::default()
286                }
287            }
288        }
289    }
290}
291
292#[derive(Debug, Error)]
293/// Errors raised before or during admin operations.
294pub enum AdminError {
295    /// A topic name was empty.
296    #[error("topic names must be non-empty")]
297    EmptyTopicName,
298    /// A topic was requested with an invalid partition count.
299    #[error("topic partition count must be positive: {partitions}")]
300    InvalidPartitionCount {
301        /// Requested partition count.
302        partitions: i32,
303    },
304    /// A topic was requested with an invalid replication factor.
305    #[error("topic replication factor must be positive: {replication_factor}")]
306    InvalidReplicationFactor {
307        /// Requested replication factor.
308        replication_factor: i16,
309    },
310}
311
312impl AdminError {
313    fn classification(&self) -> ErrorClassification {
314        ErrorClassification::default()
315    }
316}
317
318#[derive(Debug, Error)]
319/// Errors raised by the consumer API.
320pub enum ConsumerError {
321    /// The runtime stopped before the operation was sent.
322    #[error("consumer runtime stopped before {operation}")]
323    ThreadStoppedBefore {
324        /// Operation that was waiting to be sent.
325        operation: &'static str,
326    },
327    /// The runtime stopped while the operation was waiting for a reply.
328    #[error("consumer runtime stopped during {operation}")]
329    ThreadStoppedDuring {
330        /// Operation that was in flight.
331        operation: &'static str,
332    },
333    /// The background consumer task could not be joined.
334    #[error("failed to join consumer runtime: {0}")]
335    Join(#[source] JoinError),
336    /// A subscription was requested without any topic names.
337    #[error("subscribe requires at least one non-empty topic name")]
338    EmptySubscription,
339    /// A regex subscription was requested with an empty pattern.
340    #[error("subscribe_pattern requires a non-empty pattern")]
341    EmptySubscriptionPattern,
342    /// A regex subscription could not be compiled locally.
343    #[error("subscribe_regex requires a valid regular expression: {message}")]
344    InvalidSubscriptionRegex {
345        /// Regex parser error message.
346        message: String,
347    },
348    /// Another poll call was already active.
349    #[error("concurrent poll calls are not supported by this simple consumer")]
350    ConcurrentPoll,
351    /// A blocking poll was interrupted with `wakeup`.
352    #[error("poll was interrupted by wakeup()")]
353    Wakeup,
354    /// A seek offset was negative.
355    #[error("seek offset must be non-negative: {offset}")]
356    InvalidSeekOffset {
357        /// Requested offset.
358        offset: i64,
359    },
360    /// A topic partition had an empty topic name.
361    #[error("topic partition names must be non-empty")]
362    EmptyTopicPartition,
363    /// The operation needs a partition currently assigned to this consumer.
364    #[error("{operation} requires an assigned partition, but {topic}:{partition} is not assigned")]
365    PartitionNotAssigned {
366        /// Operation that needed the assignment.
367        operation: &'static str,
368        /// Topic name.
369        topic: String,
370        /// Partition number.
371        partition: i32,
372    },
373    /// The broker rejected the subscription regex.
374    #[error("broker rejected the subscription regex: {message}")]
375    InvalidRegularExpression {
376        /// Broker error message.
377        message: String,
378    },
379    /// The broker rejected the configured server-side assignor.
380    #[error("broker rejected the configured server assignor '{assignor}': {message}")]
381    UnsupportedAssignor {
382        /// Requested assignor name.
383        assignor: String,
384        /// Broker error message.
385        message: String,
386    },
387    /// A static member id is still owned by another consumer instance.
388    #[error("static member '{instance_id}' is still owned by another consumer: {message}")]
389    UnreleasedInstanceId {
390        /// Static member instance id.
391        instance_id: String,
392        /// Broker error message.
393        message: String,
394    },
395    /// The broker fenced this static member instance.
396    #[error("static member '{instance_id}' was fenced: {message}")]
397    FencedInstanceId {
398        /// Static member instance id.
399        instance_id: String,
400        /// Broker error message.
401        message: String,
402    },
403    /// The consumer runtime is already shutting down.
404    #[error("consumer is shutting down")]
405    ShuttingDown,
406    /// The consumer runtime is in a fatal state.
407    #[error("consumer runtime is fatal: {message}")]
408    Fatal {
409        /// Fatal state reason.
410        message: String,
411    },
412}
413
414impl ConsumerError {
415    fn classification(&self) -> ErrorClassification {
416        match self {
417            Self::ThreadStoppedBefore { .. } | Self::ThreadStoppedDuring { .. } | Self::Join(_) => {
418                ErrorClassification {
419                    fatal: true,
420                    ..ErrorClassification::default()
421                }
422            }
423            Self::UnreleasedInstanceId { .. } => ErrorClassification {
424                fatal: true,
425                ..ErrorClassification::default()
426            },
427            Self::FencedInstanceId { .. } | Self::Fatal { .. } => ErrorClassification {
428                fatal: true,
429                ..ErrorClassification::default()
430            },
431            Self::EmptySubscription
432            | Self::EmptySubscriptionPattern
433            | Self::InvalidSubscriptionRegex { .. }
434            | Self::ConcurrentPoll
435            | Self::Wakeup
436            | Self::InvalidSeekOffset { .. }
437            | Self::EmptyTopicPartition
438            | Self::PartitionNotAssigned { .. }
439            | Self::InvalidRegularExpression { .. }
440            | Self::UnsupportedAssignor { .. }
441            | Self::ShuttingDown => ErrorClassification::default(),
442        }
443    }
444}
445
446#[derive(Debug, Error)]
447/// Errors raised by the producer API.
448pub enum ProducerError {
449    /// Idempotence was enabled without `acks=-1`.
450    #[error("idempotent producers require acks=-1")]
451    IdempotenceRequiresAcksAll,
452    /// Idempotence was enabled without retries.
453    #[error("idempotent producers require max_retries > 0")]
454    IdempotenceRequiresRetries,
455    /// Transactions require all replicas to acknowledge records.
456    #[error(
457        "transactional producers require acks=-1 so the broker can commit the full transaction"
458    )]
459    TransactionalRequiresAcksAll,
460    /// The broker did not report the transaction feature level.
461    #[error("broker did not advertise finalized feature level for transaction.version")]
462    MissingTransactionVersionFeature,
463    /// The broker transaction feature level is too old.
464    #[error(
465        "broker finalized transaction.version={level}, but transaction v2 requires transaction.version>=2"
466    )]
467    UnsupportedTransactionVersion {
468        /// Broker feature level.
469        level: i16,
470    },
471    /// A broker API version is older than this client needs.
472    #[error("{api} v{min_version}+ is required, broker only supports v{broker_version}")]
473    UnsupportedApiVersion {
474        /// API name.
475        api: &'static str,
476        /// Minimum required API version.
477        min_version: i16,
478        /// Version advertised by the broker.
479        broker_version: i16,
480    },
481    /// The operation requires a configured transactional id.
482    #[error("producer is not configured with a transactional_id")]
483    NotTransactional,
484    /// Metadata reported no partitions for the topic.
485    #[error("topic '{topic}' has no partitions in metadata")]
486    TopicHasNoPartitions {
487        /// Topic name.
488        topic: String,
489    },
490    /// Metadata did not include partition data for a topic.
491    #[error("missing partition metadata for topic '{topic}'")]
492    MissingPartitionMetadata {
493        /// Topic name.
494        topic: String,
495    },
496    /// Metadata was not available before `max_block` elapsed.
497    #[error("metadata for topic '{topic}' was not available within max_block={max_block_ms}ms")]
498    MetadataTimeout {
499        /// Topic name.
500        topic: String,
501        /// Configured max block in milliseconds.
502        max_block_ms: u128,
503    },
504    /// The transaction manager unexpectedly disappeared.
505    #[error("transaction manager is unavailable during {operation}")]
506    TransactionManagerUnavailable {
507        /// Operation that needed the transaction manager.
508        operation: &'static str,
509    },
510    /// A transactional producer id was needed but had not been initialized.
511    #[error("transactional producer id is not initialized during {operation}")]
512    TransactionalProducerNotInitialized {
513        /// Operation that needed the producer id.
514        operation: &'static str,
515    },
516    /// A coordinator connection was needed but had not been opened.
517    #[error("transaction coordinator connection is missing")]
518    TransactionCoordinatorConnectionMissing,
519    /// A producer coordinator request exhausted all attempts.
520    #[error("{operation} exhausted {attempts} attempts{resource}")]
521    AttemptsExhausted {
522        /// Operation that exhausted retries.
523        operation: &'static str,
524        /// Optional transactional id, group id, topic, or partition context.
525        resource: String,
526        /// Number of attempts made.
527        attempts: usize,
528    },
529    /// A produce batch failed before the broker accepted it.
530    #[error("produce batch failed for {topic}:{partition}: {message}")]
531    BatchFailed {
532        /// Topic name.
533        topic: String,
534        /// Partition number.
535        partition: i32,
536        /// Failure detail.
537        message: String,
538    },
539    /// A record is too large for the configured producer limits.
540    #[error("producer record is {size} bytes, larger than {limit_name}={limit}")]
541    RecordTooLarge {
542        /// Estimated serialized record size.
543        size: usize,
544        /// Config limit name.
545        limit_name: &'static str,
546        /// Configured limit.
547        limit: usize,
548    },
549    /// A Produce request is too large for the configured producer limits.
550    #[error("produce request is {size} bytes, larger than max_request_size={limit}")]
551    RequestTooLarge {
552        /// Estimated serialized request size.
553        size: usize,
554        /// Configured limit.
555        limit: usize,
556    },
557    /// The accumulator did not have room for a record before `max_block` elapsed.
558    #[error(
559        "producer buffer has {buffered} of {limit} bytes queued and cannot accept {required} more bytes within max_block={max_block_ms}ms"
560    )]
561    BufferExhausted {
562        /// Current estimated queued bytes.
563        buffered: usize,
564        /// Additional bytes needed for the record.
565        required: usize,
566        /// Configured buffer size.
567        limit: usize,
568        /// Configured max block in milliseconds.
569        max_block_ms: u128,
570    },
571    /// A transactional operation failed and the open transaction must be aborted.
572    #[error("{operation} failed and the transaction must be aborted: {message}")]
573    TransactionAbortRequired {
574        /// Operation that failed.
575        operation: &'static str,
576        /// Failure detail.
577        message: String,
578    },
579    /// A transactional operation failed and the producer is no longer usable.
580    #[error("{operation} failed and the transactional producer is no longer usable: {message}")]
581    TransactionFatal {
582        /// Operation that failed.
583        operation: &'static str,
584        /// Failure detail.
585        message: String,
586    },
587    /// The sender task stopped before the operation was sent.
588    #[error("producer runtime stopped before {operation}")]
589    RuntimeStoppedBefore {
590        /// Operation that was waiting to be sent.
591        operation: &'static str,
592    },
593    /// The sender task stopped while the operation was waiting for a reply.
594    #[error("producer runtime stopped during {operation}")]
595    RuntimeStoppedDuring {
596        /// Operation that was in flight.
597        operation: &'static str,
598    },
599    /// The sender task could not be joined.
600    #[error("failed to join producer runtime: {0}")]
601    Join(#[source] JoinError),
602}
603
604impl ProducerError {
605    fn classification(&self) -> ErrorClassification {
606        match self {
607            Self::MissingTransactionVersionFeature
608            | Self::UnsupportedTransactionVersion { .. }
609            | Self::UnsupportedApiVersion { .. }
610            | Self::TransactionFatal { .. }
611            | Self::TransactionManagerUnavailable { .. }
612            | Self::TransactionalProducerNotInitialized { .. }
613            | Self::TransactionCoordinatorConnectionMissing
614            | Self::RuntimeStoppedBefore { .. }
615            | Self::RuntimeStoppedDuring { .. }
616            | Self::Join(_) => ErrorClassification {
617                fatal: true,
618                ..ErrorClassification::default()
619            },
620            Self::TransactionAbortRequired { .. } => ErrorClassification {
621                transaction_abort_required: true,
622                ..ErrorClassification::default()
623            },
624            Self::AttemptsExhausted { .. } => ErrorClassification {
625                retriable: true,
626                ..ErrorClassification::default()
627            },
628            Self::IdempotenceRequiresAcksAll
629            | Self::IdempotenceRequiresRetries
630            | Self::TransactionalRequiresAcksAll
631            | Self::NotTransactional
632            | Self::TopicHasNoPartitions { .. }
633            | Self::MissingPartitionMetadata { .. }
634            | Self::MetadataTimeout { .. }
635            | Self::BatchFailed { .. }
636            | Self::RecordTooLarge { .. }
637            | Self::RequestTooLarge { .. }
638            | Self::BufferExhausted { .. } => ErrorClassification::default(),
639        }
640    }
641}
642
643#[derive(Debug, Error)]
644/// Errors found while validating consumer group metadata.
645pub enum ConsumerGroupMetadataError {
646    /// The group id was empty.
647    #[error("consumer group metadata requires a non-empty group_id")]
648    EmptyGroupId,
649    /// Active group metadata needs a member id.
650    #[error("consumer group metadata has generation_id > 0 but no member_id")]
651    MissingMemberId,
652}
653
654#[derive(Debug, Error)]
655/// Errors raised by the producer transaction state machine.
656pub enum TransactionStateError {
657    /// A transaction is already open.
658    #[error("transaction already in progress")]
659    AlreadyInProgress,
660    /// The transaction is already completing.
661    #[error("transaction is already completing with {0}")]
662    Completing(&'static str),
663    /// The transaction must be aborted before it can be used again.
664    #[error("transaction must be aborted before reuse: {0}")]
665    MustAbortBeforeReuse(String),
666    /// The transaction is permanently failed.
667    #[error("transaction is unusable: {0}")]
668    Fatal(String),
669    /// Records were sent before a transaction was started.
670    #[error("transactional send requires begin_transaction() before send()")]
671    AppendWithoutBegin,
672    /// Offsets were sent before a transaction was started.
673    #[error("send_offsets_to_transaction requires begin_transaction() first")]
674    SendOffsetsWithoutBegin,
675    /// The transaction needs an abort before more work can happen.
676    #[error("transaction has failed and must be aborted: {0}")]
677    AbortRequired(String),
678    /// The transaction cannot be committed and must be aborted.
679    #[error("transaction cannot be committed and must be aborted: {0}")]
680    CommitRequiresAbort(String),
681    /// There is no active transaction to finish.
682    #[error("no active transaction to complete")]
683    NoActiveTransaction,
684    /// Shutdown found an active transaction.
685    #[error("shutdown stopped with an active transaction still in progress")]
686    ShutdownWithActiveTransaction,
687    /// Shutdown found a transaction completion in progress.
688    #[error("shutdown stopped while transaction was still completing with {0}")]
689    ShutdownWhileCompleting(&'static str),
690    /// Shutdown found a failed transaction that still needs aborting.
691    #[error("shutdown stopped with an un-aborted failed transaction: {0}")]
692    ShutdownAbortRequired(String),
693    /// Shutdown found a fatal transaction error.
694    #[error("transaction failed before shutdown: {0}")]
695    ShutdownFatal(String),
696}
697
698impl TransactionStateError {
699    fn classification(&self) -> ErrorClassification {
700        match self {
701            Self::Fatal(_) | Self::ShutdownFatal(_) => ErrorClassification {
702                fatal: true,
703                ..ErrorClassification::default()
704            },
705            Self::MustAbortBeforeReuse(_)
706            | Self::AbortRequired(_)
707            | Self::CommitRequiresAbort(_)
708            | Self::ShutdownAbortRequired(_) => ErrorClassification {
709                transaction_abort_required: true,
710                ..ErrorClassification::default()
711            },
712            Self::AlreadyInProgress
713            | Self::Completing(_)
714            | Self::AppendWithoutBegin
715            | Self::SendOffsetsWithoutBegin
716            | Self::NoActiveTransaction
717            | Self::ShutdownWithActiveTransaction
718            | Self::ShutdownWhileCompleting(_) => ErrorClassification::default(),
719        }
720    }
721}
722
723#[cfg(test)]
724mod tests {
725    use kafka_protocol::error::ResponseError;
726
727    use super::{
728        BrokerError, ConsumerError, Error, ProducerError, TransactionStateError, ValidationError,
729    };
730
731    #[test]
732    fn broker_errors_preserve_retriable_flag() {
733        let error = Error::Broker(BrokerError::response(
734            "produce",
735            Some("orders:0".to_owned()),
736            ResponseError::NotLeaderOrFollower,
737        ));
738
739        assert!(error.is_retriable());
740        assert!(!error.is_fatal());
741        assert!(!error.transaction_abort_required());
742    }
743
744    #[test]
745    fn broker_errors_can_mark_transaction_abort_required() {
746        let error = Error::Broker(
747            BrokerError::response(
748                "send_offsets_to_transaction",
749                Some("orders-reader".to_owned()),
750                ResponseError::UnknownServerError,
751            )
752            .transaction_abort_required(),
753        );
754
755        assert!(!error.is_fatal());
756        assert!(error.transaction_abort_required());
757    }
758
759    #[test]
760    fn broker_errors_can_mark_fatal() {
761        let error = Error::Broker(
762            BrokerError::response(
763                "end_transaction",
764                None::<String>,
765                ResponseError::ProducerFenced,
766            )
767            .fatal(),
768        );
769
770        assert!(error.is_fatal());
771        assert!(!error.transaction_abort_required());
772    }
773
774    #[test]
775    fn static_member_ownership_errors_are_fatal() {
776        let unreleased = Error::Consumer(ConsumerError::UnreleasedInstanceId {
777            instance_id: "instance-a".to_owned(),
778            message: "still owned".to_owned(),
779        });
780        let fenced = Error::Consumer(ConsumerError::FencedInstanceId {
781            instance_id: "instance-a".to_owned(),
782            message: "fenced".to_owned(),
783        });
784
785        assert!(unreleased.is_fatal());
786        assert!(!unreleased.is_retriable());
787        assert!(fenced.is_fatal());
788        assert!(!fenced.is_retriable());
789    }
790
791    #[test]
792    fn transaction_state_errors_are_classified() {
793        let abort_required = Error::TransactionState(TransactionStateError::AbortRequired(
794            "send failed".to_owned(),
795        ));
796        let fatal = Error::TransactionState(TransactionStateError::Fatal("fenced".to_owned()));
797
798        assert!(abort_required.transaction_abort_required());
799        assert!(!abort_required.is_fatal());
800        assert!(fatal.is_fatal());
801    }
802
803    #[test]
804    fn producer_transaction_runtime_errors_are_classified() {
805        let abort_required = Error::Producer(ProducerError::TransactionAbortRequired {
806            operation: "send_offsets_to_transaction",
807            message: "connection reset".to_owned(),
808        });
809        let fatal = Error::Producer(ProducerError::TransactionFatal {
810            operation: "commit_transaction",
811            message: "connection reset".to_owned(),
812        });
813
814        assert!(abort_required.transaction_abort_required());
815        assert!(!abort_required.is_fatal());
816        assert!(fatal.is_fatal());
817        assert!(!fatal.transaction_abort_required());
818    }
819
820    #[test]
821    fn validation_errors_are_not_retriable_or_fatal() {
822        let error = Error::Validation(ValidationError::MissingTopic {
823            operation: "message conversion",
824        });
825
826        assert!(!error.is_retriable());
827        assert!(!error.is_fatal());
828        assert!(!error.transaction_abort_required());
829    }
830
831    #[test]
832    fn producer_attempt_exhaustion_is_retriable_and_typed() {
833        let error = Error::Producer(ProducerError::AttemptsExhausted {
834            operation: "find_transaction_coordinator",
835            resource: " for transactional_id 'tx-a'".to_owned(),
836            attempts: 11,
837        });
838
839        assert!(error.is_retriable());
840        assert!(!error.is_fatal());
841        assert_eq!(
842            error.to_string(),
843            "find_transaction_coordinator exhausted 11 attempts for transactional_id 'tx-a'"
844        );
845    }
846}