rdkafka2_sys/
types.rs

1use crate::bindings::{
2    self, rd_kafka_conf_res_t, rd_kafka_err2name, rd_kafka_err2str, rd_kafka_resp_err_t,
3    rd_kafka_type_t,
4};
5use std::{convert::TryFrom, error::Error, ffi::CStr, fmt};
6
7/// Unknown partition constant
8pub const RD_KAFKA_PARTITION_UA: i32 = -1;
9
10/// Kafka client types
11#[derive(Debug, Clone, Copy, PartialEq, Eq, ::num_enum::TryFromPrimitive)]
12#[repr(u32)]
13pub enum RDKafkaType {
14    /// Producer client
15    Producer = rd_kafka_type_t::RD_KAFKA_PRODUCER as u32,
16    /// Consumer client
17    Consumer = rd_kafka_type_t::RD_KAFKA_CONSUMER as u32,
18}
19
20impl From<RDKafkaType> for rd_kafka_type_t {
21    fn from(value: RDKafkaType) -> Self {
22        match value {
23            RDKafkaType::Producer => rd_kafka_type_t::RD_KAFKA_PRODUCER,
24            RDKafkaType::Consumer => rd_kafka_type_t::RD_KAFKA_CONSUMER,
25        }
26    }
27}
28
29impl From<rd_kafka_type_t> for RDKafkaType {
30    fn from(value: rd_kafka_type_t) -> Self {
31        match value {
32            rd_kafka_type_t::RD_KAFKA_PRODUCER => Self::Producer,
33            rd_kafka_type_t::RD_KAFKA_CONSUMER => Self::Consumer,
34        }
35    }
36}
37
38/// Native rdkafka error code.
39#[derive(Debug, Clone, Copy, PartialEq, Eq, ::num_enum::TryFromPrimitive)]
40#[repr(i32)]
41#[non_exhaustive]
42pub enum RDKafkaErrorCode {
43    #[doc(hidden)]
44    Begin = -200,
45    /// Received message is incorrect.
46    BadMessage = -199,
47    /// Bad/unknown compression.
48    BadCompression = -198,
49    /// Broker is going away.
50    BrokerDestroy = -197,
51    /// Generic failure.
52    Fail = -196,
53    /// Broker transport failure.
54    BrokerTransportFailure = -195,
55    /// Critical system resource.
56    CriticalSystemResource = -194,
57    /// Failed to resolve broker.
58    Resolve = -193,
59    /// Produced message timed out.
60    MessageTimedOut = -192,
61    /// Reached the end of the topic+partition queue on the broker. Not really an error.
62    PartitionEOF = -191,
63    /// Permanent: Partition does not exist in cluster.
64    UnknownPartition = -190,
65    /// File or filesystem error.
66    FileSystem = -189,
67    /// Permanent: Topic does not exist in cluster.
68    UnknownTopic = -188,
69    /// All broker connections are down.
70    AllBrokersDown = -187,
71    /// Invalid argument, or invalid configuration.
72    InvalidArgument = -186,
73    /// Operation timed out.
74    OperationTimedOut = -185,
75    /// Queue is full.
76    QueueFull = -184,
77    /// ISR count < required.acks.
78    ISRInsufficient = -183,
79    /// Broker node update.
80    NodeUpdate = -182,
81    /// SSL error.
82    SSL = -181,
83    /// Waiting for coordinator to become available.
84    WaitingForCoordinator = -180,
85    /// Unknown client group.
86    UnknownGroup = -179,
87    /// Operation in progress.
88    InProgress = -178,
89    /// Previous operation in progress, wait for it to finish.
90    PreviousInProgress = -177,
91    /// This operation would interfere with an existing subscription.
92    ExistingSubscription = -176,
93    /// Assigned partitions (rebalance_cb).
94    AssignPartitions = -175,
95    /// Revoked partitions (rebalance_cb).
96    RevokePartitions = -174,
97    /// Conflicting use.
98    Conflict = -173,
99    /// Wrong state.
100    State = -172,
101    /// Unknown protocol.
102    UnknownProtocol = -171,
103    /// Not implemented.
104    NotImplemented = -170,
105    /// Authentication failure.
106    Authentication = -169,
107    /// No stored offset.
108    NoOffset = -168,
109    /// Outdated.
110    Outdated = -167,
111    /// Timed out in queue.
112    TimedOutQueue = -166,
113    /// Feature not supported by broker.
114    UnsupportedFeature = -165,
115    /// Awaiting cache update.
116    WaitCache = -164,
117    /// Operation interrupted (e.g., due to yield).
118    Interrupted = -163,
119    /// Key serialization error.
120    KeySerialization = -162,
121    /// Value serialization error.
122    ValueSerialization = -161,
123    /// Key deserialization error.
124    KeyDeserialization = -160,
125    /// Value deserialization error.
126    ValueDeserialization = -159,
127    /// Partial response.
128    Partial = -158,
129    /// Modification attempted on read-only object.
130    ReadOnly = -157,
131    /// No such entry or item not found.
132    NoEnt = -156,
133    /// Read underflow.
134    Underflow = -155,
135    /// Invalid type.
136    InvalidType = -154,
137    /// Retry operation.
138    Retry = -153,
139    /// Purged in queue.
140    PurgeQueue = -152,
141    /// Purged in flight.
142    PurgeInflight = -151,
143    /// Fatal error: see rd_kafka_fatal_error().
144    Fatal = -150,
145    /// Inconsistent state.
146    Inconsistent = -149,
147    /// Gap-less ordering would not be guaranteed if proceeding.
148    GaplessGuarantee = -148,
149    /// Maximum poll interval exceeded.
150    PollExceeded = -147,
151    /// Unknown broker.
152    UnknownBroker = -146,
153    /// Functionality not configured.
154    NotConfigured = -145,
155    /// Instance has been fenced.
156    Fenced = -144,
157    /// Application generated error.
158    Application = -143,
159    /// Assignment lost.
160    AssignmentLost = -142,
161    /// No operation performed.
162    Noop = -141,
163    /// No offset to automatically reset to.
164    AutoOffsetReset = -140,
165    /// Partition log truncation detected
166    LogTruncation = -139,
167    /// A different record in the batch was invalid and this message failed persisting.
168    InvalidDifferentRecord = -138,
169    /// Broker is going away but client isn't terminating
170    DestroyBroker = -137,
171    #[doc(hidden)]
172    End = -100,
173    /// Unknown broker error.
174    Unknown = -1,
175    /// Success.
176    NoError = 0,
177    /// Offset out of range.
178    OffsetOutOfRange = 1,
179    /// Invalid message.
180    InvalidMessage = 2,
181    /// Unknown topic or partition.
182    UnknownTopicOrPartition = 3,
183    /// Invalid message size.
184    InvalidMessageSize = 4,
185    /// Leader not available.
186    LeaderNotAvailable = 5,
187    /// Not leader for partition.
188    NotLeaderForPartition = 6,
189    /// Request timed out.
190    RequestTimedOut = 7,
191    /// Broker not available.
192    BrokerNotAvailable = 8,
193    /// Replica not available.
194    ReplicaNotAvailable = 9,
195    /// Message size too large.
196    MessageSizeTooLarge = 10,
197    /// Stale controller epoch code.
198    StaleControllerEpoch = 11,
199    /// Offset metadata string too large.
200    OffsetMetadataTooLarge = 12,
201    /// Broker disconnected before response received.
202    NetworkException = 13,
203    /// Coordinator load in progress.
204    CoordinatorLoadInProgress = 14,
205    /// Coordinator not available.
206    CoordinatorNotAvailable = 15,
207    /// Not coordinator.
208    NotCoordinator = 16,
209    /// Invalid topic.
210    InvalidTopic = 17,
211    /// Message batch larger than configured server segment size.
212    MessageBatchTooLarge = 18,
213    /// Not enough in-sync replicas.
214    NotEnoughReplicas = 19,
215    /// Message(s) written to insufficient number of in-sync replicas.
216    NotEnoughReplicasAfterAppend = 20,
217    /// Invalid required acks value.
218    InvalidRequiredAcks = 21,
219    /// Specified group generation id is not valid.
220    IllegalGeneration = 22,
221    /// Inconsistent group protocol.
222    InconsistentGroupProtocol = 23,
223    /// Invalid group.id.
224    InvalidGroupId = 24,
225    /// Unknown member.
226    UnknownMemberId = 25,
227    /// Invalid session timeout.
228    InvalidSessionTimeout = 26,
229    /// Group rebalance in progress.
230    RebalanceInProgress = 27,
231    /// Commit offset data size is not valid.
232    InvalidCommitOffsetSize = 28,
233    /// Topic authorization failed.
234    TopicAuthorizationFailed = 29,
235    /// Group authorization failed.
236    GroupAuthorizationFailed = 30,
237    /// Cluster authorization failed.
238    ClusterAuthorizationFailed = 31,
239    /// Invalid timestamp.
240    InvalidTimestamp = 32,
241    /// Unsupported SASL mechanism.
242    UnsupportedSASLMechanism = 33,
243    /// Illegal SASL state.
244    IllegalSASLState = 34,
245    /// Unsupported version.
246    UnsupportedVersion = 35,
247    /// Topic already exists.
248    TopicAlreadyExists = 36,
249    /// Invalid number of partitions.
250    InvalidPartitions = 37,
251    /// Invalid replication factor.
252    InvalidReplicationFactor = 38,
253    /// Invalid replica assignment.
254    InvalidReplicaAssignment = 39,
255    /// Invalid config.
256    InvalidConfig = 40,
257    /// Not controller for cluster.
258    NotController = 41,
259    /// Invalid request.
260    InvalidRequest = 42,
261    /// Message format on broker does not support request.
262    UnsupportedForMessageFormat = 43,
263    /// Policy violation.
264    PolicyViolation = 44,
265    /// Broker received an out of order sequence number.
266    OutOfOrderSequenceNumber = 45,
267    /// Broker received a duplicate sequence number.
268    DuplicateSequenceNumber = 46,
269    /// Producer attempted an operation with an old epoch.
270    InvalidProducerEpoch = 47,
271    /// Producer attempted a transactional operation in an invalid state.
272    InvalidTransactionalState = 48,
273    /// Producer attempted to use a producer id which is currently assigned to
274    /// its transactional id.
275    InvalidProducerIdMapping = 49,
276    /// Transaction timeout is larger than the maxi value allowed by the
277    /// broker's max.transaction.timeout.ms.
278    InvalidTransactionTimeout = 50,
279    /// Producer attempted to update a transaction while another concurrent
280    /// operation on the same transaction was ongoing.
281    ConcurrentTransactions = 51,
282    /// Indicates that the transaction coordinator sending a WriteTxnMarker is
283    /// no longer the current coordinator for a given producer.
284    TransactionCoordinatorFenced = 52,
285    /// Transactional Id authorization failed.
286    TransactionalIdAuthorizationFailed = 53,
287    /// Security features are disabled.
288    SecurityDisabled = 54,
289    /// Operation not attempted.
290    OperationNotAttempted = 55,
291    /// Disk error when trying to access log file on the disk.
292    KafkaStorageError = 56,
293    /// The user-specified log directory is not found in the broker config.
294    LogDirNotFound = 57,
295    /// SASL Authentication failed.
296    SaslAuthenticationFailed = 58,
297    /// Unknown Producer Id.
298    UnknownProducerId = 59,
299    /// Partition reassignment is in progress.
300    ReassignmentInProgress = 60,
301    /// Delegation Token feature is not enabled.
302    DelegationTokenAuthDisabled = 61,
303    /// Delegation Token is not found on server.
304    DelegationTokenNotFound = 62,
305    /// Specified Principal is not valid Owner/Renewer.
306    DelegationTokenOwnerMismatch = 63,
307    /// Delegation Token requests are not allowed on this connection.
308    DelegationTokenRequestNotAllowed = 64,
309    /// Delegation Token authorization failed.
310    DelegationTokenAuthorizationFailed = 65,
311    /// Delegation Token is expired.
312    DelegationTokenExpired = 66,
313    /// Supplied principalType is not supported.
314    InvalidPrincipalType = 67,
315    /// The group is not empty.
316    NonEmptyGroup = 68,
317    /// The group id does not exist.
318    GroupIdNotFound = 69,
319    /// The fetch session ID was not found.
320    FetchSessionIdNotFound = 70,
321    /// The fetch session epoch is invalid.
322    InvalidFetchSessionEpoch = 71,
323    /// No matching listener.
324    ListenerNotFound = 72,
325    /// Topic deletion is disabled.
326    TopicDeletionDisabled = 73,
327    /// Leader epoch is older than broker epoch.
328    FencedLeaderEpoch = 74,
329    /// Leader epoch is newer than broker epoch.
330    UnknownLeaderEpoch = 75,
331    /// Unsupported compression type.
332    UnsupportedCompressionType = 76,
333    /// Broker epoch has changed.
334    StaleBrokerEpoch = 77,
335    /// Leader high watermark is not caught up.
336    OffsetNotAvailable = 78,
337    /// Group member needs a valid member ID.
338    MemberIdRequired = 79,
339    /// Preferred leader was not available.
340    PreferredLeaderNotAvailable = 80,
341    /// Consumer group has reached maximum size.
342    GroupMaxSizeReached = 81,
343    /// Static consumer fenced by other consumer with same group.instance.id.
344    FencedInstanceId = 82,
345    /// Eligible partition leaders are not available.
346    EligibleLeadersNotAvailable = 83,
347    /// Leader election not needed for topic partition.
348    ElectionNotNeeded = 84,
349    /// No partition reassignment is in progress.
350    NoReassignmentInProgress = 85,
351    /// Deleting offsets of a topic while the consumer group is subscribed to
352    /// it.
353    GroupSubscribedToTopic = 86,
354    /// Broker failed to validate record.
355    InvalidRecord = 87,
356    /// There are unstable offsets that need to be cleared.
357    UnstableOffsetCommit = 88,
358    /// Throttling quota has been exceeded.
359    ThrottlingQuotaExceeded = 89,
360    /// There is a newer producer with the same transactional ID which fences
361    /// the current one.
362    ProducerFenced = 90,
363    /// Request illegally referred to resource that does not exist.
364    ResourceNotFound = 91,
365    /// Request illegally referred to the same resource twice.
366    DuplicateResource = 92,
367    /// Requested credential would not meet criteria for acceptability.
368    UnacceptableCredential = 93,
369    /// Either the sender or recipient of a voter-only request is not one of the
370    /// expected voters.
371    InconsistentVoterSet = 94,
372    /// Invalid update version.
373    InvalidUpdateVersion = 95,
374    /// Unable to update finalized features due to server error.
375    FeatureUpdateFailed = 96,
376    /// Request principal deserialization failed during forwarding.
377    PrincipalDeserializationFailure = 97,
378    /// Unknown topic id
379    UnknownTopicId = 100,
380    /// The member epoch is fenced by the group coordinator
381    FencedMemberEpoch = 110,
382    /// The instance ID is still used by another member in the consumer group
383    UnreleasedInstanceId = 111,
384    /// The assignor or its version range is not supported by the consumer group
385    UnsupportedAssignor = 112,
386    /// The member epoch is stale
387    StaleMemberEpoch = 113,
388    /// Client sent a push telemetry request with an invalid or outdated subscription ID.
389    UnknownSubscriptionId = 117,
390    /// Client sent a push telemetry request larger than the maximum size the broker will accept.
391    TelemetryTooLarge = 118,
392    /// Client metadata is stale,
393    /// client should rebootstrap to obtain new metadata.
394    RebootstrapRequired = 129,
395    #[doc(hidden)]
396    EndAll = 130,
397}
398
399impl RDKafkaErrorCode {
400    /// Returns native err name only (no description)
401    pub fn name(&self) -> String {
402        let cstr = unsafe { rd_kafka_err2name((*self).into()) };
403        unsafe { CStr::from_ptr(cstr) }
404            .to_string_lossy()
405            .into_owned()
406    }
407
408    /// Returns an error if any
409    pub fn error(self) -> Option<Self> {
410        (!matches!(self, RDKafkaErrorCode::NoError)).then_some(self)
411    }
412}
413
414impl From<RDKafkaErrorCode> for rd_kafka_resp_err_t {
415    fn from(err: RDKafkaErrorCode) -> Self {
416        // UNWRAP: seemless conversion
417        Self::try_from(err as i32).unwrap()
418    }
419}
420
421impl From<rd_kafka_resp_err_t> for RDKafkaErrorCode {
422    fn from(err: rd_kafka_resp_err_t) -> RDKafkaErrorCode {
423        // UNWRAP: seemless conversion
424        Self::try_from(err as i32).unwrap()
425    }
426}
427
428impl fmt::Display for RDKafkaErrorCode {
429    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
430        let cstr = unsafe { rd_kafka_err2str((*self).into()) };
431        let description = unsafe { CStr::from_ptr(cstr) }
432            .to_string_lossy()
433            .into_owned();
434
435        write!(f, "{:?} ({})", self, description)
436    }
437}
438
439impl Error for RDKafkaErrorCode {}
440
441/// Errors for rdkafka configuration
442#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, ::num_enum::TryFromPrimitive)]
443#[repr(i32)]
444pub enum RDKafkaConfErrorCode {
445    /// Unknown configuration name.
446    UnknownKey = -2,
447    /// Invalid configuration value or
448    /// property or value not supported in
449    /// this build.
450    InvalidValue = -1,
451    /// Ok variant
452    Ok = 0,
453}
454
455impl RDKafkaConfErrorCode {
456    /// Returns an error if any
457    pub fn error(&self) -> Option<&Self> {
458        (!matches!(self, RDKafkaConfErrorCode::Ok)).then_some(self)
459    }
460}
461
462impl From<rd_kafka_conf_res_t> for RDKafkaConfErrorCode {
463    fn from(err: rd_kafka_conf_res_t) -> RDKafkaConfErrorCode {
464        // UNWRAP: seemless conversion
465        Self::try_from(err as i32).unwrap()
466    }
467}
468
469impl fmt::Display for RDKafkaConfErrorCode {
470    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
471        write!(
472            f,
473            "{}",
474            match self {
475                RDKafkaConfErrorCode::UnknownKey => "Unknown configuration key name",
476                RDKafkaConfErrorCode::InvalidValue =>
477                    "Invalid configuration value or not supported",
478                RDKafkaConfErrorCode::Ok => "Ok",
479            }
480        )
481    }
482}
483
484impl Error for RDKafkaConfErrorCode {}
485
486/// Events API event tags
487#[derive(Debug, Clone, Copy, PartialEq, Eq, ::num_enum::TryFromPrimitive)]
488#[repr(i32)]
489#[non_exhaustive]
490pub enum RDKafkaEventType {
491    None = bindings::RD_KAFKA_EVENT_NONE,
492    Dr = bindings::RD_KAFKA_EVENT_DR,
493    Fetch = bindings::RD_KAFKA_EVENT_FETCH,
494    Log = bindings::RD_KAFKA_EVENT_LOG,
495    Error = bindings::RD_KAFKA_EVENT_ERROR,
496    Rebalance = bindings::RD_KAFKA_EVENT_REBALANCE,
497    //
498    OffsetCommit = bindings::RD_KAFKA_EVENT_OFFSET_COMMIT,
499    Stats = bindings::RD_KAFKA_EVENT_STATS,
500    //
501    CreateTopicsResult = bindings::RD_KAFKA_EVENT_CREATETOPICS_RESULT,
502    DeleteTopicsResult = bindings::RD_KAFKA_EVENT_DELETETOPICS_RESULT,
503    CreatePartitionsResult = bindings::RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT,
504    AlterConfigsResult = bindings::RD_KAFKA_EVENT_ALTERCONFIGS_RESULT,
505    DescribeConfigsResult = bindings::RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT,
506    DeleteRecordsResult = bindings::RD_KAFKA_EVENT_DELETERECORDS_RESULT,
507    DeleteGroupsResult = bindings::RD_KAFKA_EVENT_DELETEGROUPS_RESULT,
508    DeleteConsumerGroupOffsetsResult = bindings::RD_KAFKA_EVENT_DELETECONSUMERGROUPOFFSETS_RESULT,
509    //
510    OauthbearerTokenRefresh = bindings::RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH,
511    Background = 0x200,
512    //
513    CreateAclsResult = bindings::RD_KAFKA_EVENT_CREATEACLS_RESULT,
514    DescribeAclsResult = bindings::RD_KAFKA_EVENT_DESCRIBEACLS_RESULT,
515    DeleteAclsResult = bindings::RD_KAFKA_EVENT_DELETEACLS_RESULT,
516    ListConsumerGroupsResult = bindings::RD_KAFKA_EVENT_LISTCONSUMERGROUPS_RESULT,
517    DescribeConsumerGroupsResult = bindings::RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT,
518    ListConsumerGroupOffsetsResult = bindings::RD_KAFKA_EVENT_LISTCONSUMERGROUPOFFSETS_RESULT,
519    AlterConsumerGroupOffsetsResult = bindings::RD_KAFKA_EVENT_ALTERCONSUMERGROUPOFFSETS_RESULT,
520    IncrementalAlterConfigsResult = bindings::RD_KAFKA_EVENT_INCREMENTALALTERCONFIGS_RESULT,
521    //
522    DescribeUserScramCredentialsResult =
523        bindings::RD_KAFKA_EVENT_DESCRIBEUSERSCRAMCREDENTIALS_RESULT,
524    AlterUserScramCredentialsResult = bindings::RD_KAFKA_EVENT_ALTERUSERSCRAMCREDENTIALS_RESULT,
525    //
526    DescribeTopicsResult = bindings::RD_KAFKA_EVENT_DESCRIBETOPICS_RESULT,
527    DescribeClusterResult = bindings::RD_KAFKA_EVENT_DESCRIBECLUSTER_RESULT,
528    ListOffsetsResult = bindings::RD_KAFKA_EVENT_LISTOFFSETS_RESULT,
529    ElectLeadersResult = 0x800000,
530}