1use crate::bindings::{
2 self, rd_kafka_conf_res_t, rd_kafka_err2name, rd_kafka_err2str, rd_kafka_resp_err_t,
3 rd_kafka_type_t,
4};
5use std::{convert::TryFrom, error::Error, ffi::CStr, fmt};
6
7pub const RD_KAFKA_PARTITION_UA: i32 = -1;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq, ::num_enum::TryFromPrimitive)]
12#[repr(u32)]
13pub enum RDKafkaType {
14 Producer = rd_kafka_type_t::RD_KAFKA_PRODUCER as u32,
16 Consumer = rd_kafka_type_t::RD_KAFKA_CONSUMER as u32,
18}
19
20impl From<RDKafkaType> for rd_kafka_type_t {
21 fn from(value: RDKafkaType) -> Self {
22 match value {
23 RDKafkaType::Producer => rd_kafka_type_t::RD_KAFKA_PRODUCER,
24 RDKafkaType::Consumer => rd_kafka_type_t::RD_KAFKA_CONSUMER,
25 }
26 }
27}
28
29impl From<rd_kafka_type_t> for RDKafkaType {
30 fn from(value: rd_kafka_type_t) -> Self {
31 match value {
32 rd_kafka_type_t::RD_KAFKA_PRODUCER => Self::Producer,
33 rd_kafka_type_t::RD_KAFKA_CONSUMER => Self::Consumer,
34 }
35 }
36}
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq, ::num_enum::TryFromPrimitive)]
40#[repr(i32)]
41#[non_exhaustive]
42pub enum RDKafkaErrorCode {
43 #[doc(hidden)]
44 Begin = -200,
45 BadMessage = -199,
47 BadCompression = -198,
49 BrokerDestroy = -197,
51 Fail = -196,
53 BrokerTransportFailure = -195,
55 CriticalSystemResource = -194,
57 Resolve = -193,
59 MessageTimedOut = -192,
61 PartitionEOF = -191,
63 UnknownPartition = -190,
65 FileSystem = -189,
67 UnknownTopic = -188,
69 AllBrokersDown = -187,
71 InvalidArgument = -186,
73 OperationTimedOut = -185,
75 QueueFull = -184,
77 ISRInsufficient = -183,
79 NodeUpdate = -182,
81 SSL = -181,
83 WaitingForCoordinator = -180,
85 UnknownGroup = -179,
87 InProgress = -178,
89 PreviousInProgress = -177,
91 ExistingSubscription = -176,
93 AssignPartitions = -175,
95 RevokePartitions = -174,
97 Conflict = -173,
99 State = -172,
101 UnknownProtocol = -171,
103 NotImplemented = -170,
105 Authentication = -169,
107 NoOffset = -168,
109 Outdated = -167,
111 TimedOutQueue = -166,
113 UnsupportedFeature = -165,
115 WaitCache = -164,
117 Interrupted = -163,
119 KeySerialization = -162,
121 ValueSerialization = -161,
123 KeyDeserialization = -160,
125 ValueDeserialization = -159,
127 Partial = -158,
129 ReadOnly = -157,
131 NoEnt = -156,
133 Underflow = -155,
135 InvalidType = -154,
137 Retry = -153,
139 PurgeQueue = -152,
141 PurgeInflight = -151,
143 Fatal = -150,
145 Inconsistent = -149,
147 GaplessGuarantee = -148,
149 PollExceeded = -147,
151 UnknownBroker = -146,
153 NotConfigured = -145,
155 Fenced = -144,
157 Application = -143,
159 AssignmentLost = -142,
161 Noop = -141,
163 AutoOffsetReset = -140,
165 LogTruncation = -139,
167 InvalidDifferentRecord = -138,
169 DestroyBroker = -137,
171 #[doc(hidden)]
172 End = -100,
173 Unknown = -1,
175 NoError = 0,
177 OffsetOutOfRange = 1,
179 InvalidMessage = 2,
181 UnknownTopicOrPartition = 3,
183 InvalidMessageSize = 4,
185 LeaderNotAvailable = 5,
187 NotLeaderForPartition = 6,
189 RequestTimedOut = 7,
191 BrokerNotAvailable = 8,
193 ReplicaNotAvailable = 9,
195 MessageSizeTooLarge = 10,
197 StaleControllerEpoch = 11,
199 OffsetMetadataTooLarge = 12,
201 NetworkException = 13,
203 CoordinatorLoadInProgress = 14,
205 CoordinatorNotAvailable = 15,
207 NotCoordinator = 16,
209 InvalidTopic = 17,
211 MessageBatchTooLarge = 18,
213 NotEnoughReplicas = 19,
215 NotEnoughReplicasAfterAppend = 20,
217 InvalidRequiredAcks = 21,
219 IllegalGeneration = 22,
221 InconsistentGroupProtocol = 23,
223 InvalidGroupId = 24,
225 UnknownMemberId = 25,
227 InvalidSessionTimeout = 26,
229 RebalanceInProgress = 27,
231 InvalidCommitOffsetSize = 28,
233 TopicAuthorizationFailed = 29,
235 GroupAuthorizationFailed = 30,
237 ClusterAuthorizationFailed = 31,
239 InvalidTimestamp = 32,
241 UnsupportedSASLMechanism = 33,
243 IllegalSASLState = 34,
245 UnsupportedVersion = 35,
247 TopicAlreadyExists = 36,
249 InvalidPartitions = 37,
251 InvalidReplicationFactor = 38,
253 InvalidReplicaAssignment = 39,
255 InvalidConfig = 40,
257 NotController = 41,
259 InvalidRequest = 42,
261 UnsupportedForMessageFormat = 43,
263 PolicyViolation = 44,
265 OutOfOrderSequenceNumber = 45,
267 DuplicateSequenceNumber = 46,
269 InvalidProducerEpoch = 47,
271 InvalidTransactionalState = 48,
273 InvalidProducerIdMapping = 49,
276 InvalidTransactionTimeout = 50,
279 ConcurrentTransactions = 51,
282 TransactionCoordinatorFenced = 52,
285 TransactionalIdAuthorizationFailed = 53,
287 SecurityDisabled = 54,
289 OperationNotAttempted = 55,
291 KafkaStorageError = 56,
293 LogDirNotFound = 57,
295 SaslAuthenticationFailed = 58,
297 UnknownProducerId = 59,
299 ReassignmentInProgress = 60,
301 DelegationTokenAuthDisabled = 61,
303 DelegationTokenNotFound = 62,
305 DelegationTokenOwnerMismatch = 63,
307 DelegationTokenRequestNotAllowed = 64,
309 DelegationTokenAuthorizationFailed = 65,
311 DelegationTokenExpired = 66,
313 InvalidPrincipalType = 67,
315 NonEmptyGroup = 68,
317 GroupIdNotFound = 69,
319 FetchSessionIdNotFound = 70,
321 InvalidFetchSessionEpoch = 71,
323 ListenerNotFound = 72,
325 TopicDeletionDisabled = 73,
327 FencedLeaderEpoch = 74,
329 UnknownLeaderEpoch = 75,
331 UnsupportedCompressionType = 76,
333 StaleBrokerEpoch = 77,
335 OffsetNotAvailable = 78,
337 MemberIdRequired = 79,
339 PreferredLeaderNotAvailable = 80,
341 GroupMaxSizeReached = 81,
343 FencedInstanceId = 82,
345 EligibleLeadersNotAvailable = 83,
347 ElectionNotNeeded = 84,
349 NoReassignmentInProgress = 85,
351 GroupSubscribedToTopic = 86,
354 InvalidRecord = 87,
356 UnstableOffsetCommit = 88,
358 ThrottlingQuotaExceeded = 89,
360 ProducerFenced = 90,
363 ResourceNotFound = 91,
365 DuplicateResource = 92,
367 UnacceptableCredential = 93,
369 InconsistentVoterSet = 94,
372 InvalidUpdateVersion = 95,
374 FeatureUpdateFailed = 96,
376 PrincipalDeserializationFailure = 97,
378 UnknownTopicId = 100,
380 FencedMemberEpoch = 110,
382 UnreleasedInstanceId = 111,
384 UnsupportedAssignor = 112,
386 StaleMemberEpoch = 113,
388 UnknownSubscriptionId = 117,
390 TelemetryTooLarge = 118,
392 RebootstrapRequired = 129,
395 #[doc(hidden)]
396 EndAll = 130,
397}
398
399impl RDKafkaErrorCode {
400 pub fn name(&self) -> String {
402 let cstr = unsafe { rd_kafka_err2name((*self).into()) };
403 unsafe { CStr::from_ptr(cstr) }
404 .to_string_lossy()
405 .into_owned()
406 }
407
408 pub fn error(self) -> Option<Self> {
410 (!matches!(self, RDKafkaErrorCode::NoError)).then_some(self)
411 }
412}
413
414impl From<RDKafkaErrorCode> for rd_kafka_resp_err_t {
415 fn from(err: RDKafkaErrorCode) -> Self {
416 Self::try_from(err as i32).unwrap()
418 }
419}
420
421impl From<rd_kafka_resp_err_t> for RDKafkaErrorCode {
422 fn from(err: rd_kafka_resp_err_t) -> RDKafkaErrorCode {
423 Self::try_from(err as i32).unwrap()
425 }
426}
427
428impl fmt::Display for RDKafkaErrorCode {
429 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
430 let cstr = unsafe { rd_kafka_err2str((*self).into()) };
431 let description = unsafe { CStr::from_ptr(cstr) }
432 .to_string_lossy()
433 .into_owned();
434
435 write!(f, "{:?} ({})", self, description)
436 }
437}
438
439impl Error for RDKafkaErrorCode {}
440
441#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, ::num_enum::TryFromPrimitive)]
443#[repr(i32)]
444pub enum RDKafkaConfErrorCode {
445 UnknownKey = -2,
447 InvalidValue = -1,
451 Ok = 0,
453}
454
455impl RDKafkaConfErrorCode {
456 pub fn error(&self) -> Option<&Self> {
458 (!matches!(self, RDKafkaConfErrorCode::Ok)).then_some(self)
459 }
460}
461
462impl From<rd_kafka_conf_res_t> for RDKafkaConfErrorCode {
463 fn from(err: rd_kafka_conf_res_t) -> RDKafkaConfErrorCode {
464 Self::try_from(err as i32).unwrap()
466 }
467}
468
469impl fmt::Display for RDKafkaConfErrorCode {
470 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
471 write!(
472 f,
473 "{}",
474 match self {
475 RDKafkaConfErrorCode::UnknownKey => "Unknown configuration key name",
476 RDKafkaConfErrorCode::InvalidValue =>
477 "Invalid configuration value or not supported",
478 RDKafkaConfErrorCode::Ok => "Ok",
479 }
480 )
481 }
482}
483
484impl Error for RDKafkaConfErrorCode {}
485
486#[derive(Debug, Clone, Copy, PartialEq, Eq, ::num_enum::TryFromPrimitive)]
488#[repr(i32)]
489#[non_exhaustive]
490pub enum RDKafkaEventType {
491 None = bindings::RD_KAFKA_EVENT_NONE,
492 Dr = bindings::RD_KAFKA_EVENT_DR,
493 Fetch = bindings::RD_KAFKA_EVENT_FETCH,
494 Log = bindings::RD_KAFKA_EVENT_LOG,
495 Error = bindings::RD_KAFKA_EVENT_ERROR,
496 Rebalance = bindings::RD_KAFKA_EVENT_REBALANCE,
497 OffsetCommit = bindings::RD_KAFKA_EVENT_OFFSET_COMMIT,
499 Stats = bindings::RD_KAFKA_EVENT_STATS,
500 CreateTopicsResult = bindings::RD_KAFKA_EVENT_CREATETOPICS_RESULT,
502 DeleteTopicsResult = bindings::RD_KAFKA_EVENT_DELETETOPICS_RESULT,
503 CreatePartitionsResult = bindings::RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT,
504 AlterConfigsResult = bindings::RD_KAFKA_EVENT_ALTERCONFIGS_RESULT,
505 DescribeConfigsResult = bindings::RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT,
506 DeleteRecordsResult = bindings::RD_KAFKA_EVENT_DELETERECORDS_RESULT,
507 DeleteGroupsResult = bindings::RD_KAFKA_EVENT_DELETEGROUPS_RESULT,
508 DeleteConsumerGroupOffsetsResult = bindings::RD_KAFKA_EVENT_DELETECONSUMERGROUPOFFSETS_RESULT,
509 OauthbearerTokenRefresh = bindings::RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH,
511 Background = 0x200,
512 CreateAclsResult = bindings::RD_KAFKA_EVENT_CREATEACLS_RESULT,
514 DescribeAclsResult = bindings::RD_KAFKA_EVENT_DESCRIBEACLS_RESULT,
515 DeleteAclsResult = bindings::RD_KAFKA_EVENT_DELETEACLS_RESULT,
516 ListConsumerGroupsResult = bindings::RD_KAFKA_EVENT_LISTCONSUMERGROUPS_RESULT,
517 DescribeConsumerGroupsResult = bindings::RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT,
518 ListConsumerGroupOffsetsResult = bindings::RD_KAFKA_EVENT_LISTCONSUMERGROUPOFFSETS_RESULT,
519 AlterConsumerGroupOffsetsResult = bindings::RD_KAFKA_EVENT_ALTERCONSUMERGROUPOFFSETS_RESULT,
520 IncrementalAlterConfigsResult = bindings::RD_KAFKA_EVENT_INCREMENTALALTERCONFIGS_RESULT,
521 DescribeUserScramCredentialsResult =
523 bindings::RD_KAFKA_EVENT_DESCRIBEUSERSCRAMCREDENTIALS_RESULT,
524 AlterUserScramCredentialsResult = bindings::RD_KAFKA_EVENT_ALTERUSERSCRAMCREDENTIALS_RESULT,
525 DescribeTopicsResult = bindings::RD_KAFKA_EVENT_DESCRIBETOPICS_RESULT,
527 DescribeClusterResult = bindings::RD_KAFKA_EVENT_DESCRIBECLUSTER_RESULT,
528 ListOffsetsResult = bindings::RD_KAFKA_EVENT_LISTOFFSETS_RESULT,
529 ElectLeadersResult = 0x800000,
530}