use bytes::{Buf, BufMut, Bytes, BytesMut};
use std::io::{self, Cursor, Read};
use thiserror::Error;
use tokio_util::codec::{Decoder, Encoder};
use tracing::{debug, warn, info, error};
use super::messages::*;
use crate::protocol::kafka::{
API_KEY_API_VERSIONS, API_KEY_CREATE_TOPICS, API_KEY_DELETE_TOPICS,
API_KEY_DESCRIBE_GROUPS, API_KEY_FETCH, API_KEY_FIND_COORDINATOR,
API_KEY_HEARTBEAT, API_KEY_JOIN_GROUP, API_KEY_LEAVE_GROUP, API_KEY_LIST_GROUPS,
API_KEY_LIST_OFFSETS, API_KEY_METADATA, API_KEY_OFFSET_COMMIT, API_KEY_OFFSET_FETCH,
API_KEY_PRODUCE, API_KEY_SYNC_GROUP,
API_KEY_LEADER_AND_ISR, API_KEY_STOP_REPLICA, API_KEY_UPDATE_METADATA, API_KEY_CONTROLLED_SHUTDOWN,
};
#[derive(Debug, Error)]
pub enum KafkaCodecError {
#[error("IO error: {0}")]
Io(#[from] io::Error),
#[error("Invalid message format: {0}")]
InvalidFormat(String),
#[error("Unsupported API version: key={0}, version={1}")]
UnsupportedVersion(i16, i16),
#[error("Unsupported API key: {0}")]
UnsupportedApiKey(i16),
#[error("Buffer underrun: needed {needed}, available {available}")]
BufferUnderrun { needed: usize, available: usize },
}
pub type Result<T> = std::result::Result<T, KafkaCodecError>;
pub struct KafkaCodec;
impl KafkaCodec {
pub fn new() -> Self {
Self
}
pub fn decode_request(data: &mut Bytes) -> Result<KafkaRequest> {
if data.len() < 8 {
return Err(KafkaCodecError::BufferUnderrun {
needed: 8,
available: data.len(),
});
}
let mut cursor = Cursor::new(data.as_ref());
let api_key = cursor.get_i16();
let api_version = cursor.get_i16();
let correlation_id = cursor.get_i32();
let client_id = if api_key == API_KEY_API_VERSIONS && api_version >= 3 {
debug!("Decoding flexible header for ApiVersions v{}", api_version);
match Self::decode_varint(&mut cursor) {
Ok(varint_len) => {
if varint_len == 0 {
None
} else {
let actual_len = (varint_len - 1) as usize;
if cursor.remaining() >= actual_len {
let mut buf = vec![0u8; actual_len];
cursor.copy_to_slice(&mut buf);
String::from_utf8(buf).ok()
} else {
None
}
}
}
Err(_) => None,
}
} else {
Self::decode_nullable_string(&mut cursor)?
};
if api_key == API_KEY_API_VERSIONS && api_version >= 3 {
if cursor.remaining() > 0 {
match Self::decode_varint(&mut cursor) {
Ok(num_tags) => {
debug!("Skipping {} tagged fields in header", num_tags);
for _ in 0..num_tags {
let _ = Self::decode_varint(&mut cursor);
if let Ok(tag_len) = Self::decode_varint(&mut cursor) {
let advance_len = tag_len as usize;
if advance_len <= cursor.remaining() {
cursor.advance(advance_len);
} else {
debug!("Tagged field length {} exceeds remaining buffer {}, skipping to end",
advance_len, cursor.remaining());
let remaining = cursor.remaining();
if remaining > 0 {
cursor.advance(remaining);
}
break; }
}
}
}
Err(_) => {}
}
}
}
let header = KafkaRequestHeader {
api_key,
api_version,
correlation_id,
client_id,
};
debug!("Decoding request for API key {}", api_key);
match api_key {
API_KEY_PRODUCE => {
let request = Self::decode_produce_request(header, &mut cursor)?;
Ok(KafkaRequest::Produce(request))
}
API_KEY_FETCH => {
let request = Self::decode_fetch_request(header, &mut cursor)?;
Ok(KafkaRequest::Fetch(request))
}
API_KEY_LIST_OFFSETS => {
let request = Self::decode_list_offsets_request(header, &mut cursor)?;
Ok(KafkaRequest::ListOffsets(request))
}
API_KEY_METADATA => {
let request = Self::decode_metadata_request(header, &mut cursor)?;
Ok(KafkaRequest::Metadata(request))
}
API_KEY_LEADER_AND_ISR => {
return Err(KafkaCodecError::UnsupportedApiKey(api_key));
}
API_KEY_STOP_REPLICA => {
return Err(KafkaCodecError::UnsupportedApiKey(api_key));
}
API_KEY_UPDATE_METADATA => {
return Err(KafkaCodecError::UnsupportedApiKey(api_key));
}
API_KEY_CONTROLLED_SHUTDOWN => {
return Err(KafkaCodecError::UnsupportedApiKey(api_key));
}
API_KEY_OFFSET_COMMIT => {
let request = Self::decode_offset_commit_request(header, &mut cursor)?;
Ok(KafkaRequest::OffsetCommit(request))
}
API_KEY_OFFSET_FETCH => {
let request = Self::decode_offset_fetch_request(header, &mut cursor)?;
Ok(KafkaRequest::OffsetFetch(request))
}
API_KEY_FIND_COORDINATOR => {
let request = Self::decode_find_coordinator_request(header, &mut cursor)?;
Ok(KafkaRequest::FindCoordinator(request))
}
API_KEY_LIST_GROUPS => {
let request = Self::decode_list_groups_request(header, &mut cursor)?;
Ok(KafkaRequest::ListGroups(request))
}
API_KEY_JOIN_GROUP => {
let request = Self::decode_join_group_request(header, &mut cursor)?;
Ok(KafkaRequest::JoinGroup(request))
}
API_KEY_HEARTBEAT => {
let request = Self::decode_heartbeat_request(header, &mut cursor)?;
Ok(KafkaRequest::Heartbeat(request))
}
API_KEY_LEAVE_GROUP => {
let request = Self::decode_leave_group_request(header, &mut cursor)?;
Ok(KafkaRequest::LeaveGroup(request))
}
API_KEY_SYNC_GROUP => {
let request = Self::decode_sync_group_request(header, &mut cursor)?;
Ok(KafkaRequest::SyncGroup(request))
}
API_KEY_DESCRIBE_GROUPS => {
let request = Self::decode_describe_groups_request(header, &mut cursor)?;
Ok(KafkaRequest::DescribeGroups(request))
}
API_KEY_API_VERSIONS => {
let request = Self::decode_api_versions_request(header, &mut cursor)?;
Ok(KafkaRequest::ApiVersions(request))
}
API_KEY_CREATE_TOPICS => {
let request = Self::decode_create_topics_request(header, &mut cursor)?;
Ok(KafkaRequest::CreateTopics(request))
}
API_KEY_DELETE_TOPICS => {
let request = Self::decode_delete_topics_request(header, &mut cursor)?;
Ok(KafkaRequest::DeleteTopics(request))
}
17 => { let request = Self::decode_sasl_handshake_request(header, &mut cursor)?;
Ok(KafkaRequest::SaslHandshake(request))
}
32 => { let request = Self::decode_describe_configs_request(header, &mut cursor)?;
Ok(KafkaRequest::DescribeConfigs(request))
}
33 => { let request = Self::decode_alter_configs_request(header, &mut cursor)?;
Ok(KafkaRequest::AlterConfigs(request))
}
36 => { let request = Self::decode_sasl_authenticate_request(header, &mut cursor)?;
Ok(KafkaRequest::SaslAuthenticate(request))
}
_ => Err(KafkaCodecError::UnsupportedVersion(api_key, api_version)),
}
}
pub fn encode_response(response: &KafkaResponse) -> Result<Bytes> {
let mut buf = BytesMut::new();
info!("🎯 encode_response: Starting with empty buffer");
match response {
KafkaResponse::Produce(resp) => Self::encode_produce_response(resp, &mut buf)?,
KafkaResponse::Fetch(resp) => Self::encode_fetch_response(resp, &mut buf)?,
KafkaResponse::Metadata(resp) => {
info!("📝 encode_response: About to encode Metadata response");
Self::encode_metadata_response(resp, &mut buf)?;
info!("✅ encode_response: Metadata response encoded, buffer size: {}", buf.len());
if buf.len() >= 12 {
let first_12_bytes = &buf[0..12];
info!("📊 encode_response: First 12 bytes after Metadata encoding: [{:02x} {:02x} {:02x} {:02x}] [{:02x} {:02x} {:02x} {:02x}] [{:02x} {:02x} {:02x} {:02x}]",
first_12_bytes[0], first_12_bytes[1], first_12_bytes[2], first_12_bytes[3],
first_12_bytes[4], first_12_bytes[5], first_12_bytes[6], first_12_bytes[7],
first_12_bytes[8], first_12_bytes[9], first_12_bytes[10], first_12_bytes[11]);
}
},
KafkaResponse::LeaderAndIsr(_resp) => {
return Err(KafkaCodecError::InvalidFormat("LEADER_AND_ISR response encoding not implemented".to_string()));
}
KafkaResponse::StopReplica(_resp) => {
return Err(KafkaCodecError::InvalidFormat("STOP_REPLICA response encoding not implemented".to_string()));
}
KafkaResponse::UpdateMetadata(_resp) => {
return Err(KafkaCodecError::InvalidFormat("UPDATE_METADATA response encoding not implemented".to_string()));
}
KafkaResponse::ControlledShutdown(_resp) => {
return Err(KafkaCodecError::InvalidFormat("CONTROLLED_SHUTDOWN response encoding not implemented".to_string()));
}
KafkaResponse::OffsetCommit(resp) => {
Self::encode_offset_commit_response(resp, &mut buf)?
}
KafkaResponse::OffsetFetch(resp) => Self::encode_offset_fetch_response(resp, &mut buf)?,
KafkaResponse::FindCoordinator(resp) => {
Self::encode_find_coordinator_response(resp, &mut buf)?
}
KafkaResponse::ListGroups(resp) => Self::encode_list_groups_response(resp, &mut buf)?,
KafkaResponse::JoinGroup(resp) => Self::encode_join_group_response(resp, &mut buf)?,
KafkaResponse::Heartbeat(resp) => Self::encode_heartbeat_response(resp, &mut buf)?,
KafkaResponse::LeaveGroup(resp) => Self::encode_leave_group_response(resp, &mut buf)?,
KafkaResponse::SyncGroup(resp) => Self::encode_sync_group_response(resp, &mut buf)?,
KafkaResponse::DescribeGroups(resp) => {
Self::encode_describe_groups_response(resp, &mut buf)?
}
KafkaResponse::ListOffsets(resp) => Self::encode_list_offsets_response(resp, &mut buf)?,
KafkaResponse::ApiVersions(resp) => Self::encode_api_versions_response(resp, &mut buf)?,
KafkaResponse::CreateTopics(resp) => {
Self::encode_create_topics_response(resp, &mut buf)?
}
KafkaResponse::DeleteTopics(resp) => {
Self::encode_delete_topics_response(resp, &mut buf)?
}
KafkaResponse::DescribeConfigs(resp) => {
Self::encode_describe_configs_response(resp, &mut buf)?
}
KafkaResponse::AlterConfigs(resp) => {
Self::encode_alter_configs_response(resp, &mut buf)?
}
KafkaResponse::SaslHandshake(resp) => {
Self::encode_sasl_handshake_response(resp, &mut buf)?
}
KafkaResponse::SaslAuthenticate(resp) => {
Self::encode_sasl_authenticate_response(resp, &mut buf)?
}
}
info!("🔧 FIXED: Removed double length encoding, returning buffer directly");
info!("🔧 Buffer size: {}, first 8 bytes should be correlation_id + throttle_time", buf.len());
if buf.len() >= 8 {
let first_8_bytes = &buf[0..8];
info!("🔧 First 8 bytes: [{:02x} {:02x} {:02x} {:02x}] [{:02x} {:02x} {:02x} {:02x}]",
first_8_bytes[0], first_8_bytes[1], first_8_bytes[2], first_8_bytes[3],
first_8_bytes[4], first_8_bytes[5], first_8_bytes[6], first_8_bytes[7]);
}
Ok(buf.freeze())
}
fn decode_produce_request(
header: KafkaRequestHeader,
cursor: &mut Cursor<&[u8]>,
) -> Result<KafkaProduceRequest> {
let transactional_id = if header.api_version >= 3 {
Self::decode_nullable_string(cursor)?
} else {
None
};
let acks = cursor.get_i16();
let timeout_ms = cursor.get_i32();
let topic_count = cursor.get_i32();
debug!("Decoding produce request: topic_count={}", topic_count);
let mut topic_data = Vec::with_capacity(topic_count as usize);
for i in 0..topic_count {
let topic = Self::decode_string(cursor)?;
debug!("Decoded topic {}: '{}'", i, topic);
let partition_count = cursor.get_i32();
debug!("Topic '{}' has {} partitions", topic, partition_count);
let mut partition_data = Vec::with_capacity(partition_count as usize);
for _ in 0..partition_count {
let partition = cursor.get_i32();
let records = Self::decode_nullable_bytes(cursor)?;
partition_data.push(KafkaPartitionProduceData { partition, records });
}
topic_data.push(KafkaTopicProduceData {
topic,
partition_data,
});
}
Ok(KafkaProduceRequest {
header,
transactional_id,
acks,
timeout_ms,
topic_data,
})
}
fn encode_produce_response(response: &KafkaProduceResponse, buf: &mut BytesMut) -> Result<()> {
buf.put_i32(response.header.correlation_id);
buf.put_i32(response.responses.len() as i32);
for topic_response in &response.responses {
Self::encode_string(&topic_response.topic, buf);
buf.put_i32(topic_response.partition_responses.len() as i32);
for partition_response in &topic_response.partition_responses {
buf.put_i32(partition_response.partition);
buf.put_i16(partition_response.error_code);
buf.put_i64(partition_response.base_offset);
buf.put_i64(partition_response.log_append_time_ms);
buf.put_i64(partition_response.log_start_offset);
}
}
buf.put_i32(response.throttle_time_ms);
Ok(())
}
fn decode_fetch_request(
header: KafkaRequestHeader,
cursor: &mut Cursor<&[u8]>,
) -> Result<KafkaFetchRequest> {
tracing::debug!("Decoding Fetch request: api_version={}", header.api_version);
let replica_id = cursor.get_i32();
let max_wait_ms = cursor.get_i32();
let min_bytes = cursor.get_i32();
let max_bytes = cursor.get_i32();
let isolation_level = cursor.get_i8();
let session_id = cursor.get_i32();
let session_epoch = cursor.get_i32();
let topic_count = cursor.get_i32();
let mut topics = Vec::with_capacity(topic_count as usize);
for _ in 0..topic_count {
let topic = Self::decode_string(cursor)?;
let partition_count = cursor.get_i32();
let mut partitions = Vec::with_capacity(partition_count as usize);
for _ in 0..partition_count {
let partition = cursor.get_i32();
let current_leader_epoch = cursor.get_i32();
let fetch_offset = cursor.get_i64();
let log_start_offset = cursor.get_i64();
let max_bytes = cursor.get_i32();
partitions.push(KafkaPartitionFetchData {
partition,
current_leader_epoch,
fetch_offset,
log_start_offset,
max_bytes,
});
}
topics.push(KafkaTopicFetchData { topic, partitions });
}
let forgotten_topic_count = cursor.get_i32();
let mut forgotten_topics_data = Vec::with_capacity(forgotten_topic_count as usize);
for _ in 0..forgotten_topic_count {
let topic = Self::decode_string(cursor)?;
let partition_count = cursor.get_i32();
let mut partitions = Vec::with_capacity(partition_count as usize);
for _ in 0..partition_count {
partitions.push(cursor.get_i32());
}
forgotten_topics_data.push(KafkaForgottenTopic { topic, partitions });
}
Ok(KafkaFetchRequest {
header,
replica_id,
max_wait_ms,
min_bytes,
max_bytes,
isolation_level,
session_id,
session_epoch,
topics,
forgotten_topics_data,
})
}
fn encode_fetch_response(response: &KafkaFetchResponse, buf: &mut BytesMut) -> Result<()> {
let api_version = 10i16;
buf.put_i32(response.header.correlation_id);
if api_version >= 1 {
buf.put_i32(response.throttle_time_ms);
}
if api_version >= 7 {
buf.put_i32(response.session_id);
}
buf.put_i32(response.responses.len() as i32);
for topic_response in &response.responses {
Self::encode_string(&topic_response.topic, buf);
buf.put_i32(topic_response.partitions.len() as i32);
for partition_response in &topic_response.partitions {
buf.put_i32(partition_response.partition);
buf.put_i16(partition_response.error_code);
buf.put_i64(partition_response.high_watermark);
if api_version >= 4 {
buf.put_i64(partition_response.last_stable_offset);
}
if api_version >= 5 {
buf.put_i64(partition_response.log_start_offset);
}
if api_version >= 4 {
buf.put_i32(0); }
if api_version >= 11 {
buf.put_i32(partition_response.preferred_read_replica);
}
if let Some(records) = &partition_response.records {
tracing::debug!("Encoding partition {} records: {} bytes (first 32 bytes: {:?}) for API v{}",
partition_response.partition, records.len(),
&records[..std::cmp::min(32, records.len())], api_version);
} else {
tracing::debug!("Encoding partition {} records: None for API v{}", partition_response.partition, api_version);
}
Self::encode_nullable_bytes(&partition_response.records, buf);
}
}
Ok(())
}
fn decode_list_offsets_request(
header: KafkaRequestHeader,
cursor: &mut Cursor<&[u8]>,
) -> Result<KafkaListOffsetsRequest> {
let replica_id = cursor.get_i32();
let isolation_level = cursor.get_i8();
let topic_count = cursor.get_i32();
let mut topics = Vec::with_capacity(topic_count as usize);
for _ in 0..topic_count {
let topic = Self::decode_string(cursor)?;
let partition_count = cursor.get_i32();
let mut partitions = Vec::with_capacity(partition_count as usize);
for _ in 0..partition_count {
let partition = cursor.get_i32();
let current_leader_epoch = cursor.get_i32();
let timestamp = cursor.get_i64();
partitions.push(KafkaListOffsetsPartition {
partition,
current_leader_epoch,
timestamp,
});
}
topics.push(KafkaListOffsetsTopic { topic, partitions });
}
Ok(KafkaListOffsetsRequest {
header,
replica_id,
isolation_level,
topics,
})
}
fn decode_metadata_request(
header: KafkaRequestHeader,
cursor: &mut Cursor<&[u8]>,
) -> Result<KafkaMetadataRequest> {
debug!("Decoding Metadata request: api_version={}", header.api_version);
debug!(" Cursor position: {}, remaining: {}", cursor.position(), cursor.remaining());
let start_pos = cursor.position() as usize;
let slice = cursor.get_ref();
let debug_bytes = &slice[start_pos..std::cmp::min(start_pos + 20, slice.len())];
debug!(" Raw bytes at cursor: {:02x?}", debug_bytes);
let is_flexible = header.api_version >= 9; debug!(" Using flexible format: {}", is_flexible);
let topic_count = if is_flexible {
let varint_value = Self::decode_varint(cursor)? as i32;
if varint_value == 0 {
-1 } else {
varint_value - 1 }
} else {
cursor.get_i32()
};
debug!(" Topic count read: {} (flexible={})", topic_count, is_flexible);
debug!(" Cursor after topic_count: pos={}, remaining={}", cursor.position(), cursor.remaining());
let topics = if topic_count == -1 {
debug!(" Topics: null (topic_count = -1)");
None
} else if topic_count == 0 {
debug!(" Topics: empty array (topic_count = 0)");
Some(Vec::new())
} else {
debug!(" Parsing {} topics...", topic_count);
let mut topics = Vec::with_capacity(topic_count as usize);
for i in 0..topic_count {
if is_flexible {
if header.api_version >= 10 {
cursor.set_position(cursor.position() + 16);
}
let topic_name = Self::decode_compact_string(cursor)?
.unwrap_or_else(|| "".to_string());
debug!(" Topic {} (flexible): '{}'", i, topic_name);
topics.push(topic_name);
let _tagged_fields_count = Self::decode_varint(cursor)?;
} else {
let topic_name = Self::decode_string(cursor)?;
debug!(" Topic {} (non-flexible): '{}'", i, topic_name);
topics.push(topic_name);
}
}
debug!(" Final topics parsed: {:?}", topics);
Some(topics)
};
let allow_auto_topic_creation = if header.api_version >= 4 && cursor.remaining() > 0 {
if is_flexible {
cursor.get_u8() != 0 } else {
cursor.get_i8() != 0 }
} else {
true };
let include_cluster_authorized_operations =
if header.api_version >= 8 && header.api_version <= 10 && cursor.remaining() > 0 {
if is_flexible {
cursor.get_u8() != 0
} else {
cursor.get_i8() != 0
}
} else {
false };
let include_topic_authorized_operations =
if header.api_version >= 8 && cursor.remaining() > 0 {
if is_flexible {
cursor.get_u8() != 0
} else {
cursor.get_i8() != 0
}
} else {
false };
if is_flexible && cursor.remaining() > 0 {
let _tagged_fields_count = Self::decode_varint(cursor)?;
debug!(" Skipped {} top-level tagged fields", _tagged_fields_count);
}
Ok(KafkaMetadataRequest {
header,
topics,
allow_auto_topic_creation,
include_cluster_authorized_operations,
include_topic_authorized_operations,
})
}
fn encode_metadata_response(
response: &KafkaMetadataResponse,
buf: &mut BytesMut,
) -> Result<()> {
let is_flexible = response.api_version >= 9; info!("🔧 MetadataResponse encoding: version={}, flexible={}", response.api_version, is_flexible);
debug!("Encoding Metadata response v{}, flexible={}", response.api_version, is_flexible);
debug!(" - correlation_id: {}", response.header.correlation_id);
debug!(" - throttle_time_ms: {}", response.throttle_time_ms);
debug!(" - brokers count: {}", response.brokers.len());
debug!(" - topics count: {}", response.topics.len());
info!("🔍 BEFORE correlation_id encoding: buffer is empty, size={}", buf.len());
buf.put_i32(response.header.correlation_id);
info!("✅ AFTER correlation_id encoding: buffer size={}, correlation_id={}", buf.len(), response.header.correlation_id);
if buf.len() >= 4 {
let first_4_bytes = &buf[0..4];
info!(" - CORRELATION_ID BYTES: [{:02x}, {:02x}, {:02x}, {:02x}] = {}",
first_4_bytes[0], first_4_bytes[1], first_4_bytes[2], first_4_bytes[3],
i32::from_be_bytes([first_4_bytes[0], first_4_bytes[1], first_4_bytes[2], first_4_bytes[3]]));
}
if response.api_version >= 3 {
buf.put_i32(response.throttle_time_ms);
debug!(" - Wrote throttle_time_ms after correlation_id (v3+), buffer size: {}", buf.len());
} else {
debug!(" - Skipped throttle_time_ms (v0-v2 don't have it), buffer size: {}", buf.len());
}
info!(" - CRITICAL: Encoding brokers array, count: {}", response.brokers.len());
if response.brokers.is_empty() {
error!(" - ERROR: brokers array is empty! This will cause Java client parsing failure");
}
if is_flexible {
Self::encode_varint(buf, (response.brokers.len() + 1) as u64);
info!(" - KAFKA v9+ FIX: Using compact array encoding for brokers, varint length: {}", response.brokers.len() + 1);
} else {
buf.put_i32(response.brokers.len() as i32);
info!(" - KAFKA v0-8: Using standard i32 array encoding for brokers, length: {}", response.brokers.len());
}
info!(" - Wrote brokers array length, buffer size: {}", buf.len());
for (i, broker) in response.brokers.iter().enumerate() {
debug!(" broker[{}]: node_id={}, host='{}', port={}", i, broker.node_id, broker.host, broker.port);
buf.put_i32(broker.node_id);
if is_flexible {
Self::encode_compact_string(&broker.host, buf);
} else {
Self::encode_string(&broker.host, buf);
}
buf.put_i32(broker.port);
if is_flexible {
Self::encode_compact_nullable_string(&broker.rack, buf);
} else {
Self::encode_nullable_string(&broker.rack, buf);
}
debug!(" broker[{}] encoded, buffer size: {}", i, buf.len());
if is_flexible {
Self::encode_empty_tagged_fields(buf);
debug!(" broker[{}] tagged fields encoded, buffer size: {}", i, buf.len());
}
}
if is_flexible {
Self::encode_compact_nullable_string(&response.cluster_id, buf);
} else {
Self::encode_nullable_string(&response.cluster_id, buf);
}
buf.put_i32(response.controller_id);
debug!(" - Encoding topics array, actual count: {}", response.topics.len());
debug!(" - Topics in response: {:?}", response.topics.iter().map(|t| &t.topic).collect::<Vec<_>>());
if is_flexible {
let array_len = response.topics.len();
let encoded_len = array_len + 1;
Self::encode_varint(buf, encoded_len as u64);
info!(" - FIXED topics compact array: length={}, encoded as varint {} (Java will read {})",
array_len, encoded_len, array_len);
} else {
buf.put_i32(response.topics.len() as i32);
}
debug!(" - Wrote topics array length, buffer size: {}", buf.len());
for (i, topic) in response.topics.iter().enumerate() {
debug!(" topic[{}]: name='{}', partitions={}", i, topic.topic, topic.partitions.len());
buf.put_i16(topic.error_code);
if is_flexible {
Self::encode_compact_string(&topic.topic, buf);
} else {
Self::encode_string(&topic.topic, buf);
}
buf.put_i8(if topic.is_internal { 1 } else { 0 });
if is_flexible {
let array_len = topic.partitions.len();
let encoded_len = array_len + 1;
Self::encode_varint(buf, encoded_len as u64);
debug!(" FIXED partitions compact array: length={}, encoded as varint {} (Java will read {})",
array_len, encoded_len, array_len);
} else {
buf.put_i32(topic.partitions.len() as i32);
}
for (partition_idx, partition) in topic.partitions.iter().enumerate() {
debug!(" partition[{}]: id={}, leader={}",
partition_idx, partition.partition, partition.leader);
if partition.partition < 0 || partition.partition > 1000000 {
warn!(" partition[{}]: Invalid partition ID {}, using 0",
partition_idx, partition.partition);
buf.put_i16(partition.error_code);
buf.put_i32(0); buf.put_i32(0); } else {
buf.put_i16(partition.error_code);
buf.put_i32(partition.partition);
buf.put_i32(partition.leader);
}
if response.api_version >= 7 {
buf.put_i32(partition.leader_epoch);
}
let safe_replica_nodes = if partition.replica_nodes.len() > 10000 {
warn!(" partition[{}]: Too many replica nodes ({}), limiting to empty",
partition_idx, partition.replica_nodes.len());
Vec::new()
} else {
partition.replica_nodes.clone()
};
if is_flexible {
let array_len = safe_replica_nodes.len();
let encoded_len = array_len + 1;
Self::encode_varint(buf, encoded_len as u64);
debug!(" partition[{}]: replica_nodes compact array len={}",
partition_idx, array_len);
} else {
buf.put_i32(safe_replica_nodes.len() as i32);
}
for (replica_idx, replica) in safe_replica_nodes.iter().enumerate() {
if *replica < 0 || *replica > 1000000 {
warn!(" partition[{}]: Invalid replica node {}, using 0",
partition_idx, replica);
buf.put_i32(0);
} else {
buf.put_i32(*replica);
}
if replica_idx >= 100 { warn!(" partition[{}]: Too many replica nodes, truncating at 100", partition_idx);
break;
}
}
let safe_isr_nodes = if partition.isr_nodes.len() > 10000 {
warn!(" partition[{}]: Too many ISR nodes ({}), limiting to empty",
partition_idx, partition.isr_nodes.len());
Vec::new()
} else {
partition.isr_nodes.clone()
};
if is_flexible {
let array_len = safe_isr_nodes.len();
let encoded_len = array_len + 1;
Self::encode_varint(buf, encoded_len as u64);
debug!(" partition[{}]: isr_nodes compact array len={}",
partition_idx, array_len);
} else {
buf.put_i32(safe_isr_nodes.len() as i32);
}
for (isr_idx, isr) in safe_isr_nodes.iter().enumerate() {
if *isr < 0 || *isr > 1000000 {
warn!(" partition[{}]: Invalid ISR node {}, using 0",
partition_idx, isr);
buf.put_i32(0);
} else {
buf.put_i32(*isr);
}
if isr_idx >= 100 { warn!(" partition[{}]: Too many ISR nodes, truncating at 100", partition_idx);
break;
}
}
if response.api_version >= 5 {
let safe_offline_replicas = if partition.offline_replicas.len() > 10000 {
warn!(" partition[{}]: Too many offline replicas ({}), limiting to empty",
partition_idx, partition.offline_replicas.len());
Vec::new()
} else {
partition.offline_replicas.clone()
};
if is_flexible {
let array_len = safe_offline_replicas.len();
let encoded_len = array_len + 1;
Self::encode_varint(buf, encoded_len as u64);
debug!(" partition[{}]: offline_replicas compact array len={}",
partition_idx, array_len);
} else {
buf.put_i32(safe_offline_replicas.len() as i32);
}
for (offline_idx, offline) in safe_offline_replicas.iter().enumerate() {
if *offline < 0 || *offline > 1000000 {
warn!(" partition[{}]: Invalid offline replica {}, using 0",
partition_idx, offline);
buf.put_i32(0);
} else {
buf.put_i32(*offline);
}
if offline_idx >= 100 { warn!(" partition[{}]: Too many offline replicas, truncating at 100", partition_idx);
break;
}
}
}
if is_flexible {
Self::encode_empty_tagged_fields(buf);
}
debug!(" partition[{}]: encoding complete", partition_idx);
}
if response.api_version >= 8 {
buf.put_i32(topic.topic_authorized_operations);
}
if is_flexible {
Self::encode_empty_tagged_fields(buf);
}
}
if response.api_version >= 8 {
buf.put_i32(response.cluster_authorized_operations);
}
if is_flexible {
Self::encode_empty_tagged_fields(buf);
info!(" - Added response-level tagged fields, final buffer size: {}", buf.len());
}
info!("Completed Metadata response encoding, total bytes: {}", buf.len());
if buf.len() >= 10 {
let preview: Vec<u8> = buf[0..std::cmp::min(100, buf.len())].to_vec();
info!("Response bytes (first 100): {:?}", preview);
}
if buf.len() >= 20 {
let first_20_bytes = &buf[0..20];
info!(" - FULL METADATA STRUCTURE (20 bytes):");
info!(" Bytes 00-03: [{:02x} {:02x} {:02x} {:02x}] = correlation_id: {}",
first_20_bytes[0], first_20_bytes[1], first_20_bytes[2], first_20_bytes[3],
i32::from_be_bytes([first_20_bytes[0], first_20_bytes[1], first_20_bytes[2], first_20_bytes[3]]));
info!(" Bytes 04-07: [{:02x} {:02x} {:02x} {:02x}] = throttle_time: {}",
first_20_bytes[4], first_20_bytes[5], first_20_bytes[6], first_20_bytes[7],
i32::from_be_bytes([first_20_bytes[4], first_20_bytes[5], first_20_bytes[6], first_20_bytes[7]]));
info!(" Byte 08: [{:02x}] = broker_count_varint: {}", first_20_bytes[8], first_20_bytes[8]);
info!(" Bytes 09-12: [{:02x} {:02x} {:02x} {:02x}] = broker_node_id: {}",
first_20_bytes[9], first_20_bytes[10], first_20_bytes[11], first_20_bytes[12],
i32::from_be_bytes([first_20_bytes[9], first_20_bytes[10], first_20_bytes[11], first_20_bytes[12]]));
info!(" Byte 13: [{:02x}] = host_length_varint: {}", first_20_bytes[13], first_20_bytes[13]);
info!(" Bytes 14-19: [{:02x} {:02x} {:02x} {:02x} {:02x} {:02x}] = host_prefix: '{}'",
first_20_bytes[14], first_20_bytes[15], first_20_bytes[16], first_20_bytes[17], first_20_bytes[18], first_20_bytes[19],
String::from_utf8_lossy(&first_20_bytes[14..20]));
info!(" - CHECKING FOR 680 (0x02A8) IN RESPONSE:");
for i in 0..=(buf.len().saturating_sub(4)) {
if i + 3 < buf.len() {
let value = i32::from_be_bytes([buf[i], buf[i+1], buf[i+2], buf[i+3]]);
if value == 680 {
info!(" Found 680 at byte offset {}: [{:02x} {:02x} {:02x} {:02x}]", i, buf[i], buf[i+1], buf[i+2], buf[i+3]);
}
}
}
}
Ok(())
}
fn encode_list_offsets_response(
response: &KafkaListOffsetsResponse,
buf: &mut BytesMut,
) -> Result<()> {
buf.put_i32(response.header.correlation_id);
buf.put_i32(response.throttle_time_ms);
buf.put_i32(response.topics.len() as i32);
for topic in &response.topics {
Self::encode_string(&topic.topic, buf);
buf.put_i32(topic.partitions.len() as i32);
for partition in &topic.partitions {
buf.put_i32(partition.partition);
buf.put_i16(partition.error_code);
buf.put_i64(partition.timestamp);
buf.put_i64(partition.offset);
buf.put_i32(partition.leader_epoch);
}
}
Ok(())
}
fn decode_offset_commit_request(
header: KafkaRequestHeader,
cursor: &mut Cursor<&[u8]>,
) -> Result<KafkaOffsetCommitRequest> {
let group_id = Self::decode_string(cursor)?;
let generation_id = cursor.get_i32();
let consumer_id = Self::decode_string(cursor)?;
let retention_time_ms = cursor.get_i64();
let topic_count = cursor.get_i32();
let mut topics = Vec::with_capacity(topic_count as usize);
for _ in 0..topic_count {
let topic = Self::decode_string(cursor)?;
let partition_count = cursor.get_i32();
let mut partitions = Vec::with_capacity(partition_count as usize);
for _ in 0..partition_count {
let partition = cursor.get_i32();
let offset = cursor.get_i64();
let timestamp = cursor.get_i64();
let metadata = Self::decode_nullable_string(cursor)?;
partitions.push(KafkaOffsetCommitPartition {
partition,
offset,
timestamp,
metadata,
});
}
topics.push(KafkaOffsetCommitTopic { topic, partitions });
}
Ok(KafkaOffsetCommitRequest {
header,
group_id,
generation_id,
consumer_id,
retention_time_ms,
topics,
})
}
fn encode_offset_commit_response(
response: &KafkaOffsetCommitResponse,
buf: &mut BytesMut,
) -> Result<()> {
buf.put_i32(response.header.correlation_id);
buf.put_i32(response.throttle_time_ms);
buf.put_i32(response.topics.len() as i32);
for topic in &response.topics {
Self::encode_string(&topic.topic, buf);
buf.put_i32(topic.partitions.len() as i32);
for partition in &topic.partitions {
buf.put_i32(partition.partition);
buf.put_i16(partition.error_code);
}
}
Ok(())
}
fn decode_offset_fetch_request(
header: KafkaRequestHeader,
cursor: &mut Cursor<&[u8]>,
) -> Result<KafkaOffsetFetchRequest> {
let group_id = Self::decode_string(cursor)?;
let topic_count = cursor.get_i32();
let topics = if topic_count == -1 {
None } else {
let mut topics = Vec::with_capacity(topic_count as usize);
for _ in 0..topic_count {
let topic = Self::decode_string(cursor)?;
let partition_count = cursor.get_i32();
let mut partitions = Vec::with_capacity(partition_count as usize);
for _ in 0..partition_count {
partitions.push(cursor.get_i32());
}
topics.push(KafkaOffsetFetchTopic { topic, partitions });
}
Some(topics)
};
Ok(KafkaOffsetFetchRequest {
header,
group_id,
topics,
})
}
fn encode_offset_fetch_response(
response: &KafkaOffsetFetchResponse,
buf: &mut BytesMut,
) -> Result<()> {
buf.put_i32(response.header.correlation_id);
buf.put_i32(response.throttle_time_ms);
buf.put_i32(response.topics.len() as i32);
for topic in &response.topics {
Self::encode_string(&topic.topic, buf);
buf.put_i32(topic.partitions.len() as i32);
for partition in &topic.partitions {
buf.put_i32(partition.partition);
buf.put_i64(partition.offset);
buf.put_i32(partition.leader_epoch);
Self::encode_nullable_string(&partition.metadata, buf);
buf.put_i16(partition.error_code);
}
}
buf.put_i16(response.error_code);
Ok(())
}
fn decode_find_coordinator_request(
header: KafkaRequestHeader,
cursor: &mut Cursor<&[u8]>,
) -> Result<KafkaFindCoordinatorRequest> {
let coordinator_key = Self::decode_string(cursor)?;
let coordinator_type = if header.api_version >= 1 && cursor.remaining() > 0 {
cursor.get_i8()
} else {
0 };
Ok(KafkaFindCoordinatorRequest {
header,
coordinator_key,
coordinator_type,
})
}
fn encode_find_coordinator_response(
response: &KafkaFindCoordinatorResponse,
buf: &mut BytesMut,
) -> Result<()> {
buf.put_i32(response.header.correlation_id);
buf.put_i32(response.throttle_time_ms);
buf.put_i16(response.error_code);
Self::encode_nullable_string(&response.error_message, buf);
buf.put_i32(response.node_id);
Self::encode_string(&response.host, buf);
buf.put_i32(response.port);
Ok(())
}
fn decode_list_groups_request(
header: KafkaRequestHeader,
cursor: &mut Cursor<&[u8]>,
) -> Result<KafkaListGroupsRequest> {
let states_filter_count = cursor.get_i32();
let mut states_filter = Vec::with_capacity(states_filter_count as usize);
for _ in 0..states_filter_count {
states_filter.push(Self::decode_string(cursor)?);
}
Ok(KafkaListGroupsRequest {
header,
states_filter,
})
}
fn encode_list_groups_response(
response: &KafkaListGroupsResponse,
buf: &mut BytesMut,
) -> Result<()> {
buf.put_i32(response.header.correlation_id);
buf.put_i32(response.throttle_time_ms);
buf.put_i16(response.error_code);
buf.put_i32(response.groups.len() as i32);
for group in &response.groups {
Self::encode_string(&group.group_id, buf);
Self::encode_string(&group.protocol_type, buf);
Self::encode_string(&group.group_state, buf);
}
Ok(())
}
fn decode_heartbeat_request(
header: KafkaRequestHeader,
cursor: &mut Cursor<&[u8]>,
) -> Result<KafkaHeartbeatRequest> {
let group_id = Self::decode_string(cursor)?;
let generation_id = cursor.get_i32();
let consumer_id = Self::decode_string(cursor)?;
let group_instance_id = if header.api_version >= 4 {
Self::decode_nullable_string(cursor)?
} else {
None
};
Ok(KafkaHeartbeatRequest {
header,
group_id,
generation_id,
consumer_id,
group_instance_id,
})
}
fn encode_heartbeat_response(
response: &KafkaHeartbeatResponse,
buf: &mut BytesMut,
) -> Result<()> {
buf.put_i32(response.header.correlation_id);
buf.put_i32(response.throttle_time_ms);
buf.put_i16(response.error_code);
Ok(())
}
fn decode_leave_group_request(
header: KafkaRequestHeader,
cursor: &mut Cursor<&[u8]>,
) -> Result<KafkaLeaveGroupRequest> {
let group_id = Self::decode_string(cursor)?;
let consumer_id = Self::decode_string(cursor)?;
let group_instance_id = if header.api_version >= 4 {
Self::decode_nullable_string(cursor)?
} else {
None
};
Ok(KafkaLeaveGroupRequest {
header,
group_id,
consumer_id,
group_instance_id,
})
}
fn encode_leave_group_response(
response: &KafkaLeaveGroupResponse,
buf: &mut BytesMut,
) -> Result<()> {
buf.put_i32(response.header.correlation_id);
buf.put_i32(response.throttle_time_ms);
buf.put_i16(response.error_code);
Ok(())
}
fn decode_sync_group_request(
header: KafkaRequestHeader,
cursor: &mut Cursor<&[u8]>,
) -> Result<KafkaSyncGroupRequest> {
let group_id = Self::decode_string(cursor)?;
let generation_id = cursor.get_i32();
let consumer_id = Self::decode_string(cursor)?;
let group_instance_id = if header.api_version >= 3 {
Self::decode_nullable_string(cursor)?
} else {
None
};
let protocol_type = if header.api_version >= 3 {
Self::decode_string(cursor)?
} else {
String::new()
};
let protocol_name = if header.api_version >= 3 {
Self::decode_string(cursor)?
} else {
String::new()
};
let assignment_count = cursor.get_i32();
let mut assignments = Vec::with_capacity(assignment_count as usize);
for _ in 0..assignment_count {
let consumer_id = Self::decode_string(cursor)?;
let assignment = Self::decode_bytes(cursor)?;
assignments.push(KafkaSyncGroupAssignment {
consumer_id,
assignment,
});
}
Ok(KafkaSyncGroupRequest {
header,
group_id,
generation_id,
consumer_id,
group_instance_id,
protocol_type,
protocol_name,
assignments,
})
}
fn encode_sync_group_response(
response: &KafkaSyncGroupResponse,
buf: &mut BytesMut,
) -> Result<()> {
buf.put_i32(response.header.correlation_id);
if response.api_version >= 1 {
buf.put_i32(response.throttle_time_ms);
}
buf.put_i16(response.error_code);
if response.api_version >= 3 {
Self::encode_string(&response.protocol_type, buf);
Self::encode_string(&response.protocol_name, buf);
}
Self::encode_bytes(&response.assignment, buf);
Ok(())
}
fn decode_join_group_request(
header: KafkaRequestHeader,
cursor: &mut Cursor<&[u8]>,
) -> Result<KafkaJoinGroupRequest> {
let group_id = Self::decode_string(cursor)?;
let session_timeout_ms = cursor.get_i32();
let rebalance_timeout_ms = if header.api_version >= 1 {
cursor.get_i32()
} else {
session_timeout_ms };
let member_id = Self::decode_string(cursor)?;
let group_instance_id = if header.api_version >= 5 {
Self::decode_nullable_string(cursor)?
} else {
None
};
let protocol_type = Self::decode_string(cursor)?;
let protocol_count = cursor.get_i32();
let mut protocols = Vec::with_capacity(protocol_count as usize);
for _ in 0..protocol_count {
let name = Self::decode_string(cursor)?;
let metadata = Self::decode_bytes(cursor)?;
protocols.push(KafkaJoinGroupProtocol { name, metadata });
}
Ok(KafkaJoinGroupRequest {
header,
group_id,
session_timeout_ms,
rebalance_timeout_ms,
member_id,
group_instance_id,
protocol_type,
protocols,
})
}
fn encode_join_group_response(
response: &KafkaJoinGroupResponse,
buf: &mut BytesMut,
) -> Result<()> {
buf.put_i32(response.header.correlation_id);
buf.put_i32(response.throttle_time_ms);
buf.put_i16(response.error_code);
buf.put_i32(response.generation_id);
Self::encode_string(&response.protocol_name, buf);
Self::encode_string(&response.protocol_type, buf);
Self::encode_string(&response.leader, buf);
Self::encode_string(&response.member_id, buf);
buf.put_i32(response.members.len() as i32);
for member in &response.members {
Self::encode_string(&member.member_id, buf);
Self::encode_nullable_string(&member.group_instance_id, buf);
Self::encode_bytes(&member.metadata, buf);
}
Ok(())
}
fn decode_describe_groups_request(
header: KafkaRequestHeader,
cursor: &mut Cursor<&[u8]>,
) -> Result<KafkaDescribeGroupsRequest> {
let group_count = cursor.get_i32();
let mut groups = Vec::with_capacity(group_count as usize);
for _ in 0..group_count {
groups.push(Self::decode_string(cursor)?);
}
let include_authorized_operations = cursor.get_i8() != 0;
Ok(KafkaDescribeGroupsRequest {
header,
groups,
include_authorized_operations,
})
}
fn encode_describe_groups_response(
response: &KafkaDescribeGroupsResponse,
buf: &mut BytesMut,
) -> Result<()> {
buf.put_i32(response.header.correlation_id);
buf.put_i32(response.throttle_time_ms);
buf.put_i32(response.groups.len() as i32);
for group in &response.groups {
buf.put_i16(group.error_code);
Self::encode_string(&group.group_id, buf);
Self::encode_string(&group.group_state, buf);
Self::encode_string(&group.protocol_type, buf);
Self::encode_string(&group.protocol_data, buf);
buf.put_i32(group.members.len() as i32);
for member in &group.members {
Self::encode_string(&member.member_id, buf);
Self::encode_nullable_string(&member.group_instance_id, buf);
Self::encode_string(&member.client_id, buf);
Self::encode_string(&member.client_host, buf);
Self::encode_bytes(&member.member_metadata, buf);
Self::encode_bytes(&member.member_assignment, buf);
}
buf.put_i32(group.authorized_operations);
}
Ok(())
}
fn decode_api_versions_request(
header: KafkaRequestHeader,
cursor: &mut Cursor<&[u8]>,
) -> Result<KafkaApiVersionsRequest> {
debug!(
"Decoding ApiVersions request: api_version={}, remaining_bytes={}, correlation_id={}",
header.api_version,
cursor.remaining(),
header.correlation_id
);
let (client_software_name, client_software_version) = if header.api_version >= 3 {
debug!("Decoding ApiVersions v3+ with KIP-482 flexible versions");
debug!("Remaining bytes before field parsing: {}", cursor.remaining());
let remaining_bytes = cursor.remaining();
if remaining_bytes > 0 {
let pos = cursor.position() as usize;
let slice = cursor.get_ref();
let hex_bytes = &slice[pos..pos+std::cmp::min(remaining_bytes, 32)];
debug!("Next {} hex bytes: {}", hex_bytes.len(), hex_bytes.iter().map(|b| format!("{:02x}", b)).collect::<Vec<_>>().join(""));
}
let name = if cursor.remaining() > 0 {
match Self::decode_varint(cursor) {
Ok(varint_len) => {
debug!("First varint length: {}", varint_len);
if varint_len == 0 {
debug!("Null string for client_software_name");
None
} else {
let actual_len = (varint_len - 1) as usize; if cursor.remaining() >= actual_len {
let mut buf = vec![0u8; actual_len];
cursor.copy_to_slice(&mut buf);
match String::from_utf8(buf) {
Ok(name_str) => {
debug!("Decoded client_software_name (len={}): {:?}", actual_len, name_str);
Some(name_str)
}
Err(e) => {
warn!("Invalid UTF-8 in client_software_name: {}", e);
None
}
}
} else {
warn!("Invalid string length: {} (remaining: {})", actual_len, cursor.remaining());
None
}
}
}
Err(e) => {
warn!("Failed to decode varint for client_software_name: {}", e);
None
}
}
} else {
None
};
let version = if cursor.remaining() > 0 {
match Self::decode_varint(cursor) {
Ok(varint_len) => {
debug!("Second varint length: {}", varint_len);
if varint_len == 0 {
debug!("Null string for client_software_version");
None
} else {
let actual_len = (varint_len - 1) as usize; if cursor.remaining() >= actual_len {
let mut buf = vec![0u8; actual_len];
cursor.copy_to_slice(&mut buf);
match String::from_utf8(buf) {
Ok(version_str) => {
debug!("Decoded client_software_version (len={}): {:?}", actual_len, version_str);
Some(version_str)
}
Err(e) => {
warn!("Invalid UTF-8 in client_software_version: {}", e);
None
}
}
} else {
warn!("Invalid string length: {} (remaining: {})", actual_len, cursor.remaining());
None
}
}
}
Err(e) => {
warn!("Failed to decode varint for client_software_version: {}", e);
None
}
}
} else {
None
};
if cursor.remaining() > 0 {
match Self::decode_varint(cursor) {
Ok(num_tagged_fields) => {
debug!("Tagged fields count: {}", num_tagged_fields);
for i in 0..num_tagged_fields {
if let (Ok(tag), Ok(size)) = (Self::decode_varint(cursor), Self::decode_varint(cursor)) {
debug!(" Tagged field {}: tag={}, size={}", i, tag, size);
let advance_size = size as usize;
if cursor.remaining() >= advance_size {
cursor.advance(advance_size);
debug!(" Advanced by {} bytes, remaining: {}", advance_size, cursor.remaining());
} else {
warn!(" Cannot advance by {} bytes, only {} remaining. Consuming all remaining bytes.",
advance_size, cursor.remaining());
cursor.advance(cursor.remaining()); break; }
} else {
warn!("Failed to decode tagged field {}", i);
break;
}
}
}
Err(e) => {
warn!("Failed to decode tagged fields count: {}", e);
let remaining = cursor.remaining();
if remaining > 0 {
debug!("Consuming {} remaining bytes", remaining);
cursor.advance(remaining);
}
}
}
}
debug!("Successfully parsed Java 4.1 ApiVersions format");
(name, version)
} else {
debug!("Decoding ApiVersions v0-v2 (no client software fields)");
(None, None)
};
debug!(
"Decoded ApiVersions: name={:?}, version={:?}",
client_software_name, client_software_version
);
Ok(KafkaApiVersionsRequest {
header,
client_software_name,
client_software_version,
})
}
fn encode_api_versions_response(
response: &KafkaApiVersionsResponse,
buf: &mut BytesMut,
) -> Result<()> {
buf.put_i32(response.header.correlation_id);
if response.api_version >= 3 {
debug!("Encoding ApiVersions v{} with flexible versions (FIXED ORDER)", response.api_version);
debug!("ApiVersions v{} response structure:", response.api_version);
debug!(" correlation_id: {}", response.header.correlation_id);
debug!(" error_code: {}", response.error_code);
debug!(" api_keys count: {}", response.api_keys.len());
debug!(" throttle_time_ms: {}", response.throttle_time_ms);
debug!(" Adding error_code: {}", response.error_code);
buf.put_i16(response.error_code);
let compact_length = (response.api_keys.len() as u64) + 1;
debug!(" compact_length (raw): {}", response.api_keys.len());
debug!(" compact_length (encoded): {}", compact_length);
Self::encode_varint(buf, compact_length);
for (i, api_key) in response.api_keys.iter().enumerate() {
if i < 5 { debug!(" api_key[{}]: key={}, min={}, max={}", i, api_key.api_key, api_key.min_version, api_key.max_version);
}
buf.put_i16(api_key.api_key);
buf.put_i16(api_key.min_version);
buf.put_i16(api_key.max_version);
buf.put_u8(0); }
debug!(" Adding throttle_time_ms: {} at CORRECT position", response.throttle_time_ms);
buf.put_i32(response.throttle_time_ms);
if response.api_version >= 3 {
debug!(" Adding cluster_id: {:?}", response.cluster_id);
Self::encode_compact_nullable_string(&response.cluster_id, buf);
if let Some(controller_id) = response.controller_id {
debug!(" Adding controller_id: {}", controller_id);
buf.put_i32(controller_id);
} else {
debug!(" Adding controller_id: -1 (no controller)");
buf.put_i32(-1); }
debug!(" Adding supported_features count: {}", response.supported_features.len());
Self::encode_compact_array_len(response.supported_features.len(), buf);
for feature in &response.supported_features {
Self::encode_compact_string(feature, buf);
}
}
debug!(" Adding final tagged fields marker: 0x00");
buf.put_u8(0); } else {
debug!("Encoding ApiVersions v{} with standard format", response.api_version);
buf.put_i32(response.api_keys.len() as i32);
for api_key in &response.api_keys {
buf.put_i16(api_key.api_key);
buf.put_i16(api_key.min_version);
buf.put_i16(api_key.max_version);
}
if response.api_version >= 1 {
debug!(" Adding throttle_time_ms: {} at end of v0-v2 response", response.throttle_time_ms);
buf.put_i32(response.throttle_time_ms);
}
}
Ok(())
}
fn decode_string(cursor: &mut Cursor<&[u8]>) -> Result<String> {
let len = cursor.get_i16();
debug!("decode_string: length = {}", len);
if len == -1 {
return Err(KafkaCodecError::InvalidFormat(
"Expected non-null string".to_string(),
));
}
if len == 0 {
debug!("decode_string: empty string");
return Ok(String::new());
}
let remaining = cursor.get_ref().len() - cursor.position() as usize;
if len as usize > remaining {
warn!("Buffer underrun in decode_string: requested {} bytes, only {} available at position {}. Buffer total length: {}",
len, remaining, cursor.position(), cursor.get_ref().len());
return Err(KafkaCodecError::InvalidFormat(format!(
"String length {} exceeds remaining buffer size {}. This may indicate a framing error or corrupt message.",
len, remaining
)));
}
let mut buf = vec![0u8; len as usize];
cursor.copy_to_slice(&mut buf);
let result = String::from_utf8(buf)
.map_err(|e| KafkaCodecError::InvalidFormat(format!("Invalid UTF-8: {}", e)))?;
debug!("decode_string: result = '{}'", result);
Ok(result)
}
fn decode_nullable_string(cursor: &mut Cursor<&[u8]>) -> Result<Option<String>> {
let len = cursor.get_i16();
if len == -1 {
return Ok(None);
}
if len == 0 {
return Ok(Some(String::new()));
}
let remaining = cursor.get_ref().len() - cursor.position() as usize;
if len as usize > remaining {
warn!("Buffer underrun in decode_nullable_string: requested {} bytes, only {} available at position {}. Buffer total length: {}",
len, remaining, cursor.position(), cursor.get_ref().len());
return Err(KafkaCodecError::InvalidFormat(format!(
"Nullable string length {} exceeds remaining buffer size {}. This may indicate a framing error or corrupt message.",
len, remaining
)));
}
let mut buf = vec![0u8; len as usize];
cursor.copy_to_slice(&mut buf);
let s = String::from_utf8(buf)
.map_err(|e| KafkaCodecError::InvalidFormat(format!("Invalid UTF-8: {}", e)))?;
Ok(Some(s))
}
fn decode_bytes(cursor: &mut Cursor<&[u8]>) -> Result<Bytes> {
let len = cursor.get_i32();
if len == -1 {
return Err(KafkaCodecError::InvalidFormat(
"Expected non-null bytes".to_string(),
));
}
if len == 0 {
return Ok(Bytes::new());
}
let remaining = cursor.get_ref().len() - cursor.position() as usize;
if len as usize > remaining {
return Err(KafkaCodecError::InvalidFormat(format!(
"Bytes length {} exceeds remaining buffer size {}", len, remaining
)));
}
let mut buf = vec![0u8; len as usize];
cursor.copy_to_slice(&mut buf);
Ok(Bytes::from(buf))
}
fn decode_nullable_bytes(cursor: &mut Cursor<&[u8]>) -> Result<Option<Bytes>> {
let len = cursor.get_i32();
if len == -1 {
return Ok(None);
}
if len == 0 {
return Ok(Some(Bytes::new()));
}
let remaining = cursor.get_ref().len() - cursor.position() as usize;
if len as usize > remaining {
return Err(KafkaCodecError::InvalidFormat(format!(
"Nullable bytes length {} exceeds remaining buffer size {}", len, remaining
)));
}
let mut buf = vec![0u8; len as usize];
cursor.copy_to_slice(&mut buf);
Ok(Some(Bytes::from(buf)))
}
fn encode_string(s: &str, buf: &mut BytesMut) {
buf.put_i16(s.len() as i16);
buf.put_slice(s.as_bytes());
}
fn encode_nullable_string(s: &Option<String>, buf: &mut BytesMut) {
match s {
Some(s) => {
buf.put_i16(s.len() as i16);
buf.put_slice(s.as_bytes());
}
None => buf.put_i16(-1),
}
}
fn encode_bytes(bytes: &Bytes, buf: &mut BytesMut) {
buf.put_i32(bytes.len() as i32);
buf.put_slice(bytes);
}
fn encode_nullable_bytes(bytes: &Option<Bytes>, buf: &mut BytesMut) {
match bytes {
Some(bytes) => {
buf.put_i32(bytes.len() as i32);
buf.put_slice(bytes);
}
None => buf.put_i32(-1),
}
}
fn decode_create_topics_request(
header: KafkaRequestHeader,
cursor: &mut Cursor<&[u8]>,
) -> Result<KafkaCreateTopicsRequest> {
let topic_count = cursor.get_i32();
let mut topics = Vec::with_capacity(topic_count as usize);
for _ in 0..topic_count {
let name = Self::decode_string(cursor)?;
let num_partitions = cursor.get_i32();
let replication_factor = cursor.get_i16();
let assignment_count = cursor.get_i32();
for _ in 0..assignment_count {
let _partition_id = cursor.get_i32();
let replica_count = cursor.get_i32();
for _ in 0..replica_count {
let _broker_id = cursor.get_i32();
}
}
let config_count = cursor.get_i32();
let mut configs = Vec::new();
for _ in 0..config_count {
let config_name = Self::decode_string(cursor)?;
let config_value = Self::decode_nullable_string(cursor)?;
configs.push(KafkaCreatableTopicConfigs {
name: config_name,
value: config_value,
read_only: false,
config_source: 0,
is_sensitive: false,
});
}
topics.push(KafkaCreatableTopic {
name,
num_partitions,
replication_factor,
assignments: vec![],
configs: Some(configs),
});
}
let timeout_ms = cursor.get_i32();
let validate_only = if cursor.remaining() > 0 {
cursor.get_u8() != 0
} else {
false
};
Ok(KafkaCreateTopicsRequest {
header,
topics,
timeout_ms,
validate_only,
})
}
fn encode_create_topics_response(
response: &KafkaCreateTopicsResponse,
buf: &mut BytesMut,
) -> Result<()> {
buf.put_i32(response.header.correlation_id);
buf.put_i32(response.throttle_time_ms);
buf.put_i32(response.topics.len() as i32);
for topic in &response.topics {
Self::encode_string(&topic.name, buf);
buf.put_i16(topic.error_code);
Self::encode_nullable_string(&topic.error_message, buf);
buf.put_i32(0);
}
Ok(())
}
fn decode_delete_topics_request(
header: KafkaRequestHeader,
cursor: &mut Cursor<&[u8]>,
) -> Result<KafkaDeleteTopicsRequest> {
let topic_count = cursor.get_i32();
let mut topic_names = Vec::with_capacity(topic_count as usize);
for _ in 0..topic_count {
let topic_name = Self::decode_string(cursor)?;
topic_names.push(topic_name);
}
let timeout_ms = if header.api_version >= 1 {
cursor.get_i32()
} else {
5000 };
Ok(KafkaDeleteTopicsRequest {
correlation_id: header.correlation_id,
client_id: header.client_id,
topic_names,
timeout_ms,
})
}
fn encode_delete_topics_response(
response: &KafkaDeleteTopicsResponse,
buf: &mut BytesMut,
) -> Result<()> {
buf.put_i32(response.correlation_id);
buf.put_i32(response.throttle_time_ms);
buf.put_i32(response.responses.len() as i32);
for topic_response in &response.responses {
Self::encode_string(&topic_response.name, buf);
buf.put_i16(topic_response.error_code);
Self::encode_nullable_string(&topic_response.error_message, buf);
}
Ok(())
}
fn decode_sasl_handshake_request(
header: KafkaRequestHeader,
cursor: &mut Cursor<&[u8]>,
) -> Result<KafkaSaslHandshakeRequest> {
let mechanism = Self::decode_string(cursor)?;
Ok(KafkaSaslHandshakeRequest {
correlation_id: header.correlation_id,
client_id: header.client_id,
mechanism,
})
}
fn encode_sasl_handshake_response(
response: &KafkaSaslHandshakeResponse,
buf: &mut BytesMut,
) -> Result<()> {
buf.put_i32(response.correlation_id);
buf.put_i16(response.error_code);
buf.put_i32(response.mechanisms.len() as i32);
for mechanism in &response.mechanisms {
Self::encode_string(mechanism, buf);
}
Ok(())
}
fn decode_sasl_authenticate_request(
header: KafkaRequestHeader,
cursor: &mut Cursor<&[u8]>,
) -> Result<KafkaSaslAuthenticateRequest> {
let auth_bytes_length = cursor.get_i32();
let mut auth_bytes = vec![0u8; auth_bytes_length as usize];
cursor.read_exact(&mut auth_bytes).map_err(|_| {
KafkaCodecError::InvalidFormat("Failed to read SASL auth bytes".to_string())
})?;
Ok(KafkaSaslAuthenticateRequest {
correlation_id: header.correlation_id,
client_id: header.client_id,
auth_bytes,
})
}
fn encode_sasl_authenticate_response(
response: &KafkaSaslAuthenticateResponse,
buf: &mut BytesMut,
) -> Result<()> {
buf.put_i32(response.correlation_id);
buf.put_i16(response.error_code);
Self::encode_nullable_string(&response.error_message, buf);
buf.put_i32(response.auth_bytes.len() as i32);
buf.put_slice(&response.auth_bytes);
buf.put_i64(response.session_lifetime_ms);
Ok(())
}
fn decode_describe_configs_request(
header: KafkaRequestHeader,
cursor: &mut Cursor<&[u8]>,
) -> Result<KafkaDescribeConfigsRequest> {
let resources_length = cursor.get_i32();
let mut resources = Vec::with_capacity(resources_length as usize);
for _ in 0..resources_length {
let resource_type = cursor.get_i8();
let resource_name = Self::decode_string(cursor)?;
let config_keys_length = cursor.get_i32();
let configuration_keys = if config_keys_length == -1 {
None
} else {
let mut keys = Vec::with_capacity(config_keys_length as usize);
for _ in 0..config_keys_length {
let key = Self::decode_string(cursor)?;
keys.push(key);
}
Some(keys)
};
resources.push(KafkaConfigResource {
resource_type,
resource_name,
configuration_keys,
});
}
let include_synonyms = if header.api_version >= 1 {
cursor.get_u8() != 0
} else {
false
};
let include_documentation = if header.api_version >= 3 {
cursor.get_u8() != 0
} else {
false
};
Ok(KafkaDescribeConfigsRequest {
correlation_id: header.correlation_id,
client_id: header.client_id,
api_version: header.api_version,
resources,
include_synonyms,
include_documentation,
})
}
fn encode_describe_configs_response(
response: &KafkaDescribeConfigsResponse,
buf: &mut BytesMut,
) -> Result<()> {
buf.put_i32(response.correlation_id);
buf.put_i32(response.throttle_time_ms);
buf.put_i32(response.results.len() as i32);
for result in &response.results {
buf.put_i16(result.error_code);
Self::encode_nullable_string(&result.error_message, buf);
buf.put_i8(result.resource_type);
Self::encode_string(&result.resource_name, buf);
buf.put_i32(result.configs.len() as i32);
for config in &result.configs {
Self::encode_string(&config.name, buf);
Self::encode_nullable_string(&config.value, buf);
buf.put_u8(if config.read_only { 1 } else { 0 });
buf.put_u8(if config.is_default { 1 } else { 0 });
buf.put_i8(config.config_source);
buf.put_u8(if config.is_sensitive { 1 } else { 0 });
buf.put_i32(config.synonyms.len() as i32);
for synonym in &config.synonyms {
Self::encode_string(&synonym.name, buf);
Self::encode_nullable_string(&synonym.value, buf);
buf.put_i8(synonym.source);
}
buf.put_i8(config.config_type);
Self::encode_nullable_string(&config.documentation, buf);
}
}
Ok(())
}
fn decode_alter_configs_request(
header: KafkaRequestHeader,
cursor: &mut Cursor<&[u8]>,
) -> Result<KafkaAlterConfigsRequest> {
let resources_length = cursor.get_i32();
let mut resources = Vec::with_capacity(resources_length as usize);
for _ in 0..resources_length {
let resource_type = cursor.get_i8();
let resource_name = Self::decode_string(cursor)?;
let configs_length = cursor.get_i32();
let mut configs = Vec::with_capacity(configs_length as usize);
for _ in 0..configs_length {
let name = Self::decode_string(cursor)?;
let value = Self::decode_nullable_string(cursor)?;
configs.push(KafkaAlterableConfig { name, value });
}
resources.push(KafkaAlterConfigsResource {
resource_type,
resource_name,
configs,
});
}
let validate_only = cursor.get_u8() != 0;
Ok(KafkaAlterConfigsRequest {
correlation_id: header.correlation_id,
client_id: header.client_id,
api_version: header.api_version,
resources,
validate_only,
})
}
fn encode_alter_configs_response(
response: &KafkaAlterConfigsResponse,
buf: &mut BytesMut,
) -> Result<()> {
buf.put_i32(response.correlation_id);
buf.put_i32(response.throttle_time_ms);
buf.put_i32(response.responses.len() as i32);
for resource_response in &response.responses {
buf.put_i16(resource_response.error_code);
Self::encode_nullable_string(&resource_response.error_message, buf);
buf.put_i8(resource_response.resource_type);
Self::encode_string(&resource_response.resource_name, buf);
}
Ok(())
}
#[allow(dead_code)]
fn encode_varint(buf: &mut BytesMut, value: u64) {
let mut val = value;
loop {
let mut byte = (val & 0x7F) as u8;
val >>= 7;
if val != 0 {
byte |= 0x80;
}
buf.put_u8(byte);
if val == 0 {
break;
}
}
}
fn decode_varint(cursor: &mut Cursor<&[u8]>) -> Result<u64> {
let mut result = 0u64;
let mut shift = 0;
loop {
if shift >= 64 {
return Err(KafkaCodecError::InvalidFormat("Varint too large".to_string()));
}
if !cursor.has_remaining() {
return Err(KafkaCodecError::InvalidFormat("Incomplete varint".to_string()));
}
let byte = cursor.get_u8();
result |= ((byte & 0x7F) as u64) << shift;
if (byte & 0x80) == 0 {
break;
}
shift += 7;
}
Ok(result)
}
fn decode_compact_string(cursor: &mut Cursor<&[u8]>) -> Result<Option<String>> {
let len = Self::decode_varint(cursor)?;
if len == 0 {
return Ok(None);
}
if len == 1 {
return Ok(Some(String::new()));
}
let actual_len = (len - 1) as usize;
if cursor.remaining() < actual_len {
return Err(KafkaCodecError::InvalidFormat(
"Not enough bytes for compact string".to_string()
));
}
let mut buf = vec![0u8; actual_len];
cursor.copy_to_slice(&mut buf);
let result = String::from_utf8(buf)
.map_err(|e| KafkaCodecError::InvalidFormat(format!("Invalid UTF-8: {}", e)))?;
Ok(Some(result))
}
#[allow(dead_code)]
fn encode_empty_tagged_fields(buf: &mut BytesMut) {
buf.put_u8(0);
}
#[allow(dead_code)]
fn decode_tagged_fields(cursor: &mut Cursor<&[u8]>) -> Result<()> {
if cursor.remaining() == 0 {
debug!("No remaining bytes for tagged fields, treating as empty");
return Ok(());
}
let num_fields = match Self::decode_varint(cursor) {
Ok(n) => n,
Err(e) => {
debug!("Failed to decode tagged field count, treating as empty: {}", e);
return Ok(());
}
};
debug!("Decoding {} tagged fields", num_fields);
for i in 0..num_fields {
debug!("Processing tagged field {}/{}", i + 1, num_fields);
let tag = match Self::decode_varint(cursor) {
Ok(tag) => tag,
Err(e) => {
debug!("Failed to decode tag {}: {}, stopping tagged field parsing", i, e);
break;
}
};
let data_len = match Self::decode_varint(cursor) {
Ok(len) => len,
Err(e) => {
debug!("Failed to decode data length for tag {}: {}, stopping tagged field parsing", tag, e);
break;
}
};
debug!("Tagged field: tag={}, data_len={}", tag, data_len);
if cursor.remaining() < data_len as usize {
debug!(
"Not enough bytes for tagged field data: needed={}, available={}, treating as end of fields",
data_len, cursor.remaining()
);
break;
}
cursor.advance(data_len as usize);
debug!("Successfully skipped tagged field data ({} bytes)", data_len);
}
debug!("Completed tagged fields parsing");
Ok(())
}
#[allow(dead_code)]
fn encode_compact_string(s: &str, buf: &mut BytesMut) {
let len = s.len();
let varint_len = len as u64 + 1;
Self::encode_varint(buf, varint_len);
info!(" - FIXED compact string '{}': length={}, encoded as varint {} (Java will read {})",
s, len, varint_len, len);
buf.put_slice(s.as_bytes());
}
#[allow(dead_code)]
fn encode_compact_nullable_string(s: &Option<String>, buf: &mut BytesMut) {
match s {
Some(s) => {
let len = s.len();
let varint_len = len as u64 + 1;
Self::encode_varint(buf, varint_len);
info!(" - FIXED compact nullable string '{}': length={}, encoded as varint {} (Java will read {})",
s, len, varint_len, len);
buf.put_slice(s.as_bytes());
}
None => {
buf.put_u8(0);
info!(" - FIXED compact nullable string: null, encoded as 0x00");
}
}
}
#[allow(dead_code)]
fn encode_compact_array_len(len: usize, buf: &mut BytesMut) {
Self::encode_varint(buf, len as u64 + 1);
}
}
pub struct KafkaFrameCodec;
impl Decoder for KafkaFrameCodec {
type Item = Bytes;
type Error = KafkaCodecError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>> {
if src.len() < 4 {
return Ok(None);
}
let message_length = {
let mut cursor = Cursor::new(src.as_ref());
cursor.get_i32()
};
if message_length < 0 || message_length > 100_000_000 {
return Err(KafkaCodecError::InvalidFormat(format!(
"Invalid message length: {}",
message_length
)));
}
let total_length = 4 + message_length as usize;
if src.len() < total_length {
return Ok(None);
}
let mut full_message = src.split_to(total_length);
full_message.advance(4); let message = full_message.freeze();
info!("KafkaFrameCodec: Decoded request without length prefix: {} bytes", message.len());
Ok(Some(message))
}
}
impl Encoder<Bytes> for KafkaFrameCodec {
type Error = KafkaCodecError;
fn encode(&mut self, item: Bytes, dst: &mut BytesMut) -> Result<()> {
let message_len = item.len() as u32;
dst.put_u32(message_len); dst.extend_from_slice(&item);
info!("KafkaFrameCodec: Encoded response with length prefix: {} bytes total", message_len + 4);
if dst.len() >= 12 {
let first_12_bytes = &dst[0..12];
info!(" - FRAMED RESPONSE BYTES: [{:02x} {:02x} {:02x} {:02x}] [{:02x} {:02x} {:02x} {:02x}] [{:02x} {:02x} {:02x} {:02x}]",
first_12_bytes[0], first_12_bytes[1], first_12_bytes[2], first_12_bytes[3],
first_12_bytes[4], first_12_bytes[5], first_12_bytes[6], first_12_bytes[7],
first_12_bytes[8], first_12_bytes[9], first_12_bytes[10], first_12_bytes[11]);
info!(" - LENGTH_PREFIX: {} = 0x{:08x}", message_len, message_len);
info!(" - ACTUAL_CORRELATION_ID_AT_BYTES_4-7: {} (this should be correlation_id=1)",
i32::from_be_bytes([first_12_bytes[4], first_12_bytes[5], first_12_bytes[6], first_12_bytes[7]]));
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
#[test]
fn test_string_encoding_decoding() {
let mut buf = BytesMut::new();
let test_string = "hello world";
KafkaCodec::encode_string(test_string, &mut buf);
let mut cursor = Cursor::new(buf.as_ref());
let decoded = KafkaCodec::decode_string(&mut cursor).unwrap();
assert_eq!(decoded, test_string);
}
#[test]
fn test_nullable_string_encoding_decoding() {
let mut buf = BytesMut::new();
KafkaCodec::encode_nullable_string(&Some("test".to_string()), &mut buf);
KafkaCodec::encode_nullable_string(&None, &mut buf);
let mut cursor = Cursor::new(buf.as_ref());
let decoded1 = KafkaCodec::decode_nullable_string(&mut cursor).unwrap();
let decoded2 = KafkaCodec::decode_nullable_string(&mut cursor).unwrap();
assert_eq!(decoded1, Some("test".to_string()));
assert_eq!(decoded2, None);
}
#[test]
fn test_bytes_encoding_decoding() {
let mut buf = BytesMut::new();
let test_bytes = Bytes::from("test data");
KafkaCodec::encode_bytes(&test_bytes, &mut buf);
let mut cursor = Cursor::new(buf.as_ref());
let decoded = KafkaCodec::decode_bytes(&mut cursor).unwrap();
assert_eq!(decoded, test_bytes);
}
}