1use std::convert::TryFrom;
4use std::error::Error;
5use std::ffi::CStr;
6use std::fmt;
7
8use num_enum::IntoPrimitive;
9
10use crate::bindings;
11use crate::helpers;
12
13pub type RDKafka = bindings::rd_kafka_t;
17
18pub type RDKafkaConf = bindings::rd_kafka_conf_t;
20
21pub type RDKafkaMessage = bindings::rd_kafka_message_t;
23
24pub type RDKafkaTopic = bindings::rd_kafka_topic_t;
26
27pub type RDKafkaTopicConf = bindings::rd_kafka_topic_conf_t;
29
30pub type RDKafkaTopicPartition = bindings::rd_kafka_topic_partition_t;
32
33pub type RDKafkaTopicPartitionList = bindings::rd_kafka_topic_partition_list_t;
35
36pub type RDKafkaMetadata = bindings::rd_kafka_metadata_t;
38
39pub type RDKafkaMetadataTopic = bindings::rd_kafka_metadata_topic_t;
41
42pub type RDKafkaMetadataPartition = bindings::rd_kafka_metadata_partition_t;
44
45pub type RDKafkaMetadataBroker = bindings::rd_kafka_metadata_broker_t;
47
48pub type RDKafkaConsumerGroupMetadata = bindings::rd_kafka_consumer_group_metadata_t;
50
51pub type RDKafkaState = bindings::rd_kafka_s;
53
54pub type RDKafkaGroupList = bindings::rd_kafka_group_list;
56
57pub type RDKafkaGroupInfo = bindings::rd_kafka_group_info;
59
60pub type RDKafkaGroupMemberInfo = bindings::rd_kafka_group_member_info;
62
63pub type RDKafkaHeaders = bindings::rd_kafka_headers_t;
65
66pub type RDKafkaQueue = bindings::rd_kafka_queue_t;
68
69pub type RDKafkaNewTopic = bindings::rd_kafka_NewTopic_t;
71
72pub type RDKafkaDeleteTopic = bindings::rd_kafka_DeleteTopic_t;
74
75pub type RDKafkaDeleteGroup = bindings::rd_kafka_DeleteGroup_t;
77
78pub type RDKafkaNewPartitions = bindings::rd_kafka_NewPartitions_t;
80
81pub type RDKafkaDeleteRecords = bindings::rd_kafka_DeleteRecords_t;
83
84pub type RDKafkaConfigResource = bindings::rd_kafka_ConfigResource_t;
86
87pub type RDKafkaEvent = bindings::rd_kafka_event_t;
89
90pub type RDKafkaAdminOptions = bindings::rd_kafka_AdminOptions_t;
92
93pub type RDKafkaTopicResult = bindings::rd_kafka_topic_result_t;
95
96pub type RDKafkaGroupResult = bindings::rd_kafka_group_result_t;
98
99pub type RDKafkaMockCluster = bindings::rd_kafka_mock_cluster_t;
101
102pub use bindings::rd_kafka_type_t as RDKafkaType;
106
107pub use bindings::rd_kafka_conf_res_t as RDKafkaConfRes;
109
110pub use bindings::rd_kafka_resp_err_t as RDKafkaRespErr;
112
113pub use bindings::rd_kafka_admin_op_t as RDKafkaAdminOp;
115
116pub use bindings::rd_kafka_ResourceType_t as RDKafkaResourceType;
118
119pub use bindings::rd_kafka_ConfigSource_t as RDKafkaConfigSource;
121
122#[derive(Debug, Clone, Copy, PartialEq, Eq)]
126#[non_exhaustive]
127pub enum RDKafkaErrorCode {
128 #[doc(hidden)]
129 Begin = -200,
130 BadMessage = -199,
132 BadCompression = -198,
134 BrokerDestroy = -197,
136 Fail = -196,
138 BrokerTransportFailure = -195,
140 CriticalSystemResource = -194,
142 Resolve = -193,
144 MessageTimedOut = -192,
146 PartitionEOF = -191,
148 UnknownPartition = -190,
150 FileSystem = -189,
152 UnknownTopic = -188,
154 AllBrokersDown = -187,
156 InvalidArgument = -186,
158 OperationTimedOut = -185,
160 QueueFull = -184,
162 ISRInsufficient = -183,
164 NodeUpdate = -182,
166 SSL = -181,
168 WaitingForCoordinator = -180,
170 UnknownGroup = -179,
172 InProgress = -178,
174 PreviousInProgress = -177,
176 ExistingSubscription = -176,
178 AssignPartitions = -175,
180 RevokePartitions = -174,
182 Conflict = -173,
184 State = -172,
186 UnknownProtocol = -171,
188 NotImplemented = -170,
190 Authentication = -169,
192 NoOffset = -168,
194 Outdated = -167,
196 TimedOutQueue = -166,
198 UnsupportedFeature = -165,
200 WaitCache = -164,
202 Interrupted = -163,
204 KeySerialization = -162,
206 ValueSerialization = -161,
208 KeyDeserialization = -160,
210 ValueDeserialization = -159,
212 Partial = -158,
214 ReadOnly = -157,
216 NoEnt = -156,
218 Underflow = -155,
220 InvalidType = -154,
222 Retry = -153,
224 PurgeQueue = -152,
226 PurgeInflight = -151,
228 Fatal = -150,
230 Inconsistent = -149,
232 GaplessGuarantee = -148,
234 PollExceeded = -147,
236 UnknownBroker = -146,
238 NotConfigured = -145,
240 Fenced = -144,
242 Application = -143,
244 AssignmentLost = -142,
246 Noop = -141,
248 AutoOffsetReset = -140,
250 LogTruncation = -139,
252 #[doc(hidden)]
253 End = -100,
254 Unknown = -1,
256 NoError = 0,
258 OffsetOutOfRange = 1,
260 InvalidMessage = 2,
262 UnknownTopicOrPartition = 3,
264 InvalidMessageSize = 4,
266 LeaderNotAvailable = 5,
268 NotLeaderForPartition = 6,
270 RequestTimedOut = 7,
272 BrokerNotAvailable = 8,
274 ReplicaNotAvailable = 9,
276 MessageSizeTooLarge = 10,
278 StaleControllerEpoch = 11,
280 OffsetMetadataTooLarge = 12,
282 NetworkException = 13,
284 CoordinatorLoadInProgress = 14,
286 CoordinatorNotAvailable = 15,
288 NotCoordinator = 16,
290 InvalidTopic = 17,
292 MessageBatchTooLarge = 18,
294 NotEnoughReplicas = 19,
296 NotEnoughReplicasAfterAppend = 20,
298 InvalidRequiredAcks = 21,
300 IllegalGeneration = 22,
302 InconsistentGroupProtocol = 23,
304 InvalidGroupId = 24,
306 UnknownMemberId = 25,
308 InvalidSessionTimeout = 26,
310 RebalanceInProgress = 27,
312 InvalidCommitOffsetSize = 28,
314 TopicAuthorizationFailed = 29,
316 GroupAuthorizationFailed = 30,
318 ClusterAuthorizationFailed = 31,
320 InvalidTimestamp = 32,
322 UnsupportedSASLMechanism = 33,
324 IllegalSASLState = 34,
326 UnsupportedVersion = 35,
328 TopicAlreadyExists = 36,
330 InvalidPartitions = 37,
332 InvalidReplicationFactor = 38,
334 InvalidReplicaAssignment = 39,
336 InvalidConfig = 40,
338 NotController = 41,
340 InvalidRequest = 42,
342 UnsupportedForMessageFormat = 43,
344 PolicyViolation = 44,
346 OutOfOrderSequenceNumber = 45,
348 DuplicateSequenceNumber = 46,
350 InvalidProducerEpoch = 47,
352 InvalidTransactionalState = 48,
354 InvalidProducerIdMapping = 49,
357 InvalidTransactionTimeout = 50,
360 ConcurrentTransactions = 51,
363 TransactionCoordinatorFenced = 52,
366 TransactionalIdAuthorizationFailed = 53,
368 SecurityDisabled = 54,
370 OperationNotAttempted = 55,
372 KafkaStorageError = 56,
374 LogDirNotFound = 57,
376 SaslAuthenticationFailed = 58,
378 UnknownProducerId = 59,
380 ReassignmentInProgress = 60,
382 DelegationTokenAuthDisabled = 61,
384 DelegationTokenNotFound = 62,
386 DelegationTokenOwnerMismatch = 63,
388 DelegationTokenRequestNotAllowed = 64,
390 DelegationTokenAuthorizationFailed = 65,
392 DelegationTokenExpired = 66,
394 InvalidPrincipalType = 67,
396 NonEmptyGroup = 68,
398 GroupIdNotFound = 69,
400 FetchSessionIdNotFound = 70,
402 InvalidFetchSessionEpoch = 71,
404 ListenerNotFound = 72,
406 TopicDeletionDisabled = 73,
408 FencedLeaderEpoch = 74,
410 UnknownLeaderEpoch = 75,
412 UnsupportedCompressionType = 76,
414 StaleBrokerEpoch = 77,
416 OffsetNotAvailable = 78,
418 MemberIdRequired = 79,
420 PreferredLeaderNotAvailable = 80,
422 GroupMaxSizeReached = 81,
424 FencedInstanceId = 82,
426 EligibleLeadersNotAvailable = 83,
428 ElectionNotNeeded = 84,
430 NoReassignmentInProgress = 85,
432 GroupSubscribedToTopic = 86,
435 InvalidRecord = 87,
437 UnstableOffsetCommit = 88,
439 ThrottlingQuotaExceeded = 89,
441 ProducerFenced = 90,
444 ResourceNotFound = 91,
446 DuplicateResource = 92,
448 UnacceptableCredential = 93,
450 InconsistentVoterSet = 94,
453 InvalidUpdateVersion = 95,
455 FeatureUpdateFailed = 96,
457 PrincipalDeserializationFailure = 97,
459 #[doc(hidden)]
460 EndAll,
461}
462
463impl From<RDKafkaRespErr> for RDKafkaErrorCode {
464 fn from(err: RDKafkaRespErr) -> RDKafkaErrorCode {
465 helpers::rd_kafka_resp_err_t_to_rdkafka_error(err)
466 }
467}
468
469impl fmt::Display for RDKafkaErrorCode {
470 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
471 let description = match RDKafkaRespErr::try_from(*self as i32) {
472 Ok(err) => {
473 let cstr = unsafe { bindings::rd_kafka_err2str(err) };
474 unsafe { CStr::from_ptr(cstr) }
475 .to_string_lossy()
476 .into_owned()
477 }
478 Err(_) => "Unknown error".to_owned(),
479 };
480
481 write!(f, "{:?} ({})", self, description)
482 }
483}
484
485impl Error for RDKafkaErrorCode {}
486
487#[derive(Debug, Clone, Copy, PartialEq, Eq, IntoPrimitive)]
489#[repr(i16)]
490#[non_exhaustive]
491pub enum RDKafkaApiKey {
492 Produce = 0,
493 Fetch = 1,
494 ListOffsets = 2,
495 Metadata = 3,
496 LeaderAndIsr = 4,
497 StopReplica = 5,
498 UpdateMetadata = 6,
499 ControlledShutdown = 7,
500 OffsetCommit = 8,
501 OffsetFetch = 9,
502 FindCoordinator = 10,
503 JoinGroup = 11,
504 Heartbeat = 12,
505 LeaveGroup = 13,
506 SyncGroup = 14,
507 DescribeGroups = 15,
508 ListGroups = 16,
509 SaslHandshake = 17,
510 ApiVersion = 18,
511 CreateTopics = 19,
512 DeleteTopics = 20,
513 DeleteRecords = 21,
514 InitProducerId = 22,
515 OffsetForLeaderEpoch = 23,
516 AddPartitionsToTxn = 24,
517 AddOffsetsToTxn = 25,
518 EndTxn = 26,
519 WriteTxnMarkers = 27,
520 TxnOffsetCommit = 28,
521 DescribeAcls = 29,
522 CreateAcls = 30,
523 DeleteAcls = 31,
524 DescribeConfigs = 32,
525 AlterConfigs = 33,
526 AlterReplicaLogDirs = 34,
527 DescribeLogDirs = 35,
528 SaslAuthenticate = 36,
529 CreatePartitions = 37,
530 CreateDelegationToken = 38,
531 RenewDelegationToken = 39,
532 ExpireDelegationToken = 40,
533 DescribeDelegationToken = 41,
534 DeleteGroups = 42,
535 ElectLeaders = 43,
536 IncrementalAlterConfigs = 44,
537 AlterPartitionReassignments = 45,
538 ListPartitionReassignments = 46,
539 OffsetDelete = 47,
540 DescribeClientQuotas = 48,
541 AlterClientQuotas = 49,
542 DescribeUserScramCredentials = 50,
543 AlterUserScramCredentials = 51,
544 Vote = 52,
545 BeginQuorumEpoch = 53,
546 EndQuorumEpoch = 54,
547 DescribeQuorum = 55,
548 AlterIsr = 56,
549 UpdateFeatures = 57,
550 Envelope = 58,
551}
552
553#[cfg(test)]
554mod tests {
555 use super::*;
556
557 #[test]
558 fn test_display_error() {
559 let error: RDKafkaErrorCode = RDKafkaRespErr::RD_KAFKA_RESP_ERR__PARTITION_EOF.into();
560 assert_eq!(
561 "PartitionEOF (Broker: No more messages)",
562 format!("{}", error)
563 );
564 assert_eq!("PartitionEOF", format!("{:?}", error));
565 }
566}