1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315
use flv_util::string_helper::upper_cammel_case_to_sentence; use serde::{Serialize, Deserialize}; use kf_protocol_derive::Decode; use kf_protocol_derive::Encode; /// kafka error /// https://kafka.apache.org/protocol#protocol_types #[repr(i16)] #[derive(PartialEq, Debug, Clone, Copy, Serialize, Deserialize, Encode, Decode)] pub enum ErrorCode { // The server experienced an unexpected error when processing the request UnknownServerError = -1, None = 0, // The requested offset is not within the range of offsets maintained by the server. OffsetOutOfRange = 1, // This message has failed its CRC checksum, exceeds the valid size, or is otherwise corrupt. CorruptMessage = 2, //This server does not host this topic-partition. UnknownTopicOrPartition = 3, // The requested fetch size is invalid. InvalidFetchSize = 4, // There is no leader for this topic-partition as we are in the middle of a leadership election. LeaderNotAvailable = 5, // This server is not the leader for that topic-partition. NotLeaderForPartition = 6, // The request timed out. RequestTimedOut = 7, // The broker is not available. BrokerNotAvailable = 8, // The replica is not available for the requested topic-partition ReplicaNotAvailable = 9, // The request included a message larger than the max message size the server will accept. MessageTooLarge = 10, // The controller moved to another broker. StaleControllerEpoch = 11, // The metadata field of the offset request was too large. OffsetMetadataTooLarge = 12, // The server disconnected before a response was received. NetworkException = 13, // The coordinator is loading and hence can't process requests. CoordinatorLoadInProgress = 14, // The coordinator is not available. CoordinatorNotAvailable = 15, // This is not the correct coordinato NotCoordinator = 16, // The request attempted to perform an operation on an invalid topic. InvalidTopicException = 17, // The request included message batch larger than the configured segment size on the server. RecordListTooLarge = 18, // Messages are rejected since there are fewer in-sync replicas than required. NotEnoughReplicas = 19, // Messages are written to the log, but to fewer in-sync replicas than required. NotEnougReplicasAfterAppend = 20, // Produce request specified an invalid value for required acks. InvalidRequiredAcks = 21, // Specified group generation id is not valid IllegalGeneration = 22, // The group member's supported protocols are incompatible with those of existing members or first group member tried to join with empty protocol type or empty protocol list. InconsistentGroupProtocol = 23, // The configured groupId is invalid InvalidGroupId = 24, // The coordinator is not aware of this member. UnknownMemberId = 25, // The session timeout is not within the range allowed by the broker (as configured by group.min.session.timeout.ms and group.max.session.timeout.ms). InvalidSessionTimeout = 26, // The group is rebalancing, so a rejoin is needed. RebalanceInProgress = 27, // The committing offset data size is not valid InvalidCommitOffsetSize = 28, // Not authorized to access topics: [Topic authorization failed.] TopicAuthorizationFailed = 29, // Not authorized to access group: Group authorization failed. GroupAuthorizationFailed = 30, // Cluster authorization failed. ClusterAuthorizationFailed = 31, // The timestamp of the message is out of acceptable range. InvalidTimestamp = 32, // The broker does not support the requested SASL mechanism. UnsupportedSaslMechanism = 33, // Request is not valid given the current SASL state. IllegalSaslState = 34, // The version of API is not supported. UnsupportedVersion = 35, // Topic with this name already exists. TopicAlreadyExists = 36, // Number of partitions is invalid. InvalidPartitions = 37, // Replication-factor is invalid. InvalidReplicationFactor = 38, // Replica assignment is invalid. InvalidReplicaAssignment = 39, // Configuration is invalid. InvalidConfig = 40, // This is not the correct controller for this cluster. NotController = 41, // This most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible broker. See the broker logs for more details InvalidRequest = 42, // The message format version on the broker does not support the request. UnsupportedForMessageFormat = 43, // Request parameters do not satisfy the configured policy. PolicyViolation = 44, // The broker received an out of order sequence number OutOfOrderSequenceNumber = 45, // The broker received a duplicate sequence number DuplicateSequenceNumber = 46, // Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. InvalidProducerEpoch = 47, // The producer attempted a transactional operation in an invalid state InvalidTxnState = 48, // The producer attempted to use a producer id which is not currently assigned to its transactional id InvalidProducerIdMapping = 49, // The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms). InvalidTransactionTimeout = 50, // The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing ConcurrentTransactions = 51, // Indicates that the transaction coordinator sending a WriteTxnMarker is no longer the current coordinator for a given producer TransactionCoordinatorFenced = 52, // Transactional Id authorization failed TransactionalIdAuthorizationFailed = 53, // Security features are disabled. SecurityDisabled = 54, // The broker did not attempt to execute this operation. This may happen for batched RPCs where some operations in the batch failed, causing the broker to respond without trying the rest. OperationNotAttempted = 55, // Disk error when trying to access log file on the disk. KafkaStorageError = 56, // The user-specified log directory is not found in the broker config. LogDirNotFound = 57, // SASL Authentication failed SaslAuthenticationFailed = 58, // This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception. UnknownProducerId = 59, // A partition reassignment is in progress ReassignmentInProgress = 60, // Delegation Token feature is not enabled. DelegationTokenAuthDisabled = 61, // Delegation Token is not found on server. DelegationTokenNotFound = 62, // Specified Principal is not valid Owner/Renewer. DelegationTokenOwnerMismatch = 63, // Delegation Token requests are not allowed on PLAINTEXT/1-way SSL channels and on delegation token authenticated channels. DelegationTokenRequestNotAllowed = 64, // Delegation Token authorization failed. DelegationTokenAuthorizationFailed = 65, // Delegation Token is expired. DelegationTokenExpired = 66, // Supplied principalType is not supported InvalidPrincipleType = 67, // The group The group is not empty is not empty NonEmptyGroup = 68, // The group id The group id does not exist was not found GroupIdNotFound = 69, // The fetch session ID was not found FetchSessionIdNotFound = 70, // The fetch session epoch is invalid InvalidFetchSessionEpoch = 71, // There is no listener on the leader broker that matches the listener // on which metadata request was processed. ListenerNotFound = 72, // Topic deletion is disabled TopicDeletionDisabled = 73, // The leader epoch in the request is older than the epoch on the broker FencedLeaderEpoch = 74, // The leader epoch in the request is newer than the epoch on the broker UnknownLeaderEpoch = 75, // The requesting client does not support the compression type of given partition UnsupportedCompressionType = 76, // Broker epoch has changed StaleBrokerEpoch = 77, // The leader high watermark has not caught up from a recent leader election // so the offsets cannot be guaranteed to be monotonically increasing OffsetNotAvailable = 78, // The group member needs to have a valid member id before actually // entering a consumer group MemberIdRequired = 79, // The preferred leader was not available PreferredLeaderNotAvailable = 80, // Consumer group The consumer group has reached maximum number of members allowed GroupMaxSizeReached = 81, } impl Default for ErrorCode { fn default() -> ErrorCode { ErrorCode::None } } impl ErrorCode { pub fn is_ok(&self) -> bool { match self { Self::None => true, _ => false } } pub fn to_string(&self) -> String { match self { ErrorCode::None => "Ok".to_owned(), _ => format!("{:?}", self), } } pub fn to_sentence(&self) -> String { match self { ErrorCode::None => "".to_owned(), _ => upper_cammel_case_to_sentence(format!("{:?}", self), false), } } pub fn is_error(&self) -> bool { match self { ErrorCode::None => false, _ => true, } } } #[cfg(test)] mod test { use std::convert::TryInto; use super::ErrorCode; #[test] fn test_error_code_from_conversion() { let val: i16 = 6; let erro_code: ErrorCode = val.try_into().expect("convert"); assert_eq!(erro_code, ErrorCode::NotLeaderForPartition); } }