Skip to main content

tansu_sans_io/
lib.rs

1// Copyright ⓒ 2024-2026 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15//! A Kafka protocol implementation that performs no I/O (it operates only on bytes)
16//!
17//! ## Design
18//!
19//! Apache Kafka defines each API message with a JSON message descriptor. Each descriptor
20//! contains a list of fields together with their associated type. Each field can
21//! include a range of versions for which it is valid, its encoding and whether it
22//! includes tagged fields. Further background on the protocol and implementation
23//! used are in the
24//! [Apache Kafka protocol with serde, quote, syn and proc_macro2](https://blog.tansu.io/articles/serde-kafka-protocol)
25//! article.
26//!
27//! Some useful starting points:
28//!
29//! - **Data Structures** - [`Frame`], [`Request`], [`Response`], [`Header`] and [`Body`].
30//! - **Producing or fetching messages** - [`record`], [`ProduceRequest`] and [`FetchRequest`]
31//!
32//! ## Examples
33//!
34//! Encoding a [`CreateTopicsRequest`] request:
35//!
36//! ```
37//! # use tansu_sans_io::Error;
38//! # fn main() -> Result<(), Error> {
39//! use tansu_sans_io::{
40//!     ApiKey as _, CreateTopicsRequest, Frame, Header,
41//!     create_topics_request::{CreatableTopic, CreatableTopicConfig},
42//! };
43//!
44//! let header = Header::Request {
45//!     api_key: CreateTopicsRequest::KEY,
46//!     api_version: 7,
47//!     correlation_id: 298,
48//!     client_id: Some("adminclient-1".into()),
49//! };
50//!
51//! let body = CreateTopicsRequest::default()
52//!     .topics(Some(
53//!         [CreatableTopic::default()
54//!             .name("balances".into())
55//!             .num_partitions(-1)
56//!             .replication_factor(-1)
57//!             .assignments(Some([].into()))
58//!             .configs(Some(
59//!                 [CreatableTopicConfig::default()
60//!                     .name("cleanup.policy".into())
61//!                     .value(Some("compact".into()))]
62//!                 .into(),
63//!             ))]
64//!         .into(),
65//!     ))
66//!     .timeout_ms(30_000)
67//!     .validate_only(Some(false))
68//!     .into();
69//!
70//! let encoded = Frame::request(header, body)?;
71//! # Ok(())
72//! # }
73//! ```
74//!
75//! Decoding a [`FindCoordinatorRequest`]:
76//!
77//! ```
78//! # use tansu_sans_io::Error;
79//! # fn main() -> Result<(), Error> {
80//! use tansu_sans_io::{ApiKey as _, FindCoordinatorRequest, Frame, Header};
81//!
82//! let encoded = vec![
83//!     0, 0, 0, 50, 0, 10, 0, 4, 0, 0, 0, 0, 0, 16, 99, 111, 110, 115, 111, 108, 101, 45, 99, 111,
84//!     110, 115, 117, 109, 101, 114, 0, 0, 2, 20, 116, 101, 115, 116, 45, 99, 111, 110, 115, 117,
85//!     109, 101, 114, 45, 103, 114, 111, 117, 112, 0,
86//! ];
87//!
88//! assert_eq!(
89//!     Frame {
90//!         size: 50,
91//!         header: Header::Request {
92//!             api_key: FindCoordinatorRequest::KEY,
93//!             api_version: 4,
94//!             correlation_id: 0,
95//!             client_id: Some("console-consumer".into())
96//!         },
97//!         body: FindCoordinatorRequest::default()
98//!             .key(None)
99//!             .key_type(Some(0))
100//!             .coordinator_keys(Some(["test-consumer-group".into()].into()))
101//!             .into()
102//!     },
103//!     Frame::request_from_bytes(&encoded[..])?
104//! );
105//! # Ok(())
106//! # }
107//! ```
108//!
109//! This crate includes a build time proc macro that generates simple Rust structures
110//! containing all the fields present in the the Kafka message descriptor. Each generated
111//! type implements [`serde::Serialize`] and [`serde::Deserialize`] traits. As part of
112//! the generation phase [`MESSAGE_META`] is created, which is used by the actual message serializers.
113//!
114//! The Kafka protocol is implemented by [`ser::Encoder`] and [`de::Decoder`],
115//! using [`MESSAGE_META`] to determine which fields are present, their serialization type
116//! and whether any tagged fields can be present for a particular message version. Serializers
117//! map from the [Serde Data Model](https://serde.rs/data-model.html) to the Kafka protocol or vice versa.
118
119pub mod acl;
120pub mod de;
121pub mod primitive;
122pub mod record;
123pub mod resource;
124pub mod ser;
125
126use bytes::{Buf, BufMut, Bytes, BytesMut, TryGetError};
127pub use de::Decoder;
128use flate2::read::GzDecoder;
129use primitive::tagged::TagBuffer;
130use record::deflated::Frame as RecordBatch;
131pub use ser::Encoder;
132use serde::{Deserialize, Serialize};
133use std::{
134    array::TryFromSliceError,
135    collections::HashMap,
136    env::VarError,
137    fmt::{self, Display, Formatter},
138    io::{self, BufRead, Cursor, Read, Write},
139    num,
140    process::{ExitCode, Termination},
141    str::{self, FromStr},
142    string,
143    sync::{Arc, OnceLock},
144    time::{Duration, SystemTime, SystemTimeError},
145};
146use tansu_model::{MessageKind, MessageMeta};
147use tracing::{debug, error, instrument, warn};
148use tracing_subscriber::filter::ParseError;
149
150/// The null topic identifier.
151pub const NULL_TOPIC_ID: [u8; 16] = [0; 16];
152
153#[derive(Debug)]
154pub struct RootMessageMeta {
155    pub(crate) requests: HashMap<i16, &'static MessageMeta>,
156    pub(crate) responses: HashMap<i16, &'static MessageMeta>,
157}
158
159impl RootMessageMeta {
160    fn new() -> Self {
161        let (requests, responses) = MESSAGE_META.iter().fold(
162            (HashMap::new(), HashMap::new()),
163            |(mut requests, mut responses), (_, meta)| {
164                match meta.message_kind {
165                    MessageKind::Request => {
166                        _ = requests.insert(meta.api_key, *meta);
167                    }
168
169                    MessageKind::Response => {
170                        _ = responses.insert(meta.api_key, *meta);
171                    }
172                }
173
174                (requests, responses)
175            },
176        );
177
178        Self {
179            requests,
180            responses,
181        }
182    }
183
184    pub fn messages() -> &'static RootMessageMeta {
185        static MAPPING: OnceLock<RootMessageMeta> = OnceLock::new();
186        MAPPING.get_or_init(RootMessageMeta::new)
187    }
188
189    #[must_use]
190    pub const fn requests(&self) -> &HashMap<i16, &'static MessageMeta> {
191        &self.requests
192    }
193
194    #[must_use]
195    pub const fn responses(&self) -> &HashMap<i16, &'static MessageMeta> {
196        &self.responses
197    }
198}
199
200pub trait ApiKey {
201    const KEY: i16;
202}
203
204pub trait ApiName {
205    const NAME: &'static str;
206}
207
208/// All Kafka API requests implement this trait
209pub trait Request:
210    ApiKey + ApiName + fmt::Debug + Default + Into<Body> + Send + Sync + TryFrom<Body> + 'static
211{
212    type Response: Response;
213}
214
215/// All Kafka API responses implement this trait
216pub trait Response:
217    ApiKey + ApiName + fmt::Debug + Default + Into<Body> + Send + Sync + TryFrom<Body> + 'static
218{
219    type Request: Request;
220}
221
222#[derive(Clone, Debug, thiserror::Error)]
223pub enum Error {
224    ApiError(ErrorCode),
225    EnvVar(VarError),
226    FromUtf8(string::FromUtf8Error),
227    InvalidAckValue(i16),
228    InvalidCoordinatorType(i8),
229    InvalidIsolationLevel(i8),
230    InvalidOpType(i8),
231    InvalidScramMechanism(i8),
232    Io(Arc<io::Error>),
233    Message(String),
234    MessageMaxSizeExceeded(usize),
235    NoSuchField(&'static str),
236    NoSuchMessage(&'static str),
237    NoSuchRequest(i16),
238    NotAuthenticated,
239    Overflow,
240    ParseFilter(Arc<ParseError>),
241    ParseScram(String),
242    ResponseFrame,
243    Snap(#[from] snap::Error),
244    StringWithoutApiVersion,
245    StringWithoutLength,
246    SystemTime(SystemTimeError),
247    TansuModel(tansu_model::Error),
248    TryFromInt(#[from] num::TryFromIntError),
249    TryFromSlice(#[from] TryFromSliceError),
250    TryGet(Arc<TryGetError>),
251    UnexpectedType(String),
252    UnknownApiErrorCode(i16),
253    UnknownCompressionType(i16),
254    UnknownScramMechanism(i8),
255    UnknownContainer,
256    Utf8(str::Utf8Error),
257}
258
259pub type Result<T, E = Error> = std::result::Result<T, E>;
260
261impl Display for Error {
262    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
263        match self {
264            Error::Message(e) => f.write_str(e),
265            e => write!(f, "{e:?}"),
266        }
267    }
268}
269
270impl serde::ser::Error for Error {
271    fn custom<T: Display>(msg: T) -> Self {
272        Error::Message(msg.to_string())
273    }
274}
275
276impl serde::de::Error for Error {
277    fn custom<T: Display>(msg: T) -> Self {
278        Error::Message(msg.to_string())
279    }
280}
281
282impl From<io::Error> for Error {
283    fn from(value: io::Error) -> Self {
284        Self::Io(Arc::new(value))
285    }
286}
287
288impl From<TryGetError> for Error {
289    fn from(value: TryGetError) -> Self {
290        Self::TryGet(Arc::new(value))
291    }
292}
293
294impl From<ParseError> for Error {
295    fn from(value: ParseError) -> Self {
296        Self::ParseFilter(Arc::new(value))
297    }
298}
299
300impl From<str::Utf8Error> for Error {
301    fn from(value: str::Utf8Error) -> Self {
302        Self::Utf8(value)
303    }
304}
305
306impl From<string::FromUtf8Error> for Error {
307    fn from(value: string::FromUtf8Error) -> Self {
308        Self::FromUtf8(value)
309    }
310}
311
312impl From<tansu_model::Error> for Error {
313    fn from(value: tansu_model::Error) -> Self {
314        Self::TansuModel(value)
315    }
316}
317
318impl From<VarError> for Error {
319    fn from(value: VarError) -> Self {
320        Self::EnvVar(value)
321    }
322}
323
324impl From<SystemTimeError> for Error {
325    fn from(value: SystemTimeError) -> Self {
326        Self::SystemTime(value)
327    }
328}
329
330/// A Kafka API frame prefixed with its length, followed by a header and the message body.
331///
332/// # Examples
333///
334/// ## Encoding
335///
336/// Encoding a [`CreateTopicsRequest`] request:
337///
338/// ```
339/// use tansu_sans_io::{
340///     ApiKey as _, CreateTopicsRequest, Frame, Header,
341///     create_topics_request::{CreatableTopic, CreatableTopicConfig},
342/// };
343///
344/// let header = Header::Request {
345///     api_key: CreateTopicsRequest::KEY,
346///     api_version: 7,
347///     correlation_id: 298,
348///     client_id: Some("adminclient-1".into()),
349/// };
350///
351/// let body = CreateTopicsRequest::default()
352///     .topics(Some(
353///         [CreatableTopic::default()
354///             .name("balances".into())
355///             .num_partitions(-1)
356///             .replication_factor(-1)
357///             .assignments(Some([].into()))
358///             .configs(Some(
359///                 [CreatableTopicConfig::default()
360///                     .name("cleanup.policy".into())
361///                     .value(Some("compact".into()))]
362///                 .into(),
363///             ))]
364///         .into(),
365///     ))
366///     .timeout_ms(30_000)
367///     .validate_only(Some(false))
368///     .into();
369///
370/// let encoded = Frame::request(header, body).unwrap();
371/// ```
372///
373/// ## Decoding
374///
375/// Decoding a [`FindCoordinatorRequest`]:
376///
377/// ```
378/// use tansu_sans_io::{ApiKey as _, FindCoordinatorRequest, Frame, Header};
379///
380/// let encoded = vec![
381///     0, 0, 0, 50, 0, 10, 0, 4, 0, 0, 0, 0, 0, 16, 99, 111, 110, 115, 111, 108, 101, 45, 99, 111,
382///     110, 115, 117, 109, 101, 114, 0, 0, 2, 20, 116, 101, 115, 116, 45, 99, 111, 110, 115, 117,
383///     109, 101, 114, 45, 103, 114, 111, 117, 112, 0,
384/// ];
385///
386/// assert_eq!(
387///     Frame {
388///         size: 50,
389///         header: Header::Request {
390///             api_key: FindCoordinatorRequest::KEY,
391///             api_version: 4,
392///             correlation_id: 0,
393///             client_id: Some("console-consumer".into())
394///         },
395///         body: FindCoordinatorRequest::default()
396///             .key(None)
397///             .key_type(Some(0))
398///             .coordinator_keys(Some(["test-consumer-group".into()].into()))
399///             .into()
400///     },
401///     Frame::request_from_bytes(&encoded[..]).unwrap()
402/// );
403/// ```
404#[derive(Clone, Debug, Deserialize, PartialEq, PartialOrd, Serialize)]
405pub struct Frame {
406    /// The size of this frame.
407    pub size: i32,
408
409    /// The frame header.
410    pub header: Header,
411
412    /// The frame body.
413    pub body: Body,
414}
415
416impl Frame {
417    fn elapsed_millis(start: SystemTime) -> u64 {
418        start
419            .elapsed()
420            .map_or(0, |duration| duration.as_millis() as u64)
421    }
422
423    /// serialize an API request into a frame of bytes
424    #[instrument(skip_all)]
425    pub fn request(header: Header, body: Body) -> Result<Bytes> {
426        let start = SystemTime::now();
427
428        let mut c = Cursor::new(vec![]);
429
430        let mut serializer = Encoder::request(&mut c);
431
432        let frame = Frame {
433            size: 0,
434            header,
435            body,
436        };
437
438        frame.serialize(&mut serializer)?;
439        let size = i32::try_from(c.position()).map(|position| position - 4)?;
440
441        c.set_position(0);
442        let buf = size.to_be_bytes();
443        c.write_all(&buf)?;
444
445        Ok(Bytes::from(c.into_inner())).inspect(|encoded| {
446            debug!(
447                len = encoded.len(),
448                elapsed_millis = Self::elapsed_millis(start)
449            )
450        })
451    }
452
453    /// deserialize bytes into an API request frame
454    #[instrument(skip_all)]
455    pub fn request_from_bytes(encoded: impl Buf) -> Result<Frame> {
456        let start = SystemTime::now();
457
458        let mut reader = encoded.reader();
459        let mut deserializer = Decoder::request(&mut reader);
460        Frame::deserialize(&mut deserializer)
461            .inspect(|_frame| debug!(elapsed_millis = Self::elapsed_millis(start)))
462    }
463
464    /// serialize an API response into a frame of bytes
465    #[instrument(skip_all)]
466    pub fn response(header: Header, body: Body, api_key: i16, api_version: i16) -> Result<Bytes> {
467        let start = SystemTime::now();
468
469        let mut c = Cursor::new(vec![]);
470        let mut serializer = Encoder::response(&mut c, api_key, api_version);
471
472        let frame = Frame {
473            size: 0,
474            header,
475            body,
476        };
477
478        frame.serialize(&mut serializer)?;
479        let size = i32::try_from(c.position())
480            .map(|position| position - 4)
481            .inspect_err(|err| {
482                let position = c.position();
483                warn!(?err, ?position, ?frame);
484            })?;
485
486        c.set_position(0);
487        let buf = size.to_be_bytes();
488        c.write_all(&buf)?;
489
490        Ok(Bytes::from(c.into_inner())).inspect(|encoded| {
491            debug!(
492                len = encoded.len(),
493                elapsed_millis = Self::elapsed_millis(start)
494            )
495        })
496    }
497
498    /// deserialize bytes into an API response frame
499    #[instrument(skip_all)]
500    pub fn response_from_bytes(bytes: impl Buf, api_key: i16, api_version: i16) -> Result<Frame> {
501        let start = SystemTime::now();
502
503        let mut reader = bytes.reader();
504        let mut deserializer = Decoder::response(&mut reader, api_key, api_version);
505        Frame::deserialize(&mut deserializer)
506            .inspect(|encoded| debug!(elapsed_millis = Self::elapsed_millis(start)))
507    }
508
509    /// API request key
510    pub fn api_key(&self) -> Result<i16> {
511        if let Header::Request { api_key, .. } = self.header {
512            Ok(api_key)
513        } else {
514            Err(Error::ResponseFrame)
515        }
516    }
517
518    /// API name
519    pub fn api_name(&self) -> &str {
520        self.body.api_name()
521    }
522
523    /// API request version
524    pub fn api_version(&self) -> Result<i16> {
525        if let Header::Request { api_version, .. } = self.header {
526            Ok(api_version)
527        } else {
528            Err(Error::ResponseFrame)
529        }
530    }
531
532    /// API request/response correlation ID
533    pub fn correlation_id(&self) -> Result<i32> {
534        match self.header {
535            Header::Request { correlation_id, .. } | Header::Response { correlation_id } => {
536                Ok(correlation_id)
537            }
538        }
539    }
540
541    /// API request client ID
542    pub fn client_id(&self) -> Result<Option<&str>> {
543        if let Header::Request { ref client_id, .. } = self.header {
544            Ok(client_id.as_deref())
545        } else {
546            Err(Error::ResponseFrame)
547        }
548    }
549}
550
551/// A Kafka API request or response header.
552#[derive(Clone, Debug, Deserialize, PartialEq, PartialOrd, Serialize)]
553#[serde(try_from = "HeaderMezzanine")]
554#[serde(into = "HeaderMezzanine")]
555pub enum Header {
556    /// An API request header.
557    Request {
558        /// The API key being used for this request.
559        api_key: i16,
560
561        /// The API version being used for this request.
562        api_version: i16,
563
564        /// The correlation ID that should be used by the response to this request.
565        correlation_id: i32,
566
567        /// An optional client ID.
568        client_id: Option<String>,
569    },
570
571    /// An API Response header.
572    Response {
573        /// The correlation ID for the corresponding request.
574        correlation_id: i32,
575    },
576}
577
578#[derive(Clone, Debug, Deserialize, PartialEq, PartialOrd, Serialize)]
579pub(crate) enum HeaderMezzanine {
580    Request {
581        api_key: i16,
582        api_version: i16,
583        correlation_id: i32,
584        client_id: Option<String>,
585        tag_buffer: Option<TagBuffer>,
586    },
587    Response {
588        correlation_id: i32,
589        tag_buffer: Option<TagBuffer>,
590    },
591}
592
593impl TryFrom<HeaderMezzanine> for Header {
594    type Error = Error;
595
596    fn try_from(value: HeaderMezzanine) -> Result<Self, Self::Error> {
597        debug!(?value);
598
599        match value {
600            HeaderMezzanine::Request {
601                api_key,
602                api_version,
603                correlation_id,
604                client_id,
605                ..
606            } => Ok(Self::Request {
607                api_key,
608                api_version,
609                correlation_id,
610                client_id,
611            }),
612
613            HeaderMezzanine::Response { correlation_id, .. } => {
614                Ok(Self::Response { correlation_id })
615            }
616        }
617    }
618}
619
620impl From<Header> for HeaderMezzanine {
621    fn from(value: Header) -> Self {
622        debug!("value: {value:?}");
623
624        match value {
625            Header::Request {
626                api_key,
627                api_version,
628                correlation_id,
629                client_id,
630            } => HeaderMezzanine::Request {
631                api_key,
632                api_version,
633                correlation_id,
634                client_id,
635                tag_buffer: Some(TagBuffer([].into())),
636            },
637
638            Header::Response { correlation_id } => HeaderMezzanine::Response {
639                correlation_id,
640                tag_buffer: Some(TagBuffer([].into())),
641            },
642        }
643    }
644}
645
646impl Termination for ErrorCode {
647    fn report(self) -> ExitCode {
648        if let Self::None = self {
649            ExitCode::SUCCESS
650        } else {
651            ExitCode::FAILURE
652        }
653    }
654}
655
656impl TryFrom<i16> for ErrorCode {
657    type Error = Error;
658
659    fn try_from(value: i16) -> Result<Self, Self::Error> {
660        Self::try_from(&value)
661    }
662}
663
664impl TryFrom<&i16> for ErrorCode {
665    type Error = Error;
666
667    #[allow(clippy::too_many_lines)]
668    fn try_from(value: &i16) -> Result<Self, Self::Error> {
669        match value {
670            -1 => Ok(Self::UnknownServerError),
671            0 => Ok(Self::None),
672            1 => Ok(Self::OffsetOutOfRange),
673            2 => Ok(Self::CorruptMessage),
674            3 => Ok(Self::UnknownTopicOrPartition),
675            4 => Ok(Self::InvalidFetchSize),
676            5 => Ok(Self::LeaderNotAvailable),
677            6 => Ok(Self::NotLeaderOrFollower),
678            7 => Ok(Self::RequestTimedOut),
679            8 => Ok(Self::BrokerNotAvailable),
680            9 => Ok(Self::ReplicaNotAvailable),
681            10 => Ok(Self::MessageTooLarge),
682            11 => Ok(Self::StaleControllerEpoch),
683            12 => Ok(Self::OffsetMetadataTooLarge),
684            13 => Ok(Self::NetworkException),
685            14 => Ok(Self::CoordinatorLoadInProgress),
686            15 => Ok(Self::CoordinatorNotAvailable),
687            16 => Ok(Self::NotCoordinator),
688            17 => Ok(Self::InvalidTopicException),
689            18 => Ok(Self::RecordListTooLarge),
690            19 => Ok(Self::NotEnoughReplicas),
691            20 => Ok(Self::NotEnoughReplicasAfterAppend),
692            21 => Ok(Self::InvalidRequiredAcks),
693            22 => Ok(Self::IllegalGeneration),
694            23 => Ok(Self::InconsistentGroupProtocol),
695            24 => Ok(Self::InvalidGroupId),
696            25 => Ok(Self::UnknownMemberId),
697            26 => Ok(Self::InvalidSessionTimeout),
698            27 => Ok(Self::RebalanceInProgress),
699            28 => Ok(Self::InvalidCommitOffsetSize),
700            29 => Ok(Self::TopicAuthorizationFailed),
701            30 => Ok(Self::GroupAuthorizationFailed),
702            31 => Ok(Self::ClusterAuthorizationFailed),
703            32 => Ok(Self::InvalidTimestamp),
704            33 => Ok(Self::UnsupportedSaslMechanism),
705            34 => Ok(Self::IllegalSaslState),
706            35 => Ok(Self::UnsupportedVersion),
707            36 => Ok(Self::TopicAlreadyExists),
708            37 => Ok(Self::InvalidPartitions),
709            38 => Ok(Self::InvalidReplicationFactor),
710            39 => Ok(Self::InvalidReplicaAssignment),
711            40 => Ok(Self::InvalidConfig),
712            41 => Ok(Self::NotController),
713            42 => Ok(Self::InvalidRequest),
714            43 => Ok(Self::UnsupportedForMessageFormat),
715            44 => Ok(Self::PolicyViolation),
716            45 => Ok(Self::OutOfOrderSequenceNumber),
717            46 => Ok(Self::DuplicateSequenceNumber),
718            47 => Ok(Self::InvalidProducerEpoch),
719            48 => Ok(Self::InvalidTxnState),
720            49 => Ok(Self::InvalidProducerIdMapping),
721            50 => Ok(Self::InvalidTransactionTimeout),
722            51 => Ok(Self::ConcurrentTransactions),
723            52 => Ok(Self::TransactionCoordinatorFenced),
724            53 => Ok(Self::TransactionalIdAuthorizationFailed),
725            54 => Ok(Self::SecurityDisabled),
726            55 => Ok(Self::OperationNotAttempted),
727            56 => Ok(Self::KafkaStorageError),
728            57 => Ok(Self::LogDirNotFound),
729            58 => Ok(Self::SaslAuthenticationFailed),
730            59 => Ok(Self::UnknownProducerId),
731            60 => Ok(Self::ReassignmentInProgress),
732            61 => Ok(Self::DelegationTokenAuthDisabled),
733            62 => Ok(Self::DelegationTokenNotFound),
734            63 => Ok(Self::DelegationTokenOwnerMismatch),
735            64 => Ok(Self::DelegationTokenRequestNotAllowed),
736            65 => Ok(Self::DelegationTokenAuthorizationFailed),
737            66 => Ok(Self::DelegationTokenExpired),
738            67 => Ok(Self::InvalidPrincipalType),
739            68 => Ok(Self::NonEmptyGroup),
740            69 => Ok(Self::GroupIdNotFound),
741            70 => Ok(Self::FetchSessionIdNotFound),
742            71 => Ok(Self::InvalidFetchSessionEpoch),
743            72 => Ok(Self::ListenerNotFound),
744            73 => Ok(Self::TopicDeletionDisabled),
745            74 => Ok(Self::FencedLeaderEpoch),
746            75 => Ok(Self::UnknownLeaderEpoch),
747            76 => Ok(Self::UnsupportedCompressionType),
748            77 => Ok(Self::StaleBrokerEpoch),
749            78 => Ok(Self::OffsetNotAvailable),
750            79 => Ok(Self::MemberIdRequired),
751            80 => Ok(Self::PreferredLeaderNotAvailable),
752            81 => Ok(Self::GroupMaxSizeReached),
753            82 => Ok(Self::FencedInstanceId),
754            83 => Ok(Self::EligibleLeadersNotAvailable),
755            84 => Ok(Self::ElectionNotNeeded),
756            85 => Ok(Self::NoReassignmentInProgress),
757            86 => Ok(Self::GroupSubscribedToTopic),
758            87 => Ok(Self::InvalidRecord),
759            88 => Ok(Self::UnstableOffsetCommit),
760            89 => Ok(Self::ThrottlingQuotaExceeded),
761            90 => Ok(Self::ProducerFenced),
762            91 => Ok(Self::ResourceNotFound),
763            92 => Ok(Self::DuplicateResource),
764            93 => Ok(Self::UnacceptableCredential),
765            94 => Ok(Self::InconsistentVoterSet),
766            95 => Ok(Self::InvalidUpdateVersion),
767            96 => Ok(Self::FeatureUpdateFailed),
768            97 => Ok(Self::PrincipalDeserializationFailure),
769            98 => Ok(Self::SnapshotNotFound),
770            99 => Ok(Self::PositionOutOfRange),
771            100 => Ok(Self::UnknownTopicId),
772            101 => Ok(Self::DuplicateBrokerRegistration),
773            102 => Ok(Self::BrokerIdNotRegistered),
774            103 => Ok(Self::InconsistentTopicId),
775            104 => Ok(Self::InconsistentClusterId),
776            105 => Ok(Self::TransactionalIdNotFound),
777            106 => Ok(Self::FetchSessionTopicIdError),
778            107 => Ok(Self::IneligibleReplica),
779            108 => Ok(Self::NewLeaderElected),
780            109 => Ok(Self::OffsetMovedToTieredStorage),
781            110 => Ok(Self::FencedMemberEpoch),
782            111 => Ok(Self::UnreleasedInstanceId),
783            112 => Ok(Self::UnsupportedAssignor),
784            113 => Ok(Self::StaleMemberEpoch),
785            114 => Ok(Self::MismatchedEndpointType),
786            115 => Ok(Self::UnsupportedEndpointType),
787            116 => Ok(Self::UnknownControllerId),
788            117 => Ok(Self::UnknownSubscriptionId),
789            118 => Ok(Self::TelemetryTooLarge),
790            119 => Ok(Self::InvalidRegistration),
791            otherwise => Err(Error::UnknownApiErrorCode(*otherwise)),
792        }
793    }
794}
795
796impl From<ErrorCode> for i16 {
797    fn from(value: ErrorCode) -> Self {
798        Self::from(&value)
799    }
800}
801
802impl From<&ErrorCode> for i16 {
803    #[allow(clippy::too_many_lines)]
804    fn from(value: &ErrorCode) -> Self {
805        match value {
806            ErrorCode::UnknownServerError => -1,
807            ErrorCode::None => 0,
808            ErrorCode::OffsetOutOfRange => 1,
809            ErrorCode::CorruptMessage => 2,
810            ErrorCode::UnknownTopicOrPartition => 3,
811            ErrorCode::InvalidFetchSize => 4,
812            ErrorCode::LeaderNotAvailable => 5,
813            ErrorCode::NotLeaderOrFollower => 6,
814            ErrorCode::RequestTimedOut => 7,
815            ErrorCode::BrokerNotAvailable => 8,
816            ErrorCode::ReplicaNotAvailable => 9,
817            ErrorCode::MessageTooLarge => 10,
818            ErrorCode::StaleControllerEpoch => 11,
819            ErrorCode::OffsetMetadataTooLarge => 12,
820            ErrorCode::NetworkException => 13,
821            ErrorCode::CoordinatorLoadInProgress => 14,
822            ErrorCode::CoordinatorNotAvailable => 15,
823            ErrorCode::NotCoordinator => 16,
824            ErrorCode::InvalidTopicException => 17,
825            ErrorCode::RecordListTooLarge => 18,
826            ErrorCode::NotEnoughReplicas => 19,
827            ErrorCode::NotEnoughReplicasAfterAppend => 20,
828            ErrorCode::InvalidRequiredAcks => 21,
829            ErrorCode::IllegalGeneration => 22,
830            ErrorCode::InconsistentGroupProtocol => 23,
831            ErrorCode::InvalidGroupId => 24,
832            ErrorCode::UnknownMemberId => 25,
833            ErrorCode::InvalidSessionTimeout => 26,
834            ErrorCode::RebalanceInProgress => 27,
835            ErrorCode::InvalidCommitOffsetSize => 28,
836            ErrorCode::TopicAuthorizationFailed => 29,
837            ErrorCode::GroupAuthorizationFailed => 30,
838            ErrorCode::ClusterAuthorizationFailed => 31,
839            ErrorCode::InvalidTimestamp => 32,
840            ErrorCode::UnsupportedSaslMechanism => 33,
841            ErrorCode::IllegalSaslState => 34,
842            ErrorCode::UnsupportedVersion => 35,
843            ErrorCode::TopicAlreadyExists => 36,
844            ErrorCode::InvalidPartitions => 37,
845            ErrorCode::InvalidReplicationFactor => 38,
846            ErrorCode::InvalidReplicaAssignment => 39,
847            ErrorCode::InvalidConfig => 40,
848            ErrorCode::NotController => 41,
849            ErrorCode::InvalidRequest => 42,
850            ErrorCode::UnsupportedForMessageFormat => 43,
851            ErrorCode::PolicyViolation => 44,
852            ErrorCode::OutOfOrderSequenceNumber => 45,
853            ErrorCode::DuplicateSequenceNumber => 46,
854            ErrorCode::InvalidProducerEpoch => 47,
855            ErrorCode::InvalidTxnState => 48,
856            ErrorCode::InvalidProducerIdMapping => 49,
857            ErrorCode::InvalidTransactionTimeout => 50,
858            ErrorCode::ConcurrentTransactions => 51,
859            ErrorCode::TransactionCoordinatorFenced => 52,
860            ErrorCode::TransactionalIdAuthorizationFailed => 53,
861            ErrorCode::SecurityDisabled => 54,
862            ErrorCode::OperationNotAttempted => 55,
863            ErrorCode::KafkaStorageError => 56,
864            ErrorCode::LogDirNotFound => 57,
865            ErrorCode::SaslAuthenticationFailed => 58,
866            ErrorCode::UnknownProducerId => 59,
867            ErrorCode::ReassignmentInProgress => 60,
868            ErrorCode::DelegationTokenAuthDisabled => 61,
869            ErrorCode::DelegationTokenNotFound => 62,
870            ErrorCode::DelegationTokenOwnerMismatch => 63,
871            ErrorCode::DelegationTokenRequestNotAllowed => 64,
872            ErrorCode::DelegationTokenAuthorizationFailed => 65,
873            ErrorCode::DelegationTokenExpired => 66,
874            ErrorCode::InvalidPrincipalType => 67,
875            ErrorCode::NonEmptyGroup => 68,
876            ErrorCode::GroupIdNotFound => 69,
877            ErrorCode::FetchSessionIdNotFound => 70,
878            ErrorCode::InvalidFetchSessionEpoch => 71,
879            ErrorCode::ListenerNotFound => 72,
880            ErrorCode::TopicDeletionDisabled => 73,
881            ErrorCode::FencedLeaderEpoch => 74,
882            ErrorCode::UnknownLeaderEpoch => 75,
883            ErrorCode::UnsupportedCompressionType => 76,
884            ErrorCode::StaleBrokerEpoch => 77,
885            ErrorCode::OffsetNotAvailable => 78,
886            ErrorCode::MemberIdRequired => 79,
887            ErrorCode::PreferredLeaderNotAvailable => 80,
888            ErrorCode::GroupMaxSizeReached => 81,
889            ErrorCode::FencedInstanceId => 82,
890            ErrorCode::EligibleLeadersNotAvailable => 83,
891            ErrorCode::ElectionNotNeeded => 84,
892            ErrorCode::NoReassignmentInProgress => 85,
893            ErrorCode::GroupSubscribedToTopic => 86,
894            ErrorCode::InvalidRecord => 87,
895            ErrorCode::UnstableOffsetCommit => 88,
896            ErrorCode::ThrottlingQuotaExceeded => 89,
897            ErrorCode::ProducerFenced => 90,
898            ErrorCode::ResourceNotFound => 91,
899            ErrorCode::DuplicateResource => 92,
900            ErrorCode::UnacceptableCredential => 93,
901            ErrorCode::InconsistentVoterSet => 94,
902            ErrorCode::InvalidUpdateVersion => 95,
903            ErrorCode::FeatureUpdateFailed => 96,
904            ErrorCode::PrincipalDeserializationFailure => 97,
905            ErrorCode::SnapshotNotFound => 98,
906            ErrorCode::PositionOutOfRange => 99,
907            ErrorCode::UnknownTopicId => 100,
908            ErrorCode::DuplicateBrokerRegistration => 101,
909            ErrorCode::BrokerIdNotRegistered => 102,
910            ErrorCode::InconsistentTopicId => 103,
911            ErrorCode::InconsistentClusterId => 104,
912            ErrorCode::TransactionalIdNotFound => 105,
913            ErrorCode::FetchSessionTopicIdError => 106,
914            ErrorCode::IneligibleReplica => 107,
915            ErrorCode::NewLeaderElected => 108,
916            ErrorCode::OffsetMovedToTieredStorage => 109,
917            ErrorCode::FencedMemberEpoch => 110,
918            ErrorCode::UnreleasedInstanceId => 111,
919            ErrorCode::UnsupportedAssignor => 112,
920            ErrorCode::StaleMemberEpoch => 113,
921            ErrorCode::MismatchedEndpointType => 114,
922            ErrorCode::UnsupportedEndpointType => 115,
923            ErrorCode::UnknownControllerId => 116,
924            ErrorCode::UnknownSubscriptionId => 117,
925            ErrorCode::TelemetryTooLarge => 118,
926            ErrorCode::InvalidRegistration => 119,
927        }
928    }
929}
930
931impl Display for ErrorCode {
932    #[allow(clippy::too_many_lines)]
933    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
934        match self {
935            ErrorCode::UnknownServerError => f.write_str(
936                "The server experienced an unexpected error when processing the request.",
937            ),
938            ErrorCode::None => f.write_str("No error."),
939            ErrorCode::OffsetOutOfRange => f.write_str(
940                "The requested offset is not within the range of offsets maintained by the server.",
941            ),
942            ErrorCode::CorruptMessage => f.write_str(
943                "This message has failed its CRC checksum, exceeds the valid size, has a null key \
944                 for a compacted topic, or is otherwise corrupt.",
945            ),
946            ErrorCode::UnknownTopicOrPartition => {
947                f.write_str("This server does not host this topic-partition.")
948            }
949            ErrorCode::InvalidFetchSize => f.write_str("The requested fetch size is invalid."),
950            ErrorCode::LeaderNotAvailable => f.write_str(
951                "There is no leader for this topic-partition as we are in the middle of a \
952                 leadership election.",
953            ),
954            ErrorCode::NotLeaderOrFollower => f.write_str(
955                "For requests intended only for the leader, this error indicates that the broker \
956                 is not the current leader. For requests intended for any replica, this error \
957                 indicates that the broker is not a replica of the topic partition.",
958            ),
959            ErrorCode::RequestTimedOut => f.write_str("The request timed out."),
960            ErrorCode::BrokerNotAvailable => f.write_str("The broker is not available."),
961            ErrorCode::ReplicaNotAvailable => f.write_str(
962                "The replica is not available for the requested topic-partition. Produce/Fetch \
963                 requests and other requests intended only for the leader or follower return \
964                 NOT_LEADER_OR_FOLLOWER if the broker is not a replica of the topic-partition.",
965            ),
966            ErrorCode::MessageTooLarge => f.write_str(
967                "The request included a message larger than the max message size the server will \
968                 accept.",
969            ),
970            ErrorCode::StaleControllerEpoch => {
971                f.write_str("The controller moved to another broker.")
972            }
973            ErrorCode::OffsetMetadataTooLarge => {
974                f.write_str("The metadata field of the offset request was too large.")
975            }
976            ErrorCode::NetworkException => {
977                f.write_str("The server disconnected before a response was received.")
978            }
979            ErrorCode::CoordinatorLoadInProgress => {
980                f.write_str("The coordinator is loading and hence can't process requests.")
981            }
982            ErrorCode::CoordinatorNotAvailable => f.write_str("The coordinator is not available."),
983            ErrorCode::NotCoordinator => f.write_str("This is not the correct coordinator."),
984            ErrorCode::InvalidTopicException => {
985                f.write_str("The request attempted to perform an operation on an invalid topic.")
986            }
987            ErrorCode::RecordListTooLarge => f.write_str(
988                "The request included message batch larger than the configured segment size on \
989                 the server.",
990            ),
991            ErrorCode::NotEnoughReplicas => f.write_str(
992                "Messages are rejected since there are fewer in-sync replicas than required.",
993            ),
994            ErrorCode::NotEnoughReplicasAfterAppend => f.write_str(
995                "Messages are written to the log, but to fewer in-sync replicas than required.",
996            ),
997            ErrorCode::InvalidRequiredAcks => {
998                f.write_str("Produce request specified an invalid value for required acks.")
999            }
1000            ErrorCode::IllegalGeneration => {
1001                f.write_str("Specified group generation id is not valid.")
1002            }
1003            ErrorCode::InconsistentGroupProtocol => f.write_str(
1004                "The group member's supported protocols are incompatible with those of existing \
1005                 members or first group member tried to join with empty protocol type or empty \
1006                 protocol list.",
1007            ),
1008            ErrorCode::InvalidGroupId => f.write_str("The configured groupId is invalid."),
1009            ErrorCode::UnknownMemberId => {
1010                f.write_str("The coordinator is not aware of this member.")
1011            }
1012            ErrorCode::InvalidSessionTimeout => f.write_str(
1013                "The session timeout is not within the range allowed by the broker (as configured \
1014                 by group.min.session.timeout.ms and group.max.session.timeout.ms).",
1015            ),
1016            ErrorCode::RebalanceInProgress => {
1017                f.write_str("The group is rebalancing, so a rejoin is needed.")
1018            }
1019            ErrorCode::InvalidCommitOffsetSize => {
1020                f.write_str("The committing offset data size is not valid.")
1021            }
1022            ErrorCode::TopicAuthorizationFailed => f.write_str("Topic authorization failed."),
1023            ErrorCode::GroupAuthorizationFailed => f.write_str("Group authorization failed."),
1024            ErrorCode::ClusterAuthorizationFailed => f.write_str("Cluster authorization failed."),
1025            ErrorCode::InvalidTimestamp => {
1026                f.write_str("The timestamp of the message is out of acceptable range.")
1027            }
1028            ErrorCode::UnsupportedSaslMechanism => {
1029                f.write_str("The broker does not support the requested SASL mechanism.")
1030            }
1031            ErrorCode::IllegalSaslState => {
1032                f.write_str("Request is not valid given the current SASL state.")
1033            }
1034            ErrorCode::UnsupportedVersion => f.write_str("The version of API is not supported."),
1035            ErrorCode::TopicAlreadyExists => f.write_str("Topic with this name already exists."),
1036            ErrorCode::InvalidPartitions => f.write_str("Number of partitions is below 1."),
1037            ErrorCode::InvalidReplicationFactor => f.write_str(
1038                "Replication factor is below 1 or larger than the number of available brokers.",
1039            ),
1040            ErrorCode::InvalidReplicaAssignment => f.write_str("Replica assignment is invalid."),
1041            ErrorCode::InvalidConfig => f.write_str("Configuration is invalid."),
1042            ErrorCode::NotController => {
1043                f.write_str("This is not the correct controller for this cluster.")
1044            }
1045            ErrorCode::InvalidRequest => f.write_str(
1046                "This most likely occurs because of a request being malformed by the client \
1047                 library or the message was sent to an incompatible broker. See the broker logs \
1048                 for more details.",
1049            ),
1050            ErrorCode::UnsupportedForMessageFormat => f.write_str(
1051                "The message format version on the broker does not support the request.",
1052            ),
1053            ErrorCode::PolicyViolation => {
1054                f.write_str("Request parameters do not satisfy the configured policy.")
1055            }
1056            ErrorCode::OutOfOrderSequenceNumber => {
1057                f.write_str("The broker received an out of order sequence number.")
1058            }
1059            ErrorCode::DuplicateSequenceNumber => {
1060                f.write_str("The broker received a duplicate sequence number.")
1061            }
1062            ErrorCode::InvalidProducerEpoch => {
1063                f.write_str("Producer attempted to produce with an old epoch.")
1064            }
1065            ErrorCode::InvalidTxnState => {
1066                f.write_str("The producer attempted a transactional operation in an invalid state.")
1067            }
1068            ErrorCode::InvalidProducerIdMapping => f.write_str(
1069                "The producer attempted to use a producer id which is not currently assigned to \
1070                 its transactional id.",
1071            ),
1072            ErrorCode::InvalidTransactionTimeout => f.write_str(
1073                "The transaction timeout is larger than the maximum value allowed by the broker \
1074                 (as configured by transaction.max.timeout.ms).",
1075            ),
1076            ErrorCode::ConcurrentTransactions => f.write_str(
1077                "The producer attempted to update a transaction while another concurrent \
1078                 operation on the same transaction was ongoing.",
1079            ),
1080            ErrorCode::TransactionCoordinatorFenced => f.write_str(
1081                "Indicates that the transaction coordinator sending a WriteTxnMarker is no longer \
1082                 the current coordinator for a given producer.",
1083            ),
1084            ErrorCode::TransactionalIdAuthorizationFailed => {
1085                f.write_str("Transactional Id authorization failed.")
1086            }
1087            ErrorCode::SecurityDisabled => f.write_str("Security features are disabled."),
1088            ErrorCode::OperationNotAttempted => f.write_str(
1089                "The broker did not attempt to execute this operation. This may happen for \
1090                 batched RPCs where some operations in the batch failed, causing the broker to \
1091                 respond without trying the rest.",
1092            ),
1093            ErrorCode::KafkaStorageError => {
1094                f.write_str("Disk error when trying to access log file on the disk.")
1095            }
1096            ErrorCode::LogDirNotFound => {
1097                f.write_str("The user-specified log directory is not found in the broker config.")
1098            }
1099            ErrorCode::SaslAuthenticationFailed => f.write_str("SASL Authentication failed."),
1100            ErrorCode::UnknownProducerId => f.write_str(
1101                "This exception is raised by the broker if it could not locate the producer \
1102                 metadata associated with the producerId in question. This could happen if, for \
1103                 instance, the producer's records were deleted because their retention time had \
1104                 elapsed. Once the last records of the producerId are removed, the producer's \
1105                 metadata is removed from the broker, and future appends by the producer will \
1106                 return this exception.",
1107            ),
1108            ErrorCode::ReassignmentInProgress => {
1109                f.write_str("A partition reassignment is in progress.")
1110            }
1111            ErrorCode::DelegationTokenAuthDisabled => {
1112                f.write_str("Delegation Token feature is not enabled.")
1113            }
1114            ErrorCode::DelegationTokenNotFound => {
1115                f.write_str("Delegation Token is not found on server.")
1116            }
1117            ErrorCode::DelegationTokenOwnerMismatch => {
1118                f.write_str("Specified Principal is not valid Owner/Renewer.")
1119            }
1120            ErrorCode::DelegationTokenRequestNotAllowed => f.write_str(
1121                "Delegation Token requests are not allowed on PLAINTEXT/1-way SSL channels and on \
1122                 delegation token authenticated channels.",
1123            ),
1124            ErrorCode::DelegationTokenAuthorizationFailed => {
1125                f.write_str("Delegation Token authorization failed.")
1126            }
1127            ErrorCode::DelegationTokenExpired => f.write_str("Delegation Token is expired."),
1128            ErrorCode::InvalidPrincipalType => {
1129                f.write_str("Supplied principalType is not supported.")
1130            }
1131            ErrorCode::NonEmptyGroup => f.write_str("The group is not empty."),
1132            ErrorCode::GroupIdNotFound => f.write_str("The group id does not exist."),
1133            ErrorCode::FetchSessionIdNotFound => f.write_str("The fetch session ID was not found."),
1134            ErrorCode::InvalidFetchSessionEpoch => {
1135                f.write_str("The fetch session epoch is invalid.")
1136            }
1137            ErrorCode::ListenerNotFound => f.write_str(
1138                "There is no listener on the leader broker that matches the listener on which \
1139                 metadata request was processed.",
1140            ),
1141            ErrorCode::TopicDeletionDisabled => f.write_str("Topic deletion is disabled."),
1142            ErrorCode::FencedLeaderEpoch => f.write_str(
1143                "The leader epoch in the request is older than the epoch on the broker.",
1144            ),
1145            ErrorCode::UnknownLeaderEpoch => f.write_str(
1146                "The leader epoch in the request is newer than the epoch on the broker.",
1147            ),
1148            ErrorCode::UnsupportedCompressionType => f.write_str(
1149                "The requesting client does not support the compression type of given partition.",
1150            ),
1151            ErrorCode::StaleBrokerEpoch => f.write_str("Broker epoch has changed."),
1152            ErrorCode::OffsetNotAvailable => f.write_str(
1153                "The leader high watermark has not caught up from a recent leader election so the \
1154                 offsets cannot be guaranteed to be monotonically increasing.",
1155            ),
1156            ErrorCode::MemberIdRequired => f.write_str(
1157                "The group member needs to have a valid member id before actually entering a \
1158                 consumer group.",
1159            ),
1160            ErrorCode::PreferredLeaderNotAvailable => {
1161                f.write_str("The preferred leader was not available.")
1162            }
1163            ErrorCode::GroupMaxSizeReached => {
1164                f.write_str("The consumer group has reached its max size.")
1165            }
1166            ErrorCode::FencedInstanceId => f.write_str(
1167                "The broker rejected this static consumer since another consumer with the same \
1168                 group.instance.id has registered with a different member.id.",
1169            ),
1170            ErrorCode::EligibleLeadersNotAvailable => {
1171                f.write_str("Eligible topic partition leaders are not available.")
1172            }
1173            ErrorCode::ElectionNotNeeded => {
1174                f.write_str("Leader election not needed for topic partition.")
1175            }
1176            ErrorCode::NoReassignmentInProgress => {
1177                f.write_str("No partition reassignment is in progress.")
1178            }
1179            ErrorCode::GroupSubscribedToTopic => f.write_str(
1180                "Deleting offsets of a topic is forbidden while the consumer group is actively \
1181                 subscribed to it.",
1182            ),
1183            ErrorCode::InvalidRecord => f.write_str(
1184                "This record has failed the validation on broker and hence will be rejected.",
1185            ),
1186            ErrorCode::UnstableOffsetCommit => {
1187                f.write_str("There are unstable offsets that need to be cleared.")
1188            }
1189            ErrorCode::ThrottlingQuotaExceeded => {
1190                f.write_str("The throttling quota has been exceeded.")
1191            }
1192            ErrorCode::ProducerFenced => f.write_str(
1193                "There is a newer producer with the same transactionalId which fences the current \
1194                 one.",
1195            ),
1196            ErrorCode::ResourceNotFound => {
1197                f.write_str("A request illegally referred to a resource that does not exist.")
1198            }
1199            ErrorCode::DuplicateResource => {
1200                f.write_str("A request illegally referred to the same resource twice.")
1201            }
1202            ErrorCode::UnacceptableCredential => {
1203                f.write_str("Requested credential would not meet criteria for acceptability.")
1204            }
1205            ErrorCode::InconsistentVoterSet => f.write_str(
1206                "Indicates that the either the sender or recipient of a voter-only request is not \
1207                 one of the expected voters",
1208            ),
1209            ErrorCode::InvalidUpdateVersion => f.write_str("The given update version was invalid."),
1210            ErrorCode::FeatureUpdateFailed => f.write_str(
1211                "Unable to update finalized features due to an unexpected server error.",
1212            ),
1213            ErrorCode::PrincipalDeserializationFailure => f.write_str(
1214                "Request principal deserialization failed during forwarding. This indicates an \
1215                 internal error on the broker cluster security setup.",
1216            ),
1217            ErrorCode::SnapshotNotFound => f.write_str("Requested snapshot was not found"),
1218            ErrorCode::PositionOutOfRange => f.write_str(
1219                "Requested position is not greater than or equal to zero, and less than the size \
1220                 of the snapshot.",
1221            ),
1222            ErrorCode::UnknownTopicId => f.write_str("This server does not host this topic ID."),
1223            ErrorCode::DuplicateBrokerRegistration => {
1224                f.write_str("This broker ID is already in use.")
1225            }
1226            ErrorCode::BrokerIdNotRegistered => {
1227                f.write_str("The given broker ID was not registered.")
1228            }
1229            ErrorCode::InconsistentTopicId => {
1230                f.write_str("The log's topic ID did not match the topic ID in the request")
1231            }
1232            ErrorCode::InconsistentClusterId => {
1233                f.write_str("The clusterId in the request does not match that found on the server")
1234            }
1235            ErrorCode::TransactionalIdNotFound => {
1236                f.write_str("The transactionalId could not be found")
1237            }
1238            ErrorCode::FetchSessionTopicIdError => {
1239                f.write_str("The fetch session encountered inconsistent topic ID usage")
1240            }
1241            ErrorCode::IneligibleReplica => {
1242                f.write_str("The new ISR contains at least one ineligible replica.")
1243            }
1244            ErrorCode::NewLeaderElected => f.write_str(
1245                "The AlterPartition request successfully updated the partition state but the \
1246                 leader has changed.",
1247            ),
1248            ErrorCode::OffsetMovedToTieredStorage => {
1249                f.write_str("The requested offset is moved to tiered storage.")
1250            }
1251            ErrorCode::FencedMemberEpoch => f.write_str(
1252                "The member epoch is fenced by the group coordinator. The member must abandon all \
1253                 its partitions and rejoin.",
1254            ),
1255            ErrorCode::UnreleasedInstanceId => f.write_str(
1256                "The instance ID is still used by another member in the consumer group. That \
1257                 member must leave first.",
1258            ),
1259            ErrorCode::UnsupportedAssignor => f.write_str(
1260                "The assignor or its version range is not supported by the consumer group.",
1261            ),
1262            ErrorCode::StaleMemberEpoch => f.write_str(
1263                "The member epoch is stale. The member must retry after receiving its updated \
1264                 member epoch via the ConsumerGroupHeartbeat API.",
1265            ),
1266            ErrorCode::MismatchedEndpointType => {
1267                f.write_str("The request was sent to an endpoint of the wrong type.")
1268            }
1269            ErrorCode::UnsupportedEndpointType => {
1270                f.write_str("This endpoint type is not supported yet.")
1271            }
1272            ErrorCode::UnknownControllerId => f.write_str("This controller ID is not known."),
1273            ErrorCode::UnknownSubscriptionId => f.write_str(
1274                "Client sent a push telemetry request with an invalid or outdated subscription ID.",
1275            ),
1276            ErrorCode::TelemetryTooLarge => f.write_str(
1277                "Client sent a push telemetry request larger than the maximum size the broker \
1278                 will accept.",
1279            ),
1280            ErrorCode::InvalidRegistration => {
1281                f.write_str("The controller has considered the broker registration to be invalid.")
1282            }
1283        }
1284    }
1285}
1286
1287#[non_exhaustive]
1288#[derive(
1289    Clone, Copy, Default, Deserialize, Eq, Hash, Debug, Ord, PartialEq, PartialOrd, Serialize,
1290)]
1291/// Kafka API response error codes.
1292pub enum ErrorCode {
1293    UnknownServerError,
1294    #[default]
1295    None,
1296    OffsetOutOfRange,
1297    CorruptMessage,
1298    UnknownTopicOrPartition,
1299    InvalidFetchSize,
1300    LeaderNotAvailable,
1301    NotLeaderOrFollower,
1302    RequestTimedOut,
1303    BrokerNotAvailable,
1304    ReplicaNotAvailable,
1305    MessageTooLarge,
1306    StaleControllerEpoch,
1307    OffsetMetadataTooLarge,
1308    NetworkException,
1309    CoordinatorLoadInProgress,
1310    CoordinatorNotAvailable,
1311    NotCoordinator,
1312    InvalidTopicException,
1313    RecordListTooLarge,
1314    NotEnoughReplicas,
1315    NotEnoughReplicasAfterAppend,
1316    InvalidRequiredAcks,
1317    IllegalGeneration,
1318    InconsistentGroupProtocol,
1319    InvalidGroupId,
1320    UnknownMemberId,
1321    InvalidSessionTimeout,
1322    RebalanceInProgress,
1323    InvalidCommitOffsetSize,
1324    TopicAuthorizationFailed,
1325    GroupAuthorizationFailed,
1326    ClusterAuthorizationFailed,
1327    InvalidTimestamp,
1328    UnsupportedSaslMechanism,
1329    IllegalSaslState,
1330    UnsupportedVersion,
1331    TopicAlreadyExists,
1332    InvalidPartitions,
1333    InvalidReplicationFactor,
1334    InvalidReplicaAssignment,
1335    InvalidConfig,
1336    NotController,
1337    InvalidRequest,
1338    UnsupportedForMessageFormat,
1339    PolicyViolation,
1340    OutOfOrderSequenceNumber,
1341    DuplicateSequenceNumber,
1342    InvalidProducerEpoch,
1343    InvalidTxnState,
1344    InvalidProducerIdMapping,
1345    InvalidTransactionTimeout,
1346    ConcurrentTransactions,
1347    TransactionCoordinatorFenced,
1348    TransactionalIdAuthorizationFailed,
1349    SecurityDisabled,
1350    OperationNotAttempted,
1351    KafkaStorageError,
1352    LogDirNotFound,
1353    SaslAuthenticationFailed,
1354    UnknownProducerId,
1355    ReassignmentInProgress,
1356    DelegationTokenAuthDisabled,
1357    DelegationTokenNotFound,
1358    DelegationTokenOwnerMismatch,
1359    DelegationTokenRequestNotAllowed,
1360    DelegationTokenAuthorizationFailed,
1361    DelegationTokenExpired,
1362    InvalidPrincipalType,
1363    NonEmptyGroup,
1364    GroupIdNotFound,
1365    FetchSessionIdNotFound,
1366    InvalidFetchSessionEpoch,
1367    ListenerNotFound,
1368    TopicDeletionDisabled,
1369    FencedLeaderEpoch,
1370    UnknownLeaderEpoch,
1371    UnsupportedCompressionType,
1372    StaleBrokerEpoch,
1373    OffsetNotAvailable,
1374    MemberIdRequired,
1375    PreferredLeaderNotAvailable,
1376    GroupMaxSizeReached,
1377    FencedInstanceId,
1378    EligibleLeadersNotAvailable,
1379    ElectionNotNeeded,
1380    NoReassignmentInProgress,
1381    GroupSubscribedToTopic,
1382    InvalidRecord,
1383    UnstableOffsetCommit,
1384    ThrottlingQuotaExceeded,
1385    ProducerFenced,
1386    ResourceNotFound,
1387    DuplicateResource,
1388    UnacceptableCredential,
1389    InconsistentVoterSet,
1390    InvalidUpdateVersion,
1391    FeatureUpdateFailed,
1392    PrincipalDeserializationFailure,
1393    SnapshotNotFound,
1394    PositionOutOfRange,
1395    UnknownTopicId,
1396    DuplicateBrokerRegistration,
1397    BrokerIdNotRegistered,
1398    InconsistentTopicId,
1399    InconsistentClusterId,
1400    TransactionalIdNotFound,
1401    FetchSessionTopicIdError,
1402    IneligibleReplica,
1403    NewLeaderElected,
1404    OffsetMovedToTieredStorage,
1405    FencedMemberEpoch,
1406    UnreleasedInstanceId,
1407    UnsupportedAssignor,
1408    StaleMemberEpoch,
1409    MismatchedEndpointType,
1410    UnsupportedEndpointType,
1411    UnknownControllerId,
1412    UnknownSubscriptionId,
1413    TelemetryTooLarge,
1414    InvalidRegistration,
1415}
1416
1417#[derive(
1418    Clone, Copy, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize,
1419)]
1420/// The fetch isolation level.
1421pub enum IsolationLevel {
1422    #[default]
1423    ReadUncommitted,
1424    ReadCommitted,
1425}
1426
1427impl TryFrom<i8> for IsolationLevel {
1428    type Error = Error;
1429
1430    fn try_from(value: i8) -> Result<Self, Self::Error> {
1431        match value {
1432            0 => Ok(Self::ReadUncommitted),
1433            1 => Ok(Self::ReadCommitted),
1434            _ => Err(Error::InvalidIsolationLevel(value)),
1435        }
1436    }
1437}
1438
1439impl From<IsolationLevel> for i8 {
1440    fn from(value: IsolationLevel) -> Self {
1441        Self::from(&value)
1442    }
1443}
1444
1445impl From<&IsolationLevel> for i8 {
1446    fn from(value: &IsolationLevel) -> Self {
1447        match value {
1448            IsolationLevel::ReadUncommitted => 0,
1449            IsolationLevel::ReadCommitted => 1,
1450        }
1451    }
1452}
1453
1454#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
1455/// Produce message acknowledgement.
1456pub enum Ack {
1457    None,
1458    Leader,
1459    FullIsr,
1460}
1461
1462impl Ack {
1463    const FULL_ISR: i16 = -1;
1464    const NONE: i16 = 0;
1465    const LEADER: i16 = 1;
1466}
1467
1468impl From<Ack> for i16 {
1469    fn from(value: Ack) -> Self {
1470        match value {
1471            Ack::FullIsr => Ack::FULL_ISR,
1472            Ack::None => Ack::NONE,
1473            Ack::Leader => Ack::LEADER,
1474        }
1475    }
1476}
1477
1478impl TryFrom<i16> for Ack {
1479    type Error = Error;
1480
1481    fn try_from(value: i16) -> Result<Self, Self::Error> {
1482        match value {
1483            Self::FULL_ISR => Ok(Self::FullIsr),
1484            Self::NONE => Ok(Self::None),
1485            Self::LEADER => Ok(Self::Leader),
1486            _ => Err(Error::InvalidAckValue(value)),
1487        }
1488    }
1489}
1490
1491#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1492/// The timestamp type.
1493pub enum TimestampType {
1494    #[default]
1495    CreateTime,
1496    LogAppendTime,
1497}
1498
1499impl TimestampType {
1500    const TIMESTAMP_TYPE_BITMASK: i16 = 8;
1501}
1502
1503impl From<i16> for TimestampType {
1504    fn from(value: i16) -> Self {
1505        if value & Self::TIMESTAMP_TYPE_BITMASK == Self::TIMESTAMP_TYPE_BITMASK {
1506            Self::LogAppendTime
1507        } else {
1508            Self::CreateTime
1509        }
1510    }
1511}
1512
1513impl From<TimestampType> for i16 {
1514    fn from(value: TimestampType) -> Self {
1515        match value {
1516            TimestampType::CreateTime => 0,
1517            TimestampType::LogAppendTime => TimestampType::TIMESTAMP_TYPE_BITMASK,
1518        }
1519    }
1520}
1521
1522#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1523/// Kafka message compression types.
1524pub enum Compression {
1525    #[default]
1526    None,
1527    Gzip,
1528    Snappy,
1529    Lz4,
1530    Zstd,
1531}
1532
1533impl TryFrom<i16> for Compression {
1534    type Error = Error;
1535
1536    fn try_from(value: i16) -> Result<Self, Self::Error> {
1537        match value & 0b111i16 {
1538            0 => Ok(Self::None),
1539            1 => Ok(Self::Gzip),
1540            2 => Ok(Self::Snappy),
1541            3 => Ok(Self::Lz4),
1542            4 => Ok(Self::Zstd),
1543            otherwise => Err(Error::UnknownCompressionType(otherwise)),
1544        }
1545    }
1546}
1547
1548impl From<Compression> for i16 {
1549    fn from(value: Compression) -> Self {
1550        match value {
1551            Compression::None => 0,
1552            Compression::Gzip => 1,
1553            Compression::Snappy => 2,
1554            Compression::Lz4 => 3,
1555            Compression::Zstd => 4,
1556        }
1557    }
1558}
1559
1560impl Compression {
1561    fn inflator(&self, mut deflated: impl BufRead + 'static) -> Result<Box<dyn Read>> {
1562        match self {
1563            Compression::None => Ok(Box::new(deflated)),
1564            Compression::Gzip => Ok(Box::new(GzDecoder::new(deflated))),
1565            Compression::Snappy => {
1566                let mut input = vec![];
1567                _ = deflated.read_to_end(&mut input)?;
1568                debug!(?input);
1569
1570                let mut decoder = snap::raw::Decoder::new();
1571
1572                decoder
1573                    .decompress_vec(
1574                        // https://github.com/xerial/snappy-java/tree/master?tab=readme-ov-file#compatibility-notes
1575                        if input.starts_with(b"\x82SNAPPY\0") {
1576                            if let (b"\x82SNAPPY\0", remainder) = input.split_at(8) {
1577                                let (version, remainder) = remainder.split_at(4);
1578                                let version: i32 = version.try_into().map(i32::from_be_bytes)?;
1579
1580                                let (compatible_version, remainder) = remainder.split_at(4);
1581                                let compatible_version: i32 =
1582                                    compatible_version.try_into().map(i32::from_be_bytes)?;
1583
1584                                let (block_size, _) = remainder.split_at(4);
1585                                let block_size: i32 =
1586                                    block_size.try_into().map(i32::from_be_bytes)?;
1587
1588                                debug!(version, compatible_version, block_size);
1589                            }
1590
1591                            let skip_header = &input[20..];
1592                            debug!(?skip_header);
1593                            skip_header
1594                        } else {
1595                            &input[..]
1596                        },
1597                    )
1598                    .map_err(Into::into)
1599                    .map(Bytes::from)
1600                    .map(|bytes| bytes.reader())
1601                    .map(Box::new)
1602                    .map(|boxed| boxed as Box<dyn Read>)
1603                    .inspect_err(|err| error!(?err))
1604            }
1605            Compression::Lz4 => lz4::Decoder::new(deflated)
1606                .map(Box::new)
1607                .map(|boxed| boxed as Box<dyn Read>)
1608                .map_err(Into::into),
1609            Compression::Zstd => zstd::stream::read::Decoder::with_buffer(deflated)
1610                .map(Box::new)
1611                .map(|boxed| boxed as Box<dyn Read>)
1612                .map_err(Into::into),
1613        }
1614    }
1615}
1616
1617#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1618/// The produce batch attributes.
1619pub struct BatchAttribute {
1620    pub compression: Compression,
1621    pub timestamp: TimestampType,
1622    pub transaction: bool,
1623    pub control: bool,
1624    pub delete_horizon: bool,
1625}
1626
1627impl BatchAttribute {
1628    const TRANSACTION_BITMASK: i16 = 16;
1629    const CONTROL_BITMASK: i16 = 32;
1630    const DELETE_HORIZON_BITMASK: i16 = 64;
1631
1632    pub fn compression(self, compression: Compression) -> Self {
1633        Self {
1634            compression,
1635            ..self
1636        }
1637    }
1638
1639    pub fn timestamp(self, timestamp: TimestampType) -> Self {
1640        Self { timestamp, ..self }
1641    }
1642
1643    pub fn transaction(self, transaction: bool) -> Self {
1644        Self {
1645            transaction,
1646            ..self
1647        }
1648    }
1649
1650    pub fn control(self, control: bool) -> Self {
1651        Self { control, ..self }
1652    }
1653
1654    pub fn delete_horizon(self, delete_horizon: bool) -> Self {
1655        Self {
1656            delete_horizon,
1657            ..self
1658        }
1659    }
1660}
1661
1662impl From<BatchAttribute> for i16 {
1663    fn from(value: BatchAttribute) -> Self {
1664        let mut attributes = i16::from(value.compression);
1665        attributes |= i16::from(value.timestamp);
1666
1667        if value.transaction {
1668            attributes |= BatchAttribute::TRANSACTION_BITMASK;
1669        }
1670
1671        if value.control {
1672            attributes |= BatchAttribute::CONTROL_BITMASK;
1673        }
1674
1675        if value.delete_horizon {
1676            attributes |= BatchAttribute::DELETE_HORIZON_BITMASK;
1677        }
1678
1679        attributes
1680    }
1681}
1682
1683impl TryFrom<i16> for BatchAttribute {
1684    type Error = Error;
1685
1686    fn try_from(value: i16) -> Result<Self, Self::Error> {
1687        Compression::try_from(value).map(|compression| {
1688            Self::default()
1689                .compression(compression)
1690                .timestamp(TimestampType::from(value))
1691                .transaction(value & Self::TRANSACTION_BITMASK == Self::TRANSACTION_BITMASK)
1692                .control(value & Self::CONTROL_BITMASK == Self::CONTROL_BITMASK)
1693                .delete_horizon(
1694                    value & Self::DELETE_HORIZON_BITMASK == Self::DELETE_HORIZON_BITMASK,
1695                )
1696        })
1697    }
1698}
1699
1700#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1701/// The control batch marker.
1702pub struct ControlBatch {
1703    pub version: i16,
1704    pub r#type: i16,
1705}
1706
1707impl ControlBatch {
1708    const ABORT: i16 = 0;
1709    const COMMIT: i16 = 1;
1710
1711    pub fn is_abort(&self) -> bool {
1712        self.r#type == Self::ABORT
1713    }
1714
1715    pub fn is_commit(&self) -> bool {
1716        self.r#type == Self::COMMIT
1717    }
1718
1719    pub fn version(self, version: i16) -> Self {
1720        Self { version, ..self }
1721    }
1722
1723    pub fn commit(self) -> Self {
1724        Self {
1725            r#type: Self::COMMIT,
1726            ..self
1727        }
1728    }
1729
1730    pub fn abort(self) -> Self {
1731        Self {
1732            r#type: Self::ABORT,
1733            ..self
1734        }
1735    }
1736}
1737
1738impl TryFrom<Bytes> for ControlBatch {
1739    type Error = Error;
1740
1741    fn try_from(value: Bytes) -> Result<Self, Self::Error> {
1742        let mut c = Cursor::new(value);
1743        let mut deserializer = Decoder::new(&mut c);
1744        Self::deserialize(&mut deserializer)
1745    }
1746}
1747
1748impl TryFrom<ControlBatch> for Bytes {
1749    type Error = Error;
1750
1751    fn try_from(value: ControlBatch) -> Result<Self, Self::Error> {
1752        let mut b = BytesMut::new().writer();
1753        let mut serializer = Encoder::new(&mut b);
1754        value.serialize(&mut serializer)?;
1755        Ok(Bytes::from(b.into_inner()))
1756    }
1757}
1758
1759#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1760/// An end transaction marker.
1761pub struct EndTransactionMarker {
1762    pub version: i16,
1763    pub coordinator_epoch: i32,
1764}
1765
1766impl TryFrom<Bytes> for EndTransactionMarker {
1767    type Error = Error;
1768
1769    fn try_from(value: Bytes) -> Result<Self, Self::Error> {
1770        let mut c = Cursor::new(value);
1771        let mut deserializer = Decoder::new(&mut c);
1772        Self::deserialize(&mut deserializer)
1773    }
1774}
1775
1776impl TryFrom<EndTransactionMarker> for Bytes {
1777    type Error = Error;
1778
1779    fn try_from(value: EndTransactionMarker) -> Result<Self, Self::Error> {
1780        let mut b = BytesMut::new().writer();
1781        let mut serializer = Encoder::new(&mut b);
1782        value.serialize(&mut serializer)?;
1783        Ok(Bytes::from(b.into_inner()))
1784    }
1785}
1786
1787#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
1788/// The endpoint type.
1789pub enum EndpointType {
1790    #[default]
1791    Unknown,
1792    Broker,
1793    Controller,
1794}
1795
1796impl From<i8> for EndpointType {
1797    fn from(value: i8) -> Self {
1798        match value {
1799            1 => Self::Broker,
1800            2 => Self::Controller,
1801            _ => Self::Unknown,
1802        }
1803    }
1804}
1805
1806impl From<EndpointType> for i8 {
1807    fn from(value: EndpointType) -> Self {
1808        match value {
1809            EndpointType::Unknown => 0,
1810            EndpointType::Broker => 1,
1811            EndpointType::Controller => 2,
1812        }
1813    }
1814}
1815
1816/// The coordinator type.
1817pub enum CoordinatorType {
1818    Group,
1819    Transaction,
1820    Share,
1821}
1822
1823impl TryFrom<i8> for CoordinatorType {
1824    type Error = Error;
1825
1826    fn try_from(value: i8) -> Result<Self, Self::Error> {
1827        match value {
1828            0 => Ok(Self::Group),
1829            1 => Ok(Self::Transaction),
1830            2 => Ok(Self::Share),
1831            otherwise => Err(Error::InvalidCoordinatorType(otherwise)),
1832        }
1833    }
1834}
1835
1836#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
1837/// What type of resource is the configuration describing.
1838pub enum ConfigResource {
1839    Group,
1840    ClientMetric,
1841    BrokerLogger,
1842    Broker,
1843    Topic,
1844    Unknown,
1845}
1846
1847impl From<i8> for ConfigResource {
1848    fn from(value: i8) -> Self {
1849        match value {
1850            2 => Self::Topic,
1851            4 => Self::Broker,
1852            8 => Self::BrokerLogger,
1853            16 => Self::ClientMetric,
1854            32 => Self::Group,
1855            _ => Self::Unknown,
1856        }
1857    }
1858}
1859
1860impl From<CoordinatorType> for i8 {
1861    fn from(value: CoordinatorType) -> Self {
1862        match value {
1863            CoordinatorType::Group => 0,
1864            CoordinatorType::Transaction => 1,
1865            CoordinatorType::Share => 2,
1866        }
1867    }
1868}
1869
1870impl From<ConfigResource> for i8 {
1871    fn from(value: ConfigResource) -> Self {
1872        match value {
1873            ConfigResource::Unknown => 0,
1874            ConfigResource::Topic => 2,
1875            ConfigResource::Broker => 4,
1876            ConfigResource::BrokerLogger => 8,
1877            ConfigResource::ClientMetric => 16,
1878            ConfigResource::Group => 32,
1879        }
1880    }
1881}
1882
1883impl From<ConfigResource> for i32 {
1884    fn from(value: ConfigResource) -> Self {
1885        match value {
1886            ConfigResource::Unknown => 0,
1887            ConfigResource::Topic => 2,
1888            ConfigResource::Broker => 4,
1889            ConfigResource::BrokerLogger => 8,
1890            ConfigResource::ClientMetric => 16,
1891            ConfigResource::Group => 32,
1892        }
1893    }
1894}
1895
1896#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
1897/// The type of configuration.
1898pub enum ConfigType {
1899    #[default]
1900    Unknown,
1901    Boolean,
1902    String,
1903    Int,
1904    Short,
1905    Long,
1906    Double,
1907    List,
1908    Class,
1909    Password,
1910}
1911
1912impl From<i8> for ConfigType {
1913    fn from(value: i8) -> Self {
1914        match value {
1915            1 => Self::Boolean,
1916            2 => Self::String,
1917            3 => Self::Int,
1918            4 => Self::Short,
1919            5 => Self::Long,
1920            6 => Self::Double,
1921            7 => Self::List,
1922            8 => Self::Class,
1923            9 => Self::Password,
1924            _ => Self::Unknown,
1925        }
1926    }
1927}
1928
1929impl From<ConfigType> for i8 {
1930    fn from(value: ConfigType) -> i8 {
1931        match value {
1932            ConfigType::Boolean => 1,
1933            ConfigType::String => 2,
1934            ConfigType::Int => 3,
1935            ConfigType::Short => 4,
1936            ConfigType::Long => 5,
1937            ConfigType::Double => 6,
1938            ConfigType::List => 7,
1939            ConfigType::Class => 8,
1940            ConfigType::Password => 9,
1941            ConfigType::Unknown => 0,
1942        }
1943    }
1944}
1945
1946#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
1947/// From which source was the configuration provided.
1948pub enum ConfigSource {
1949    DynamicTopicConfig,
1950    DynamicBrokerLoggerConfig,
1951    DynamicBrokerConfig,
1952    DynamicDefaultBrokerConfig,
1953    DynamicClientMetricsConfig,
1954    DynamicGroupConfig,
1955    StaticBrokerConfig,
1956    DefaultConfig,
1957    Unknown,
1958}
1959
1960impl From<i8> for ConfigSource {
1961    fn from(value: i8) -> Self {
1962        match value {
1963            1 => Self::DynamicTopicConfig,
1964            2 => Self::DynamicBrokerConfig,
1965            3 => Self::DynamicDefaultBrokerConfig,
1966            4 => Self::StaticBrokerConfig,
1967            5 => Self::DefaultConfig,
1968            6 => Self::DynamicBrokerLoggerConfig,
1969            7 => Self::DynamicClientMetricsConfig,
1970            8 => Self::DynamicGroupConfig,
1971            _ => Self::Unknown,
1972        }
1973    }
1974}
1975
1976impl From<ConfigSource> for i8 {
1977    fn from(value: ConfigSource) -> i8 {
1978        match value {
1979            ConfigSource::DynamicTopicConfig => 1,
1980            ConfigSource::DynamicBrokerConfig => 2,
1981            ConfigSource::DynamicDefaultBrokerConfig => 3,
1982            ConfigSource::StaticBrokerConfig => 4,
1983            ConfigSource::DefaultConfig => 5,
1984            ConfigSource::DynamicBrokerLoggerConfig => 6,
1985            ConfigSource::DynamicClientMetricsConfig => 7,
1986            ConfigSource::DynamicGroupConfig => 8,
1987            ConfigSource::Unknown => 0,
1988        }
1989    }
1990}
1991
1992/// The configuration operation type.
1993pub enum OpType {
1994    Set,
1995    Delete,
1996    Append,
1997    Subtract,
1998}
1999
2000impl TryFrom<i8> for OpType {
2001    type Error = Error;
2002
2003    fn try_from(value: i8) -> Result<Self, Self::Error> {
2004        match value {
2005            0 => Ok(Self::Set),
2006            1 => Ok(Self::Delete),
2007            2 => Ok(Self::Append),
2008            3 => Ok(Self::Subtract),
2009            otherwise => Err(Error::InvalidOpType(otherwise)),
2010        }
2011    }
2012}
2013
2014impl From<OpType> for i8 {
2015    fn from(value: OpType) -> Self {
2016        match value {
2017            OpType::Set => 0,
2018            OpType::Delete => 1,
2019            OpType::Append => 2,
2020            OpType::Subtract => 3,
2021        }
2022    }
2023}
2024
2025/// convert a Kafka timestamp into system time
2026pub fn to_system_time(timestamp: i64) -> Result<SystemTime> {
2027    u64::try_from(timestamp)
2028        .map(|timestamp| SystemTime::UNIX_EPOCH + Duration::from_millis(timestamp))
2029        .map_err(Into::into)
2030}
2031
2032/// convert system time into a kafka timestamp
2033pub fn to_timestamp(system_time: &SystemTime) -> Result<i64> {
2034    system_time
2035        .duration_since(SystemTime::UNIX_EPOCH)
2036        .map_err(Into::into)
2037        .map(|since_epoch| since_epoch.as_millis())
2038        .and_then(|since_epoch| i64::try_from(since_epoch).map_err(Into::into))
2039}
2040
2041/// List Offset
2042///
2043/// An enumeration of offset request types, with conversion from/to an i64 protocol representation.
2044///
2045#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
2046pub enum ListOffset {
2047    Earliest,
2048    Latest,
2049    Timestamp(SystemTime),
2050}
2051
2052impl ListOffset {
2053    const EARLIEST_OFFSET: i64 = -2;
2054    const LATEST_OFFSET: i64 = -1;
2055}
2056
2057impl TryFrom<ListOffset> for i64 {
2058    type Error = Error;
2059
2060    fn try_from(value: ListOffset) -> Result<Self, Self::Error> {
2061        match value {
2062            ListOffset::Earliest => Ok(ListOffset::EARLIEST_OFFSET),
2063            ListOffset::Latest => Ok(ListOffset::LATEST_OFFSET),
2064            ListOffset::Timestamp(timestamp) => to_timestamp(&timestamp),
2065        }
2066    }
2067}
2068
2069impl TryFrom<i64> for ListOffset {
2070    type Error = Error;
2071
2072    fn try_from(value: i64) -> Result<Self, Self::Error> {
2073        match value {
2074            Self::EARLIEST_OFFSET => Ok(Self::Earliest),
2075            Self::LATEST_OFFSET => Ok(Self::Latest),
2076            timestamp => to_system_time(timestamp).map(Self::Timestamp),
2077        }
2078    }
2079}
2080
2081#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
2082pub enum ScramMechanism {
2083    Scram256,
2084    Scram512,
2085}
2086
2087impl FromStr for ScramMechanism {
2088    type Err = Error;
2089
2090    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
2091        match s {
2092            "SCRAM-SHA-256" => Ok(ScramMechanism::Scram256),
2093            "SCRAM-SHA-512" => Ok(ScramMechanism::Scram512),
2094            otherwise => Err(Error::ParseScram(otherwise.to_string())),
2095        }
2096    }
2097}
2098
2099impl TryFrom<i8> for ScramMechanism {
2100    type Error = Error;
2101
2102    fn try_from(value: i8) -> std::result::Result<Self, Self::Error> {
2103        match value {
2104            1 => Ok(ScramMechanism::Scram256),
2105            2 => Ok(ScramMechanism::Scram512),
2106            otherwise => Err(Error::UnknownScramMechanism(value)),
2107        }
2108    }
2109}
2110
2111impl From<ScramMechanism> for i32 {
2112    fn from(value: ScramMechanism) -> Self {
2113        match value {
2114            ScramMechanism::Scram256 => 1,
2115            ScramMechanism::Scram512 => 2,
2116        }
2117    }
2118}
2119
2120impl From<ScramMechanism> for i8 {
2121    fn from(value: ScramMechanism) -> Self {
2122        match value {
2123            ScramMechanism::Scram256 => 1,
2124            ScramMechanism::Scram512 => 2,
2125        }
2126    }
2127}
2128
2129pub trait Encode {
2130    fn encode(&self) -> Result<Bytes>;
2131}
2132
2133pub trait Decode: Sized {
2134    fn decode(encoded: &mut Bytes) -> Result<Self>;
2135}
2136
2137#[cfg(test)]
2138mod tests {
2139    use std::thread::sleep;
2140
2141    use super::*;
2142
2143    #[test]
2144    fn frame_elapsed_millis() {
2145        let pause = 6;
2146        let now = SystemTime::now();
2147        sleep(Duration::from_millis(pause));
2148
2149        assert!(Frame::elapsed_millis(now) >= pause);
2150    }
2151
2152    #[test]
2153    fn batch_attribute() {
2154        assert_eq!(0, i16::from(BatchAttribute::default()));
2155        assert_eq!(
2156            0,
2157            i16::from(BatchAttribute::default().compression(Compression::None))
2158        );
2159        assert_eq!(
2160            1,
2161            i16::from(BatchAttribute::default().compression(Compression::Gzip))
2162        );
2163        assert_eq!(
2164            2,
2165            i16::from(BatchAttribute::default().compression(Compression::Snappy))
2166        );
2167        assert_eq!(
2168            3,
2169            i16::from(BatchAttribute::default().compression(Compression::Lz4))
2170        );
2171        assert_eq!(
2172            4,
2173            i16::from(BatchAttribute::default().compression(Compression::Zstd))
2174        );
2175        assert_eq!(
2176            8,
2177            i16::from(BatchAttribute::default().timestamp(TimestampType::LogAppendTime))
2178        );
2179        assert_eq!(16, i16::from(BatchAttribute::default().transaction(true)));
2180        assert_eq!(32, i16::from(BatchAttribute::default().control(true)));
2181        assert_eq!(
2182            64,
2183            i16::from(BatchAttribute::default().delete_horizon(true))
2184        );
2185    }
2186}
2187
2188include!(concat!(env!("OUT_DIR"), "/generate.rs"));