mod api;
mod codec;
mod header;
mod messages;
mod primitives;
mod record;
pub use api::{
ApiKey, ApiVersionRange, ApiVersionsRequest, ApiVersionsResponse, FinalizedFeature,
SupportedFeature,
};
pub use codec::{Decoder, Encoder, MAX_MESSAGE_SIZE};
pub use header::{RequestHeader, ResponseHeader};
pub use messages::*;
pub use primitives::*;
pub use record::{
Compression, LazyRecordBatch, LazyRecordIterator, Record, RecordBatch, RecordBatchBuilder,
RecordHeader,
};
use crate::error::{KrafkaError, Result};
pub const MAX_DECODE_ARRAY_LEN: usize = 100_000;
pub const MAX_RECORD_HEADERS: usize = 10_000;
#[inline]
pub fn validate_topic_name(name: &str) -> Result<()> {
if name.is_empty() {
return Err(KrafkaError::protocol("topic name cannot be empty"));
}
if name.len() > i16::MAX as usize {
return Err(KrafkaError::protocol(format!(
"topic name length {} exceeds protocol limit of {}",
name.len(),
i16::MAX
)));
}
Ok(())
}
#[inline]
pub fn validate_topic_names<'a, I>(names: I) -> Result<()>
where
I: IntoIterator<Item = &'a str>,
{
for name in names {
validate_topic_name(name)?;
}
Ok(())
}
#[inline]
pub(crate) fn array_len_i32(len: usize) -> Result<i32> {
i32::try_from(len)
.map_err(|_| KrafkaError::protocol(format!("array length {len} exceeds i32::MAX")))
}
#[inline]
pub(crate) fn encode_compact_array_len(len: usize, buf: &mut impl bytes::BufMut) -> Result<()> {
let wire =
u32::try_from(len.checked_add(1).ok_or_else(|| {
KrafkaError::protocol(format!("compact array length {len} overflows"))
})?)
.map_err(|_| {
KrafkaError::protocol(format!("compact array length {len} exceeds u32::MAX"))
})?;
crate::util::varint::encode_unsigned_varint(wire, buf);
Ok(())
}
#[inline]
pub(crate) fn check_decode_array_len(len: i32) -> Result<usize> {
if len < 0 {
return Err(KrafkaError::protocol(format!(
"negative array length {len} in decode (use check_decode_nullable_array_len for fields where -1 means null)"
)));
}
let len = len as usize;
if len > MAX_DECODE_ARRAY_LEN {
return Err(KrafkaError::protocol(format!(
"array length {len} exceeds safety limit {MAX_DECODE_ARRAY_LEN}"
)));
}
Ok(len)
}
#[inline]
pub(crate) fn check_decode_nullable_array_len(len: i32) -> Result<usize> {
if len == -1 {
return Ok(0);
}
check_decode_array_len(len)
}
#[inline]
pub(crate) fn check_compact_array_len(raw: u32) -> Result<usize> {
if raw == 0 {
return Err(KrafkaError::protocol(
"compact array raw value 0 (null) is invalid for a non-nullable field; \
use check_compact_nullable_array_len for nullable arrays",
));
}
let len = (raw - 1) as usize;
if len > MAX_DECODE_ARRAY_LEN {
return Err(KrafkaError::protocol(format!(
"compact array length {len} exceeds safety limit {MAX_DECODE_ARRAY_LEN}"
)));
}
Ok(len)
}
#[inline]
pub(crate) fn check_compact_nullable_array_len(raw: u32) -> Result<usize> {
if raw == 0 {
return Ok(0);
}
let len = (raw - 1) as usize;
if len > MAX_DECODE_ARRAY_LEN {
return Err(KrafkaError::protocol(format!(
"compact array length {len} exceeds safety limit {MAX_DECODE_ARRAY_LEN}"
)));
}
Ok(len)
}
pub mod versions {
pub const PRODUCE_MIN: i16 = 3;
pub const PRODUCE_MAX: i16 = 11;
pub const FETCH_MIN: i16 = 4;
pub const FETCH_MAX: i16 = 12;
pub const LIST_OFFSETS_MIN: i16 = 1;
pub const LIST_OFFSETS_MAX: i16 = 8;
pub const METADATA_MIN: i16 = 1;
pub const METADATA_MAX: i16 = 13;
pub const OFFSET_COMMIT_MIN: i16 = 2;
pub const OFFSET_COMMIT_MAX: i16 = 9;
pub const OFFSET_FETCH_MIN: i16 = 1;
pub const OFFSET_FETCH_MAX: i16 = 9;
pub const FIND_COORDINATOR_MIN: i16 = 1;
pub const FIND_COORDINATOR_MAX: i16 = 6;
pub const JOIN_GROUP_MIN: i16 = 4;
pub const JOIN_GROUP_MAX: i16 = 9;
pub const HEARTBEAT_MIN: i16 = 3;
pub const HEARTBEAT_MAX: i16 = 4;
pub const LEAVE_GROUP_MIN: i16 = 3;
pub const LEAVE_GROUP_MAX: i16 = 5;
pub const SYNC_GROUP_MIN: i16 = 3;
pub const SYNC_GROUP_MAX: i16 = 5;
pub const API_VERSIONS_MIN: i16 = 0;
#[cfg(not(feature = "unstable-protocol"))]
pub const API_VERSIONS_MAX: i16 = 4;
#[cfg(feature = "unstable-protocol")]
pub const API_VERSIONS_MAX: i16 = 5;
pub const CREATE_TOPICS_MIN: i16 = 2;
pub const CREATE_TOPICS_MAX: i16 = 7;
pub const DELETE_TOPICS_MIN: i16 = 1;
pub const DELETE_TOPICS_MAX: i16 = 6;
pub const CREATE_PARTITIONS_MIN: i16 = 0;
pub const CREATE_PARTITIONS_MAX: i16 = 3;
pub const DESCRIBE_CONFIGS_MIN: i16 = 0;
pub const DESCRIBE_CONFIGS_MAX: i16 = 4;
pub const DESCRIBE_ACLS_MIN: i16 = 1;
pub const DESCRIBE_ACLS_MAX: i16 = 3;
pub const CREATE_ACLS_MIN: i16 = 1;
pub const CREATE_ACLS_MAX: i16 = 3;
pub const DELETE_ACLS_MIN: i16 = 1;
pub const DELETE_ACLS_MAX: i16 = 3;
pub const DESCRIBE_GROUPS_MIN: i16 = 1;
pub const DESCRIBE_GROUPS_MAX: i16 = 6;
pub const LIST_GROUPS_MIN: i16 = 1;
pub const LIST_GROUPS_MAX: i16 = 5;
pub const DELETE_RECORDS_MIN: i16 = 0;
pub const DELETE_RECORDS_MAX: i16 = 2;
pub const DESCRIBE_LOG_DIRS_MIN: i16 = 1;
pub const DESCRIBE_LOG_DIRS_MAX: i16 = 4;
pub const OFFSET_FOR_LEADER_EPOCH_MIN: i16 = 2;
pub const OFFSET_FOR_LEADER_EPOCH_MAX: i16 = 4;
pub const INIT_PRODUCER_ID_MIN: i16 = 0;
pub const INIT_PRODUCER_ID_MAX: i16 = 4;
pub const ADD_PARTITIONS_TO_TXN_MIN: i16 = 0;
pub const ADD_PARTITIONS_TO_TXN_MAX: i16 = 3;
pub const ADD_OFFSETS_TO_TXN_MIN: i16 = 0;
pub const ADD_OFFSETS_TO_TXN_MAX: i16 = 3;
pub const END_TXN_MIN: i16 = 0;
pub const END_TXN_MAX: i16 = 3;
pub const TXN_OFFSET_COMMIT_MIN: i16 = 0;
pub const TXN_OFFSET_COMMIT_MAX: i16 = 3;
pub const CREATE_DELEGATION_TOKEN_MIN: i16 = 1;
pub const CREATE_DELEGATION_TOKEN_MAX: i16 = 3;
pub const RENEW_DELEGATION_TOKEN_MIN: i16 = 1;
pub const RENEW_DELEGATION_TOKEN_MAX: i16 = 2;
pub const EXPIRE_DELEGATION_TOKEN_MIN: i16 = 1;
pub const EXPIRE_DELEGATION_TOKEN_MAX: i16 = 2;
pub const DESCRIBE_DELEGATION_TOKEN_MIN: i16 = 1;
pub const DESCRIBE_DELEGATION_TOKEN_MAX: i16 = 3;
pub const DESCRIBE_CLIENT_QUOTAS_MIN: i16 = 0;
pub const DESCRIBE_CLIENT_QUOTAS_MAX: i16 = 1;
pub const ALTER_CLIENT_QUOTAS_MIN: i16 = 0;
pub const ALTER_CLIENT_QUOTAS_MAX: i16 = 1;
pub const CONSUMER_GROUP_HEARTBEAT_MIN: i16 = 0;
pub const CONSUMER_GROUP_HEARTBEAT_MAX: i16 = 1;
pub const INCREMENTAL_ALTER_CONFIGS_MIN: i16 = 0;
pub const INCREMENTAL_ALTER_CONFIGS_MAX: i16 = 1;
pub const DELETE_GROUPS_MIN: i16 = 0;
pub const DELETE_GROUPS_MAX: i16 = 2;
pub const DESCRIBE_CLUSTER_MIN: i16 = 0;
pub const DESCRIBE_CLUSTER_MAX: i16 = 2;
pub const UPDATE_FEATURES_MIN: i16 = 0;
pub const UPDATE_FEATURES_MAX: i16 = 1;
pub const ELECT_LEADERS_MIN: i16 = 0;
pub const ELECT_LEADERS_MAX: i16 = 2;
pub const ALTER_PARTITION_REASSIGNMENTS_MIN: i16 = 0;
pub const ALTER_PARTITION_REASSIGNMENTS_MAX: i16 = 0;
pub const LIST_PARTITION_REASSIGNMENTS_MIN: i16 = 0;
pub const LIST_PARTITION_REASSIGNMENTS_MAX: i16 = 0;
pub const ALTER_REPLICA_LOG_DIRS_MIN: i16 = 1;
pub const ALTER_REPLICA_LOG_DIRS_MAX: i16 = 2;
pub const OFFSET_DELETE_MIN: i16 = 0;
pub const OFFSET_DELETE_MAX: i16 = 0;
pub const DESCRIBE_USER_SCRAM_CREDENTIALS_MIN: i16 = 0;
pub const DESCRIBE_USER_SCRAM_CREDENTIALS_MAX: i16 = 0;
pub const ALTER_USER_SCRAM_CREDENTIALS_MIN: i16 = 0;
pub const ALTER_USER_SCRAM_CREDENTIALS_MAX: i16 = 0;
pub const DESCRIBE_PRODUCERS_MIN: i16 = 0;
pub const DESCRIBE_PRODUCERS_MAX: i16 = 0;
pub const DESCRIBE_TRANSACTIONS_MIN: i16 = 0;
pub const DESCRIBE_TRANSACTIONS_MAX: i16 = 0;
pub const LIST_TRANSACTIONS_MIN: i16 = 0;
pub const LIST_TRANSACTIONS_MAX: i16 = 1;
pub const CONSUMER_GROUP_DESCRIBE_MIN: i16 = 0;
pub const CONSUMER_GROUP_DESCRIBE_MAX: i16 = 1;
pub const DESCRIBE_TOPIC_PARTITIONS_MIN: i16 = 0;
pub const DESCRIBE_TOPIC_PARTITIONS_MAX: i16 = 0;
pub const LIST_CLIENT_METRICS_RESOURCES_MIN: i16 = 0;
pub const LIST_CLIENT_METRICS_RESOURCES_MAX: i16 = 0;
pub const WRITE_TXN_MARKERS_MIN: i16 = 1;
pub const WRITE_TXN_MARKERS_MAX: i16 = 1;
pub const DESCRIBE_QUORUM_MIN: i16 = 0;
pub const DESCRIBE_QUORUM_MAX: i16 = 0;
#[cfg(feature = "telemetry")]
#[cfg_attr(docsrs, doc(cfg(feature = "telemetry")))]
pub const GET_TELEMETRY_SUBSCRIPTIONS_MIN: i16 = 0;
#[cfg(feature = "telemetry")]
#[cfg_attr(docsrs, doc(cfg(feature = "telemetry")))]
pub const GET_TELEMETRY_SUBSCRIPTIONS_MAX: i16 = 0;
#[cfg(feature = "telemetry")]
#[cfg_attr(docsrs, doc(cfg(feature = "telemetry")))]
pub const PUSH_TELEMETRY_MIN: i16 = 0;
#[cfg(feature = "telemetry")]
#[cfg_attr(docsrs, doc(cfg(feature = "telemetry")))]
pub const PUSH_TELEMETRY_MAX: i16 = 0;
#[cfg(feature = "unstable-protocol")]
pub const SHARE_GROUP_HEARTBEAT_MIN: i16 = 1;
#[cfg(feature = "unstable-protocol")]
pub const SHARE_GROUP_HEARTBEAT_MAX: i16 = 1;
#[cfg(feature = "unstable-protocol")]
pub const SHARE_GROUP_DESCRIBE_MIN: i16 = 1;
#[cfg(feature = "unstable-protocol")]
pub const SHARE_GROUP_DESCRIBE_MAX: i16 = 1;
#[cfg(feature = "unstable-protocol")]
pub const SHARE_FETCH_MIN: i16 = 1;
#[cfg(feature = "unstable-protocol")]
pub const SHARE_FETCH_MAX: i16 = 2;
#[cfg(feature = "unstable-protocol")]
pub const SHARE_ACKNOWLEDGE_MIN: i16 = 1;
#[cfg(feature = "unstable-protocol")]
pub const SHARE_ACKNOWLEDGE_MAX: i16 = 2;
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
#[test]
fn check_decode_array_len_valid() {
assert_eq!(check_decode_array_len(0).unwrap(), 0);
assert_eq!(check_decode_array_len(1).unwrap(), 1);
assert_eq!(check_decode_array_len(100_000).unwrap(), 100_000);
}
#[test]
fn validate_topic_name_accepts_valid() {
assert!(validate_topic_name("t").is_ok());
assert!(validate_topic_name("my.topic-0_1").is_ok());
let max_ok = "x".repeat(i16::MAX as usize);
assert!(validate_topic_name(&max_ok).is_ok());
}
#[test]
fn validate_topic_name_rejects_empty() {
let err = validate_topic_name("").unwrap_err().to_string();
assert!(err.contains("cannot be empty"), "got: {err}");
}
#[test]
fn validate_topic_name_rejects_oversize() {
let too_big = "x".repeat(i16::MAX as usize + 1);
let err = validate_topic_name(&too_big).unwrap_err().to_string();
assert!(err.contains("exceeds protocol limit"), "got: {err}");
}
#[test]
fn validate_topic_names_short_circuits_on_first_error() {
let names = ["ok", "", "also-ok"];
let err = validate_topic_names(names.iter().copied())
.unwrap_err()
.to_string();
assert!(err.contains("cannot be empty"), "got: {err}");
assert!(validate_topic_names(["a", "b", "c"].iter().copied()).is_ok());
}
#[test]
fn check_decode_array_len_rejects_negative() {
assert!(check_decode_array_len(-1).is_err());
assert!(check_decode_array_len(i32::MIN).is_err());
}
#[test]
fn check_decode_array_len_rejects_oversized() {
assert!(check_decode_array_len(100_001).is_err());
assert!(check_decode_array_len(i32::MAX).is_err());
}
#[test]
fn check_decode_nullable_array_len_null() {
assert_eq!(check_decode_nullable_array_len(-1).unwrap(), 0);
}
#[test]
fn check_decode_nullable_array_len_valid() {
assert_eq!(check_decode_nullable_array_len(0).unwrap(), 0);
assert_eq!(check_decode_nullable_array_len(5).unwrap(), 5);
}
#[test]
fn check_decode_nullable_array_len_rejects_other_negative() {
assert!(check_decode_nullable_array_len(-2).is_err());
assert!(check_decode_nullable_array_len(i32::MIN).is_err());
}
#[test]
fn compact_array_len_rejects_null() {
assert!(check_compact_array_len(0).is_err());
}
#[test]
fn compact_array_len_empty() {
assert_eq!(check_compact_array_len(1).unwrap(), 0);
}
#[test]
fn compact_array_len_valid() {
assert_eq!(check_compact_array_len(2).unwrap(), 1);
assert_eq!(check_compact_array_len(101).unwrap(), 100);
}
#[test]
fn compact_array_len_rejects_oversized() {
let over = (MAX_DECODE_ARRAY_LEN as u32) + 2; assert!(check_compact_array_len(over).is_err());
}
#[test]
fn compact_nullable_array_len_null() {
assert_eq!(check_compact_nullable_array_len(0).unwrap(), 0);
}
#[test]
fn compact_nullable_array_len_empty() {
assert_eq!(check_compact_nullable_array_len(1).unwrap(), 0);
}
#[test]
fn compact_nullable_array_len_valid() {
assert_eq!(check_compact_nullable_array_len(2).unwrap(), 1);
assert_eq!(check_compact_nullable_array_len(101).unwrap(), 100);
}
#[test]
fn compact_nullable_array_len_rejects_oversized() {
let over = (MAX_DECODE_ARRAY_LEN as u32) + 2;
assert!(check_compact_nullable_array_len(over).is_err());
}
}