1pub mod acl;
120pub mod de;
121pub mod primitive;
122pub mod record;
123pub mod resource;
124pub mod ser;
125
126use bytes::{Buf, BufMut, Bytes, BytesMut, TryGetError};
127pub use de::Decoder;
128use flate2::read::GzDecoder;
129use primitive::tagged::TagBuffer;
130use record::deflated::Frame as RecordBatch;
131pub use ser::Encoder;
132use serde::{Deserialize, Serialize};
133use std::{
134 array::TryFromSliceError,
135 collections::HashMap,
136 env::VarError,
137 fmt::{self, Display, Formatter},
138 io::{self, BufRead, Cursor, Read, Write},
139 num,
140 process::{ExitCode, Termination},
141 str::{self, FromStr},
142 string,
143 sync::{Arc, OnceLock},
144 time::{Duration, SystemTime, SystemTimeError},
145};
146use tansu_model::{MessageKind, MessageMeta};
147use tracing::{debug, error, instrument, warn};
148use tracing_subscriber::filter::ParseError;
149
150pub const NULL_TOPIC_ID: [u8; 16] = [0; 16];
152
153#[derive(Debug)]
154pub struct RootMessageMeta {
155 pub(crate) requests: HashMap<i16, &'static MessageMeta>,
156 pub(crate) responses: HashMap<i16, &'static MessageMeta>,
157}
158
159impl RootMessageMeta {
160 fn new() -> Self {
161 let (requests, responses) = MESSAGE_META.iter().fold(
162 (HashMap::new(), HashMap::new()),
163 |(mut requests, mut responses), (_, meta)| {
164 match meta.message_kind {
165 MessageKind::Request => {
166 _ = requests.insert(meta.api_key, *meta);
167 }
168
169 MessageKind::Response => {
170 _ = responses.insert(meta.api_key, *meta);
171 }
172 }
173
174 (requests, responses)
175 },
176 );
177
178 Self {
179 requests,
180 responses,
181 }
182 }
183
184 pub fn messages() -> &'static RootMessageMeta {
185 static MAPPING: OnceLock<RootMessageMeta> = OnceLock::new();
186 MAPPING.get_or_init(RootMessageMeta::new)
187 }
188
189 #[must_use]
190 pub const fn requests(&self) -> &HashMap<i16, &'static MessageMeta> {
191 &self.requests
192 }
193
194 #[must_use]
195 pub const fn responses(&self) -> &HashMap<i16, &'static MessageMeta> {
196 &self.responses
197 }
198}
199
200pub trait ApiKey {
201 const KEY: i16;
202}
203
204pub trait ApiName {
205 const NAME: &'static str;
206}
207
208pub trait Request:
210 ApiKey + ApiName + fmt::Debug + Default + Into<Body> + Send + Sync + TryFrom<Body> + 'static
211{
212 type Response: Response;
213}
214
215pub trait Response:
217 ApiKey + ApiName + fmt::Debug + Default + Into<Body> + Send + Sync + TryFrom<Body> + 'static
218{
219 type Request: Request;
220}
221
222#[derive(Clone, Debug, thiserror::Error)]
223pub enum Error {
224 ApiError(ErrorCode),
225 EnvVar(VarError),
226 FromUtf8(string::FromUtf8Error),
227 InvalidAckValue(i16),
228 InvalidCoordinatorType(i8),
229 InvalidIsolationLevel(i8),
230 InvalidOpType(i8),
231 InvalidScramMechanism(i8),
232 Io(Arc<io::Error>),
233 Message(String),
234 MessageMaxSizeExceeded(usize),
235 NoSuchField(&'static str),
236 NoSuchMessage(&'static str),
237 NoSuchRequest(i16),
238 NotAuthenticated,
239 Overflow,
240 ParseFilter(Arc<ParseError>),
241 ParseScram(String),
242 ResponseFrame,
243 Snap(#[from] snap::Error),
244 StringWithoutApiVersion,
245 StringWithoutLength,
246 SystemTime(SystemTimeError),
247 TansuModel(tansu_model::Error),
248 TryFromInt(#[from] num::TryFromIntError),
249 TryFromSlice(#[from] TryFromSliceError),
250 TryGet(Arc<TryGetError>),
251 UnexpectedType(String),
252 UnknownApiErrorCode(i16),
253 UnknownCompressionType(i16),
254 UnknownScramMechanism(i8),
255 UnknownContainer,
256 Utf8(str::Utf8Error),
257}
258
259pub type Result<T, E = Error> = std::result::Result<T, E>;
260
261impl Display for Error {
262 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
263 match self {
264 Error::Message(e) => f.write_str(e),
265 e => write!(f, "{e:?}"),
266 }
267 }
268}
269
270impl serde::ser::Error for Error {
271 fn custom<T: Display>(msg: T) -> Self {
272 Error::Message(msg.to_string())
273 }
274}
275
276impl serde::de::Error for Error {
277 fn custom<T: Display>(msg: T) -> Self {
278 Error::Message(msg.to_string())
279 }
280}
281
282impl From<io::Error> for Error {
283 fn from(value: io::Error) -> Self {
284 Self::Io(Arc::new(value))
285 }
286}
287
288impl From<TryGetError> for Error {
289 fn from(value: TryGetError) -> Self {
290 Self::TryGet(Arc::new(value))
291 }
292}
293
294impl From<ParseError> for Error {
295 fn from(value: ParseError) -> Self {
296 Self::ParseFilter(Arc::new(value))
297 }
298}
299
300impl From<str::Utf8Error> for Error {
301 fn from(value: str::Utf8Error) -> Self {
302 Self::Utf8(value)
303 }
304}
305
306impl From<string::FromUtf8Error> for Error {
307 fn from(value: string::FromUtf8Error) -> Self {
308 Self::FromUtf8(value)
309 }
310}
311
312impl From<tansu_model::Error> for Error {
313 fn from(value: tansu_model::Error) -> Self {
314 Self::TansuModel(value)
315 }
316}
317
318impl From<VarError> for Error {
319 fn from(value: VarError) -> Self {
320 Self::EnvVar(value)
321 }
322}
323
324impl From<SystemTimeError> for Error {
325 fn from(value: SystemTimeError) -> Self {
326 Self::SystemTime(value)
327 }
328}
329
330#[derive(Clone, Debug, Deserialize, PartialEq, PartialOrd, Serialize)]
405pub struct Frame {
406 pub size: i32,
408
409 pub header: Header,
411
412 pub body: Body,
414}
415
416impl Frame {
417 fn elapsed_millis(start: SystemTime) -> u64 {
418 start
419 .elapsed()
420 .map_or(0, |duration| duration.as_millis() as u64)
421 }
422
423 #[instrument(skip_all)]
425 pub fn request(header: Header, body: Body) -> Result<Bytes> {
426 let start = SystemTime::now();
427
428 let mut c = Cursor::new(vec![]);
429
430 let mut serializer = Encoder::request(&mut c);
431
432 let frame = Frame {
433 size: 0,
434 header,
435 body,
436 };
437
438 frame.serialize(&mut serializer)?;
439 let size = i32::try_from(c.position()).map(|position| position - 4)?;
440
441 c.set_position(0);
442 let buf = size.to_be_bytes();
443 c.write_all(&buf)?;
444
445 Ok(Bytes::from(c.into_inner())).inspect(|encoded| {
446 debug!(
447 len = encoded.len(),
448 elapsed_millis = Self::elapsed_millis(start)
449 )
450 })
451 }
452
453 #[instrument(skip_all)]
455 pub fn request_from_bytes(encoded: impl Buf) -> Result<Frame> {
456 let start = SystemTime::now();
457
458 let mut reader = encoded.reader();
459 let mut deserializer = Decoder::request(&mut reader);
460 Frame::deserialize(&mut deserializer)
461 .inspect(|_frame| debug!(elapsed_millis = Self::elapsed_millis(start)))
462 }
463
464 #[instrument(skip_all)]
466 pub fn response(header: Header, body: Body, api_key: i16, api_version: i16) -> Result<Bytes> {
467 let start = SystemTime::now();
468
469 let mut c = Cursor::new(vec![]);
470 let mut serializer = Encoder::response(&mut c, api_key, api_version);
471
472 let frame = Frame {
473 size: 0,
474 header,
475 body,
476 };
477
478 frame.serialize(&mut serializer)?;
479 let size = i32::try_from(c.position())
480 .map(|position| position - 4)
481 .inspect_err(|err| {
482 let position = c.position();
483 warn!(?err, ?position, ?frame);
484 })?;
485
486 c.set_position(0);
487 let buf = size.to_be_bytes();
488 c.write_all(&buf)?;
489
490 Ok(Bytes::from(c.into_inner())).inspect(|encoded| {
491 debug!(
492 len = encoded.len(),
493 elapsed_millis = Self::elapsed_millis(start)
494 )
495 })
496 }
497
498 #[instrument(skip_all)]
500 pub fn response_from_bytes(bytes: impl Buf, api_key: i16, api_version: i16) -> Result<Frame> {
501 let start = SystemTime::now();
502
503 let mut reader = bytes.reader();
504 let mut deserializer = Decoder::response(&mut reader, api_key, api_version);
505 Frame::deserialize(&mut deserializer)
506 .inspect(|encoded| debug!(elapsed_millis = Self::elapsed_millis(start)))
507 }
508
509 pub fn api_key(&self) -> Result<i16> {
511 if let Header::Request { api_key, .. } = self.header {
512 Ok(api_key)
513 } else {
514 Err(Error::ResponseFrame)
515 }
516 }
517
518 pub fn api_name(&self) -> &str {
520 self.body.api_name()
521 }
522
523 pub fn api_version(&self) -> Result<i16> {
525 if let Header::Request { api_version, .. } = self.header {
526 Ok(api_version)
527 } else {
528 Err(Error::ResponseFrame)
529 }
530 }
531
532 pub fn correlation_id(&self) -> Result<i32> {
534 match self.header {
535 Header::Request { correlation_id, .. } | Header::Response { correlation_id } => {
536 Ok(correlation_id)
537 }
538 }
539 }
540
541 pub fn client_id(&self) -> Result<Option<&str>> {
543 if let Header::Request { ref client_id, .. } = self.header {
544 Ok(client_id.as_deref())
545 } else {
546 Err(Error::ResponseFrame)
547 }
548 }
549}
550
551#[derive(Clone, Debug, Deserialize, PartialEq, PartialOrd, Serialize)]
553#[serde(try_from = "HeaderMezzanine")]
554#[serde(into = "HeaderMezzanine")]
555pub enum Header {
556 Request {
558 api_key: i16,
560
561 api_version: i16,
563
564 correlation_id: i32,
566
567 client_id: Option<String>,
569 },
570
571 Response {
573 correlation_id: i32,
575 },
576}
577
578#[derive(Clone, Debug, Deserialize, PartialEq, PartialOrd, Serialize)]
579pub(crate) enum HeaderMezzanine {
580 Request {
581 api_key: i16,
582 api_version: i16,
583 correlation_id: i32,
584 client_id: Option<String>,
585 tag_buffer: Option<TagBuffer>,
586 },
587 Response {
588 correlation_id: i32,
589 tag_buffer: Option<TagBuffer>,
590 },
591}
592
593impl TryFrom<HeaderMezzanine> for Header {
594 type Error = Error;
595
596 fn try_from(value: HeaderMezzanine) -> Result<Self, Self::Error> {
597 debug!(?value);
598
599 match value {
600 HeaderMezzanine::Request {
601 api_key,
602 api_version,
603 correlation_id,
604 client_id,
605 ..
606 } => Ok(Self::Request {
607 api_key,
608 api_version,
609 correlation_id,
610 client_id,
611 }),
612
613 HeaderMezzanine::Response { correlation_id, .. } => {
614 Ok(Self::Response { correlation_id })
615 }
616 }
617 }
618}
619
620impl From<Header> for HeaderMezzanine {
621 fn from(value: Header) -> Self {
622 debug!("value: {value:?}");
623
624 match value {
625 Header::Request {
626 api_key,
627 api_version,
628 correlation_id,
629 client_id,
630 } => HeaderMezzanine::Request {
631 api_key,
632 api_version,
633 correlation_id,
634 client_id,
635 tag_buffer: Some(TagBuffer([].into())),
636 },
637
638 Header::Response { correlation_id } => HeaderMezzanine::Response {
639 correlation_id,
640 tag_buffer: Some(TagBuffer([].into())),
641 },
642 }
643 }
644}
645
646impl Termination for ErrorCode {
647 fn report(self) -> ExitCode {
648 if let Self::None = self {
649 ExitCode::SUCCESS
650 } else {
651 ExitCode::FAILURE
652 }
653 }
654}
655
656impl TryFrom<i16> for ErrorCode {
657 type Error = Error;
658
659 fn try_from(value: i16) -> Result<Self, Self::Error> {
660 Self::try_from(&value)
661 }
662}
663
664impl TryFrom<&i16> for ErrorCode {
665 type Error = Error;
666
667 #[allow(clippy::too_many_lines)]
668 fn try_from(value: &i16) -> Result<Self, Self::Error> {
669 match value {
670 -1 => Ok(Self::UnknownServerError),
671 0 => Ok(Self::None),
672 1 => Ok(Self::OffsetOutOfRange),
673 2 => Ok(Self::CorruptMessage),
674 3 => Ok(Self::UnknownTopicOrPartition),
675 4 => Ok(Self::InvalidFetchSize),
676 5 => Ok(Self::LeaderNotAvailable),
677 6 => Ok(Self::NotLeaderOrFollower),
678 7 => Ok(Self::RequestTimedOut),
679 8 => Ok(Self::BrokerNotAvailable),
680 9 => Ok(Self::ReplicaNotAvailable),
681 10 => Ok(Self::MessageTooLarge),
682 11 => Ok(Self::StaleControllerEpoch),
683 12 => Ok(Self::OffsetMetadataTooLarge),
684 13 => Ok(Self::NetworkException),
685 14 => Ok(Self::CoordinatorLoadInProgress),
686 15 => Ok(Self::CoordinatorNotAvailable),
687 16 => Ok(Self::NotCoordinator),
688 17 => Ok(Self::InvalidTopicException),
689 18 => Ok(Self::RecordListTooLarge),
690 19 => Ok(Self::NotEnoughReplicas),
691 20 => Ok(Self::NotEnoughReplicasAfterAppend),
692 21 => Ok(Self::InvalidRequiredAcks),
693 22 => Ok(Self::IllegalGeneration),
694 23 => Ok(Self::InconsistentGroupProtocol),
695 24 => Ok(Self::InvalidGroupId),
696 25 => Ok(Self::UnknownMemberId),
697 26 => Ok(Self::InvalidSessionTimeout),
698 27 => Ok(Self::RebalanceInProgress),
699 28 => Ok(Self::InvalidCommitOffsetSize),
700 29 => Ok(Self::TopicAuthorizationFailed),
701 30 => Ok(Self::GroupAuthorizationFailed),
702 31 => Ok(Self::ClusterAuthorizationFailed),
703 32 => Ok(Self::InvalidTimestamp),
704 33 => Ok(Self::UnsupportedSaslMechanism),
705 34 => Ok(Self::IllegalSaslState),
706 35 => Ok(Self::UnsupportedVersion),
707 36 => Ok(Self::TopicAlreadyExists),
708 37 => Ok(Self::InvalidPartitions),
709 38 => Ok(Self::InvalidReplicationFactor),
710 39 => Ok(Self::InvalidReplicaAssignment),
711 40 => Ok(Self::InvalidConfig),
712 41 => Ok(Self::NotController),
713 42 => Ok(Self::InvalidRequest),
714 43 => Ok(Self::UnsupportedForMessageFormat),
715 44 => Ok(Self::PolicyViolation),
716 45 => Ok(Self::OutOfOrderSequenceNumber),
717 46 => Ok(Self::DuplicateSequenceNumber),
718 47 => Ok(Self::InvalidProducerEpoch),
719 48 => Ok(Self::InvalidTxnState),
720 49 => Ok(Self::InvalidProducerIdMapping),
721 50 => Ok(Self::InvalidTransactionTimeout),
722 51 => Ok(Self::ConcurrentTransactions),
723 52 => Ok(Self::TransactionCoordinatorFenced),
724 53 => Ok(Self::TransactionalIdAuthorizationFailed),
725 54 => Ok(Self::SecurityDisabled),
726 55 => Ok(Self::OperationNotAttempted),
727 56 => Ok(Self::KafkaStorageError),
728 57 => Ok(Self::LogDirNotFound),
729 58 => Ok(Self::SaslAuthenticationFailed),
730 59 => Ok(Self::UnknownProducerId),
731 60 => Ok(Self::ReassignmentInProgress),
732 61 => Ok(Self::DelegationTokenAuthDisabled),
733 62 => Ok(Self::DelegationTokenNotFound),
734 63 => Ok(Self::DelegationTokenOwnerMismatch),
735 64 => Ok(Self::DelegationTokenRequestNotAllowed),
736 65 => Ok(Self::DelegationTokenAuthorizationFailed),
737 66 => Ok(Self::DelegationTokenExpired),
738 67 => Ok(Self::InvalidPrincipalType),
739 68 => Ok(Self::NonEmptyGroup),
740 69 => Ok(Self::GroupIdNotFound),
741 70 => Ok(Self::FetchSessionIdNotFound),
742 71 => Ok(Self::InvalidFetchSessionEpoch),
743 72 => Ok(Self::ListenerNotFound),
744 73 => Ok(Self::TopicDeletionDisabled),
745 74 => Ok(Self::FencedLeaderEpoch),
746 75 => Ok(Self::UnknownLeaderEpoch),
747 76 => Ok(Self::UnsupportedCompressionType),
748 77 => Ok(Self::StaleBrokerEpoch),
749 78 => Ok(Self::OffsetNotAvailable),
750 79 => Ok(Self::MemberIdRequired),
751 80 => Ok(Self::PreferredLeaderNotAvailable),
752 81 => Ok(Self::GroupMaxSizeReached),
753 82 => Ok(Self::FencedInstanceId),
754 83 => Ok(Self::EligibleLeadersNotAvailable),
755 84 => Ok(Self::ElectionNotNeeded),
756 85 => Ok(Self::NoReassignmentInProgress),
757 86 => Ok(Self::GroupSubscribedToTopic),
758 87 => Ok(Self::InvalidRecord),
759 88 => Ok(Self::UnstableOffsetCommit),
760 89 => Ok(Self::ThrottlingQuotaExceeded),
761 90 => Ok(Self::ProducerFenced),
762 91 => Ok(Self::ResourceNotFound),
763 92 => Ok(Self::DuplicateResource),
764 93 => Ok(Self::UnacceptableCredential),
765 94 => Ok(Self::InconsistentVoterSet),
766 95 => Ok(Self::InvalidUpdateVersion),
767 96 => Ok(Self::FeatureUpdateFailed),
768 97 => Ok(Self::PrincipalDeserializationFailure),
769 98 => Ok(Self::SnapshotNotFound),
770 99 => Ok(Self::PositionOutOfRange),
771 100 => Ok(Self::UnknownTopicId),
772 101 => Ok(Self::DuplicateBrokerRegistration),
773 102 => Ok(Self::BrokerIdNotRegistered),
774 103 => Ok(Self::InconsistentTopicId),
775 104 => Ok(Self::InconsistentClusterId),
776 105 => Ok(Self::TransactionalIdNotFound),
777 106 => Ok(Self::FetchSessionTopicIdError),
778 107 => Ok(Self::IneligibleReplica),
779 108 => Ok(Self::NewLeaderElected),
780 109 => Ok(Self::OffsetMovedToTieredStorage),
781 110 => Ok(Self::FencedMemberEpoch),
782 111 => Ok(Self::UnreleasedInstanceId),
783 112 => Ok(Self::UnsupportedAssignor),
784 113 => Ok(Self::StaleMemberEpoch),
785 114 => Ok(Self::MismatchedEndpointType),
786 115 => Ok(Self::UnsupportedEndpointType),
787 116 => Ok(Self::UnknownControllerId),
788 117 => Ok(Self::UnknownSubscriptionId),
789 118 => Ok(Self::TelemetryTooLarge),
790 119 => Ok(Self::InvalidRegistration),
791 otherwise => Err(Error::UnknownApiErrorCode(*otherwise)),
792 }
793 }
794}
795
796impl From<ErrorCode> for i16 {
797 fn from(value: ErrorCode) -> Self {
798 Self::from(&value)
799 }
800}
801
802impl From<&ErrorCode> for i16 {
803 #[allow(clippy::too_many_lines)]
804 fn from(value: &ErrorCode) -> Self {
805 match value {
806 ErrorCode::UnknownServerError => -1,
807 ErrorCode::None => 0,
808 ErrorCode::OffsetOutOfRange => 1,
809 ErrorCode::CorruptMessage => 2,
810 ErrorCode::UnknownTopicOrPartition => 3,
811 ErrorCode::InvalidFetchSize => 4,
812 ErrorCode::LeaderNotAvailable => 5,
813 ErrorCode::NotLeaderOrFollower => 6,
814 ErrorCode::RequestTimedOut => 7,
815 ErrorCode::BrokerNotAvailable => 8,
816 ErrorCode::ReplicaNotAvailable => 9,
817 ErrorCode::MessageTooLarge => 10,
818 ErrorCode::StaleControllerEpoch => 11,
819 ErrorCode::OffsetMetadataTooLarge => 12,
820 ErrorCode::NetworkException => 13,
821 ErrorCode::CoordinatorLoadInProgress => 14,
822 ErrorCode::CoordinatorNotAvailable => 15,
823 ErrorCode::NotCoordinator => 16,
824 ErrorCode::InvalidTopicException => 17,
825 ErrorCode::RecordListTooLarge => 18,
826 ErrorCode::NotEnoughReplicas => 19,
827 ErrorCode::NotEnoughReplicasAfterAppend => 20,
828 ErrorCode::InvalidRequiredAcks => 21,
829 ErrorCode::IllegalGeneration => 22,
830 ErrorCode::InconsistentGroupProtocol => 23,
831 ErrorCode::InvalidGroupId => 24,
832 ErrorCode::UnknownMemberId => 25,
833 ErrorCode::InvalidSessionTimeout => 26,
834 ErrorCode::RebalanceInProgress => 27,
835 ErrorCode::InvalidCommitOffsetSize => 28,
836 ErrorCode::TopicAuthorizationFailed => 29,
837 ErrorCode::GroupAuthorizationFailed => 30,
838 ErrorCode::ClusterAuthorizationFailed => 31,
839 ErrorCode::InvalidTimestamp => 32,
840 ErrorCode::UnsupportedSaslMechanism => 33,
841 ErrorCode::IllegalSaslState => 34,
842 ErrorCode::UnsupportedVersion => 35,
843 ErrorCode::TopicAlreadyExists => 36,
844 ErrorCode::InvalidPartitions => 37,
845 ErrorCode::InvalidReplicationFactor => 38,
846 ErrorCode::InvalidReplicaAssignment => 39,
847 ErrorCode::InvalidConfig => 40,
848 ErrorCode::NotController => 41,
849 ErrorCode::InvalidRequest => 42,
850 ErrorCode::UnsupportedForMessageFormat => 43,
851 ErrorCode::PolicyViolation => 44,
852 ErrorCode::OutOfOrderSequenceNumber => 45,
853 ErrorCode::DuplicateSequenceNumber => 46,
854 ErrorCode::InvalidProducerEpoch => 47,
855 ErrorCode::InvalidTxnState => 48,
856 ErrorCode::InvalidProducerIdMapping => 49,
857 ErrorCode::InvalidTransactionTimeout => 50,
858 ErrorCode::ConcurrentTransactions => 51,
859 ErrorCode::TransactionCoordinatorFenced => 52,
860 ErrorCode::TransactionalIdAuthorizationFailed => 53,
861 ErrorCode::SecurityDisabled => 54,
862 ErrorCode::OperationNotAttempted => 55,
863 ErrorCode::KafkaStorageError => 56,
864 ErrorCode::LogDirNotFound => 57,
865 ErrorCode::SaslAuthenticationFailed => 58,
866 ErrorCode::UnknownProducerId => 59,
867 ErrorCode::ReassignmentInProgress => 60,
868 ErrorCode::DelegationTokenAuthDisabled => 61,
869 ErrorCode::DelegationTokenNotFound => 62,
870 ErrorCode::DelegationTokenOwnerMismatch => 63,
871 ErrorCode::DelegationTokenRequestNotAllowed => 64,
872 ErrorCode::DelegationTokenAuthorizationFailed => 65,
873 ErrorCode::DelegationTokenExpired => 66,
874 ErrorCode::InvalidPrincipalType => 67,
875 ErrorCode::NonEmptyGroup => 68,
876 ErrorCode::GroupIdNotFound => 69,
877 ErrorCode::FetchSessionIdNotFound => 70,
878 ErrorCode::InvalidFetchSessionEpoch => 71,
879 ErrorCode::ListenerNotFound => 72,
880 ErrorCode::TopicDeletionDisabled => 73,
881 ErrorCode::FencedLeaderEpoch => 74,
882 ErrorCode::UnknownLeaderEpoch => 75,
883 ErrorCode::UnsupportedCompressionType => 76,
884 ErrorCode::StaleBrokerEpoch => 77,
885 ErrorCode::OffsetNotAvailable => 78,
886 ErrorCode::MemberIdRequired => 79,
887 ErrorCode::PreferredLeaderNotAvailable => 80,
888 ErrorCode::GroupMaxSizeReached => 81,
889 ErrorCode::FencedInstanceId => 82,
890 ErrorCode::EligibleLeadersNotAvailable => 83,
891 ErrorCode::ElectionNotNeeded => 84,
892 ErrorCode::NoReassignmentInProgress => 85,
893 ErrorCode::GroupSubscribedToTopic => 86,
894 ErrorCode::InvalidRecord => 87,
895 ErrorCode::UnstableOffsetCommit => 88,
896 ErrorCode::ThrottlingQuotaExceeded => 89,
897 ErrorCode::ProducerFenced => 90,
898 ErrorCode::ResourceNotFound => 91,
899 ErrorCode::DuplicateResource => 92,
900 ErrorCode::UnacceptableCredential => 93,
901 ErrorCode::InconsistentVoterSet => 94,
902 ErrorCode::InvalidUpdateVersion => 95,
903 ErrorCode::FeatureUpdateFailed => 96,
904 ErrorCode::PrincipalDeserializationFailure => 97,
905 ErrorCode::SnapshotNotFound => 98,
906 ErrorCode::PositionOutOfRange => 99,
907 ErrorCode::UnknownTopicId => 100,
908 ErrorCode::DuplicateBrokerRegistration => 101,
909 ErrorCode::BrokerIdNotRegistered => 102,
910 ErrorCode::InconsistentTopicId => 103,
911 ErrorCode::InconsistentClusterId => 104,
912 ErrorCode::TransactionalIdNotFound => 105,
913 ErrorCode::FetchSessionTopicIdError => 106,
914 ErrorCode::IneligibleReplica => 107,
915 ErrorCode::NewLeaderElected => 108,
916 ErrorCode::OffsetMovedToTieredStorage => 109,
917 ErrorCode::FencedMemberEpoch => 110,
918 ErrorCode::UnreleasedInstanceId => 111,
919 ErrorCode::UnsupportedAssignor => 112,
920 ErrorCode::StaleMemberEpoch => 113,
921 ErrorCode::MismatchedEndpointType => 114,
922 ErrorCode::UnsupportedEndpointType => 115,
923 ErrorCode::UnknownControllerId => 116,
924 ErrorCode::UnknownSubscriptionId => 117,
925 ErrorCode::TelemetryTooLarge => 118,
926 ErrorCode::InvalidRegistration => 119,
927 }
928 }
929}
930
931impl Display for ErrorCode {
932 #[allow(clippy::too_many_lines)]
933 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
934 match self {
935 ErrorCode::UnknownServerError => f.write_str(
936 "The server experienced an unexpected error when processing the request.",
937 ),
938 ErrorCode::None => f.write_str("No error."),
939 ErrorCode::OffsetOutOfRange => f.write_str(
940 "The requested offset is not within the range of offsets maintained by the server.",
941 ),
942 ErrorCode::CorruptMessage => f.write_str(
943 "This message has failed its CRC checksum, exceeds the valid size, has a null key \
944 for a compacted topic, or is otherwise corrupt.",
945 ),
946 ErrorCode::UnknownTopicOrPartition => {
947 f.write_str("This server does not host this topic-partition.")
948 }
949 ErrorCode::InvalidFetchSize => f.write_str("The requested fetch size is invalid."),
950 ErrorCode::LeaderNotAvailable => f.write_str(
951 "There is no leader for this topic-partition as we are in the middle of a \
952 leadership election.",
953 ),
954 ErrorCode::NotLeaderOrFollower => f.write_str(
955 "For requests intended only for the leader, this error indicates that the broker \
956 is not the current leader. For requests intended for any replica, this error \
957 indicates that the broker is not a replica of the topic partition.",
958 ),
959 ErrorCode::RequestTimedOut => f.write_str("The request timed out."),
960 ErrorCode::BrokerNotAvailable => f.write_str("The broker is not available."),
961 ErrorCode::ReplicaNotAvailable => f.write_str(
962 "The replica is not available for the requested topic-partition. Produce/Fetch \
963 requests and other requests intended only for the leader or follower return \
964 NOT_LEADER_OR_FOLLOWER if the broker is not a replica of the topic-partition.",
965 ),
966 ErrorCode::MessageTooLarge => f.write_str(
967 "The request included a message larger than the max message size the server will \
968 accept.",
969 ),
970 ErrorCode::StaleControllerEpoch => {
971 f.write_str("The controller moved to another broker.")
972 }
973 ErrorCode::OffsetMetadataTooLarge => {
974 f.write_str("The metadata field of the offset request was too large.")
975 }
976 ErrorCode::NetworkException => {
977 f.write_str("The server disconnected before a response was received.")
978 }
979 ErrorCode::CoordinatorLoadInProgress => {
980 f.write_str("The coordinator is loading and hence can't process requests.")
981 }
982 ErrorCode::CoordinatorNotAvailable => f.write_str("The coordinator is not available."),
983 ErrorCode::NotCoordinator => f.write_str("This is not the correct coordinator."),
984 ErrorCode::InvalidTopicException => {
985 f.write_str("The request attempted to perform an operation on an invalid topic.")
986 }
987 ErrorCode::RecordListTooLarge => f.write_str(
988 "The request included message batch larger than the configured segment size on \
989 the server.",
990 ),
991 ErrorCode::NotEnoughReplicas => f.write_str(
992 "Messages are rejected since there are fewer in-sync replicas than required.",
993 ),
994 ErrorCode::NotEnoughReplicasAfterAppend => f.write_str(
995 "Messages are written to the log, but to fewer in-sync replicas than required.",
996 ),
997 ErrorCode::InvalidRequiredAcks => {
998 f.write_str("Produce request specified an invalid value for required acks.")
999 }
1000 ErrorCode::IllegalGeneration => {
1001 f.write_str("Specified group generation id is not valid.")
1002 }
1003 ErrorCode::InconsistentGroupProtocol => f.write_str(
1004 "The group member's supported protocols are incompatible with those of existing \
1005 members or first group member tried to join with empty protocol type or empty \
1006 protocol list.",
1007 ),
1008 ErrorCode::InvalidGroupId => f.write_str("The configured groupId is invalid."),
1009 ErrorCode::UnknownMemberId => {
1010 f.write_str("The coordinator is not aware of this member.")
1011 }
1012 ErrorCode::InvalidSessionTimeout => f.write_str(
1013 "The session timeout is not within the range allowed by the broker (as configured \
1014 by group.min.session.timeout.ms and group.max.session.timeout.ms).",
1015 ),
1016 ErrorCode::RebalanceInProgress => {
1017 f.write_str("The group is rebalancing, so a rejoin is needed.")
1018 }
1019 ErrorCode::InvalidCommitOffsetSize => {
1020 f.write_str("The committing offset data size is not valid.")
1021 }
1022 ErrorCode::TopicAuthorizationFailed => f.write_str("Topic authorization failed."),
1023 ErrorCode::GroupAuthorizationFailed => f.write_str("Group authorization failed."),
1024 ErrorCode::ClusterAuthorizationFailed => f.write_str("Cluster authorization failed."),
1025 ErrorCode::InvalidTimestamp => {
1026 f.write_str("The timestamp of the message is out of acceptable range.")
1027 }
1028 ErrorCode::UnsupportedSaslMechanism => {
1029 f.write_str("The broker does not support the requested SASL mechanism.")
1030 }
1031 ErrorCode::IllegalSaslState => {
1032 f.write_str("Request is not valid given the current SASL state.")
1033 }
1034 ErrorCode::UnsupportedVersion => f.write_str("The version of API is not supported."),
1035 ErrorCode::TopicAlreadyExists => f.write_str("Topic with this name already exists."),
1036 ErrorCode::InvalidPartitions => f.write_str("Number of partitions is below 1."),
1037 ErrorCode::InvalidReplicationFactor => f.write_str(
1038 "Replication factor is below 1 or larger than the number of available brokers.",
1039 ),
1040 ErrorCode::InvalidReplicaAssignment => f.write_str("Replica assignment is invalid."),
1041 ErrorCode::InvalidConfig => f.write_str("Configuration is invalid."),
1042 ErrorCode::NotController => {
1043 f.write_str("This is not the correct controller for this cluster.")
1044 }
1045 ErrorCode::InvalidRequest => f.write_str(
1046 "This most likely occurs because of a request being malformed by the client \
1047 library or the message was sent to an incompatible broker. See the broker logs \
1048 for more details.",
1049 ),
1050 ErrorCode::UnsupportedForMessageFormat => f.write_str(
1051 "The message format version on the broker does not support the request.",
1052 ),
1053 ErrorCode::PolicyViolation => {
1054 f.write_str("Request parameters do not satisfy the configured policy.")
1055 }
1056 ErrorCode::OutOfOrderSequenceNumber => {
1057 f.write_str("The broker received an out of order sequence number.")
1058 }
1059 ErrorCode::DuplicateSequenceNumber => {
1060 f.write_str("The broker received a duplicate sequence number.")
1061 }
1062 ErrorCode::InvalidProducerEpoch => {
1063 f.write_str("Producer attempted to produce with an old epoch.")
1064 }
1065 ErrorCode::InvalidTxnState => {
1066 f.write_str("The producer attempted a transactional operation in an invalid state.")
1067 }
1068 ErrorCode::InvalidProducerIdMapping => f.write_str(
1069 "The producer attempted to use a producer id which is not currently assigned to \
1070 its transactional id.",
1071 ),
1072 ErrorCode::InvalidTransactionTimeout => f.write_str(
1073 "The transaction timeout is larger than the maximum value allowed by the broker \
1074 (as configured by transaction.max.timeout.ms).",
1075 ),
1076 ErrorCode::ConcurrentTransactions => f.write_str(
1077 "The producer attempted to update a transaction while another concurrent \
1078 operation on the same transaction was ongoing.",
1079 ),
1080 ErrorCode::TransactionCoordinatorFenced => f.write_str(
1081 "Indicates that the transaction coordinator sending a WriteTxnMarker is no longer \
1082 the current coordinator for a given producer.",
1083 ),
1084 ErrorCode::TransactionalIdAuthorizationFailed => {
1085 f.write_str("Transactional Id authorization failed.")
1086 }
1087 ErrorCode::SecurityDisabled => f.write_str("Security features are disabled."),
1088 ErrorCode::OperationNotAttempted => f.write_str(
1089 "The broker did not attempt to execute this operation. This may happen for \
1090 batched RPCs where some operations in the batch failed, causing the broker to \
1091 respond without trying the rest.",
1092 ),
1093 ErrorCode::KafkaStorageError => {
1094 f.write_str("Disk error when trying to access log file on the disk.")
1095 }
1096 ErrorCode::LogDirNotFound => {
1097 f.write_str("The user-specified log directory is not found in the broker config.")
1098 }
1099 ErrorCode::SaslAuthenticationFailed => f.write_str("SASL Authentication failed."),
1100 ErrorCode::UnknownProducerId => f.write_str(
1101 "This exception is raised by the broker if it could not locate the producer \
1102 metadata associated with the producerId in question. This could happen if, for \
1103 instance, the producer's records were deleted because their retention time had \
1104 elapsed. Once the last records of the producerId are removed, the producer's \
1105 metadata is removed from the broker, and future appends by the producer will \
1106 return this exception.",
1107 ),
1108 ErrorCode::ReassignmentInProgress => {
1109 f.write_str("A partition reassignment is in progress.")
1110 }
1111 ErrorCode::DelegationTokenAuthDisabled => {
1112 f.write_str("Delegation Token feature is not enabled.")
1113 }
1114 ErrorCode::DelegationTokenNotFound => {
1115 f.write_str("Delegation Token is not found on server.")
1116 }
1117 ErrorCode::DelegationTokenOwnerMismatch => {
1118 f.write_str("Specified Principal is not valid Owner/Renewer.")
1119 }
1120 ErrorCode::DelegationTokenRequestNotAllowed => f.write_str(
1121 "Delegation Token requests are not allowed on PLAINTEXT/1-way SSL channels and on \
1122 delegation token authenticated channels.",
1123 ),
1124 ErrorCode::DelegationTokenAuthorizationFailed => {
1125 f.write_str("Delegation Token authorization failed.")
1126 }
1127 ErrorCode::DelegationTokenExpired => f.write_str("Delegation Token is expired."),
1128 ErrorCode::InvalidPrincipalType => {
1129 f.write_str("Supplied principalType is not supported.")
1130 }
1131 ErrorCode::NonEmptyGroup => f.write_str("The group is not empty."),
1132 ErrorCode::GroupIdNotFound => f.write_str("The group id does not exist."),
1133 ErrorCode::FetchSessionIdNotFound => f.write_str("The fetch session ID was not found."),
1134 ErrorCode::InvalidFetchSessionEpoch => {
1135 f.write_str("The fetch session epoch is invalid.")
1136 }
1137 ErrorCode::ListenerNotFound => f.write_str(
1138 "There is no listener on the leader broker that matches the listener on which \
1139 metadata request was processed.",
1140 ),
1141 ErrorCode::TopicDeletionDisabled => f.write_str("Topic deletion is disabled."),
1142 ErrorCode::FencedLeaderEpoch => f.write_str(
1143 "The leader epoch in the request is older than the epoch on the broker.",
1144 ),
1145 ErrorCode::UnknownLeaderEpoch => f.write_str(
1146 "The leader epoch in the request is newer than the epoch on the broker.",
1147 ),
1148 ErrorCode::UnsupportedCompressionType => f.write_str(
1149 "The requesting client does not support the compression type of given partition.",
1150 ),
1151 ErrorCode::StaleBrokerEpoch => f.write_str("Broker epoch has changed."),
1152 ErrorCode::OffsetNotAvailable => f.write_str(
1153 "The leader high watermark has not caught up from a recent leader election so the \
1154 offsets cannot be guaranteed to be monotonically increasing.",
1155 ),
1156 ErrorCode::MemberIdRequired => f.write_str(
1157 "The group member needs to have a valid member id before actually entering a \
1158 consumer group.",
1159 ),
1160 ErrorCode::PreferredLeaderNotAvailable => {
1161 f.write_str("The preferred leader was not available.")
1162 }
1163 ErrorCode::GroupMaxSizeReached => {
1164 f.write_str("The consumer group has reached its max size.")
1165 }
1166 ErrorCode::FencedInstanceId => f.write_str(
1167 "The broker rejected this static consumer since another consumer with the same \
1168 group.instance.id has registered with a different member.id.",
1169 ),
1170 ErrorCode::EligibleLeadersNotAvailable => {
1171 f.write_str("Eligible topic partition leaders are not available.")
1172 }
1173 ErrorCode::ElectionNotNeeded => {
1174 f.write_str("Leader election not needed for topic partition.")
1175 }
1176 ErrorCode::NoReassignmentInProgress => {
1177 f.write_str("No partition reassignment is in progress.")
1178 }
1179 ErrorCode::GroupSubscribedToTopic => f.write_str(
1180 "Deleting offsets of a topic is forbidden while the consumer group is actively \
1181 subscribed to it.",
1182 ),
1183 ErrorCode::InvalidRecord => f.write_str(
1184 "This record has failed the validation on broker and hence will be rejected.",
1185 ),
1186 ErrorCode::UnstableOffsetCommit => {
1187 f.write_str("There are unstable offsets that need to be cleared.")
1188 }
1189 ErrorCode::ThrottlingQuotaExceeded => {
1190 f.write_str("The throttling quota has been exceeded.")
1191 }
1192 ErrorCode::ProducerFenced => f.write_str(
1193 "There is a newer producer with the same transactionalId which fences the current \
1194 one.",
1195 ),
1196 ErrorCode::ResourceNotFound => {
1197 f.write_str("A request illegally referred to a resource that does not exist.")
1198 }
1199 ErrorCode::DuplicateResource => {
1200 f.write_str("A request illegally referred to the same resource twice.")
1201 }
1202 ErrorCode::UnacceptableCredential => {
1203 f.write_str("Requested credential would not meet criteria for acceptability.")
1204 }
1205 ErrorCode::InconsistentVoterSet => f.write_str(
1206 "Indicates that the either the sender or recipient of a voter-only request is not \
1207 one of the expected voters",
1208 ),
1209 ErrorCode::InvalidUpdateVersion => f.write_str("The given update version was invalid."),
1210 ErrorCode::FeatureUpdateFailed => f.write_str(
1211 "Unable to update finalized features due to an unexpected server error.",
1212 ),
1213 ErrorCode::PrincipalDeserializationFailure => f.write_str(
1214 "Request principal deserialization failed during forwarding. This indicates an \
1215 internal error on the broker cluster security setup.",
1216 ),
1217 ErrorCode::SnapshotNotFound => f.write_str("Requested snapshot was not found"),
1218 ErrorCode::PositionOutOfRange => f.write_str(
1219 "Requested position is not greater than or equal to zero, and less than the size \
1220 of the snapshot.",
1221 ),
1222 ErrorCode::UnknownTopicId => f.write_str("This server does not host this topic ID."),
1223 ErrorCode::DuplicateBrokerRegistration => {
1224 f.write_str("This broker ID is already in use.")
1225 }
1226 ErrorCode::BrokerIdNotRegistered => {
1227 f.write_str("The given broker ID was not registered.")
1228 }
1229 ErrorCode::InconsistentTopicId => {
1230 f.write_str("The log's topic ID did not match the topic ID in the request")
1231 }
1232 ErrorCode::InconsistentClusterId => {
1233 f.write_str("The clusterId in the request does not match that found on the server")
1234 }
1235 ErrorCode::TransactionalIdNotFound => {
1236 f.write_str("The transactionalId could not be found")
1237 }
1238 ErrorCode::FetchSessionTopicIdError => {
1239 f.write_str("The fetch session encountered inconsistent topic ID usage")
1240 }
1241 ErrorCode::IneligibleReplica => {
1242 f.write_str("The new ISR contains at least one ineligible replica.")
1243 }
1244 ErrorCode::NewLeaderElected => f.write_str(
1245 "The AlterPartition request successfully updated the partition state but the \
1246 leader has changed.",
1247 ),
1248 ErrorCode::OffsetMovedToTieredStorage => {
1249 f.write_str("The requested offset is moved to tiered storage.")
1250 }
1251 ErrorCode::FencedMemberEpoch => f.write_str(
1252 "The member epoch is fenced by the group coordinator. The member must abandon all \
1253 its partitions and rejoin.",
1254 ),
1255 ErrorCode::UnreleasedInstanceId => f.write_str(
1256 "The instance ID is still used by another member in the consumer group. That \
1257 member must leave first.",
1258 ),
1259 ErrorCode::UnsupportedAssignor => f.write_str(
1260 "The assignor or its version range is not supported by the consumer group.",
1261 ),
1262 ErrorCode::StaleMemberEpoch => f.write_str(
1263 "The member epoch is stale. The member must retry after receiving its updated \
1264 member epoch via the ConsumerGroupHeartbeat API.",
1265 ),
1266 ErrorCode::MismatchedEndpointType => {
1267 f.write_str("The request was sent to an endpoint of the wrong type.")
1268 }
1269 ErrorCode::UnsupportedEndpointType => {
1270 f.write_str("This endpoint type is not supported yet.")
1271 }
1272 ErrorCode::UnknownControllerId => f.write_str("This controller ID is not known."),
1273 ErrorCode::UnknownSubscriptionId => f.write_str(
1274 "Client sent a push telemetry request with an invalid or outdated subscription ID.",
1275 ),
1276 ErrorCode::TelemetryTooLarge => f.write_str(
1277 "Client sent a push telemetry request larger than the maximum size the broker \
1278 will accept.",
1279 ),
1280 ErrorCode::InvalidRegistration => {
1281 f.write_str("The controller has considered the broker registration to be invalid.")
1282 }
1283 }
1284 }
1285}
1286
1287#[non_exhaustive]
1288#[derive(
1289 Clone, Copy, Default, Deserialize, Eq, Hash, Debug, Ord, PartialEq, PartialOrd, Serialize,
1290)]
1291pub enum ErrorCode {
1293 UnknownServerError,
1294 #[default]
1295 None,
1296 OffsetOutOfRange,
1297 CorruptMessage,
1298 UnknownTopicOrPartition,
1299 InvalidFetchSize,
1300 LeaderNotAvailable,
1301 NotLeaderOrFollower,
1302 RequestTimedOut,
1303 BrokerNotAvailable,
1304 ReplicaNotAvailable,
1305 MessageTooLarge,
1306 StaleControllerEpoch,
1307 OffsetMetadataTooLarge,
1308 NetworkException,
1309 CoordinatorLoadInProgress,
1310 CoordinatorNotAvailable,
1311 NotCoordinator,
1312 InvalidTopicException,
1313 RecordListTooLarge,
1314 NotEnoughReplicas,
1315 NotEnoughReplicasAfterAppend,
1316 InvalidRequiredAcks,
1317 IllegalGeneration,
1318 InconsistentGroupProtocol,
1319 InvalidGroupId,
1320 UnknownMemberId,
1321 InvalidSessionTimeout,
1322 RebalanceInProgress,
1323 InvalidCommitOffsetSize,
1324 TopicAuthorizationFailed,
1325 GroupAuthorizationFailed,
1326 ClusterAuthorizationFailed,
1327 InvalidTimestamp,
1328 UnsupportedSaslMechanism,
1329 IllegalSaslState,
1330 UnsupportedVersion,
1331 TopicAlreadyExists,
1332 InvalidPartitions,
1333 InvalidReplicationFactor,
1334 InvalidReplicaAssignment,
1335 InvalidConfig,
1336 NotController,
1337 InvalidRequest,
1338 UnsupportedForMessageFormat,
1339 PolicyViolation,
1340 OutOfOrderSequenceNumber,
1341 DuplicateSequenceNumber,
1342 InvalidProducerEpoch,
1343 InvalidTxnState,
1344 InvalidProducerIdMapping,
1345 InvalidTransactionTimeout,
1346 ConcurrentTransactions,
1347 TransactionCoordinatorFenced,
1348 TransactionalIdAuthorizationFailed,
1349 SecurityDisabled,
1350 OperationNotAttempted,
1351 KafkaStorageError,
1352 LogDirNotFound,
1353 SaslAuthenticationFailed,
1354 UnknownProducerId,
1355 ReassignmentInProgress,
1356 DelegationTokenAuthDisabled,
1357 DelegationTokenNotFound,
1358 DelegationTokenOwnerMismatch,
1359 DelegationTokenRequestNotAllowed,
1360 DelegationTokenAuthorizationFailed,
1361 DelegationTokenExpired,
1362 InvalidPrincipalType,
1363 NonEmptyGroup,
1364 GroupIdNotFound,
1365 FetchSessionIdNotFound,
1366 InvalidFetchSessionEpoch,
1367 ListenerNotFound,
1368 TopicDeletionDisabled,
1369 FencedLeaderEpoch,
1370 UnknownLeaderEpoch,
1371 UnsupportedCompressionType,
1372 StaleBrokerEpoch,
1373 OffsetNotAvailable,
1374 MemberIdRequired,
1375 PreferredLeaderNotAvailable,
1376 GroupMaxSizeReached,
1377 FencedInstanceId,
1378 EligibleLeadersNotAvailable,
1379 ElectionNotNeeded,
1380 NoReassignmentInProgress,
1381 GroupSubscribedToTopic,
1382 InvalidRecord,
1383 UnstableOffsetCommit,
1384 ThrottlingQuotaExceeded,
1385 ProducerFenced,
1386 ResourceNotFound,
1387 DuplicateResource,
1388 UnacceptableCredential,
1389 InconsistentVoterSet,
1390 InvalidUpdateVersion,
1391 FeatureUpdateFailed,
1392 PrincipalDeserializationFailure,
1393 SnapshotNotFound,
1394 PositionOutOfRange,
1395 UnknownTopicId,
1396 DuplicateBrokerRegistration,
1397 BrokerIdNotRegistered,
1398 InconsistentTopicId,
1399 InconsistentClusterId,
1400 TransactionalIdNotFound,
1401 FetchSessionTopicIdError,
1402 IneligibleReplica,
1403 NewLeaderElected,
1404 OffsetMovedToTieredStorage,
1405 FencedMemberEpoch,
1406 UnreleasedInstanceId,
1407 UnsupportedAssignor,
1408 StaleMemberEpoch,
1409 MismatchedEndpointType,
1410 UnsupportedEndpointType,
1411 UnknownControllerId,
1412 UnknownSubscriptionId,
1413 TelemetryTooLarge,
1414 InvalidRegistration,
1415}
1416
1417#[derive(
1418 Clone, Copy, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize,
1419)]
1420pub enum IsolationLevel {
1422 #[default]
1423 ReadUncommitted,
1424 ReadCommitted,
1425}
1426
1427impl TryFrom<i8> for IsolationLevel {
1428 type Error = Error;
1429
1430 fn try_from(value: i8) -> Result<Self, Self::Error> {
1431 match value {
1432 0 => Ok(Self::ReadUncommitted),
1433 1 => Ok(Self::ReadCommitted),
1434 _ => Err(Error::InvalidIsolationLevel(value)),
1435 }
1436 }
1437}
1438
1439impl From<IsolationLevel> for i8 {
1440 fn from(value: IsolationLevel) -> Self {
1441 Self::from(&value)
1442 }
1443}
1444
1445impl From<&IsolationLevel> for i8 {
1446 fn from(value: &IsolationLevel) -> Self {
1447 match value {
1448 IsolationLevel::ReadUncommitted => 0,
1449 IsolationLevel::ReadCommitted => 1,
1450 }
1451 }
1452}
1453
1454#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
1455pub enum Ack {
1457 None,
1458 Leader,
1459 FullIsr,
1460}
1461
1462impl Ack {
1463 const FULL_ISR: i16 = -1;
1464 const NONE: i16 = 0;
1465 const LEADER: i16 = 1;
1466}
1467
1468impl From<Ack> for i16 {
1469 fn from(value: Ack) -> Self {
1470 match value {
1471 Ack::FullIsr => Ack::FULL_ISR,
1472 Ack::None => Ack::NONE,
1473 Ack::Leader => Ack::LEADER,
1474 }
1475 }
1476}
1477
1478impl TryFrom<i16> for Ack {
1479 type Error = Error;
1480
1481 fn try_from(value: i16) -> Result<Self, Self::Error> {
1482 match value {
1483 Self::FULL_ISR => Ok(Self::FullIsr),
1484 Self::NONE => Ok(Self::None),
1485 Self::LEADER => Ok(Self::Leader),
1486 _ => Err(Error::InvalidAckValue(value)),
1487 }
1488 }
1489}
1490
1491#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1492pub enum TimestampType {
1494 #[default]
1495 CreateTime,
1496 LogAppendTime,
1497}
1498
1499impl TimestampType {
1500 const TIMESTAMP_TYPE_BITMASK: i16 = 8;
1501}
1502
1503impl From<i16> for TimestampType {
1504 fn from(value: i16) -> Self {
1505 if value & Self::TIMESTAMP_TYPE_BITMASK == Self::TIMESTAMP_TYPE_BITMASK {
1506 Self::LogAppendTime
1507 } else {
1508 Self::CreateTime
1509 }
1510 }
1511}
1512
1513impl From<TimestampType> for i16 {
1514 fn from(value: TimestampType) -> Self {
1515 match value {
1516 TimestampType::CreateTime => 0,
1517 TimestampType::LogAppendTime => TimestampType::TIMESTAMP_TYPE_BITMASK,
1518 }
1519 }
1520}
1521
1522#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1523pub enum Compression {
1525 #[default]
1526 None,
1527 Gzip,
1528 Snappy,
1529 Lz4,
1530 Zstd,
1531}
1532
1533impl TryFrom<i16> for Compression {
1534 type Error = Error;
1535
1536 fn try_from(value: i16) -> Result<Self, Self::Error> {
1537 match value & 0b111i16 {
1538 0 => Ok(Self::None),
1539 1 => Ok(Self::Gzip),
1540 2 => Ok(Self::Snappy),
1541 3 => Ok(Self::Lz4),
1542 4 => Ok(Self::Zstd),
1543 otherwise => Err(Error::UnknownCompressionType(otherwise)),
1544 }
1545 }
1546}
1547
1548impl From<Compression> for i16 {
1549 fn from(value: Compression) -> Self {
1550 match value {
1551 Compression::None => 0,
1552 Compression::Gzip => 1,
1553 Compression::Snappy => 2,
1554 Compression::Lz4 => 3,
1555 Compression::Zstd => 4,
1556 }
1557 }
1558}
1559
1560impl Compression {
1561 fn inflator(&self, mut deflated: impl BufRead + 'static) -> Result<Box<dyn Read>> {
1562 match self {
1563 Compression::None => Ok(Box::new(deflated)),
1564 Compression::Gzip => Ok(Box::new(GzDecoder::new(deflated))),
1565 Compression::Snappy => {
1566 let mut input = vec![];
1567 _ = deflated.read_to_end(&mut input)?;
1568 debug!(?input);
1569
1570 let mut decoder = snap::raw::Decoder::new();
1571
1572 decoder
1573 .decompress_vec(
1574 if input.starts_with(b"\x82SNAPPY\0") {
1576 if let (b"\x82SNAPPY\0", remainder) = input.split_at(8) {
1577 let (version, remainder) = remainder.split_at(4);
1578 let version: i32 = version.try_into().map(i32::from_be_bytes)?;
1579
1580 let (compatible_version, remainder) = remainder.split_at(4);
1581 let compatible_version: i32 =
1582 compatible_version.try_into().map(i32::from_be_bytes)?;
1583
1584 let (block_size, _) = remainder.split_at(4);
1585 let block_size: i32 =
1586 block_size.try_into().map(i32::from_be_bytes)?;
1587
1588 debug!(version, compatible_version, block_size);
1589 }
1590
1591 let skip_header = &input[20..];
1592 debug!(?skip_header);
1593 skip_header
1594 } else {
1595 &input[..]
1596 },
1597 )
1598 .map_err(Into::into)
1599 .map(Bytes::from)
1600 .map(|bytes| bytes.reader())
1601 .map(Box::new)
1602 .map(|boxed| boxed as Box<dyn Read>)
1603 .inspect_err(|err| error!(?err))
1604 }
1605 Compression::Lz4 => lz4::Decoder::new(deflated)
1606 .map(Box::new)
1607 .map(|boxed| boxed as Box<dyn Read>)
1608 .map_err(Into::into),
1609 Compression::Zstd => zstd::stream::read::Decoder::with_buffer(deflated)
1610 .map(Box::new)
1611 .map(|boxed| boxed as Box<dyn Read>)
1612 .map_err(Into::into),
1613 }
1614 }
1615}
1616
1617#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1618pub struct BatchAttribute {
1620 pub compression: Compression,
1621 pub timestamp: TimestampType,
1622 pub transaction: bool,
1623 pub control: bool,
1624 pub delete_horizon: bool,
1625}
1626
1627impl BatchAttribute {
1628 const TRANSACTION_BITMASK: i16 = 16;
1629 const CONTROL_BITMASK: i16 = 32;
1630 const DELETE_HORIZON_BITMASK: i16 = 64;
1631
1632 pub fn compression(self, compression: Compression) -> Self {
1633 Self {
1634 compression,
1635 ..self
1636 }
1637 }
1638
1639 pub fn timestamp(self, timestamp: TimestampType) -> Self {
1640 Self { timestamp, ..self }
1641 }
1642
1643 pub fn transaction(self, transaction: bool) -> Self {
1644 Self {
1645 transaction,
1646 ..self
1647 }
1648 }
1649
1650 pub fn control(self, control: bool) -> Self {
1651 Self { control, ..self }
1652 }
1653
1654 pub fn delete_horizon(self, delete_horizon: bool) -> Self {
1655 Self {
1656 delete_horizon,
1657 ..self
1658 }
1659 }
1660}
1661
1662impl From<BatchAttribute> for i16 {
1663 fn from(value: BatchAttribute) -> Self {
1664 let mut attributes = i16::from(value.compression);
1665 attributes |= i16::from(value.timestamp);
1666
1667 if value.transaction {
1668 attributes |= BatchAttribute::TRANSACTION_BITMASK;
1669 }
1670
1671 if value.control {
1672 attributes |= BatchAttribute::CONTROL_BITMASK;
1673 }
1674
1675 if value.delete_horizon {
1676 attributes |= BatchAttribute::DELETE_HORIZON_BITMASK;
1677 }
1678
1679 attributes
1680 }
1681}
1682
1683impl TryFrom<i16> for BatchAttribute {
1684 type Error = Error;
1685
1686 fn try_from(value: i16) -> Result<Self, Self::Error> {
1687 Compression::try_from(value).map(|compression| {
1688 Self::default()
1689 .compression(compression)
1690 .timestamp(TimestampType::from(value))
1691 .transaction(value & Self::TRANSACTION_BITMASK == Self::TRANSACTION_BITMASK)
1692 .control(value & Self::CONTROL_BITMASK == Self::CONTROL_BITMASK)
1693 .delete_horizon(
1694 value & Self::DELETE_HORIZON_BITMASK == Self::DELETE_HORIZON_BITMASK,
1695 )
1696 })
1697 }
1698}
1699
1700#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1701pub struct ControlBatch {
1703 pub version: i16,
1704 pub r#type: i16,
1705}
1706
1707impl ControlBatch {
1708 const ABORT: i16 = 0;
1709 const COMMIT: i16 = 1;
1710
1711 pub fn is_abort(&self) -> bool {
1712 self.r#type == Self::ABORT
1713 }
1714
1715 pub fn is_commit(&self) -> bool {
1716 self.r#type == Self::COMMIT
1717 }
1718
1719 pub fn version(self, version: i16) -> Self {
1720 Self { version, ..self }
1721 }
1722
1723 pub fn commit(self) -> Self {
1724 Self {
1725 r#type: Self::COMMIT,
1726 ..self
1727 }
1728 }
1729
1730 pub fn abort(self) -> Self {
1731 Self {
1732 r#type: Self::ABORT,
1733 ..self
1734 }
1735 }
1736}
1737
1738impl TryFrom<Bytes> for ControlBatch {
1739 type Error = Error;
1740
1741 fn try_from(value: Bytes) -> Result<Self, Self::Error> {
1742 let mut c = Cursor::new(value);
1743 let mut deserializer = Decoder::new(&mut c);
1744 Self::deserialize(&mut deserializer)
1745 }
1746}
1747
1748impl TryFrom<ControlBatch> for Bytes {
1749 type Error = Error;
1750
1751 fn try_from(value: ControlBatch) -> Result<Self, Self::Error> {
1752 let mut b = BytesMut::new().writer();
1753 let mut serializer = Encoder::new(&mut b);
1754 value.serialize(&mut serializer)?;
1755 Ok(Bytes::from(b.into_inner()))
1756 }
1757}
1758
1759#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1760pub struct EndTransactionMarker {
1762 pub version: i16,
1763 pub coordinator_epoch: i32,
1764}
1765
1766impl TryFrom<Bytes> for EndTransactionMarker {
1767 type Error = Error;
1768
1769 fn try_from(value: Bytes) -> Result<Self, Self::Error> {
1770 let mut c = Cursor::new(value);
1771 let mut deserializer = Decoder::new(&mut c);
1772 Self::deserialize(&mut deserializer)
1773 }
1774}
1775
1776impl TryFrom<EndTransactionMarker> for Bytes {
1777 type Error = Error;
1778
1779 fn try_from(value: EndTransactionMarker) -> Result<Self, Self::Error> {
1780 let mut b = BytesMut::new().writer();
1781 let mut serializer = Encoder::new(&mut b);
1782 value.serialize(&mut serializer)?;
1783 Ok(Bytes::from(b.into_inner()))
1784 }
1785}
1786
1787#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
1788pub enum EndpointType {
1790 #[default]
1791 Unknown,
1792 Broker,
1793 Controller,
1794}
1795
1796impl From<i8> for EndpointType {
1797 fn from(value: i8) -> Self {
1798 match value {
1799 1 => Self::Broker,
1800 2 => Self::Controller,
1801 _ => Self::Unknown,
1802 }
1803 }
1804}
1805
1806impl From<EndpointType> for i8 {
1807 fn from(value: EndpointType) -> Self {
1808 match value {
1809 EndpointType::Unknown => 0,
1810 EndpointType::Broker => 1,
1811 EndpointType::Controller => 2,
1812 }
1813 }
1814}
1815
1816pub enum CoordinatorType {
1818 Group,
1819 Transaction,
1820 Share,
1821}
1822
1823impl TryFrom<i8> for CoordinatorType {
1824 type Error = Error;
1825
1826 fn try_from(value: i8) -> Result<Self, Self::Error> {
1827 match value {
1828 0 => Ok(Self::Group),
1829 1 => Ok(Self::Transaction),
1830 2 => Ok(Self::Share),
1831 otherwise => Err(Error::InvalidCoordinatorType(otherwise)),
1832 }
1833 }
1834}
1835
1836#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
1837pub enum ConfigResource {
1839 Group,
1840 ClientMetric,
1841 BrokerLogger,
1842 Broker,
1843 Topic,
1844 Unknown,
1845}
1846
1847impl From<i8> for ConfigResource {
1848 fn from(value: i8) -> Self {
1849 match value {
1850 2 => Self::Topic,
1851 4 => Self::Broker,
1852 8 => Self::BrokerLogger,
1853 16 => Self::ClientMetric,
1854 32 => Self::Group,
1855 _ => Self::Unknown,
1856 }
1857 }
1858}
1859
1860impl From<CoordinatorType> for i8 {
1861 fn from(value: CoordinatorType) -> Self {
1862 match value {
1863 CoordinatorType::Group => 0,
1864 CoordinatorType::Transaction => 1,
1865 CoordinatorType::Share => 2,
1866 }
1867 }
1868}
1869
1870impl From<ConfigResource> for i8 {
1871 fn from(value: ConfigResource) -> Self {
1872 match value {
1873 ConfigResource::Unknown => 0,
1874 ConfigResource::Topic => 2,
1875 ConfigResource::Broker => 4,
1876 ConfigResource::BrokerLogger => 8,
1877 ConfigResource::ClientMetric => 16,
1878 ConfigResource::Group => 32,
1879 }
1880 }
1881}
1882
1883impl From<ConfigResource> for i32 {
1884 fn from(value: ConfigResource) -> Self {
1885 match value {
1886 ConfigResource::Unknown => 0,
1887 ConfigResource::Topic => 2,
1888 ConfigResource::Broker => 4,
1889 ConfigResource::BrokerLogger => 8,
1890 ConfigResource::ClientMetric => 16,
1891 ConfigResource::Group => 32,
1892 }
1893 }
1894}
1895
1896#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
1897pub enum ConfigType {
1899 #[default]
1900 Unknown,
1901 Boolean,
1902 String,
1903 Int,
1904 Short,
1905 Long,
1906 Double,
1907 List,
1908 Class,
1909 Password,
1910}
1911
1912impl From<i8> for ConfigType {
1913 fn from(value: i8) -> Self {
1914 match value {
1915 1 => Self::Boolean,
1916 2 => Self::String,
1917 3 => Self::Int,
1918 4 => Self::Short,
1919 5 => Self::Long,
1920 6 => Self::Double,
1921 7 => Self::List,
1922 8 => Self::Class,
1923 9 => Self::Password,
1924 _ => Self::Unknown,
1925 }
1926 }
1927}
1928
1929impl From<ConfigType> for i8 {
1930 fn from(value: ConfigType) -> i8 {
1931 match value {
1932 ConfigType::Boolean => 1,
1933 ConfigType::String => 2,
1934 ConfigType::Int => 3,
1935 ConfigType::Short => 4,
1936 ConfigType::Long => 5,
1937 ConfigType::Double => 6,
1938 ConfigType::List => 7,
1939 ConfigType::Class => 8,
1940 ConfigType::Password => 9,
1941 ConfigType::Unknown => 0,
1942 }
1943 }
1944}
1945
1946#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
1947pub enum ConfigSource {
1949 DynamicTopicConfig,
1950 DynamicBrokerLoggerConfig,
1951 DynamicBrokerConfig,
1952 DynamicDefaultBrokerConfig,
1953 DynamicClientMetricsConfig,
1954 DynamicGroupConfig,
1955 StaticBrokerConfig,
1956 DefaultConfig,
1957 Unknown,
1958}
1959
1960impl From<i8> for ConfigSource {
1961 fn from(value: i8) -> Self {
1962 match value {
1963 1 => Self::DynamicTopicConfig,
1964 2 => Self::DynamicBrokerConfig,
1965 3 => Self::DynamicDefaultBrokerConfig,
1966 4 => Self::StaticBrokerConfig,
1967 5 => Self::DefaultConfig,
1968 6 => Self::DynamicBrokerLoggerConfig,
1969 7 => Self::DynamicClientMetricsConfig,
1970 8 => Self::DynamicGroupConfig,
1971 _ => Self::Unknown,
1972 }
1973 }
1974}
1975
1976impl From<ConfigSource> for i8 {
1977 fn from(value: ConfigSource) -> i8 {
1978 match value {
1979 ConfigSource::DynamicTopicConfig => 1,
1980 ConfigSource::DynamicBrokerConfig => 2,
1981 ConfigSource::DynamicDefaultBrokerConfig => 3,
1982 ConfigSource::StaticBrokerConfig => 4,
1983 ConfigSource::DefaultConfig => 5,
1984 ConfigSource::DynamicBrokerLoggerConfig => 6,
1985 ConfigSource::DynamicClientMetricsConfig => 7,
1986 ConfigSource::DynamicGroupConfig => 8,
1987 ConfigSource::Unknown => 0,
1988 }
1989 }
1990}
1991
1992pub enum OpType {
1994 Set,
1995 Delete,
1996 Append,
1997 Subtract,
1998}
1999
2000impl TryFrom<i8> for OpType {
2001 type Error = Error;
2002
2003 fn try_from(value: i8) -> Result<Self, Self::Error> {
2004 match value {
2005 0 => Ok(Self::Set),
2006 1 => Ok(Self::Delete),
2007 2 => Ok(Self::Append),
2008 3 => Ok(Self::Subtract),
2009 otherwise => Err(Error::InvalidOpType(otherwise)),
2010 }
2011 }
2012}
2013
2014impl From<OpType> for i8 {
2015 fn from(value: OpType) -> Self {
2016 match value {
2017 OpType::Set => 0,
2018 OpType::Delete => 1,
2019 OpType::Append => 2,
2020 OpType::Subtract => 3,
2021 }
2022 }
2023}
2024
2025pub fn to_system_time(timestamp: i64) -> Result<SystemTime> {
2027 u64::try_from(timestamp)
2028 .map(|timestamp| SystemTime::UNIX_EPOCH + Duration::from_millis(timestamp))
2029 .map_err(Into::into)
2030}
2031
2032pub fn to_timestamp(system_time: &SystemTime) -> Result<i64> {
2034 system_time
2035 .duration_since(SystemTime::UNIX_EPOCH)
2036 .map_err(Into::into)
2037 .map(|since_epoch| since_epoch.as_millis())
2038 .and_then(|since_epoch| i64::try_from(since_epoch).map_err(Into::into))
2039}
2040
2041#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
2046pub enum ListOffset {
2047 Earliest,
2048 Latest,
2049 Timestamp(SystemTime),
2050}
2051
2052impl ListOffset {
2053 const EARLIEST_OFFSET: i64 = -2;
2054 const LATEST_OFFSET: i64 = -1;
2055}
2056
2057impl TryFrom<ListOffset> for i64 {
2058 type Error = Error;
2059
2060 fn try_from(value: ListOffset) -> Result<Self, Self::Error> {
2061 match value {
2062 ListOffset::Earliest => Ok(ListOffset::EARLIEST_OFFSET),
2063 ListOffset::Latest => Ok(ListOffset::LATEST_OFFSET),
2064 ListOffset::Timestamp(timestamp) => to_timestamp(×tamp),
2065 }
2066 }
2067}
2068
2069impl TryFrom<i64> for ListOffset {
2070 type Error = Error;
2071
2072 fn try_from(value: i64) -> Result<Self, Self::Error> {
2073 match value {
2074 Self::EARLIEST_OFFSET => Ok(Self::Earliest),
2075 Self::LATEST_OFFSET => Ok(Self::Latest),
2076 timestamp => to_system_time(timestamp).map(Self::Timestamp),
2077 }
2078 }
2079}
2080
2081#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
2082pub enum ScramMechanism {
2083 Scram256,
2084 Scram512,
2085}
2086
2087impl FromStr for ScramMechanism {
2088 type Err = Error;
2089
2090 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
2091 match s {
2092 "SCRAM-SHA-256" => Ok(ScramMechanism::Scram256),
2093 "SCRAM-SHA-512" => Ok(ScramMechanism::Scram512),
2094 otherwise => Err(Error::ParseScram(otherwise.to_string())),
2095 }
2096 }
2097}
2098
2099impl TryFrom<i8> for ScramMechanism {
2100 type Error = Error;
2101
2102 fn try_from(value: i8) -> std::result::Result<Self, Self::Error> {
2103 match value {
2104 1 => Ok(ScramMechanism::Scram256),
2105 2 => Ok(ScramMechanism::Scram512),
2106 otherwise => Err(Error::UnknownScramMechanism(value)),
2107 }
2108 }
2109}
2110
2111impl From<ScramMechanism> for i32 {
2112 fn from(value: ScramMechanism) -> Self {
2113 match value {
2114 ScramMechanism::Scram256 => 1,
2115 ScramMechanism::Scram512 => 2,
2116 }
2117 }
2118}
2119
2120impl From<ScramMechanism> for i8 {
2121 fn from(value: ScramMechanism) -> Self {
2122 match value {
2123 ScramMechanism::Scram256 => 1,
2124 ScramMechanism::Scram512 => 2,
2125 }
2126 }
2127}
2128
2129pub trait Encode {
2130 fn encode(&self) -> Result<Bytes>;
2131}
2132
2133pub trait Decode: Sized {
2134 fn decode(encoded: &mut Bytes) -> Result<Self>;
2135}
2136
2137#[cfg(test)]
2138mod tests {
2139 use std::thread::sleep;
2140
2141 use super::*;
2142
2143 #[test]
2144 fn frame_elapsed_millis() {
2145 let pause = 6;
2146 let now = SystemTime::now();
2147 sleep(Duration::from_millis(pause));
2148
2149 assert!(Frame::elapsed_millis(now) >= pause);
2150 }
2151
2152 #[test]
2153 fn batch_attribute() {
2154 assert_eq!(0, i16::from(BatchAttribute::default()));
2155 assert_eq!(
2156 0,
2157 i16::from(BatchAttribute::default().compression(Compression::None))
2158 );
2159 assert_eq!(
2160 1,
2161 i16::from(BatchAttribute::default().compression(Compression::Gzip))
2162 );
2163 assert_eq!(
2164 2,
2165 i16::from(BatchAttribute::default().compression(Compression::Snappy))
2166 );
2167 assert_eq!(
2168 3,
2169 i16::from(BatchAttribute::default().compression(Compression::Lz4))
2170 );
2171 assert_eq!(
2172 4,
2173 i16::from(BatchAttribute::default().compression(Compression::Zstd))
2174 );
2175 assert_eq!(
2176 8,
2177 i16::from(BatchAttribute::default().timestamp(TimestampType::LogAppendTime))
2178 );
2179 assert_eq!(16, i16::from(BatchAttribute::default().transaction(true)));
2180 assert_eq!(32, i16::from(BatchAttribute::default().control(true)));
2181 assert_eq!(
2182 64,
2183 i16::from(BatchAttribute::default().delete_horizon(true))
2184 );
2185 }
2186}
2187
2188include!(concat!(env!("OUT_DIR"), "/generate.rs"));