1use thiserror::Error;
19use tokio::task::JoinError;
20
21use kafka_protocol::error::ResponseError;
22
23pub type Result<T> = std::result::Result<T, Error>;
25
26#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
28pub struct ErrorClassification {
29 pub retriable: bool,
31 pub fatal: bool,
33 pub transaction_abort_required: bool,
35}
36
37#[derive(Debug, Error)]
38pub enum Error {
40 #[error("operation cancelled")]
42 Cancelled,
43 #[error(transparent)]
45 Admin(#[from] AdminError),
46 #[error(transparent)]
48 Consumer(#[from] ConsumerError),
49 #[error(transparent)]
51 Producer(#[from] ProducerError),
52 #[error(transparent)]
54 ConsumerGroupMetadata(#[from] ConsumerGroupMetadataError),
55 #[error(transparent)]
57 TransactionState(#[from] TransactionStateError),
58 #[error(transparent)]
60 Broker(#[from] BrokerError),
61 #[error(transparent)]
63 Validation(#[from] ValidationError),
64 #[error(transparent)]
66 Protocol(#[from] ProtocolError),
67 #[error(transparent)]
69 Internal(#[from] anyhow::Error),
70}
71
72impl Error {
73 pub fn classification(&self) -> ErrorClassification {
75 match self {
76 Self::Cancelled => ErrorClassification::default(),
77 Self::Admin(error) => error.classification(),
78 Self::Consumer(error) => error.classification(),
79 Self::Producer(error) => error.classification(),
80 Self::ConsumerGroupMetadata(_) => ErrorClassification::default(),
81 Self::TransactionState(error) => error.classification(),
82 Self::Broker(error) => error.classification(),
83 Self::Validation(error) => error.classification(),
84 Self::Protocol(error) => error.classification(),
85 Self::Internal(_) => ErrorClassification::default(),
86 }
87 }
88
89 pub fn is_retriable(&self) -> bool {
91 self.classification().retriable
92 }
93
94 pub fn is_fatal(&self) -> bool {
96 self.classification().fatal
97 }
98
99 pub fn transaction_abort_required(&self) -> bool {
101 self.classification().transaction_abort_required
102 }
103}
104
105#[derive(Debug, Clone, Error)]
106pub enum BrokerError {
108 #[error("{operation} failed with broker error {name} ({code}){resource}")]
110 Response {
111 operation: &'static str,
113 resource: String,
115 code: i16,
117 name: String,
119 retriable: bool,
121 fatal: bool,
123 transaction_abort_required: bool,
125 },
126}
127
128impl BrokerError {
129 pub fn response(
131 operation: &'static str,
132 resource: impl Into<Option<String>>,
133 error: ResponseError,
134 ) -> Self {
135 let resource = resource
136 .into()
137 .map(|value| format!(" for {value}"))
138 .unwrap_or_default();
139 Self::Response {
140 operation,
141 resource,
142 code: error.code(),
143 name: error.to_string(),
144 retriable: error.is_retriable(),
145 fatal: false,
146 transaction_abort_required: false,
147 }
148 }
149
150 pub fn fatal(mut self) -> Self {
152 let Self::Response { fatal, .. } = &mut self;
153 *fatal = true;
154 self
155 }
156
157 pub fn transaction_abort_required(mut self) -> Self {
159 let Self::Response {
160 transaction_abort_required,
161 ..
162 } = &mut self;
163 *transaction_abort_required = true;
164 self
165 }
166
167 fn classification(&self) -> ErrorClassification {
168 match self {
169 Self::Response {
170 retriable,
171 fatal,
172 transaction_abort_required,
173 ..
174 } => ErrorClassification {
175 retriable: *retriable,
176 fatal: *fatal,
177 transaction_abort_required: *transaction_abort_required,
178 },
179 }
180 }
181}
182
183#[derive(Debug, Clone, Error)]
184pub enum ValidationError {
186 #[error("{operation} requires a non-empty topic name")]
188 EmptyTopicName {
189 operation: &'static str,
191 },
192 #[error("{operation} requires a topic")]
194 MissingTopic {
195 operation: &'static str,
197 },
198 #[error("{operation} requires a non-negative partition: {partition}")]
200 NegativePartition {
201 operation: &'static str,
203 partition: i32,
205 },
206 #[error("{resource} names must be non-empty")]
208 EmptyResourceName {
209 resource: &'static str,
211 },
212 #[error("ACL resource_type must not be {resource_type}")]
214 InvalidAclResourceType {
215 resource_type: String,
217 },
218 #[error("ACL pattern_type must not be {pattern_type}")]
220 InvalidAclPatternType {
221 pattern_type: String,
223 },
224 #[error("ACL principal must be non-empty")]
226 EmptyAclPrincipal,
227 #[error("ACL host must be non-empty")]
229 EmptyAclHost,
230 #[error("ACL operation must not be {operation}")]
232 InvalidAclOperation {
233 operation: String,
235 },
236 #[error("ACL permission_type must not be {permission_type}")]
238 InvalidAclPermissionType {
239 permission_type: String,
241 },
242 #[error("consumer group id must be non-empty")]
244 EmptyConsumerGroupId,
245 #[error("feature names must be non-empty")]
247 EmptyFeatureName,
248}
249
250impl ValidationError {
251 fn classification(&self) -> ErrorClassification {
252 ErrorClassification::default()
253 }
254}
255
256#[derive(Debug, Clone, Error)]
257pub enum ProtocolError {
259 #[error("{operation} response was missing required data: {detail}")]
261 MissingResponseData {
262 operation: &'static str,
264 detail: String,
266 },
267 #[error("{api} v{min_version}+ is required, broker only supports v{broker_version}")]
269 UnsupportedApiVersion {
270 api: &'static str,
272 min_version: i16,
274 broker_version: i16,
276 },
277}
278
279impl ProtocolError {
280 fn classification(&self) -> ErrorClassification {
281 match self {
282 Self::MissingResponseData { .. } | Self::UnsupportedApiVersion { .. } => {
283 ErrorClassification {
284 fatal: true,
285 ..ErrorClassification::default()
286 }
287 }
288 }
289 }
290}
291
292#[derive(Debug, Error)]
293pub enum AdminError {
295 #[error("topic names must be non-empty")]
297 EmptyTopicName,
298 #[error("topic partition count must be positive: {partitions}")]
300 InvalidPartitionCount {
301 partitions: i32,
303 },
304 #[error("topic replication factor must be positive: {replication_factor}")]
306 InvalidReplicationFactor {
307 replication_factor: i16,
309 },
310}
311
312impl AdminError {
313 fn classification(&self) -> ErrorClassification {
314 ErrorClassification::default()
315 }
316}
317
318#[derive(Debug, Error)]
319pub enum ConsumerError {
321 #[error("consumer runtime stopped before {operation}")]
323 ThreadStoppedBefore {
324 operation: &'static str,
326 },
327 #[error("consumer runtime stopped during {operation}")]
329 ThreadStoppedDuring {
330 operation: &'static str,
332 },
333 #[error("failed to join consumer runtime: {0}")]
335 Join(#[source] JoinError),
336 #[error("subscribe requires at least one non-empty topic name")]
338 EmptySubscription,
339 #[error("subscribe_pattern requires a non-empty pattern")]
341 EmptySubscriptionPattern,
342 #[error("subscribe_regex requires a valid regular expression: {message}")]
344 InvalidSubscriptionRegex {
345 message: String,
347 },
348 #[error("concurrent poll calls are not supported by this simple consumer")]
350 ConcurrentPoll,
351 #[error("poll was interrupted by wakeup()")]
353 Wakeup,
354 #[error("seek offset must be non-negative: {offset}")]
356 InvalidSeekOffset {
357 offset: i64,
359 },
360 #[error("topic partition names must be non-empty")]
362 EmptyTopicPartition,
363 #[error("{operation} requires an assigned partition, but {topic}:{partition} is not assigned")]
365 PartitionNotAssigned {
366 operation: &'static str,
368 topic: String,
370 partition: i32,
372 },
373 #[error("broker rejected the subscription regex: {message}")]
375 InvalidRegularExpression {
376 message: String,
378 },
379 #[error("broker rejected the configured server assignor '{assignor}': {message}")]
381 UnsupportedAssignor {
382 assignor: String,
384 message: String,
386 },
387 #[error("static member '{instance_id}' is still owned by another consumer: {message}")]
389 UnreleasedInstanceId {
390 instance_id: String,
392 message: String,
394 },
395 #[error("static member '{instance_id}' was fenced: {message}")]
397 FencedInstanceId {
398 instance_id: String,
400 message: String,
402 },
403 #[error("consumer is shutting down")]
405 ShuttingDown,
406 #[error("consumer runtime is fatal: {message}")]
408 Fatal {
409 message: String,
411 },
412}
413
414impl ConsumerError {
415 fn classification(&self) -> ErrorClassification {
416 match self {
417 Self::ThreadStoppedBefore { .. } | Self::ThreadStoppedDuring { .. } | Self::Join(_) => {
418 ErrorClassification {
419 fatal: true,
420 ..ErrorClassification::default()
421 }
422 }
423 Self::UnreleasedInstanceId { .. } => ErrorClassification {
424 fatal: true,
425 ..ErrorClassification::default()
426 },
427 Self::FencedInstanceId { .. } | Self::Fatal { .. } => ErrorClassification {
428 fatal: true,
429 ..ErrorClassification::default()
430 },
431 Self::EmptySubscription
432 | Self::EmptySubscriptionPattern
433 | Self::InvalidSubscriptionRegex { .. }
434 | Self::ConcurrentPoll
435 | Self::Wakeup
436 | Self::InvalidSeekOffset { .. }
437 | Self::EmptyTopicPartition
438 | Self::PartitionNotAssigned { .. }
439 | Self::InvalidRegularExpression { .. }
440 | Self::UnsupportedAssignor { .. }
441 | Self::ShuttingDown => ErrorClassification::default(),
442 }
443 }
444}
445
446#[derive(Debug, Error)]
447pub enum ProducerError {
449 #[error("idempotent producers require acks=-1")]
451 IdempotenceRequiresAcksAll,
452 #[error("idempotent producers require max_retries > 0")]
454 IdempotenceRequiresRetries,
455 #[error(
457 "transactional producers require acks=-1 so the broker can commit the full transaction"
458 )]
459 TransactionalRequiresAcksAll,
460 #[error("broker did not advertise finalized feature level for transaction.version")]
462 MissingTransactionVersionFeature,
463 #[error(
465 "broker finalized transaction.version={level}, but transaction v2 requires transaction.version>=2"
466 )]
467 UnsupportedTransactionVersion {
468 level: i16,
470 },
471 #[error("{api} v{min_version}+ is required, broker only supports v{broker_version}")]
473 UnsupportedApiVersion {
474 api: &'static str,
476 min_version: i16,
478 broker_version: i16,
480 },
481 #[error("producer is not configured with a transactional_id")]
483 NotTransactional,
484 #[error("topic '{topic}' has no partitions in metadata")]
486 TopicHasNoPartitions {
487 topic: String,
489 },
490 #[error("missing partition metadata for topic '{topic}'")]
492 MissingPartitionMetadata {
493 topic: String,
495 },
496 #[error("metadata for topic '{topic}' was not available within max_block={max_block_ms}ms")]
498 MetadataTimeout {
499 topic: String,
501 max_block_ms: u128,
503 },
504 #[error("transaction manager is unavailable during {operation}")]
506 TransactionManagerUnavailable {
507 operation: &'static str,
509 },
510 #[error("transactional producer id is not initialized during {operation}")]
512 TransactionalProducerNotInitialized {
513 operation: &'static str,
515 },
516 #[error("transaction coordinator connection is missing")]
518 TransactionCoordinatorConnectionMissing,
519 #[error("{operation} exhausted {attempts} attempts{resource}")]
521 AttemptsExhausted {
522 operation: &'static str,
524 resource: String,
526 attempts: usize,
528 },
529 #[error("produce batch failed for {topic}:{partition}: {message}")]
531 BatchFailed {
532 topic: String,
534 partition: i32,
536 message: String,
538 },
539 #[error("producer record is {size} bytes, larger than {limit_name}={limit}")]
541 RecordTooLarge {
542 size: usize,
544 limit_name: &'static str,
546 limit: usize,
548 },
549 #[error("produce request is {size} bytes, larger than max_request_size={limit}")]
551 RequestTooLarge {
552 size: usize,
554 limit: usize,
556 },
557 #[error(
559 "producer buffer has {buffered} of {limit} bytes queued and cannot accept {required} more bytes within max_block={max_block_ms}ms"
560 )]
561 BufferExhausted {
562 buffered: usize,
564 required: usize,
566 limit: usize,
568 max_block_ms: u128,
570 },
571 #[error("{operation} failed and the transaction must be aborted: {message}")]
573 TransactionAbortRequired {
574 operation: &'static str,
576 message: String,
578 },
579 #[error("{operation} failed and the transactional producer is no longer usable: {message}")]
581 TransactionFatal {
582 operation: &'static str,
584 message: String,
586 },
587 #[error("producer runtime stopped before {operation}")]
589 RuntimeStoppedBefore {
590 operation: &'static str,
592 },
593 #[error("producer runtime stopped during {operation}")]
595 RuntimeStoppedDuring {
596 operation: &'static str,
598 },
599 #[error("failed to join producer runtime: {0}")]
601 Join(#[source] JoinError),
602}
603
604impl ProducerError {
605 fn classification(&self) -> ErrorClassification {
606 match self {
607 Self::MissingTransactionVersionFeature
608 | Self::UnsupportedTransactionVersion { .. }
609 | Self::UnsupportedApiVersion { .. }
610 | Self::TransactionFatal { .. }
611 | Self::TransactionManagerUnavailable { .. }
612 | Self::TransactionalProducerNotInitialized { .. }
613 | Self::TransactionCoordinatorConnectionMissing
614 | Self::RuntimeStoppedBefore { .. }
615 | Self::RuntimeStoppedDuring { .. }
616 | Self::Join(_) => ErrorClassification {
617 fatal: true,
618 ..ErrorClassification::default()
619 },
620 Self::TransactionAbortRequired { .. } => ErrorClassification {
621 transaction_abort_required: true,
622 ..ErrorClassification::default()
623 },
624 Self::AttemptsExhausted { .. } => ErrorClassification {
625 retriable: true,
626 ..ErrorClassification::default()
627 },
628 Self::IdempotenceRequiresAcksAll
629 | Self::IdempotenceRequiresRetries
630 | Self::TransactionalRequiresAcksAll
631 | Self::NotTransactional
632 | Self::TopicHasNoPartitions { .. }
633 | Self::MissingPartitionMetadata { .. }
634 | Self::MetadataTimeout { .. }
635 | Self::BatchFailed { .. }
636 | Self::RecordTooLarge { .. }
637 | Self::RequestTooLarge { .. }
638 | Self::BufferExhausted { .. } => ErrorClassification::default(),
639 }
640 }
641}
642
643#[derive(Debug, Error)]
644pub enum ConsumerGroupMetadataError {
646 #[error("consumer group metadata requires a non-empty group_id")]
648 EmptyGroupId,
649 #[error("consumer group metadata has generation_id > 0 but no member_id")]
651 MissingMemberId,
652}
653
654#[derive(Debug, Error)]
655pub enum TransactionStateError {
657 #[error("transaction already in progress")]
659 AlreadyInProgress,
660 #[error("transaction is already completing with {0}")]
662 Completing(&'static str),
663 #[error("transaction must be aborted before reuse: {0}")]
665 MustAbortBeforeReuse(String),
666 #[error("transaction is unusable: {0}")]
668 Fatal(String),
669 #[error("transactional send requires begin_transaction() before send()")]
671 AppendWithoutBegin,
672 #[error("send_offsets_to_transaction requires begin_transaction() first")]
674 SendOffsetsWithoutBegin,
675 #[error("transaction has failed and must be aborted: {0}")]
677 AbortRequired(String),
678 #[error("transaction cannot be committed and must be aborted: {0}")]
680 CommitRequiresAbort(String),
681 #[error("no active transaction to complete")]
683 NoActiveTransaction,
684 #[error("shutdown stopped with an active transaction still in progress")]
686 ShutdownWithActiveTransaction,
687 #[error("shutdown stopped while transaction was still completing with {0}")]
689 ShutdownWhileCompleting(&'static str),
690 #[error("shutdown stopped with an un-aborted failed transaction: {0}")]
692 ShutdownAbortRequired(String),
693 #[error("transaction failed before shutdown: {0}")]
695 ShutdownFatal(String),
696}
697
698impl TransactionStateError {
699 fn classification(&self) -> ErrorClassification {
700 match self {
701 Self::Fatal(_) | Self::ShutdownFatal(_) => ErrorClassification {
702 fatal: true,
703 ..ErrorClassification::default()
704 },
705 Self::MustAbortBeforeReuse(_)
706 | Self::AbortRequired(_)
707 | Self::CommitRequiresAbort(_)
708 | Self::ShutdownAbortRequired(_) => ErrorClassification {
709 transaction_abort_required: true,
710 ..ErrorClassification::default()
711 },
712 Self::AlreadyInProgress
713 | Self::Completing(_)
714 | Self::AppendWithoutBegin
715 | Self::SendOffsetsWithoutBegin
716 | Self::NoActiveTransaction
717 | Self::ShutdownWithActiveTransaction
718 | Self::ShutdownWhileCompleting(_) => ErrorClassification::default(),
719 }
720 }
721}
722
723#[cfg(test)]
724mod tests {
725 use kafka_protocol::error::ResponseError;
726
727 use super::{
728 BrokerError, ConsumerError, Error, ProducerError, TransactionStateError, ValidationError,
729 };
730
731 #[test]
732 fn broker_errors_preserve_retriable_flag() {
733 let error = Error::Broker(BrokerError::response(
734 "produce",
735 Some("orders:0".to_owned()),
736 ResponseError::NotLeaderOrFollower,
737 ));
738
739 assert!(error.is_retriable());
740 assert!(!error.is_fatal());
741 assert!(!error.transaction_abort_required());
742 }
743
744 #[test]
745 fn broker_errors_can_mark_transaction_abort_required() {
746 let error = Error::Broker(
747 BrokerError::response(
748 "send_offsets_to_transaction",
749 Some("orders-reader".to_owned()),
750 ResponseError::UnknownServerError,
751 )
752 .transaction_abort_required(),
753 );
754
755 assert!(!error.is_fatal());
756 assert!(error.transaction_abort_required());
757 }
758
759 #[test]
760 fn broker_errors_can_mark_fatal() {
761 let error = Error::Broker(
762 BrokerError::response(
763 "end_transaction",
764 None::<String>,
765 ResponseError::ProducerFenced,
766 )
767 .fatal(),
768 );
769
770 assert!(error.is_fatal());
771 assert!(!error.transaction_abort_required());
772 }
773
774 #[test]
775 fn static_member_ownership_errors_are_fatal() {
776 let unreleased = Error::Consumer(ConsumerError::UnreleasedInstanceId {
777 instance_id: "instance-a".to_owned(),
778 message: "still owned".to_owned(),
779 });
780 let fenced = Error::Consumer(ConsumerError::FencedInstanceId {
781 instance_id: "instance-a".to_owned(),
782 message: "fenced".to_owned(),
783 });
784
785 assert!(unreleased.is_fatal());
786 assert!(!unreleased.is_retriable());
787 assert!(fenced.is_fatal());
788 assert!(!fenced.is_retriable());
789 }
790
791 #[test]
792 fn transaction_state_errors_are_classified() {
793 let abort_required = Error::TransactionState(TransactionStateError::AbortRequired(
794 "send failed".to_owned(),
795 ));
796 let fatal = Error::TransactionState(TransactionStateError::Fatal("fenced".to_owned()));
797
798 assert!(abort_required.transaction_abort_required());
799 assert!(!abort_required.is_fatal());
800 assert!(fatal.is_fatal());
801 }
802
803 #[test]
804 fn producer_transaction_runtime_errors_are_classified() {
805 let abort_required = Error::Producer(ProducerError::TransactionAbortRequired {
806 operation: "send_offsets_to_transaction",
807 message: "connection reset".to_owned(),
808 });
809 let fatal = Error::Producer(ProducerError::TransactionFatal {
810 operation: "commit_transaction",
811 message: "connection reset".to_owned(),
812 });
813
814 assert!(abort_required.transaction_abort_required());
815 assert!(!abort_required.is_fatal());
816 assert!(fatal.is_fatal());
817 assert!(!fatal.transaction_abort_required());
818 }
819
820 #[test]
821 fn validation_errors_are_not_retriable_or_fatal() {
822 let error = Error::Validation(ValidationError::MissingTopic {
823 operation: "message conversion",
824 });
825
826 assert!(!error.is_retriable());
827 assert!(!error.is_fatal());
828 assert!(!error.transaction_abort_required());
829 }
830
831 #[test]
832 fn producer_attempt_exhaustion_is_retriable_and_typed() {
833 let error = Error::Producer(ProducerError::AttemptsExhausted {
834 operation: "find_transaction_coordinator",
835 resource: " for transactional_id 'tx-a'".to_owned(),
836 attempts: 11,
837 });
838
839 assert!(error.is_retriable());
840 assert!(!error.is_fatal());
841 assert_eq!(
842 error.to_string(),
843 "find_transaction_coordinator exhausted 11 attempts for transactional_id 'tx-a'"
844 );
845 }
846}