pub mod adapter;
pub mod api_versions;
pub mod codec;
pub mod errors;
pub mod messages;
pub use adapter::*;
pub use api_versions::*;
pub use codec::{KafkaCodec, KafkaCodecError, KafkaFrameCodec};
pub use errors::*;
pub use messages::*;
use tracing::info;
pub const API_KEY_PRODUCE: i16 = 0;
pub const API_KEY_FETCH: i16 = 1;
pub const API_KEY_LIST_OFFSETS: i16 = 2;
pub const API_KEY_METADATA: i16 = 3;
pub const API_KEY_LEADER_AND_ISR: i16 = 4;
pub const API_KEY_STOP_REPLICA: i16 = 5;
pub const API_KEY_UPDATE_METADATA: i16 = 6;
pub const API_KEY_CONTROLLED_SHUTDOWN: i16 = 7;
pub const API_KEY_OFFSET_COMMIT: i16 = 8;
pub const API_KEY_OFFSET_FETCH: i16 = 9;
pub const API_KEY_FIND_COORDINATOR: i16 = 10;
pub const API_KEY_JOIN_GROUP: i16 = 11;
pub const API_KEY_HEARTBEAT: i16 = 12;
pub const API_KEY_LEAVE_GROUP: i16 = 13;
pub const API_KEY_SYNC_GROUP: i16 = 14;
pub const API_KEY_DESCRIBE_GROUPS: i16 = 15;
pub const API_KEY_LIST_GROUPS: i16 = 16;
pub const API_KEY_SASL_HANDSHAKE: i16 = 17;
pub const API_KEY_API_VERSIONS: i16 = 18;
pub const API_KEY_CREATE_TOPICS: i16 = 19;
pub const API_KEY_DELETE_TOPICS: i16 = 20;
pub const API_KEY_DESCRIBE_CONFIGS: i16 = 32;
pub const API_KEY_ALTER_CONFIGS: i16 = 33;
pub const API_KEY_SASL_AUTHENTICATE: i16 = 36;
pub fn is_kafka_request(data: &[u8]) -> bool {
info!(
"Detecting protocol: data_len={}, first_8_bytes={:?}",
data.len(),
if data.len() >= 8 { &data[0..8] } else { data }
);
if data.len() < 8 {
info!("Protocol detection: data too short ({})", data.len());
return false;
}
let length = i32::from_be_bytes([data[0], data[1], data[2], data[3]]);
info!("Protocol detection: message_length={}", length);
if length < 4 || length > 100_000_000 {
info!("Protocol detection: invalid message length {}", length);
return false;
}
let api_key = i16::from_be_bytes([data[4], data[5]]);
info!("Protocol detection: api_key={}", api_key);
let is_kafka = matches!(
api_key,
API_KEY_PRODUCE
| API_KEY_FETCH
| API_KEY_LIST_OFFSETS
| API_KEY_METADATA
| API_KEY_LEADER_AND_ISR
| API_KEY_STOP_REPLICA
| API_KEY_UPDATE_METADATA
| API_KEY_CONTROLLED_SHUTDOWN
| API_KEY_OFFSET_COMMIT
| API_KEY_OFFSET_FETCH
| API_KEY_LIST_GROUPS
| API_KEY_JOIN_GROUP
| API_KEY_HEARTBEAT
| API_KEY_LEAVE_GROUP
| API_KEY_SYNC_GROUP
| API_KEY_DESCRIBE_GROUPS
| API_KEY_SASL_HANDSHAKE
| API_KEY_API_VERSIONS
| API_KEY_CREATE_TOPICS
| API_KEY_DELETE_TOPICS
| 15..=50 );
info!("Protocol detection result: is_kafka={}", is_kafka);
is_kafka
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_kafka_request_detection() {
let kafka_data = [
0, 0, 0, 20, 0, 0, 0, 1, 0, 0, 0, 1, ];
assert!(is_kafka_request(&kafka_data));
let short_data = [0, 0, 0];
assert!(!is_kafka_request(&short_data));
let bad_length = [255, 255, 255, 255, 0, 0];
assert!(!is_kafka_request(&bad_length));
let unknown_api = [
0, 0, 0, 20, 99, 99, 0, 1, ];
assert!(!is_kafka_request(&unknown_api));
}
}