kf_protocol_api/
error.rs

1use flv_util::string_helper::upper_cammel_case_to_sentence;
2use serde::{Serialize, Deserialize};
3
4use kf_protocol_derive::Decode;
5use kf_protocol_derive::Encode;
6
7/// kafka error
8/// https://kafka.apache.org/protocol#protocol_types
9
10#[fluvio_kf(encode_discriminant)]
11#[fluvio_kf(default)]
12#[repr(i16)]
13#[derive(PartialEq, Debug, Clone, Copy, Serialize, Deserialize, Encode, Decode)]
14pub enum ErrorCode {
15    // The server experienced an unexpected error when processing the request
16    UnknownServerError = -1,
17
18    None = 0,
19
20    // The requested offset is not within the range of offsets maintained by the server.
21    OffsetOutOfRange = 1,
22
23    // This message has failed its CRC checksum, exceeds the valid size, or is otherwise corrupt.
24    CorruptMessage = 2,
25
26    //This server does not host this topic-partition.
27    UnknownTopicOrPartition = 3,
28
29    // The requested fetch size is invalid.
30    InvalidFetchSize = 4,
31
32    // There is no leader for this topic-partition as we are in the middle of a leadership election.
33    LeaderNotAvailable = 5,
34
35    // This server is not the leader for that topic-partition.
36    NotLeaderForPartition = 6,
37
38    // The request timed out.
39    RequestTimedOut = 7,
40
41    // The broker is not available.
42    BrokerNotAvailable = 8,
43
44    // The replica is not available for the requested topic-partition
45    ReplicaNotAvailable = 9,
46
47    // The request included a message larger than the max message size the server will accept.
48    MessageTooLarge = 10,
49
50    // The controller moved to another broker.
51    StaleControllerEpoch = 11,
52
53    // The metadata field of the offset request was too large.
54    OffsetMetadataTooLarge = 12,
55
56    // The server disconnected before a response was received.
57    NetworkException = 13,
58
59    // The coordinator is loading and hence can't process requests.
60    CoordinatorLoadInProgress = 14,
61
62    // The coordinator is not available.
63    CoordinatorNotAvailable = 15,
64
65    // This is not the correct coordinato
66    NotCoordinator = 16,
67
68    // The request attempted to perform an operation on an invalid topic.
69    InvalidTopicException = 17,
70
71    // The request included message batch larger than the configured segment size on the server.
72    RecordListTooLarge = 18,
73
74    // Messages are rejected since there are fewer in-sync replicas than required.
75    NotEnoughReplicas = 19,
76
77    // Messages are written to the log, but to fewer in-sync replicas than required.
78    NotEnougReplicasAfterAppend = 20,
79
80    // Produce request specified an invalid value for required acks.
81    InvalidRequiredAcks = 21,
82
83    // Specified group generation id is not valid
84    IllegalGeneration = 22,
85
86    // The group member's supported protocols are incompatible with those of existing members or first group member tried to join with empty protocol type or empty protocol list.
87    InconsistentGroupProtocol = 23,
88
89    // The configured groupId is invalid
90    InvalidGroupId = 24,
91
92    // The coordinator is not aware of this member.
93    UnknownMemberId = 25,
94
95    // The session timeout is not within the range allowed by the broker (as configured by group.min.session.timeout.ms and group.max.session.timeout.ms).
96    InvalidSessionTimeout = 26,
97
98    // The group is rebalancing, so a rejoin is needed.
99    RebalanceInProgress = 27,
100
101    // The committing offset data size is not valid
102    InvalidCommitOffsetSize = 28,
103
104    // Not authorized to access topics: [Topic authorization failed.]
105    TopicAuthorizationFailed = 29,
106
107    // Not authorized to access group: Group authorization failed.
108    GroupAuthorizationFailed = 30,
109
110    // Cluster authorization failed.
111    ClusterAuthorizationFailed = 31,
112
113    // The timestamp of the message is out of acceptable range.
114    InvalidTimestamp = 32,
115
116    // The broker does not support the requested SASL mechanism.
117    UnsupportedSaslMechanism = 33,
118
119    // Request is not valid given the current SASL state.
120    IllegalSaslState = 34,
121
122    // The version of API is not supported.
123    UnsupportedVersion = 35,
124
125    // Topic with this name already exists.
126    TopicAlreadyExists = 36,
127
128    // Number of partitions is invalid.
129    InvalidPartitions = 37,
130
131    // Replication-factor is invalid.
132    InvalidReplicationFactor = 38,
133
134    // Replica assignment is invalid.
135    InvalidReplicaAssignment = 39,
136
137    // Configuration is invalid.
138    InvalidConfig = 40,
139
140    // This is not the correct controller for this cluster.
141    NotController = 41,
142
143    // This most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible broker. See the broker logs for more details
144    InvalidRequest = 42,
145
146    // The message format version on the broker does not support the request.
147    UnsupportedForMessageFormat = 43,
148
149    // Request parameters do not satisfy the configured policy.
150    PolicyViolation = 44,
151
152    // The broker received an out of order sequence number
153    OutOfOrderSequenceNumber = 45,
154
155    // The broker received a duplicate sequence number
156    DuplicateSequenceNumber = 46,
157
158    // Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
159    InvalidProducerEpoch = 47,
160
161    // The producer attempted a transactional operation in an invalid state
162    InvalidTxnState = 48,
163
164    // The producer attempted to use a producer id which is not currently assigned to its transactional id
165    InvalidProducerIdMapping = 49,
166
167    // The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms).
168    InvalidTransactionTimeout = 50,
169
170    // The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing
171    ConcurrentTransactions = 51,
172
173    // Indicates that the transaction coordinator sending a WriteTxnMarker is no longer the current coordinator for a given producer
174    TransactionCoordinatorFenced = 52,
175
176    // Transactional Id authorization failed
177    TransactionalIdAuthorizationFailed = 53,
178
179    // Security features are disabled.
180    SecurityDisabled = 54,
181
182    // The broker did not attempt to execute this operation. This may happen for batched RPCs where some operations in the batch failed, causing the broker to respond without trying the rest.
183    OperationNotAttempted = 55,
184
185    // Disk error when trying to access log file on the disk.
186    KafkaStorageError = 56,
187
188    // The user-specified log directory is not found in the broker config.
189    LogDirNotFound = 57,
190
191    // SASL Authentication failed
192    SaslAuthenticationFailed = 58,
193
194    // This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception.
195    UnknownProducerId = 59,
196
197    // A partition reassignment is in progress
198    ReassignmentInProgress = 60,
199
200    // Delegation Token feature is not enabled.
201    DelegationTokenAuthDisabled = 61,
202
203    // Delegation Token is not found on server.
204    DelegationTokenNotFound = 62,
205
206    // Specified Principal is not valid Owner/Renewer.
207    DelegationTokenOwnerMismatch = 63,
208
209    // Delegation Token requests are not allowed on PLAINTEXT/1-way SSL channels and on delegation token authenticated channels.
210    DelegationTokenRequestNotAllowed = 64,
211
212    // Delegation Token authorization failed.
213    DelegationTokenAuthorizationFailed = 65,
214
215    // Delegation Token is expired.
216    DelegationTokenExpired = 66,
217
218    // Supplied principalType is not supported
219    InvalidPrincipleType = 67,
220
221    // The group The group is not empty is not empty
222    NonEmptyGroup = 68,
223
224    // The group id The group id does not exist was not found
225    GroupIdNotFound = 69,
226
227    // The fetch session ID was not found
228    FetchSessionIdNotFound = 70,
229
230    // The fetch session epoch is invalid
231    InvalidFetchSessionEpoch = 71,
232
233    // There is no listener on the leader broker that matches the listener
234    // on which metadata request was processed.
235    ListenerNotFound = 72,
236
237    // Topic deletion is disabled
238    TopicDeletionDisabled = 73,
239
240    // The leader epoch in the request is older than the epoch on the broker
241    FencedLeaderEpoch = 74,
242
243    // The leader epoch in the request is newer than the epoch on the broker
244    UnknownLeaderEpoch = 75,
245
246    // The requesting client does not support the compression type of given partition
247    UnsupportedCompressionType = 76,
248
249    // Broker epoch has changed
250    StaleBrokerEpoch = 77,
251
252    // The leader high watermark has not caught up from a recent leader election
253    // so the offsets cannot be guaranteed to be monotonically increasing
254    OffsetNotAvailable = 78,
255
256    // The group member needs to have a valid member id before actually
257    // entering a consumer group
258    MemberIdRequired = 79,
259
260    // The preferred leader was not available
261    PreferredLeaderNotAvailable = 80,
262
263    // Consumer group The consumer group has reached maximum number of members allowed
264    GroupMaxSizeReached = 81,
265}
266
267impl Default for ErrorCode {
268    fn default() -> ErrorCode {
269        ErrorCode::None
270    }
271}
272
273impl ErrorCode {
274
275    pub fn is_ok(&self) -> bool {
276        match self {
277            Self::None => true,
278            _ => false
279        }
280    }
281
282    pub fn to_string(&self) -> String {
283        match self {
284            ErrorCode::None => "Ok".to_owned(),
285            _ => format!("{:?}", self),
286        }
287    }
288
289    pub fn to_sentence(&self) -> String {
290        match self {
291            ErrorCode::None => "".to_owned(),
292            _ => upper_cammel_case_to_sentence(format!("{:?}", self), false),
293        }
294    }
295
296    pub fn is_error(&self) -> bool {
297        match self {
298            ErrorCode::None => false,
299            _ => true,
300        }
301    }
302}
303
304#[cfg(test)]
305mod test {
306
307    use std::convert::TryInto;
308
309    use super::ErrorCode;
310    #[test]
311    fn test_error_code_from_conversion() {
312        let val: i16 = 6;
313        let erro_code: ErrorCode = val.try_into().expect("convert");
314        assert_eq!(erro_code, ErrorCode::NotLeaderForPartition);
315    }
316
317}