fn api_key_name(key: i16) -> &'static str {
match key {
0 => "Produce",
1 => "Fetch",
2 => "ListOffsets",
3 => "Metadata",
4 => "LeaderAndIsr",
5 => "StopReplica",
6 => "UpdateMetadata",
7 => "ControlledShutdown",
8 => "OffsetCommit",
9 => "OffsetFetch",
10 => "FindCoordinator",
11 => "JoinGroup",
12 => "Heartbeat",
13 => "LeaveGroup",
14 => "SyncGroup",
15 => "DescribeGroups",
16 => "ListGroups",
18 => "ApiVersions",
19 => "CreateTopics",
20 => "DeleteTopics",
21 => "DeleteRecords",
22 => "InitProducerId",
23 => "OffsetForLeaderEpoch",
24 => "AddPartitionsToTxn",
25 => "AddOffsetsToTxn",
26 => "EndTxn",
31 => "DescribeAcls",
32 => "DescribeConfigs",
33 => "AlterConfigs",
35 => "DescribeLogDirs",
36 => "SaslHandshake",
37 => "SaslAuthenticate",
42 => "DeleteGroups",
44 => "IncrementalAlterConfigs",
46 => "DescribeProducers",
47 => "OffsetDelete",
50 => "DescribeCluster",
60 => "DescribeTopicPartitions",
75 => "DescribeTopicPartitions",
_ => "Unknown",
}
}
pub fn parse_kafka_request(buf: &[u8]) -> Option<String> {
if buf.len() < 12 { return None; }
let length = i32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]) as usize;
if buf.len() < 4 + length { return None; }
let api_key = i16::from_be_bytes([buf[4], buf[5]]);
let api_version = i16::from_be_bytes([buf[6], buf[7]]);
let name = api_key_name(api_key);
let detail = match api_key {
0 => extract_produce_topic(buf).map(|t| format!("Produce v{} topic={}", api_version, t)),
1 => extract_fetch_topic(buf).map(|t| format!("Fetch v{} topic={}", api_version, t)),
3 => Some(format!("Metadata v{}", api_version)),
18 => Some(format!("ApiVersions v{}", api_version)),
19 => extract_topic_after_client_id(buf).map(|t| format!("CreateTopics v{} topic={}", api_version, t)),
20 => extract_topic_after_client_id(buf).map(|t| format!("DeleteTopics v{} topic={}", api_version, t)),
_ if name == "Unknown" => Some(format!("ApiKey({}) v{}", api_key, api_version)),
_ => None,
};
Some(detail.unwrap_or_else(|| format!("{} v{}", name, api_version)))
}
pub fn parse_kafka_response(buf: &[u8]) -> Option<String> {
if buf.len() < 8 { return None; }
let length = i32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]) as usize;
let payload_size = length.saturating_sub(4);
Some(format!("OK ({} bytes)", payload_size))
}
pub fn format_kafka_response_detail(buf: &[u8]) -> Option<String> {
parse_kafka_response(buf)
}
pub fn extract_kafka_full_command(buf: &[u8]) -> Option<String> {
if buf.len() < 12 { return None; }
let api_key = i16::from_be_bytes([buf[4], buf[5]]);
let summary = parse_kafka_request(buf)?;
if api_key != 0 {
return Some(summary);
}
if let Some(records) = extract_produce_records(buf) {
if !records.is_empty() {
return Some(format!("{}\n{}", summary, records.join("\n")));
}
}
if let Some(body) = extract_readable_payload(buf) {
return Some(format!("{}\n{}", summary, body));
}
Some(summary)
}
fn extract_readable_payload(buf: &[u8]) -> Option<String> {
let search_start = 40.min(buf.len());
let mut best: Option<&[u8]> = None;
let mut i = search_start;
while i < buf.len() {
if buf[i] >= 0x20 && buf[i] < 0x7F {
let start = i;
while i < buf.len() && buf[i] >= 0x20 && buf[i] < 0x7F {
i += 1;
}
let candidate = &buf[start..i];
let len = candidate.len();
if len < 10 { continue; }
if let Some(json_offset) = candidate.iter().position(|&b| b == b'{' || b == b'[') {
let json_slice = &candidate[json_offset..];
if json_slice.len() >= 5 {
return Some(String::from_utf8_lossy(json_slice).to_string());
}
}
if best.is_none_or(|b| candidate.len() > b.len()) {
best = Some(candidate);
}
} else {
i += 1;
}
}
best.map(|b| String::from_utf8_lossy(b).to_string())
}
fn extract_produce_records(buf: &[u8]) -> Option<Vec<String>> {
let mut results = Vec::new();
for pos in 12..buf.len().saturating_sub(61) {
if buf[pos + 16] != 0x02 { continue; }
let batch_len = i32::from_be_bytes([buf[pos + 8], buf[pos + 9], buf[pos + 10], buf[pos + 11]]);
if !(49..=10_000_000).contains(&batch_len) { continue; }
let batch_len = batch_len as usize;
if pos + 12 + batch_len > buf.len() { continue; }
let base_offset = i64::from_be_bytes([buf[pos], buf[pos+1], buf[pos+2], buf[pos+3], buf[pos+4], buf[pos+5], buf[pos+6], buf[pos+7]]);
if base_offset != 0 { continue; }
let attributes = i16::from_be_bytes([buf[pos + 21], buf[pos + 22]]);
let compression = attributes & 0x07;
let record_count = i32::from_be_bytes([buf[pos + 53], buf[pos + 54], buf[pos + 55], buf[pos + 56]]);
if !(0..=100_000).contains(&record_count) { continue; }
if compression != 0 {
results.push(format!("({} record(s), compressed)", record_count));
return Some(results);
}
let records_start = pos + 57;
let records_end = pos + 12 + batch_len;
let mut rpos = records_start;
for _ in 0..record_count {
if rpos >= records_end { break; }
if let Some((value, consumed)) = parse_record(buf, rpos, records_end) {
if let Some(v) = value {
results.push(v);
}
rpos += consumed;
} else {
break;
}
}
return if results.is_empty() { None } else { Some(results) };
}
None
}
fn parse_record(buf: &[u8], start: usize, end: usize) -> Option<(Option<String>, usize)> {
let mut pos = start;
if pos >= end { return None; }
let (record_len, n) = decode_varint(buf, pos)?;
pos += n;
let record_len = record_len as usize;
if pos + record_len > end { return None; }
let record_end = pos + record_len;
if pos >= record_end { return Some((None, (record_end - start))); }
pos += 1;
let (_, n) = decode_varint(buf, pos)?;
pos += n;
let (_, n) = decode_varint(buf, pos)?;
pos += n;
let (key_len, n) = decode_varint(buf, pos)?;
pos += n;
if key_len > 0 {
pos += key_len as usize;
}
let (value_len, n) = decode_varint(buf, pos)?;
pos += n;
let value = if value_len > 0 && pos + value_len as usize <= record_end {
let v = &buf[pos..pos + value_len as usize];
Some(String::from_utf8_lossy(v).to_string())
} else {
None
};
Some((value, record_end - start))
}
fn decode_varint(buf: &[u8], start: usize) -> Option<(i64, usize)> {
let mut result: u64 = 0;
let mut shift = 0;
let mut pos = start;
loop {
if pos >= buf.len() { return None; }
let byte = buf[pos];
result |= ((byte & 0x7F) as u64) << shift;
pos += 1;
if byte & 0x80 == 0 { break; }
shift += 7;
if shift > 63 { return None; }
}
let decoded = ((result >> 1) as i64) ^ -((result & 1) as i64);
Some((decoded, pos - start))
}
pub fn kafka_frame_complete(buf: &[u8]) -> bool {
if buf.len() < 4 { return false; }
let length = i32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]) as usize;
buf.len() >= 4 + length
}
fn extract_produce_topic(buf: &[u8]) -> Option<String> {
extract_topic_after_client_id(buf)
}
fn extract_fetch_topic(buf: &[u8]) -> Option<String> {
extract_topic_after_client_id(buf)
}
fn extract_topic_after_client_id(buf: &[u8]) -> Option<String> {
if buf.len() < 14 { return None; }
let pos = 12;
if pos + 2 > buf.len() { return None; }
let client_id_len = i16::from_be_bytes([buf[pos], buf[pos + 1]]);
let after_client = if client_id_len < 0 {
pos + 2
} else {
pos + 2 + client_id_len as usize
};
find_first_string(buf, after_client)
}
fn find_first_string(buf: &[u8], pos: usize) -> Option<String> {
if pos + 2 > buf.len() { return None; }
for offset in 0..20 {
let p = pos + offset;
if p + 2 > buf.len() { break; }
let len = i16::from_be_bytes([buf[p], buf[p + 1]]);
if len > 0 && len < 256 && p + 2 + len as usize <= buf.len() {
let s = &buf[p + 2..p + 2 + len as usize];
if let Ok(topic) = std::str::from_utf8(s) {
if topic.chars().all(|c| c.is_alphanumeric() || c == '-' || c == '_' || c == '.') && topic.len() > 1 {
return Some(topic.to_string());
}
}
}
}
None
}
#[cfg(test)]
mod tests {
use super::*;
fn make_request(api_key: i16, api_version: i16, client_id: &str) -> Vec<u8> {
let client_id_bytes = client_id.as_bytes();
let payload_len = 2 + 2 + 4 + 2 + client_id_bytes.len();
let mut buf = Vec::new();
buf.extend_from_slice(&(payload_len as i32).to_be_bytes()); buf.extend_from_slice(&api_key.to_be_bytes());
buf.extend_from_slice(&api_version.to_be_bytes());
buf.extend_from_slice(&1i32.to_be_bytes()); buf.extend_from_slice(&(client_id_bytes.len() as i16).to_be_bytes());
buf.extend_from_slice(client_id_bytes);
buf
}
#[test]
fn test_parse_metadata_request() {
let buf = make_request(3, 12, "my-app");
assert_eq!(parse_kafka_request(&buf), Some("Metadata v12".into()));
}
#[test]
fn test_parse_api_versions() {
let buf = make_request(18, 3, "kafka-client");
assert_eq!(parse_kafka_request(&buf), Some("ApiVersions v3".into()));
}
#[test]
fn test_parse_response() {
let mut buf = vec![0, 0, 0, 20]; buf.extend_from_slice(&1i32.to_be_bytes()); buf.extend_from_slice(&[0u8; 16]); assert_eq!(parse_kafka_response(&buf), Some("OK (16 bytes)".into()));
}
#[test]
fn test_frame_complete() {
let mut buf = vec![0, 0, 0, 4]; buf.extend_from_slice(&[0u8; 4]);
assert!(kafka_frame_complete(&buf));
assert!(!kafka_frame_complete(&buf[..6]));
}
#[test]
fn test_frame_incomplete() {
assert!(!kafka_frame_complete(&[0, 0, 0, 10, 0, 0]));
}
}