rdkafka_sys/
types.rs

1//! Aliases for types defined in the auto-generated bindings.
2
3use std::convert::TryFrom;
4use std::error::Error;
5use std::ffi::CStr;
6use std::fmt;
7
8use num_enum::IntoPrimitive;
9
10use crate::bindings;
11use crate::helpers;
12
13// TYPES
14
15/// Native rdkafka client.
16pub type RDKafka = bindings::rd_kafka_t;
17
18/// Native rdkafka configuration.
19pub type RDKafkaConf = bindings::rd_kafka_conf_t;
20
21/// Native rdkafka message.
22pub type RDKafkaMessage = bindings::rd_kafka_message_t;
23
24/// Native rdkafka topic.
25pub type RDKafkaTopic = bindings::rd_kafka_topic_t;
26
27/// Native rdkafka topic configuration.
28pub type RDKafkaTopicConf = bindings::rd_kafka_topic_conf_t;
29
30/// Native rdkafka topic partition.
31pub type RDKafkaTopicPartition = bindings::rd_kafka_topic_partition_t;
32
33/// Native rdkafka topic partition list.
34pub type RDKafkaTopicPartitionList = bindings::rd_kafka_topic_partition_list_t;
35
36/// Native rdkafka metadata container.
37pub type RDKafkaMetadata = bindings::rd_kafka_metadata_t;
38
39/// Native rdkafka topic information.
40pub type RDKafkaMetadataTopic = bindings::rd_kafka_metadata_topic_t;
41
42/// Native rdkafka partition information.
43pub type RDKafkaMetadataPartition = bindings::rd_kafka_metadata_partition_t;
44
45/// Native rdkafka broker information.
46pub type RDKafkaMetadataBroker = bindings::rd_kafka_metadata_broker_t;
47
48/// Native rdkafka consumer group metadata.
49pub type RDKafkaConsumerGroupMetadata = bindings::rd_kafka_consumer_group_metadata_t;
50
51/// Native rdkafka state.
52pub type RDKafkaState = bindings::rd_kafka_s;
53
54/// Native rdkafka list of groups.
55pub type RDKafkaGroupList = bindings::rd_kafka_group_list;
56
57/// Native rdkafka group information.
58pub type RDKafkaGroupInfo = bindings::rd_kafka_group_info;
59
60/// Native rdkafka group member information.
61pub type RDKafkaGroupMemberInfo = bindings::rd_kafka_group_member_info;
62
63/// Native rdkafka group member information.
64pub type RDKafkaHeaders = bindings::rd_kafka_headers_t;
65
66/// Native rdkafka queue.
67pub type RDKafkaQueue = bindings::rd_kafka_queue_t;
68
69/// Native rdkafka new topic object.
70pub type RDKafkaNewTopic = bindings::rd_kafka_NewTopic_t;
71
72/// Native rdkafka delete topic object.
73pub type RDKafkaDeleteTopic = bindings::rd_kafka_DeleteTopic_t;
74
75/// Native rdkafka delete group object.
76pub type RDKafkaDeleteGroup = bindings::rd_kafka_DeleteGroup_t;
77
78/// Native rdkafka new partitions object.
79pub type RDKafkaNewPartitions = bindings::rd_kafka_NewPartitions_t;
80
81/// Native rdkafka delete records object.
82pub type RDKafkaDeleteRecords = bindings::rd_kafka_DeleteRecords_t;
83
84/// Native rdkafka config resource.
85pub type RDKafkaConfigResource = bindings::rd_kafka_ConfigResource_t;
86
87/// Native rdkafka event.
88pub type RDKafkaEvent = bindings::rd_kafka_event_t;
89
90/// Native rdkafka admin options.
91pub type RDKafkaAdminOptions = bindings::rd_kafka_AdminOptions_t;
92
93/// Native rdkafka topic result.
94pub type RDKafkaTopicResult = bindings::rd_kafka_topic_result_t;
95
96/// Native rdkafka group result.
97pub type RDKafkaGroupResult = bindings::rd_kafka_group_result_t;
98
99/// Native rdkafka mock cluster.
100pub type RDKafkaMockCluster = bindings::rd_kafka_mock_cluster_t;
101
102// ENUMS
103
104/// Client types.
105pub use bindings::rd_kafka_type_t as RDKafkaType;
106
107/// Configuration result.
108pub use bindings::rd_kafka_conf_res_t as RDKafkaConfRes;
109
110/// Response error.
111pub use bindings::rd_kafka_resp_err_t as RDKafkaRespErr;
112
113/// Admin operation.
114pub use bindings::rd_kafka_admin_op_t as RDKafkaAdminOp;
115
116/// Config resource type.
117pub use bindings::rd_kafka_ResourceType_t as RDKafkaResourceType;
118
119/// Config source.
120pub use bindings::rd_kafka_ConfigSource_t as RDKafkaConfigSource;
121
122// Errors enum
123
124/// Native rdkafka error code.
125#[derive(Debug, Clone, Copy, PartialEq, Eq)]
126#[non_exhaustive]
127pub enum RDKafkaErrorCode {
128    #[doc(hidden)]
129    Begin = -200,
130    /// Received message is incorrect.
131    BadMessage = -199,
132    /// Bad/unknown compression.
133    BadCompression = -198,
134    /// Broker is going away.
135    BrokerDestroy = -197,
136    /// Generic failure.
137    Fail = -196,
138    /// Broker transport failure.
139    BrokerTransportFailure = -195,
140    /// Critical system resource.
141    CriticalSystemResource = -194,
142    /// Failed to resolve broker.
143    Resolve = -193,
144    /// Produced message timed out.
145    MessageTimedOut = -192,
146    /// Reached the end of the topic+partition queue on the broker. Not really an error.
147    PartitionEOF = -191,
148    /// Permanent: Partition does not exist in cluster.
149    UnknownPartition = -190,
150    /// File or filesystem error.
151    FileSystem = -189,
152    /// Permanent: Topic does not exist in cluster.
153    UnknownTopic = -188,
154    /// All broker connections are down.
155    AllBrokersDown = -187,
156    /// Invalid argument, or invalid configuration.
157    InvalidArgument = -186,
158    /// Operation timed out.
159    OperationTimedOut = -185,
160    /// Queue is full.
161    QueueFull = -184,
162    /// ISR count < required.acks.
163    ISRInsufficient = -183,
164    /// Broker node update.
165    NodeUpdate = -182,
166    /// SSL error.
167    SSL = -181,
168    /// Waiting for coordinator to become available.
169    WaitingForCoordinator = -180,
170    /// Unknown client group.
171    UnknownGroup = -179,
172    /// Operation in progress.
173    InProgress = -178,
174    /// Previous operation in progress, wait for it to finish.
175    PreviousInProgress = -177,
176    /// This operation would interfere with an existing subscription.
177    ExistingSubscription = -176,
178    /// Assigned partitions (rebalance_cb).
179    AssignPartitions = -175,
180    /// Revoked partitions (rebalance_cb).
181    RevokePartitions = -174,
182    /// Conflicting use.
183    Conflict = -173,
184    /// Wrong state.
185    State = -172,
186    /// Unknown protocol.
187    UnknownProtocol = -171,
188    /// Not implemented.
189    NotImplemented = -170,
190    /// Authentication failure.
191    Authentication = -169,
192    /// No stored offset.
193    NoOffset = -168,
194    /// Outdated.
195    Outdated = -167,
196    /// Timed out in queue.
197    TimedOutQueue = -166,
198    /// Feature not supported by broker.
199    UnsupportedFeature = -165,
200    /// Awaiting cache update.
201    WaitCache = -164,
202    /// Operation interrupted (e.g., due to yield).
203    Interrupted = -163,
204    /// Key serialization error.
205    KeySerialization = -162,
206    /// Value serialization error.
207    ValueSerialization = -161,
208    /// Key deserialization error.
209    KeyDeserialization = -160,
210    /// Value deserialization error.
211    ValueDeserialization = -159,
212    /// Partial response.
213    Partial = -158,
214    /// Modification attempted on read-only object.
215    ReadOnly = -157,
216    /// No such entry or item not found.
217    NoEnt = -156,
218    /// Read underflow.
219    Underflow = -155,
220    /// Invalid type.
221    InvalidType = -154,
222    /// Retry operation.
223    Retry = -153,
224    /// Purged in queue.
225    PurgeQueue = -152,
226    /// Purged in flight.
227    PurgeInflight = -151,
228    /// Fatal error: see rd_kafka_fatal_error().
229    Fatal = -150,
230    /// Inconsistent state.
231    Inconsistent = -149,
232    /// Gap-less ordering would not be guaranteed if proceeding.
233    GaplessGuarantee = -148,
234    /// Maximum poll interval exceeded.
235    PollExceeded = -147,
236    /// Unknown broker.
237    UnknownBroker = -146,
238    /// Functionality not configured.
239    NotConfigured = -145,
240    /// Instance has been fenced.
241    Fenced = -144,
242    /// Application generated error.
243    Application = -143,
244    /// Assignment lost.
245    AssignmentLost = -142,
246    /// No operation performed.
247    Noop = -141,
248    /// No offset to automatically reset to.
249    AutoOffsetReset = -140,
250    /// Partition log truncation detected
251    LogTruncation = -139,
252    /// A different record in the batch was invalid and this message failed persisting.
253    InvalidDifferentRecord = -138,
254    /// Broker is going away but client isn't terminating */
255    DestroyBroker = -137,
256    #[doc(hidden)]
257    End = -100,
258    /// Unknown broker error.
259    Unknown = -1,
260    /// Success.
261    NoError = 0,
262    /// Offset out of range.
263    OffsetOutOfRange = 1,
264    /// Invalid message.
265    InvalidMessage = 2,
266    /// Unknown topic or partition.
267    UnknownTopicOrPartition = 3,
268    /// Invalid message size.
269    InvalidMessageSize = 4,
270    /// Leader not available.
271    LeaderNotAvailable = 5,
272    /// Not leader for partition.
273    NotLeaderForPartition = 6,
274    /// Request timed out.
275    RequestTimedOut = 7,
276    /// Broker not available.
277    BrokerNotAvailable = 8,
278    /// Replica not available.
279    ReplicaNotAvailable = 9,
280    /// Message size too large.
281    MessageSizeTooLarge = 10,
282    /// Stale controller epoch code.
283    StaleControllerEpoch = 11,
284    /// Offset metadata string too large.
285    OffsetMetadataTooLarge = 12,
286    /// Broker disconnected before response received.
287    NetworkException = 13,
288    /// Coordinator load in progress.
289    CoordinatorLoadInProgress = 14,
290    /// Coordinator not available.
291    CoordinatorNotAvailable = 15,
292    /// Not coordinator.
293    NotCoordinator = 16,
294    /// Invalid topic.
295    InvalidTopic = 17,
296    /// Message batch larger than configured server segment size.
297    MessageBatchTooLarge = 18,
298    /// Not enough in-sync replicas.
299    NotEnoughReplicas = 19,
300    /// Message(s) written to insufficient number of in-sync replicas.
301    NotEnoughReplicasAfterAppend = 20,
302    /// Invalid required acks value.
303    InvalidRequiredAcks = 21,
304    /// Specified group generation id is not valid.
305    IllegalGeneration = 22,
306    /// Inconsistent group protocol.
307    InconsistentGroupProtocol = 23,
308    /// Invalid group.id.
309    InvalidGroupId = 24,
310    /// Unknown member.
311    UnknownMemberId = 25,
312    /// Invalid session timeout.
313    InvalidSessionTimeout = 26,
314    /// Group rebalance in progress.
315    RebalanceInProgress = 27,
316    /// Commit offset data size is not valid.
317    InvalidCommitOffsetSize = 28,
318    /// Topic authorization failed.
319    TopicAuthorizationFailed = 29,
320    /// Group authorization failed.
321    GroupAuthorizationFailed = 30,
322    /// Cluster authorization failed.
323    ClusterAuthorizationFailed = 31,
324    /// Invalid timestamp.
325    InvalidTimestamp = 32,
326    /// Unsupported SASL mechanism.
327    UnsupportedSASLMechanism = 33,
328    /// Illegal SASL state.
329    IllegalSASLState = 34,
330    /// Unsupported version.
331    UnsupportedVersion = 35,
332    /// Topic already exists.
333    TopicAlreadyExists = 36,
334    /// Invalid number of partitions.
335    InvalidPartitions = 37,
336    /// Invalid replication factor.
337    InvalidReplicationFactor = 38,
338    /// Invalid replica assignment.
339    InvalidReplicaAssignment = 39,
340    /// Invalid config.
341    InvalidConfig = 40,
342    /// Not controller for cluster.
343    NotController = 41,
344    /// Invalid request.
345    InvalidRequest = 42,
346    /// Message format on broker does not support request.
347    UnsupportedForMessageFormat = 43,
348    /// Policy violation.
349    PolicyViolation = 44,
350    /// Broker received an out of order sequence number.
351    OutOfOrderSequenceNumber = 45,
352    /// Broker received a duplicate sequence number.
353    DuplicateSequenceNumber = 46,
354    /// Producer attempted an operation with an old epoch.
355    InvalidProducerEpoch = 47,
356    /// Producer attempted a transactional operation in an invalid state.
357    InvalidTransactionalState = 48,
358    /// Producer attempted to use a producer id which is currently assigned to
359    /// its transactional id.
360    InvalidProducerIdMapping = 49,
361    /// Transaction timeout is larger than the maxi value allowed by the
362    /// broker's max.transaction.timeout.ms.
363    InvalidTransactionTimeout = 50,
364    /// Producer attempted to update a transaction while another concurrent
365    /// operation on the same transaction was ongoing.
366    ConcurrentTransactions = 51,
367    /// Indicates that the transaction coordinator sending a WriteTxnMarker is
368    /// no longer the current coordinator for a given producer.
369    TransactionCoordinatorFenced = 52,
370    /// Transactional Id authorization failed.
371    TransactionalIdAuthorizationFailed = 53,
372    /// Security features are disabled.
373    SecurityDisabled = 54,
374    /// Operation not attempted.
375    OperationNotAttempted = 55,
376    /// Disk error when trying to access log file on the disk.
377    KafkaStorageError = 56,
378    /// The user-specified log directory is not found in the broker config.
379    LogDirNotFound = 57,
380    /// SASL Authentication failed.
381    SaslAuthenticationFailed = 58,
382    /// Unknown Producer Id.
383    UnknownProducerId = 59,
384    /// Partition reassignment is in progress.
385    ReassignmentInProgress = 60,
386    /// Delegation Token feature is not enabled.
387    DelegationTokenAuthDisabled = 61,
388    /// Delegation Token is not found on server.
389    DelegationTokenNotFound = 62,
390    /// Specified Principal is not valid Owner/Renewer.
391    DelegationTokenOwnerMismatch = 63,
392    /// Delegation Token requests are not allowed on this connection.
393    DelegationTokenRequestNotAllowed = 64,
394    /// Delegation Token authorization failed.
395    DelegationTokenAuthorizationFailed = 65,
396    /// Delegation Token is expired.
397    DelegationTokenExpired = 66,
398    /// Supplied principalType is not supported.
399    InvalidPrincipalType = 67,
400    /// The group is not empty.
401    NonEmptyGroup = 68,
402    /// The group id does not exist.
403    GroupIdNotFound = 69,
404    /// The fetch session ID was not found.
405    FetchSessionIdNotFound = 70,
406    /// The fetch session epoch is invalid.
407    InvalidFetchSessionEpoch = 71,
408    /// No matching listener.
409    ListenerNotFound = 72,
410    /// Topic deletion is disabled.
411    TopicDeletionDisabled = 73,
412    /// Leader epoch is older than broker epoch.
413    FencedLeaderEpoch = 74,
414    /// Leader epoch is newer than broker epoch.
415    UnknownLeaderEpoch = 75,
416    /// Unsupported compression type.
417    UnsupportedCompressionType = 76,
418    /// Broker epoch has changed.
419    StaleBrokerEpoch = 77,
420    /// Leader high watermark is not caught up.
421    OffsetNotAvailable = 78,
422    /// Group member needs a valid member ID.
423    MemberIdRequired = 79,
424    /// Preferred leader was not available.
425    PreferredLeaderNotAvailable = 80,
426    /// Consumer group has reached maximum size.
427    GroupMaxSizeReached = 81,
428    /// Static consumer fenced by other consumer with same group.instance.id.
429    FencedInstanceId = 82,
430    /// Eligible partition leaders are not available.
431    EligibleLeadersNotAvailable = 83,
432    /// Leader election not needed for topic partition.
433    ElectionNotNeeded = 84,
434    /// No partition reassignment is in progress.
435    NoReassignmentInProgress = 85,
436    /// Deleting offsets of a topic while the consumer group is subscribed to
437    /// it.
438    GroupSubscribedToTopic = 86,
439    /// Broker failed to validate record.
440    InvalidRecord = 87,
441    /// There are unstable offsets that need to be cleared.
442    UnstableOffsetCommit = 88,
443    /// Throttling quota has been exceeded.
444    ThrottlingQuotaExceeded = 89,
445    /// There is a newer producer with the same transactional ID which fences
446    /// the current one.
447    ProducerFenced = 90,
448    /// Request illegally referred to resource that does not exist.
449    ResourceNotFound = 91,
450    /// Request illegally referred to the same resource twice.
451    DuplicateResource = 92,
452    /// Requested credential would not meet criteria for acceptability.
453    UnacceptableCredential = 93,
454    /// Either the sender or recipient of a voter-only request is not one of the
455    /// expected voters.
456    InconsistentVoterSet = 94,
457    /// Invalid update version.
458    InvalidUpdateVersion = 95,
459    /// Unable to update finalized features due to server error.
460    FeatureUpdateFailed = 96,
461    /// Request principal deserialization failed during forwarding.
462    PrincipalDeserializationFailure = 97,
463    /// Unknown Topic Id
464    UnknownTopicId = 100,
465    /// The member epoch is fenced by the group coordinator
466    FencedMemberEpoch = 110,
467    /// The instance ID is still used by another member in the consumer group
468    UnreleasedInstanceId = 111,
469    /// The assignor or its version range is not supported by the consumer group
470    UnsupportedAssignor = 112,
471    /// The member epoch is stale
472    StaleMemberEpoch = 113,
473    /// Client sent a push telemetry request with an invalid or outdated
474    /// subscription ID.
475    UnknownSubscriptionId = 117,
476    /// Client sent a push telemetry request larger than the maximum size
477    /// the broker will accept.
478    TelemetryTooLarge = 118,
479    /// Client metadata is stale,
480    /// client should rebootstrap to obtain new metadata.
481    RebootstrapRequired = 129,
482    #[doc(hidden)]
483    EndAll,
484}
485
486impl From<RDKafkaRespErr> for RDKafkaErrorCode {
487    fn from(err: RDKafkaRespErr) -> RDKafkaErrorCode {
488        helpers::rd_kafka_resp_err_t_to_rdkafka_error(err)
489    }
490}
491
492impl fmt::Display for RDKafkaErrorCode {
493    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
494        let description = match RDKafkaRespErr::try_from(*self as i32) {
495            Ok(err) => {
496                let cstr = unsafe { bindings::rd_kafka_err2str(err) };
497                unsafe { CStr::from_ptr(cstr) }
498                    .to_string_lossy()
499                    .into_owned()
500            }
501            Err(_) => "Unknown error".to_owned(),
502        };
503
504        write!(f, "{:?} ({})", self, description)
505    }
506}
507
508impl Error for RDKafkaErrorCode {}
509
510/// Native rdkafka ApiKeys / protocol requests
511#[derive(Debug, Clone, Copy, PartialEq, Eq, IntoPrimitive)]
512#[repr(i16)]
513#[non_exhaustive]
514pub enum RDKafkaApiKey {
515    Produce = 0,
516    Fetch = 1,
517    ListOffsets = 2,
518    Metadata = 3,
519    LeaderAndIsr = 4,
520    StopReplica = 5,
521    UpdateMetadata = 6,
522    ControlledShutdown = 7,
523    OffsetCommit = 8,
524    OffsetFetch = 9,
525    FindCoordinator = 10,
526    JoinGroup = 11,
527    Heartbeat = 12,
528    LeaveGroup = 13,
529    SyncGroup = 14,
530    DescribeGroups = 15,
531    ListGroups = 16,
532    SaslHandshake = 17,
533    ApiVersion = 18,
534    CreateTopics = 19,
535    DeleteTopics = 20,
536    DeleteRecords = 21,
537    InitProducerId = 22,
538    OffsetForLeaderEpoch = 23,
539    AddPartitionsToTxn = 24,
540    AddOffsetsToTxn = 25,
541    EndTxn = 26,
542    WriteTxnMarkers = 27,
543    TxnOffsetCommit = 28,
544    DescribeAcls = 29,
545    CreateAcls = 30,
546    DeleteAcls = 31,
547    DescribeConfigs = 32,
548    AlterConfigs = 33,
549    AlterReplicaLogDirs = 34,
550    DescribeLogDirs = 35,
551    SaslAuthenticate = 36,
552    CreatePartitions = 37,
553    CreateDelegationToken = 38,
554    RenewDelegationToken = 39,
555    ExpireDelegationToken = 40,
556    DescribeDelegationToken = 41,
557    DeleteGroups = 42,
558    ElectLeaders = 43,
559    IncrementalAlterConfigs = 44,
560    AlterPartitionReassignments = 45,
561    ListPartitionReassignments = 46,
562    OffsetDelete = 47,
563    DescribeClientQuotas = 48,
564    AlterClientQuotas = 49,
565    DescribeUserScramCredentials = 50,
566    AlterUserScramCredentials = 51,
567    Vote = 52,
568    BeginQuorumEpoch = 53,
569    EndQuorumEpoch = 54,
570    DescribeQuorum = 55,
571    AlterIsr = 56,
572    UpdateFeatures = 57,
573    Envelope = 58,
574}
575
576#[cfg(test)]
577mod tests {
578    use super::*;
579
580    #[test]
581    fn test_display_error() {
582        let error: RDKafkaErrorCode = RDKafkaRespErr::RD_KAFKA_RESP_ERR__PARTITION_EOF.into();
583        assert_eq!(
584            "PartitionEOF (Broker: No more messages)",
585            format!("{}", error)
586        );
587        assert_eq!("PartitionEOF", format!("{:?}", error));
588    }
589}