use std::fmt;
use std::io;
use std::sync::Arc;
use thiserror::Error;
#[derive(Debug, Clone)]
pub struct ArcError(Arc<dyn std::error::Error + Send + Sync>);
impl ArcError {
pub fn new<E: std::error::Error + Send + Sync + 'static>(err: E) -> Self {
Self(Arc::new(err))
}
pub fn inner(&self) -> &(dyn std::error::Error + Send + Sync) {
self.0.as_ref()
}
}
impl fmt::Display for ArcError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
impl std::error::Error for ArcError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Some(self.0.as_ref())
}
}
#[non_exhaustive]
#[derive(Debug, Error)]
pub enum KrafkaError {
#[error("network error: {0}")]
Network(#[source] Arc<io::Error>),
#[error("protocol error ({kind:?}): {message}")]
Protocol {
kind: ProtocolErrorKind,
message: String,
},
#[error("authentication error: {message}")]
Auth {
message: String,
},
#[error("operation timed out: {operation}")]
Timeout {
operation: String,
},
#[error("broker error: {code:?} - {message}")]
Broker {
code: ErrorCode,
message: String,
},
#[error("configuration error: {message}")]
Config {
message: String,
},
#[error("compression error: {message}")]
Compression {
message: String,
},
#[error("invalid state: {message}")]
InvalidState {
message: String,
},
#[error("serialization error: {message}")]
Serialization {
message: String,
},
#[error("schema registry error: {message}")]
SchemaRegistry {
message: String,
#[source]
source: Option<ArcError>,
},
}
impl Clone for KrafkaError {
fn clone(&self) -> Self {
match self {
Self::Network(err) => Self::Network(Arc::clone(err)),
Self::Protocol { kind, message } => Self::Protocol {
kind: *kind,
message: message.clone(),
},
Self::Auth { message } => Self::Auth {
message: message.clone(),
},
Self::Timeout { operation } => Self::Timeout {
operation: operation.clone(),
},
Self::Broker { code, message } => Self::Broker {
code: *code,
message: message.clone(),
},
Self::Config { message } => Self::Config {
message: message.clone(),
},
Self::Compression { message } => Self::Compression {
message: message.clone(),
},
Self::InvalidState { message } => Self::InvalidState {
message: message.clone(),
},
Self::Serialization { message } => Self::Serialization {
message: message.clone(),
},
Self::SchemaRegistry { message, source } => Self::SchemaRegistry {
message: message.clone(),
source: source.clone(),
},
}
}
}
impl From<io::Error> for KrafkaError {
fn from(err: io::Error) -> Self {
Self::Network(Arc::new(err))
}
}
impl KrafkaError {
#[cold]
pub fn network(err: io::Error) -> Self {
Self::Network(Arc::new(err))
}
#[cold]
pub fn protocol_kind(kind: ProtocolErrorKind, message: impl Into<String>) -> Self {
Self::Protocol {
kind,
message: message.into(),
}
}
#[cold]
pub fn auth(message: impl Into<String>) -> Self {
Self::Auth {
message: message.into(),
}
}
#[cold]
pub fn timeout(operation: impl Into<String>) -> Self {
Self::Timeout {
operation: operation.into(),
}
}
#[cold]
pub fn broker(code: ErrorCode, message: impl Into<String>) -> Self {
Self::Broker {
code,
message: message.into(),
}
}
#[cold]
pub fn config(message: impl Into<String>) -> Self {
Self::Config {
message: message.into(),
}
}
#[cold]
pub fn compression(message: impl Into<String>) -> Self {
Self::Compression {
message: message.into(),
}
}
#[cold]
pub fn invalid_state(message: impl Into<String>) -> Self {
Self::InvalidState {
message: message.into(),
}
}
#[cold]
pub fn serialization(message: impl Into<String>) -> Self {
Self::Serialization {
message: message.into(),
}
}
#[cold]
pub fn schema_registry(message: impl Into<String>) -> Self {
Self::SchemaRegistry {
message: message.into(),
source: None,
}
}
#[cold]
pub fn schema_registry_with_source<E>(message: impl Into<String>, source: E) -> Self
where
E: std::error::Error + Send + Sync + 'static,
{
Self::SchemaRegistry {
message: message.into(),
source: Some(ArcError::new(source)),
}
}
pub fn is_retriable(&self) -> bool {
match self {
Self::Network(_) => true,
Self::Timeout { .. } => true,
Self::Broker { code, .. } => code.is_retriable(),
Self::Protocol { kind, .. } => kind.is_retriable(),
_ => false,
}
}
#[must_use]
pub fn protocol_error_kind(&self) -> Option<ProtocolErrorKind> {
match self {
Self::Protocol { kind, .. } => Some(*kind),
_ => None,
}
}
}
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ProtocolErrorKind {
TruncatedFrame,
CrcMismatch,
UnknownApiVersion,
InvalidLength,
InvalidUtf8,
UnsupportedMagic,
InvalidValue,
Malformed,
Other,
}
impl ProtocolErrorKind {
pub fn is_retriable(self) -> bool {
matches!(
self,
Self::TruncatedFrame | Self::CrcMismatch | Self::Malformed
)
}
}
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
#[repr(i16)]
pub enum ErrorCode {
#[default]
None = 0,
UnknownServerError = -1,
OffsetOutOfRange = 1,
CorruptMessage = 2,
UnknownTopicOrPartition = 3,
InvalidMessageSize = 4,
LeaderNotAvailable = 5,
NotLeaderForPartition = 6,
RequestTimedOut = 7,
BrokerNotAvailable = 8,
ReplicaNotAvailable = 9,
MessageTooLarge = 10,
StaleControllerEpoch = 11,
OffsetMetadataTooLarge = 12,
NetworkException = 13,
CoordinatorLoadInProgress = 14,
CoordinatorNotAvailable = 15,
NotCoordinator = 16,
InvalidTopic = 17,
RecordListTooLarge = 18,
NotEnoughReplicas = 19,
NotEnoughReplicasAfterAppend = 20,
InvalidRequiredAcks = 21,
IllegalGeneration = 22,
InconsistentGroupProtocol = 23,
InvalidGroupId = 24,
UnknownMemberId = 25,
InvalidSessionTimeout = 26,
RebalanceInProgress = 27,
InvalidCommitOffsetSize = 28,
TopicAuthorizationFailed = 29,
GroupAuthorizationFailed = 30,
ClusterAuthorizationFailed = 31,
InvalidTimestamp = 32,
UnsupportedSaslMechanism = 33,
IllegalSaslState = 34,
UnsupportedVersion = 35,
TopicAlreadyExists = 36,
InvalidPartitions = 37,
InvalidReplicationFactor = 38,
InvalidReplicaAssignment = 39,
InvalidConfig = 40,
NotController = 41,
InvalidRequest = 42,
UnsupportedForMessageFormat = 43,
PolicyViolation = 44,
OutOfOrderSequenceNumber = 45,
DuplicateSequenceNumber = 46,
InvalidProducerEpoch = 47,
InvalidTxnState = 48,
InvalidProducerIdMapping = 49,
InvalidTransactionTimeout = 50,
ConcurrentTransactions = 51,
TransactionCoordinatorFenced = 52,
TransactionalIdAuthorizationFailed = 53,
SecurityDisabled = 54,
OperationNotAttempted = 55,
KafkaStorageException = 56,
LogDirNotFound = 57,
SaslAuthenticationFailed = 58,
UnknownProducerId = 59,
ReassignmentInProgress = 60,
DelegationTokenAuthDisabled = 61,
DelegationTokenNotFound = 62,
DelegationTokenOwnerMismatch = 63,
DelegationTokenRequestNotAllowed = 64,
DelegationTokenAuthorizationFailed = 65,
DelegationTokenExpired = 66,
InvalidPrincipalType = 67,
NonEmptyGroup = 68,
GroupIdNotFound = 69,
FetchSessionIdNotFound = 70,
InvalidFetchSessionEpoch = 71,
ListenerNotFound = 72,
TopicDeletionDisabled = 73,
FencedLeaderEpoch = 74,
UnknownLeaderEpoch = 75,
UnsupportedCompressionType = 76,
StaleBrokerEpoch = 77,
OffsetNotAvailable = 78,
MemberIdRequired = 79,
PreferredLeaderNotAvailable = 80,
GroupMaxSizeReached = 81,
FencedInstanceId = 82,
EligibleLeadersNotAvailable = 83,
ElectionNotNeeded = 84,
NoReassignmentInProgress = 85,
GroupSubscribedToTopic = 86,
InvalidRecord = 87,
UnstableOffsetCommit = 88,
ThrottlingQuotaExceeded = 89,
ProducerFenced = 90,
ResourceNotFound = 91,
DuplicateResource = 92,
UnacceptableCredential = 93,
InconsistentVoterSet = 94,
InvalidUpdateVersion = 95,
FeatureUpdateFailed = 96,
PrincipalDeserializationFailure = 97,
SnapshotNotFound = 98,
PositionOutOfRange = 99,
UnknownTopicId = 100,
DuplicateBrokerRegistration = 101,
BrokerIdNotRegistered = 102,
InconsistentTopicId = 103,
InconsistentClusterId = 104,
TransactionalIdNotFound = 105,
FetchSessionTopicIdError = 106,
IneligibleReplica = 107,
NewLeaderElected = 108,
OffsetMovedToTieredStorage = 109,
FencedMemberEpoch = 110,
UnreleasedInstanceId = 111,
UnsupportedAssignor = 112,
StaleMemberEpoch = 113,
MismatchedEndpointType = 114,
UnsupportedEndpointType = 115,
UnknownControllerId = 116,
UnknownSubscriptionId = 117,
TelemetryTooLarge = 118,
InvalidRegistration = 119,
TransactionAbortable = 120,
InvalidRecordState = 121,
ShareSessionNotFound = 122,
InvalidShareSessionEpoch = 123,
RebootstrapRequired = 124,
ShareSessionLimitReached = 133,
InvalidRegularExpression = 128,
Unknown(i16),
}
impl ErrorCode {
#[inline]
pub fn from_i16(code: i16) -> Self {
match code {
0 => Self::None,
-1 => Self::UnknownServerError,
1 => Self::OffsetOutOfRange,
2 => Self::CorruptMessage,
3 => Self::UnknownTopicOrPartition,
4 => Self::InvalidMessageSize,
5 => Self::LeaderNotAvailable,
6 => Self::NotLeaderForPartition,
7 => Self::RequestTimedOut,
8 => Self::BrokerNotAvailable,
9 => Self::ReplicaNotAvailable,
10 => Self::MessageTooLarge,
11 => Self::StaleControllerEpoch,
12 => Self::OffsetMetadataTooLarge,
13 => Self::NetworkException,
14 => Self::CoordinatorLoadInProgress,
15 => Self::CoordinatorNotAvailable,
16 => Self::NotCoordinator,
17 => Self::InvalidTopic,
18 => Self::RecordListTooLarge,
19 => Self::NotEnoughReplicas,
20 => Self::NotEnoughReplicasAfterAppend,
21 => Self::InvalidRequiredAcks,
22 => Self::IllegalGeneration,
23 => Self::InconsistentGroupProtocol,
24 => Self::InvalidGroupId,
25 => Self::UnknownMemberId,
26 => Self::InvalidSessionTimeout,
27 => Self::RebalanceInProgress,
28 => Self::InvalidCommitOffsetSize,
29 => Self::TopicAuthorizationFailed,
30 => Self::GroupAuthorizationFailed,
31 => Self::ClusterAuthorizationFailed,
32 => Self::InvalidTimestamp,
33 => Self::UnsupportedSaslMechanism,
34 => Self::IllegalSaslState,
35 => Self::UnsupportedVersion,
36 => Self::TopicAlreadyExists,
37 => Self::InvalidPartitions,
38 => Self::InvalidReplicationFactor,
39 => Self::InvalidReplicaAssignment,
40 => Self::InvalidConfig,
41 => Self::NotController,
42 => Self::InvalidRequest,
43 => Self::UnsupportedForMessageFormat,
44 => Self::PolicyViolation,
45 => Self::OutOfOrderSequenceNumber,
46 => Self::DuplicateSequenceNumber,
47 => Self::InvalidProducerEpoch,
48 => Self::InvalidTxnState,
49 => Self::InvalidProducerIdMapping,
50 => Self::InvalidTransactionTimeout,
51 => Self::ConcurrentTransactions,
52 => Self::TransactionCoordinatorFenced,
53 => Self::TransactionalIdAuthorizationFailed,
54 => Self::SecurityDisabled,
55 => Self::OperationNotAttempted,
56 => Self::KafkaStorageException,
57 => Self::LogDirNotFound,
58 => Self::SaslAuthenticationFailed,
59 => Self::UnknownProducerId,
60 => Self::ReassignmentInProgress,
61 => Self::DelegationTokenAuthDisabled,
62 => Self::DelegationTokenNotFound,
63 => Self::DelegationTokenOwnerMismatch,
64 => Self::DelegationTokenRequestNotAllowed,
65 => Self::DelegationTokenAuthorizationFailed,
66 => Self::DelegationTokenExpired,
67 => Self::InvalidPrincipalType,
68 => Self::NonEmptyGroup,
69 => Self::GroupIdNotFound,
70 => Self::FetchSessionIdNotFound,
71 => Self::InvalidFetchSessionEpoch,
72 => Self::ListenerNotFound,
73 => Self::TopicDeletionDisabled,
74 => Self::FencedLeaderEpoch,
75 => Self::UnknownLeaderEpoch,
76 => Self::UnsupportedCompressionType,
77 => Self::StaleBrokerEpoch,
78 => Self::OffsetNotAvailable,
79 => Self::MemberIdRequired,
80 => Self::PreferredLeaderNotAvailable,
81 => Self::GroupMaxSizeReached,
82 => Self::FencedInstanceId,
83 => Self::EligibleLeadersNotAvailable,
84 => Self::ElectionNotNeeded,
85 => Self::NoReassignmentInProgress,
86 => Self::GroupSubscribedToTopic,
87 => Self::InvalidRecord,
88 => Self::UnstableOffsetCommit,
89 => Self::ThrottlingQuotaExceeded,
90 => Self::ProducerFenced,
91 => Self::ResourceNotFound,
92 => Self::DuplicateResource,
93 => Self::UnacceptableCredential,
94 => Self::InconsistentVoterSet,
95 => Self::InvalidUpdateVersion,
96 => Self::FeatureUpdateFailed,
97 => Self::PrincipalDeserializationFailure,
98 => Self::SnapshotNotFound,
99 => Self::PositionOutOfRange,
100 => Self::UnknownTopicId,
101 => Self::DuplicateBrokerRegistration,
102 => Self::BrokerIdNotRegistered,
103 => Self::InconsistentTopicId,
104 => Self::InconsistentClusterId,
105 => Self::TransactionalIdNotFound,
106 => Self::FetchSessionTopicIdError,
107 => Self::IneligibleReplica,
108 => Self::NewLeaderElected,
109 => Self::OffsetMovedToTieredStorage,
110 => Self::FencedMemberEpoch,
111 => Self::UnreleasedInstanceId,
112 => Self::UnsupportedAssignor,
113 => Self::StaleMemberEpoch,
114 => Self::MismatchedEndpointType,
115 => Self::UnsupportedEndpointType,
116 => Self::UnknownControllerId,
117 => Self::UnknownSubscriptionId,
118 => Self::TelemetryTooLarge,
119 => Self::InvalidRegistration,
120 => Self::TransactionAbortable,
121 => Self::InvalidRecordState,
122 => Self::ShareSessionNotFound,
123 => Self::InvalidShareSessionEpoch,
124 => Self::RebootstrapRequired,
128 => Self::InvalidRegularExpression,
133 => Self::ShareSessionLimitReached,
other => Self::Unknown(other),
}
}
#[inline]
pub fn to_i16(self) -> i16 {
match self {
Self::None => 0,
Self::UnknownServerError => -1,
Self::OffsetOutOfRange => 1,
Self::CorruptMessage => 2,
Self::UnknownTopicOrPartition => 3,
Self::InvalidMessageSize => 4,
Self::LeaderNotAvailable => 5,
Self::NotLeaderForPartition => 6,
Self::RequestTimedOut => 7,
Self::BrokerNotAvailable => 8,
Self::ReplicaNotAvailable => 9,
Self::MessageTooLarge => 10,
Self::StaleControllerEpoch => 11,
Self::OffsetMetadataTooLarge => 12,
Self::NetworkException => 13,
Self::CoordinatorLoadInProgress => 14,
Self::CoordinatorNotAvailable => 15,
Self::NotCoordinator => 16,
Self::InvalidTopic => 17,
Self::RecordListTooLarge => 18,
Self::NotEnoughReplicas => 19,
Self::NotEnoughReplicasAfterAppend => 20,
Self::InvalidRequiredAcks => 21,
Self::IllegalGeneration => 22,
Self::InconsistentGroupProtocol => 23,
Self::InvalidGroupId => 24,
Self::UnknownMemberId => 25,
Self::InvalidSessionTimeout => 26,
Self::RebalanceInProgress => 27,
Self::InvalidCommitOffsetSize => 28,
Self::TopicAuthorizationFailed => 29,
Self::GroupAuthorizationFailed => 30,
Self::ClusterAuthorizationFailed => 31,
Self::InvalidTimestamp => 32,
Self::UnsupportedSaslMechanism => 33,
Self::IllegalSaslState => 34,
Self::UnsupportedVersion => 35,
Self::TopicAlreadyExists => 36,
Self::InvalidPartitions => 37,
Self::InvalidReplicationFactor => 38,
Self::InvalidReplicaAssignment => 39,
Self::InvalidConfig => 40,
Self::NotController => 41,
Self::InvalidRequest => 42,
Self::UnsupportedForMessageFormat => 43,
Self::PolicyViolation => 44,
Self::OutOfOrderSequenceNumber => 45,
Self::DuplicateSequenceNumber => 46,
Self::InvalidProducerEpoch => 47,
Self::InvalidTxnState => 48,
Self::InvalidProducerIdMapping => 49,
Self::InvalidTransactionTimeout => 50,
Self::ConcurrentTransactions => 51,
Self::TransactionCoordinatorFenced => 52,
Self::TransactionalIdAuthorizationFailed => 53,
Self::SecurityDisabled => 54,
Self::OperationNotAttempted => 55,
Self::KafkaStorageException => 56,
Self::LogDirNotFound => 57,
Self::SaslAuthenticationFailed => 58,
Self::UnknownProducerId => 59,
Self::ReassignmentInProgress => 60,
Self::DelegationTokenAuthDisabled => 61,
Self::DelegationTokenNotFound => 62,
Self::DelegationTokenOwnerMismatch => 63,
Self::DelegationTokenRequestNotAllowed => 64,
Self::DelegationTokenAuthorizationFailed => 65,
Self::DelegationTokenExpired => 66,
Self::InvalidPrincipalType => 67,
Self::NonEmptyGroup => 68,
Self::GroupIdNotFound => 69,
Self::FetchSessionIdNotFound => 70,
Self::InvalidFetchSessionEpoch => 71,
Self::ListenerNotFound => 72,
Self::TopicDeletionDisabled => 73,
Self::FencedLeaderEpoch => 74,
Self::UnknownLeaderEpoch => 75,
Self::UnsupportedCompressionType => 76,
Self::StaleBrokerEpoch => 77,
Self::OffsetNotAvailable => 78,
Self::MemberIdRequired => 79,
Self::PreferredLeaderNotAvailable => 80,
Self::GroupMaxSizeReached => 81,
Self::FencedInstanceId => 82,
Self::EligibleLeadersNotAvailable => 83,
Self::ElectionNotNeeded => 84,
Self::NoReassignmentInProgress => 85,
Self::GroupSubscribedToTopic => 86,
Self::InvalidRecord => 87,
Self::UnstableOffsetCommit => 88,
Self::ThrottlingQuotaExceeded => 89,
Self::ProducerFenced => 90,
Self::ResourceNotFound => 91,
Self::DuplicateResource => 92,
Self::UnacceptableCredential => 93,
Self::InconsistentVoterSet => 94,
Self::InvalidUpdateVersion => 95,
Self::FeatureUpdateFailed => 96,
Self::PrincipalDeserializationFailure => 97,
Self::SnapshotNotFound => 98,
Self::PositionOutOfRange => 99,
Self::UnknownTopicId => 100,
Self::DuplicateBrokerRegistration => 101,
Self::BrokerIdNotRegistered => 102,
Self::InconsistentTopicId => 103,
Self::InconsistentClusterId => 104,
Self::TransactionalIdNotFound => 105,
Self::FetchSessionTopicIdError => 106,
Self::IneligibleReplica => 107,
Self::NewLeaderElected => 108,
Self::OffsetMovedToTieredStorage => 109,
Self::FencedMemberEpoch => 110,
Self::UnreleasedInstanceId => 111,
Self::UnsupportedAssignor => 112,
Self::StaleMemberEpoch => 113,
Self::MismatchedEndpointType => 114,
Self::UnsupportedEndpointType => 115,
Self::UnknownControllerId => 116,
Self::UnknownSubscriptionId => 117,
Self::TelemetryTooLarge => 118,
Self::InvalidRegistration => 119,
Self::TransactionAbortable => 120,
Self::InvalidRecordState => 121,
Self::ShareSessionNotFound => 122,
Self::InvalidShareSessionEpoch => 123,
Self::RebootstrapRequired => 124,
Self::InvalidRegularExpression => 128,
Self::ShareSessionLimitReached => 133,
Self::Unknown(code) => code,
}
}
#[inline]
pub fn is_retriable(&self) -> bool {
matches!(
self,
Self::CorruptMessage
| Self::UnknownTopicOrPartition
| Self::LeaderNotAvailable
| Self::NotLeaderForPartition
| Self::RequestTimedOut
| Self::BrokerNotAvailable
| Self::ReplicaNotAvailable
| Self::NetworkException
| Self::CoordinatorLoadInProgress
| Self::CoordinatorNotAvailable
| Self::NotCoordinator
| Self::NotEnoughReplicas
| Self::NotEnoughReplicasAfterAppend
| Self::OutOfOrderSequenceNumber
| Self::ConcurrentTransactions
| Self::OperationNotAttempted
| Self::KafkaStorageException
| Self::UnknownProducerId
| Self::FetchSessionIdNotFound
| Self::InvalidFetchSessionEpoch
| Self::FencedLeaderEpoch
| Self::UnknownLeaderEpoch
| Self::OffsetNotAvailable
| Self::PreferredLeaderNotAvailable
| Self::EligibleLeadersNotAvailable
| Self::UnstableOffsetCommit
| Self::ThrottlingQuotaExceeded
| Self::FencedMemberEpoch
| Self::StaleMemberEpoch
| Self::NewLeaderElected
)
}
#[inline]
pub fn is_ok(&self) -> bool {
matches!(self, Self::None)
}
}
impl From<i16> for ErrorCode {
#[inline]
fn from(code: i16) -> Self {
Self::from_i16(code)
}
}
impl From<ErrorCode> for i16 {
#[inline]
fn from(code: ErrorCode) -> Self {
code.to_i16()
}
}
pub type Result<T> = std::result::Result<T, KrafkaError>;
#[non_exhaustive]
#[derive(Debug, Error)]
pub enum RecvError {
#[error("consumer closed")]
Closed,
#[error(transparent)]
Error(#[from] KrafkaError),
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
#[test]
fn test_error_code_from_i16() {
assert_eq!(ErrorCode::from_i16(0), ErrorCode::None);
assert_eq!(ErrorCode::from_i16(-1), ErrorCode::UnknownServerError);
assert_eq!(ErrorCode::from_i16(3), ErrorCode::UnknownTopicOrPartition);
assert_eq!(ErrorCode::from_i16(999), ErrorCode::Unknown(999));
}
#[test]
fn test_error_code_to_i16() {
assert_eq!(ErrorCode::None.to_i16(), 0);
assert_eq!(ErrorCode::UnknownServerError.to_i16(), -1);
assert_eq!(ErrorCode::UnknownTopicOrPartition.to_i16(), 3);
assert_eq!(ErrorCode::Unknown(999).to_i16(), 999);
}
#[test]
fn test_error_code_is_retriable() {
assert!(ErrorCode::LeaderNotAvailable.is_retriable());
assert!(ErrorCode::RequestTimedOut.is_retriable());
assert!(ErrorCode::CorruptMessage.is_retriable());
assert!(ErrorCode::UnknownTopicOrPartition.is_retriable());
assert!(ErrorCode::OutOfOrderSequenceNumber.is_retriable());
assert!(ErrorCode::ConcurrentTransactions.is_retriable());
assert!(ErrorCode::OperationNotAttempted.is_retriable());
assert!(ErrorCode::BrokerNotAvailable.is_retriable());
assert!(ErrorCode::NetworkException.is_retriable());
assert!(!ErrorCode::None.is_retriable());
assert!(!ErrorCode::InvalidTopic.is_retriable());
assert!(!ErrorCode::RebootstrapRequired.is_retriable());
}
#[test]
fn test_error_code_is_ok() {
assert!(ErrorCode::None.is_ok());
assert!(!ErrorCode::UnknownServerError.is_ok());
}
#[test]
fn test_krafka_error_is_retriable() {
assert!(KrafkaError::timeout("test").is_retriable());
assert!(KrafkaError::broker(ErrorCode::LeaderNotAvailable, "test").is_retriable());
assert!(!KrafkaError::config("test").is_retriable());
assert!(
KrafkaError::network(std::io::Error::new(
std::io::ErrorKind::ConnectionRefused,
"refused",
))
.is_retriable()
);
}
#[test]
fn test_network_error_source_preserved_through_arc() {
use std::error::Error;
let io_err = std::io::Error::new(std::io::ErrorKind::ConnectionRefused, "refused");
let krafka_err = KrafkaError::network(io_err);
let source = krafka_err.source().expect("source() must not be None");
assert!(source.to_string().contains("refused"));
}
#[test]
fn test_network_error_clone_preserves_source() {
use std::error::Error;
let io_err = std::io::Error::new(std::io::ErrorKind::ConnectionRefused, "refused");
let original = KrafkaError::network(io_err);
let cloned = original.clone();
assert!(original.source().is_some());
assert!(cloned.source().is_some());
assert_eq!(
original.source().unwrap().to_string(),
cloned.source().unwrap().to_string()
);
}
#[test]
fn test_error_code_from_i16_codes_56_to_88() {
assert_eq!(ErrorCode::from_i16(56), ErrorCode::KafkaStorageException);
assert_eq!(ErrorCode::from_i16(57), ErrorCode::LogDirNotFound);
assert_eq!(ErrorCode::from_i16(58), ErrorCode::SaslAuthenticationFailed);
assert_eq!(ErrorCode::from_i16(59), ErrorCode::UnknownProducerId);
assert_eq!(ErrorCode::from_i16(60), ErrorCode::ReassignmentInProgress);
assert_eq!(
ErrorCode::from_i16(61),
ErrorCode::DelegationTokenAuthDisabled
);
assert_eq!(ErrorCode::from_i16(62), ErrorCode::DelegationTokenNotFound);
assert_eq!(
ErrorCode::from_i16(63),
ErrorCode::DelegationTokenOwnerMismatch
);
assert_eq!(
ErrorCode::from_i16(64),
ErrorCode::DelegationTokenRequestNotAllowed
);
assert_eq!(
ErrorCode::from_i16(65),
ErrorCode::DelegationTokenAuthorizationFailed
);
assert_eq!(ErrorCode::from_i16(66), ErrorCode::DelegationTokenExpired);
assert_eq!(ErrorCode::from_i16(67), ErrorCode::InvalidPrincipalType);
assert_eq!(ErrorCode::from_i16(68), ErrorCode::NonEmptyGroup);
assert_eq!(ErrorCode::from_i16(69), ErrorCode::GroupIdNotFound);
assert_eq!(ErrorCode::from_i16(70), ErrorCode::FetchSessionIdNotFound);
assert_eq!(ErrorCode::from_i16(71), ErrorCode::InvalidFetchSessionEpoch);
assert_eq!(ErrorCode::from_i16(72), ErrorCode::ListenerNotFound);
assert_eq!(ErrorCode::from_i16(73), ErrorCode::TopicDeletionDisabled);
assert_eq!(ErrorCode::from_i16(74), ErrorCode::FencedLeaderEpoch);
assert_eq!(ErrorCode::from_i16(75), ErrorCode::UnknownLeaderEpoch);
assert_eq!(
ErrorCode::from_i16(76),
ErrorCode::UnsupportedCompressionType
);
assert_eq!(ErrorCode::from_i16(77), ErrorCode::StaleBrokerEpoch);
assert_eq!(ErrorCode::from_i16(78), ErrorCode::OffsetNotAvailable);
assert_eq!(ErrorCode::from_i16(79), ErrorCode::MemberIdRequired);
assert_eq!(
ErrorCode::from_i16(80),
ErrorCode::PreferredLeaderNotAvailable
);
assert_eq!(ErrorCode::from_i16(81), ErrorCode::GroupMaxSizeReached);
assert_eq!(ErrorCode::from_i16(82), ErrorCode::FencedInstanceId);
assert_eq!(
ErrorCode::from_i16(83),
ErrorCode::EligibleLeadersNotAvailable
);
assert_eq!(ErrorCode::from_i16(84), ErrorCode::ElectionNotNeeded);
assert_eq!(ErrorCode::from_i16(85), ErrorCode::NoReassignmentInProgress);
assert_eq!(ErrorCode::from_i16(86), ErrorCode::GroupSubscribedToTopic);
assert_eq!(ErrorCode::from_i16(87), ErrorCode::InvalidRecord);
assert_eq!(ErrorCode::from_i16(88), ErrorCode::UnstableOffsetCommit);
}
#[test]
fn test_error_code_to_i16_codes_56_to_88() {
assert_eq!(ErrorCode::KafkaStorageException.to_i16(), 56);
assert_eq!(ErrorCode::LogDirNotFound.to_i16(), 57);
assert_eq!(ErrorCode::SaslAuthenticationFailed.to_i16(), 58);
assert_eq!(ErrorCode::UnknownProducerId.to_i16(), 59);
assert_eq!(ErrorCode::ReassignmentInProgress.to_i16(), 60);
assert_eq!(ErrorCode::DelegationTokenAuthDisabled.to_i16(), 61);
assert_eq!(ErrorCode::DelegationTokenNotFound.to_i16(), 62);
assert_eq!(ErrorCode::DelegationTokenOwnerMismatch.to_i16(), 63);
assert_eq!(ErrorCode::DelegationTokenRequestNotAllowed.to_i16(), 64);
assert_eq!(ErrorCode::DelegationTokenAuthorizationFailed.to_i16(), 65);
assert_eq!(ErrorCode::DelegationTokenExpired.to_i16(), 66);
assert_eq!(ErrorCode::InvalidPrincipalType.to_i16(), 67);
assert_eq!(ErrorCode::NonEmptyGroup.to_i16(), 68);
assert_eq!(ErrorCode::GroupIdNotFound.to_i16(), 69);
assert_eq!(ErrorCode::FetchSessionIdNotFound.to_i16(), 70);
assert_eq!(ErrorCode::InvalidFetchSessionEpoch.to_i16(), 71);
assert_eq!(ErrorCode::ListenerNotFound.to_i16(), 72);
assert_eq!(ErrorCode::TopicDeletionDisabled.to_i16(), 73);
assert_eq!(ErrorCode::FencedLeaderEpoch.to_i16(), 74);
assert_eq!(ErrorCode::UnknownLeaderEpoch.to_i16(), 75);
assert_eq!(ErrorCode::UnsupportedCompressionType.to_i16(), 76);
assert_eq!(ErrorCode::StaleBrokerEpoch.to_i16(), 77);
assert_eq!(ErrorCode::OffsetNotAvailable.to_i16(), 78);
assert_eq!(ErrorCode::MemberIdRequired.to_i16(), 79);
assert_eq!(ErrorCode::PreferredLeaderNotAvailable.to_i16(), 80);
assert_eq!(ErrorCode::GroupMaxSizeReached.to_i16(), 81);
assert_eq!(ErrorCode::FencedInstanceId.to_i16(), 82);
assert_eq!(ErrorCode::EligibleLeadersNotAvailable.to_i16(), 83);
assert_eq!(ErrorCode::ElectionNotNeeded.to_i16(), 84);
assert_eq!(ErrorCode::NoReassignmentInProgress.to_i16(), 85);
assert_eq!(ErrorCode::GroupSubscribedToTopic.to_i16(), 86);
assert_eq!(ErrorCode::InvalidRecord.to_i16(), 87);
assert_eq!(ErrorCode::UnstableOffsetCommit.to_i16(), 88);
}
#[test]
fn test_r9_10_new_retriable_error_codes() {
assert!(ErrorCode::KafkaStorageException.is_retriable());
assert!(ErrorCode::UnknownProducerId.is_retriable());
assert!(ErrorCode::FetchSessionIdNotFound.is_retriable());
assert!(ErrorCode::InvalidFetchSessionEpoch.is_retriable());
assert!(ErrorCode::FencedLeaderEpoch.is_retriable());
assert!(ErrorCode::UnknownLeaderEpoch.is_retriable());
assert!(ErrorCode::OffsetNotAvailable.is_retriable());
assert!(ErrorCode::PreferredLeaderNotAvailable.is_retriable());
assert!(ErrorCode::EligibleLeadersNotAvailable.is_retriable());
assert!(ErrorCode::UnstableOffsetCommit.is_retriable());
}
#[test]
fn test_r9_10_non_retriable_error_codes_56_to_88() {
assert!(!ErrorCode::LogDirNotFound.is_retriable());
assert!(!ErrorCode::SaslAuthenticationFailed.is_retriable());
assert!(!ErrorCode::ReassignmentInProgress.is_retriable());
assert!(!ErrorCode::DelegationTokenAuthDisabled.is_retriable());
assert!(!ErrorCode::DelegationTokenNotFound.is_retriable());
assert!(!ErrorCode::DelegationTokenOwnerMismatch.is_retriable());
assert!(!ErrorCode::DelegationTokenRequestNotAllowed.is_retriable());
assert!(!ErrorCode::DelegationTokenAuthorizationFailed.is_retriable());
assert!(!ErrorCode::DelegationTokenExpired.is_retriable());
assert!(!ErrorCode::InvalidPrincipalType.is_retriable());
assert!(!ErrorCode::NonEmptyGroup.is_retriable());
assert!(!ErrorCode::GroupIdNotFound.is_retriable());
assert!(!ErrorCode::ListenerNotFound.is_retriable());
assert!(!ErrorCode::TopicDeletionDisabled.is_retriable());
assert!(!ErrorCode::UnsupportedCompressionType.is_retriable());
assert!(!ErrorCode::StaleBrokerEpoch.is_retriable());
assert!(!ErrorCode::MemberIdRequired.is_retriable());
assert!(!ErrorCode::GroupMaxSizeReached.is_retriable());
assert!(!ErrorCode::FencedInstanceId.is_retriable());
assert!(!ErrorCode::ElectionNotNeeded.is_retriable());
assert!(!ErrorCode::NoReassignmentInProgress.is_retriable());
assert!(!ErrorCode::GroupSubscribedToTopic.is_retriable());
assert!(!ErrorCode::InvalidRecord.is_retriable());
}
#[test]
fn test_error_code_round_trip_56_to_88() {
for code in 56i16..=88 {
let ec = ErrorCode::from(code);
let back: i16 = ec.into();
assert_eq!(back, code, "round-trip failed for code {}", code);
}
}
#[test]
fn test_protocol_error_kind_is_retriable() {
assert!(ProtocolErrorKind::TruncatedFrame.is_retriable());
assert!(ProtocolErrorKind::CrcMismatch.is_retriable());
assert!(ProtocolErrorKind::Malformed.is_retriable());
assert!(!ProtocolErrorKind::UnknownApiVersion.is_retriable());
assert!(!ProtocolErrorKind::InvalidLength.is_retriable());
assert!(!ProtocolErrorKind::InvalidUtf8.is_retriable());
assert!(!ProtocolErrorKind::UnsupportedMagic.is_retriable());
assert!(!ProtocolErrorKind::InvalidValue.is_retriable());
assert!(!ProtocolErrorKind::Other.is_retriable());
}
#[test]
fn test_krafka_error_protocol_kind_explicit() {
let err = KrafkaError::protocol_kind(
ProtocolErrorKind::CrcMismatch,
"record batch CRC check failed",
);
match err {
KrafkaError::Protocol { kind, message } => {
assert_eq!(kind, ProtocolErrorKind::CrcMismatch);
assert_eq!(message, "record batch CRC check failed");
}
_ => panic!("expected Protocol variant"),
}
}
#[test]
fn test_krafka_error_protocol_retriable_via_kind() {
assert!(
KrafkaError::protocol_kind(ProtocolErrorKind::CrcMismatch, "CRC mismatch: a vs b")
.is_retriable()
);
assert!(
KrafkaError::protocol_kind(
ProtocolErrorKind::TruncatedFrame,
"not enough bytes for header"
)
.is_retriable()
);
assert!(
!KrafkaError::protocol_kind(
ProtocolErrorKind::UnknownApiVersion,
"no mutually supported Produce API version"
)
.is_retriable()
);
assert!(
!KrafkaError::protocol_kind(ProtocolErrorKind::InvalidUtf8, "invalid UTF-8 string")
.is_retriable()
);
}
#[test]
fn test_krafka_error_protocol_error_kind_accessor() {
let protocol = KrafkaError::protocol_kind(ProtocolErrorKind::CrcMismatch, "details");
let timeout = KrafkaError::timeout("fetch");
assert_eq!(
protocol.protocol_error_kind(),
Some(ProtocolErrorKind::CrcMismatch)
);
assert_eq!(timeout.protocol_error_kind(), None);
}
#[test]
fn test_krafka_error_protocol_display_includes_kind() {
let err = KrafkaError::protocol_kind(ProtocolErrorKind::CrcMismatch, "details");
let s = err.to_string();
assert!(s.contains("CrcMismatch"), "got: {s}");
assert!(s.contains("details"), "got: {s}");
}
}