mod api;
mod codec;
mod header;
mod messages;
mod primitives;
mod record;
pub use api::{
ApiKey, ApiVersionRange, ApiVersionsRequest, ApiVersionsResponse, FinalizedFeature,
SupportedFeature,
};
pub use codec::{Decoder, Encoder, MAX_MESSAGE_SIZE};
pub use header::{RequestHeader, ResponseHeader};
pub use messages::*;
pub use primitives::*;
pub use record::{
Compression, LazyRecordBatch, LazyRecordIterator, Record, RecordBatch, RecordBatchBuilder,
RecordHeader,
};
use crate::error::{KrafkaError, ProtocolErrorKind, Result};
pub const MAX_DECODE_ARRAY_LEN: usize = 100_000;
pub const MAX_RECORD_HEADERS: usize = 10_000;
#[inline]
pub fn validate_topic_name(name: &str) -> Result<()> {
const MAX_TOPIC_NAME_LEN: usize = 249;
if name.is_empty() {
return Err(KrafkaError::protocol_kind(
ProtocolErrorKind::InvalidValue,
"topic name cannot be empty",
));
}
if name.len() > MAX_TOPIC_NAME_LEN {
return Err(KrafkaError::protocol_kind(
ProtocolErrorKind::InvalidLength,
format!(
"topic name length {} exceeds maximum of {MAX_TOPIC_NAME_LEN}",
name.len(),
),
));
}
if let Some(bad) = name
.bytes()
.find(|b| !matches!(b, b'a'..=b'z' | b'A'..=b'Z' | b'0'..=b'9' | b'.' | b'_' | b'-'))
{
return Err(KrafkaError::protocol_kind(
ProtocolErrorKind::InvalidValue,
format!(
"topic name contains illegal character 0x{bad:02X}; only [a-zA-Z0-9._-] is allowed"
),
));
}
Ok(())
}
#[inline]
pub fn validate_topic_names<'a, I>(names: I) -> Result<()>
where
I: IntoIterator<Item = &'a str>,
{
for name in names {
validate_topic_name(name)?;
}
Ok(())
}
#[inline]
pub(crate) fn array_len_i32(len: usize) -> Result<i32> {
i32::try_from(len).map_err(|_| {
KrafkaError::protocol_kind(
ProtocolErrorKind::InvalidLength,
format!("array length {len} exceeds i32::MAX"),
)
})
}
#[inline]
pub(crate) fn encode_compact_array_len(len: usize, buf: &mut impl bytes::BufMut) -> Result<()> {
let wire = u32::try_from(len.checked_add(1).ok_or_else(|| {
KrafkaError::protocol_kind(
ProtocolErrorKind::InvalidLength,
format!("compact array length {len} overflows"),
)
})?)
.map_err(|_| {
KrafkaError::protocol_kind(
ProtocolErrorKind::InvalidLength,
format!("compact array length {len} exceeds u32::MAX"),
)
})?;
crate::util::varint::encode_unsigned_varint(wire, buf);
Ok(())
}
#[inline]
pub(crate) fn check_decode_array_len(len: i32) -> Result<usize> {
if len < 0 {
return Err(KrafkaError::protocol_kind(
ProtocolErrorKind::Malformed,
format!(
"negative array length {len} in decode (use check_decode_nullable_array_len for fields where -1 means null)"
),
));
}
let len = len as usize;
if len > MAX_DECODE_ARRAY_LEN {
return Err(KrafkaError::protocol_kind(
ProtocolErrorKind::InvalidLength,
format!("array length {len} exceeds safety limit {MAX_DECODE_ARRAY_LEN}"),
));
}
Ok(len)
}
#[inline]
pub(crate) fn check_decode_nullable_array_len(len: i32) -> Result<usize> {
if len == -1 {
return Ok(0);
}
check_decode_array_len(len)
}
#[inline]
pub(crate) fn check_compact_array_len(raw: u32) -> Result<usize> {
if raw == 0 {
return Err(KrafkaError::protocol_kind(
ProtocolErrorKind::Malformed,
"compact array raw value 0 (null) is invalid for a non-nullable field; \
use check_compact_nullable_array_len for nullable arrays",
));
}
let len = (raw - 1) as usize;
if len > MAX_DECODE_ARRAY_LEN {
return Err(KrafkaError::protocol_kind(
ProtocolErrorKind::InvalidLength,
format!("compact array length {len} exceeds safety limit {MAX_DECODE_ARRAY_LEN}"),
));
}
Ok(len)
}
#[inline]
pub(crate) fn check_compact_nullable_array_len(raw: u32) -> Result<usize> {
if raw == 0 {
return Ok(0);
}
let len = (raw - 1) as usize;
if len > MAX_DECODE_ARRAY_LEN {
return Err(KrafkaError::protocol_kind(
ProtocolErrorKind::InvalidLength,
format!("compact array length {len} exceeds safety limit {MAX_DECODE_ARRAY_LEN}"),
));
}
Ok(len)
}
pub mod versions {
pub const PRODUCE_MIN: i16 = 3;
pub const PRODUCE_MAX: i16 = 13;
pub const FETCH_MIN: i16 = 4;
pub const FETCH_MAX: i16 = 16;
pub const LIST_OFFSETS_MIN: i16 = 1;
pub const LIST_OFFSETS_MAX: i16 = 11;
pub const METADATA_MIN: i16 = 1;
pub const METADATA_MAX: i16 = 13;
pub const OFFSET_COMMIT_MIN: i16 = 2;
pub const OFFSET_COMMIT_MAX: i16 = 10;
pub const OFFSET_FETCH_MIN: i16 = 1;
pub const OFFSET_FETCH_MAX: i16 = 10;
pub const FIND_COORDINATOR_MIN: i16 = 1;
pub const FIND_COORDINATOR_MAX: i16 = 6;
pub const JOIN_GROUP_MIN: i16 = 4;
pub const JOIN_GROUP_MAX: i16 = 9;
pub const HEARTBEAT_MIN: i16 = 3;
pub const HEARTBEAT_MAX: i16 = 4;
pub const LEAVE_GROUP_MIN: i16 = 3;
pub const LEAVE_GROUP_MAX: i16 = 5;
pub const SYNC_GROUP_MIN: i16 = 3;
pub const SYNC_GROUP_MAX: i16 = 5;
pub const API_VERSIONS_MIN: i16 = 0;
pub const API_VERSIONS_MAX: i16 = 5;
pub const CREATE_TOPICS_MIN: i16 = 2;
pub const CREATE_TOPICS_MAX: i16 = 7;
pub const DELETE_TOPICS_MIN: i16 = 1;
pub const DELETE_TOPICS_MAX: i16 = 6;
pub const CREATE_PARTITIONS_MIN: i16 = 0;
pub const CREATE_PARTITIONS_MAX: i16 = 3;
pub const DESCRIBE_CONFIGS_MIN: i16 = 0;
pub const DESCRIBE_CONFIGS_MAX: i16 = 4;
pub const DESCRIBE_ACLS_MIN: i16 = 1;
pub const DESCRIBE_ACLS_MAX: i16 = 3;
pub const CREATE_ACLS_MIN: i16 = 1;
pub const CREATE_ACLS_MAX: i16 = 3;
pub const DELETE_ACLS_MIN: i16 = 1;
pub const DELETE_ACLS_MAX: i16 = 3;
pub const DESCRIBE_GROUPS_MIN: i16 = 1;
pub const DESCRIBE_GROUPS_MAX: i16 = 6;
pub const LIST_GROUPS_MIN: i16 = 1;
pub const LIST_GROUPS_MAX: i16 = 5;
pub const DELETE_RECORDS_MIN: i16 = 0;
pub const DELETE_RECORDS_MAX: i16 = 2;
pub const DESCRIBE_LOG_DIRS_MIN: i16 = 1;
pub const DESCRIBE_LOG_DIRS_MAX: i16 = 4;
pub const OFFSET_FOR_LEADER_EPOCH_MIN: i16 = 2;
pub const OFFSET_FOR_LEADER_EPOCH_MAX: i16 = 4;
pub const INIT_PRODUCER_ID_MIN: i16 = 0;
pub const INIT_PRODUCER_ID_MAX: i16 = 5;
pub const ADD_PARTITIONS_TO_TXN_MIN: i16 = 0;
pub const ADD_PARTITIONS_TO_TXN_MAX: i16 = 5;
pub const ADD_OFFSETS_TO_TXN_MIN: i16 = 0;
pub const ADD_OFFSETS_TO_TXN_MAX: i16 = 4;
pub const END_TXN_MIN: i16 = 0;
pub const END_TXN_MAX: i16 = 5;
pub const TXN_OFFSET_COMMIT_MIN: i16 = 0;
pub const TXN_OFFSET_COMMIT_MAX: i16 = 5;
pub const CREATE_DELEGATION_TOKEN_MIN: i16 = 1;
pub const CREATE_DELEGATION_TOKEN_MAX: i16 = 3;
pub const RENEW_DELEGATION_TOKEN_MIN: i16 = 1;
pub const RENEW_DELEGATION_TOKEN_MAX: i16 = 2;
pub const EXPIRE_DELEGATION_TOKEN_MIN: i16 = 1;
pub const EXPIRE_DELEGATION_TOKEN_MAX: i16 = 2;
pub const DESCRIBE_DELEGATION_TOKEN_MIN: i16 = 1;
pub const DESCRIBE_DELEGATION_TOKEN_MAX: i16 = 3;
pub const DESCRIBE_CLIENT_QUOTAS_MIN: i16 = 0;
pub const DESCRIBE_CLIENT_QUOTAS_MAX: i16 = 1;
pub const ALTER_CLIENT_QUOTAS_MIN: i16 = 0;
pub const ALTER_CLIENT_QUOTAS_MAX: i16 = 1;
pub const CONSUMER_GROUP_HEARTBEAT_MIN: i16 = 0;
pub const CONSUMER_GROUP_HEARTBEAT_MAX: i16 = 1;
pub const INCREMENTAL_ALTER_CONFIGS_MIN: i16 = 0;
pub const INCREMENTAL_ALTER_CONFIGS_MAX: i16 = 1;
pub const DELETE_GROUPS_MIN: i16 = 0;
pub const DELETE_GROUPS_MAX: i16 = 2;
pub const DESCRIBE_CLUSTER_MIN: i16 = 0;
pub const DESCRIBE_CLUSTER_MAX: i16 = 2;
pub const UPDATE_FEATURES_MIN: i16 = 0;
pub const UPDATE_FEATURES_MAX: i16 = 1;
pub const ELECT_LEADERS_MIN: i16 = 0;
pub const ELECT_LEADERS_MAX: i16 = 2;
pub const ALTER_PARTITION_REASSIGNMENTS_MIN: i16 = 0;
pub const ALTER_PARTITION_REASSIGNMENTS_MAX: i16 = 0;
pub const LIST_PARTITION_REASSIGNMENTS_MIN: i16 = 0;
pub const LIST_PARTITION_REASSIGNMENTS_MAX: i16 = 0;
pub const ALTER_REPLICA_LOG_DIRS_MIN: i16 = 1;
pub const ALTER_REPLICA_LOG_DIRS_MAX: i16 = 2;
pub const OFFSET_DELETE_MIN: i16 = 0;
pub const OFFSET_DELETE_MAX: i16 = 0;
pub const DESCRIBE_USER_SCRAM_CREDENTIALS_MIN: i16 = 0;
pub const DESCRIBE_USER_SCRAM_CREDENTIALS_MAX: i16 = 0;
pub const ALTER_USER_SCRAM_CREDENTIALS_MIN: i16 = 0;
pub const ALTER_USER_SCRAM_CREDENTIALS_MAX: i16 = 0;
pub const DESCRIBE_PRODUCERS_MIN: i16 = 0;
pub const DESCRIBE_PRODUCERS_MAX: i16 = 0;
pub const DESCRIBE_TRANSACTIONS_MIN: i16 = 0;
pub const DESCRIBE_TRANSACTIONS_MAX: i16 = 0;
pub const LIST_TRANSACTIONS_MIN: i16 = 0;
pub const LIST_TRANSACTIONS_MAX: i16 = 1;
pub const CONSUMER_GROUP_DESCRIBE_MIN: i16 = 0;
pub const CONSUMER_GROUP_DESCRIBE_MAX: i16 = 1;
pub const DESCRIBE_TOPIC_PARTITIONS_MIN: i16 = 0;
pub const DESCRIBE_TOPIC_PARTITIONS_MAX: i16 = 0;
pub const LIST_CLIENT_METRICS_RESOURCES_MIN: i16 = 0;
pub const LIST_CLIENT_METRICS_RESOURCES_MAX: i16 = 0;
pub const WRITE_TXN_MARKERS_MIN: i16 = 1;
pub const WRITE_TXN_MARKERS_MAX: i16 = 1;
pub const DESCRIBE_QUORUM_MIN: i16 = 0;
pub const DESCRIBE_QUORUM_MAX: i16 = 0;
#[cfg(feature = "telemetry")]
#[cfg_attr(docsrs, doc(cfg(feature = "telemetry")))]
pub const GET_TELEMETRY_SUBSCRIPTIONS_MIN: i16 = 0;
#[cfg(feature = "telemetry")]
#[cfg_attr(docsrs, doc(cfg(feature = "telemetry")))]
pub const GET_TELEMETRY_SUBSCRIPTIONS_MAX: i16 = 0;
#[cfg(feature = "telemetry")]
#[cfg_attr(docsrs, doc(cfg(feature = "telemetry")))]
pub const PUSH_TELEMETRY_MIN: i16 = 0;
#[cfg(feature = "telemetry")]
#[cfg_attr(docsrs, doc(cfg(feature = "telemetry")))]
pub const PUSH_TELEMETRY_MAX: i16 = 0;
pub const SHARE_GROUP_HEARTBEAT_MIN: i16 = 1;
pub const SHARE_GROUP_HEARTBEAT_MAX: i16 = 1;
pub const SHARE_GROUP_DESCRIBE_MIN: i16 = 1;
pub const SHARE_GROUP_DESCRIBE_MAX: i16 = 1;
pub const SHARE_FETCH_MIN: i16 = 1;
pub const SHARE_FETCH_MAX: i16 = 2;
pub const SHARE_ACKNOWLEDGE_MIN: i16 = 1;
pub const SHARE_ACKNOWLEDGE_MAX: i16 = 2;
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
#[test]
fn check_decode_array_len_valid() {
assert_eq!(check_decode_array_len(0).unwrap(), 0);
assert_eq!(check_decode_array_len(1).unwrap(), 1);
assert_eq!(check_decode_array_len(100_000).unwrap(), 100_000);
}
#[test]
fn validate_topic_name_accepts_valid() {
assert!(validate_topic_name("t").is_ok());
assert!(validate_topic_name("my.topic-0_1").is_ok());
assert!(validate_topic_name("UPPER_lower-123").is_ok());
let max_ok = "x".repeat(249);
assert!(validate_topic_name(&max_ok).is_ok());
}
#[test]
fn validate_topic_name_rejects_empty() {
let err = validate_topic_name("").unwrap_err().to_string();
assert!(err.contains("cannot be empty"), "got: {err}");
}
#[test]
fn validate_topic_name_rejects_oversize() {
let too_big = "x".repeat(250);
let err = validate_topic_name(&too_big).unwrap_err().to_string();
assert!(err.contains("exceeds maximum"), "got: {err}");
}
#[test]
fn validate_topic_name_rejects_illegal_chars() {
for bad in ["/", "\0", "topic/name", "topic name", "tópic", "topic!"] {
let err = validate_topic_name(bad).unwrap_err().to_string();
assert!(
err.contains("illegal character"),
"expected rejection for {bad:?}, got: {err}"
);
}
}
#[test]
fn validate_topic_names_short_circuits_on_first_error() {
let names = ["ok", "", "also-ok"];
let err = validate_topic_names(names.iter().copied())
.unwrap_err()
.to_string();
assert!(err.contains("cannot be empty"), "got: {err}");
assert!(validate_topic_names(["a", "b", "c"].iter().copied()).is_ok());
}
#[test]
fn check_decode_array_len_rejects_negative() {
assert!(check_decode_array_len(-1).is_err());
assert!(check_decode_array_len(i32::MIN).is_err());
}
#[test]
fn check_decode_array_len_rejects_oversized() {
assert!(check_decode_array_len(100_001).is_err());
assert!(check_decode_array_len(i32::MAX).is_err());
}
#[test]
fn check_decode_nullable_array_len_null() {
assert_eq!(check_decode_nullable_array_len(-1).unwrap(), 0);
}
#[test]
fn check_decode_nullable_array_len_valid() {
assert_eq!(check_decode_nullable_array_len(0).unwrap(), 0);
assert_eq!(check_decode_nullable_array_len(5).unwrap(), 5);
}
#[test]
fn check_decode_nullable_array_len_rejects_other_negative() {
assert!(check_decode_nullable_array_len(-2).is_err());
assert!(check_decode_nullable_array_len(i32::MIN).is_err());
}
#[test]
fn compact_array_len_rejects_null() {
assert!(check_compact_array_len(0).is_err());
}
#[test]
fn compact_array_len_empty() {
assert_eq!(check_compact_array_len(1).unwrap(), 0);
}
#[test]
fn compact_array_len_valid() {
assert_eq!(check_compact_array_len(2).unwrap(), 1);
assert_eq!(check_compact_array_len(101).unwrap(), 100);
}
#[test]
fn compact_array_len_rejects_oversized() {
let over = (MAX_DECODE_ARRAY_LEN as u32) + 2; assert!(check_compact_array_len(over).is_err());
}
#[test]
fn compact_nullable_array_len_null() {
assert_eq!(check_compact_nullable_array_len(0).unwrap(), 0);
}
#[test]
fn compact_nullable_array_len_empty() {
assert_eq!(check_compact_nullable_array_len(1).unwrap(), 0);
}
#[test]
fn compact_nullable_array_len_valid() {
assert_eq!(check_compact_nullable_array_len(2).unwrap(), 1);
assert_eq!(check_compact_nullable_array_len(101).unwrap(), 100);
}
#[test]
fn compact_nullable_array_len_rejects_oversized() {
let over = (MAX_DECODE_ARRAY_LEN as u32) + 2;
assert!(check_compact_nullable_array_len(over).is_err());
}
}