use bytes::Bytes;
use crc32fast::Hasher;
use std::collections::HashMap;
use std::io::Read;
use std::time::{SystemTime, UNIX_EPOCH};
use tracing::debug;
use super::errors::KafkaErrorCode;
use super::messages::*;
use crate::consumer::ConsumerGroupMessage;
use crate::protocol::{
DeleteTopicsRequest, DeleteTopicsResponse, FetchRequest, FetchResponse, ListOffsetsRequest,
ListOffsetsResponse, Message, MetadataRequest, MetadataResponse, MultiFetchRequest,
MultiFetchResponse, ProduceRequest, ProduceResponse, Request, Response,
SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, SaslHandshakeResponse,
};
pub struct ProtocolAdapter;
#[derive(Debug, thiserror::Error)]
pub enum AdapterError {
#[error("Unsupported Kafka API: key={0}, version={1}")]
UnsupportedApi(i16, i16),
#[error("Unsupported operation: {0}")]
UnsupportedOperation(String),
#[error("Invalid message format: {0}")]
InvalidFormat(String),
#[error("Invalid message: {0}")]
InvalidMessage(String),
#[error("FluxMQ error: {0}")]
FluxMq(String),
#[error("Missing required field: {0}")]
MissingField(String),
}
pub type Result<T> = std::result::Result<T, AdapterError>;
impl ProtocolAdapter {
pub fn kafka_to_fluxmq(kafka_request: KafkaRequest) -> Result<Request> {
match kafka_request {
KafkaRequest::Produce(req) => {
let fluxmq_req = Self::convert_produce_request(req)?;
Ok(Request::Produce(fluxmq_req))
}
KafkaRequest::Fetch(req) => {
if req.topics.len() > 1
|| req.topics.get(0).map_or(false, |t| t.partitions.len() > 1)
{
let multi_fetch_req = Self::convert_multi_fetch_request(req)?;
Ok(Request::MultiFetch(multi_fetch_req))
} else {
let fluxmq_req = Self::convert_fetch_request(req)?;
Ok(Request::Fetch(fluxmq_req))
}
}
KafkaRequest::ListOffsets(req) => {
let fluxmq_req = Self::convert_list_offsets_request(req)?;
Ok(Request::ListOffsets(fluxmq_req))
}
KafkaRequest::Metadata(req) => {
let fluxmq_req = Self::convert_metadata_request(req)?;
Ok(Request::Metadata(fluxmq_req))
}
KafkaRequest::JoinGroup(_req) => {
Err(AdapterError::UnsupportedApi(11, 0))
}
KafkaRequest::OffsetCommit(_req) => {
Err(AdapterError::UnsupportedApi(8, 0))
}
KafkaRequest::OffsetFetch(_req) => {
Err(AdapterError::UnsupportedApi(9, 0))
}
KafkaRequest::FindCoordinator(_req) => {
Err(AdapterError::UnsupportedApi(10, 0))
}
KafkaRequest::ListGroups(_req) => {
Err(AdapterError::UnsupportedApi(16, 0))
}
KafkaRequest::Heartbeat(_req) => {
Err(AdapterError::UnsupportedApi(12, 0))
}
KafkaRequest::LeaveGroup(_req) => {
Err(AdapterError::UnsupportedApi(13, 0))
}
KafkaRequest::SyncGroup(_req) => {
Err(AdapterError::UnsupportedApi(14, 0))
}
KafkaRequest::DescribeGroups(_req) => {
Err(AdapterError::UnsupportedApi(15, 0))
}
KafkaRequest::ApiVersions(_req) => {
Err(AdapterError::UnsupportedApi(18, 0))
}
KafkaRequest::CreateTopics(req) => {
let fluxmq_req = Self::convert_create_topics_request(req)?;
Ok(Request::CreateTopics(fluxmq_req))
}
KafkaRequest::DeleteTopics(req) => {
let fluxmq_req = Self::convert_delete_topics_request(req)?;
Ok(Request::DeleteTopics(fluxmq_req))
}
KafkaRequest::DescribeConfigs(req) => {
let fluxmq_req = Self::convert_describe_configs_request(req)?;
Ok(Request::DescribeConfigs(fluxmq_req))
}
KafkaRequest::AlterConfigs(req) => {
let fluxmq_req = Self::convert_alter_configs_request(req)?;
Ok(Request::AlterConfigs(fluxmq_req))
}
KafkaRequest::SaslHandshake(req) => {
let fluxmq_req = Self::convert_sasl_handshake_request(req)?;
Ok(Request::SaslHandshake(fluxmq_req))
}
KafkaRequest::SaslAuthenticate(req) => {
let fluxmq_req = Self::convert_sasl_authenticate_request(req)?;
Ok(Request::SaslAuthenticate(fluxmq_req))
}
KafkaRequest::LeaderAndIsr(_req) => {
Err(AdapterError::UnsupportedOperation(
"LEADER_AND_ISR not implemented".to_string(),
))
}
KafkaRequest::StopReplica(_req) => {
Err(AdapterError::UnsupportedOperation(
"STOP_REPLICA not implemented".to_string(),
))
}
KafkaRequest::UpdateMetadata(_req) => {
Err(AdapterError::UnsupportedOperation(
"UPDATE_METADATA not implemented".to_string(),
))
}
KafkaRequest::ControlledShutdown(_req) => {
Err(AdapterError::UnsupportedOperation(
"CONTROLLED_SHUTDOWN not implemented".to_string(),
))
}
}
}
pub fn fluxmq_to_kafka(
fluxmq_response: Response,
correlation_id: i32,
) -> Result<KafkaResponse> {
match fluxmq_response {
Response::Produce(resp) => {
let kafka_resp = Self::convert_produce_response(resp, correlation_id)?;
Ok(KafkaResponse::Produce(kafka_resp))
}
Response::Fetch(resp) => {
let kafka_resp = Self::convert_fetch_response(resp, correlation_id)?;
Ok(KafkaResponse::Fetch(kafka_resp))
}
Response::MultiFetch(resp) => {
let kafka_resp = Self::convert_multi_fetch_response(resp, correlation_id)?;
Ok(KafkaResponse::Fetch(kafka_resp))
}
Response::ListOffsets(resp) => {
let kafka_resp = Self::convert_list_offsets_response(resp, correlation_id)?;
Ok(KafkaResponse::ListOffsets(kafka_resp))
}
Response::Metadata(resp) => {
let kafka_resp = Self::convert_metadata_response(resp, correlation_id)?;
Ok(KafkaResponse::Metadata(kafka_resp))
}
Response::CreateTopics(resp) => {
let kafka_resp = Self::convert_create_topics_response(resp, correlation_id)?;
Ok(KafkaResponse::CreateTopics(kafka_resp))
}
Response::DeleteTopics(resp) => {
let kafka_resp = Self::convert_delete_topics_response(resp, correlation_id)?;
Ok(KafkaResponse::DeleteTopics(kafka_resp))
}
Response::DescribeConfigs(resp) => {
let kafka_resp = Self::convert_describe_configs_response(resp, correlation_id)?;
Ok(KafkaResponse::DescribeConfigs(kafka_resp))
}
Response::AlterConfigs(resp) => {
let kafka_resp = Self::convert_alter_configs_response(resp, correlation_id)?;
Ok(KafkaResponse::AlterConfigs(kafka_resp))
}
Response::SaslHandshake(resp) => {
let kafka_resp = Self::convert_sasl_handshake_response(resp, correlation_id)?;
Ok(KafkaResponse::SaslHandshake(kafka_resp))
}
Response::SaslAuthenticate(resp) => {
let kafka_resp = Self::convert_sasl_authenticate_response(resp, correlation_id)?;
Ok(KafkaResponse::SaslAuthenticate(kafka_resp))
}
Response::NoResponse => {
Err(AdapterError::FluxMq(
"Fire-and-forget request - no response should be sent".to_string(),
))
}
}
}
pub fn handle_consumer_group_request(
kafka_request: KafkaRequest,
) -> Result<Option<ConsumerGroupMessage>> {
match kafka_request {
KafkaRequest::JoinGroup(req) => {
let fluxmq_msg = Self::convert_join_group_request(req)?;
Ok(Some(fluxmq_msg))
}
KafkaRequest::OffsetCommit(req) => {
let fluxmq_msg = Self::convert_offset_commit_request(req)?;
Ok(Some(fluxmq_msg))
}
KafkaRequest::OffsetFetch(req) => {
let fluxmq_msg = Self::convert_offset_fetch_request(req)?;
Ok(Some(fluxmq_msg))
}
KafkaRequest::Heartbeat(req) => {
let fluxmq_msg = Self::convert_heartbeat_request(req)?;
Ok(Some(fluxmq_msg))
}
KafkaRequest::LeaveGroup(req) => {
let fluxmq_msg = Self::convert_leave_group_request(req)?;
Ok(Some(fluxmq_msg))
}
KafkaRequest::SyncGroup(req) => {
let fluxmq_msg = Self::convert_sync_group_request(req)?;
Ok(Some(fluxmq_msg))
}
KafkaRequest::ListGroups(req) => {
let fluxmq_msg = Self::convert_list_groups_request(req)?;
Ok(Some(fluxmq_msg))
}
KafkaRequest::DescribeGroups(req) => {
let fluxmq_msg = Self::convert_describe_groups_request(req)?;
Ok(Some(fluxmq_msg))
}
_ => Ok(None), }
}
pub fn consumer_group_response_to_kafka(
fluxmq_response: ConsumerGroupMessage,
correlation_id: i32,
api_version: i16,
) -> Result<KafkaResponse> {
match fluxmq_response {
ConsumerGroupMessage::JoinGroupResponse {
error_code,
generation_id,
group_protocol,
leader_id,
consumer_id,
members,
} => {
let kafka_members = members
.into_iter()
.map(|member| KafkaJoinGroupMember {
member_id: member.consumer_id,
group_instance_id: None,
metadata: Self::serialize_consumer_metadata(&member.subscribed_topics),
})
.collect();
let resp = KafkaJoinGroupResponse {
header: KafkaResponseHeader { correlation_id },
throttle_time_ms: 0,
error_code: Self::map_error_code(error_code),
generation_id,
protocol_type: "consumer".to_string(),
protocol_name: group_protocol.clone(),
leader: leader_id,
member_id: consumer_id,
members: kafka_members,
};
debug!("JoinGroup response: protocol_type='consumer', protocol_name='{}', generation={}",
group_protocol, generation_id);
Ok(KafkaResponse::JoinGroup(resp))
}
ConsumerGroupMessage::OffsetCommitResponse {
error_code,
topic_partition_errors,
} => {
let resp = Self::convert_offset_commit_response(
error_code,
topic_partition_errors,
correlation_id,
)?;
Ok(KafkaResponse::OffsetCommit(resp))
}
ConsumerGroupMessage::OffsetFetchResponse {
error_code,
offsets,
} => {
let resp =
Self::convert_offset_fetch_response(error_code, offsets, correlation_id)?;
Ok(KafkaResponse::OffsetFetch(resp))
}
ConsumerGroupMessage::ListGroupsResponse { .. } => {
Self::convert_list_groups_response(fluxmq_response, correlation_id)
}
ConsumerGroupMessage::DescribeGroupsResponse { .. } => {
Self::convert_describe_groups_response(fluxmq_response, correlation_id)
}
ConsumerGroupMessage::HeartbeatResponse { error_code } => {
let resp = KafkaHeartbeatResponse {
header: KafkaResponseHeader { correlation_id },
throttle_time_ms: 0,
error_code,
};
Ok(KafkaResponse::Heartbeat(resp))
}
ConsumerGroupMessage::LeaveGroupResponse { error_code } => {
let resp = KafkaLeaveGroupResponse {
header: KafkaResponseHeader { correlation_id },
throttle_time_ms: 0,
error_code,
};
Ok(KafkaResponse::LeaveGroup(resp))
}
ConsumerGroupMessage::SyncGroupResponse { error_code, .. } => {
let resp = KafkaSyncGroupResponse {
header: KafkaResponseHeader { correlation_id },
api_version,
throttle_time_ms: 0,
error_code,
protocol_type: "consumer".to_string(),
protocol_name: "range".to_string(),
assignment: bytes::Bytes::new(),
};
Ok(KafkaResponse::SyncGroup(resp))
}
_ => Err(AdapterError::InvalidFormat(
"Unexpected consumer group message type".to_string(),
)),
}
}
fn convert_produce_request(kafka_req: KafkaProduceRequest) -> Result<ProduceRequest> {
let topic_data = kafka_req
.topic_data
.into_iter()
.next()
.ok_or_else(|| AdapterError::MissingField("topic_data".to_string()))?;
let partition_data = topic_data
.partition_data
.into_iter()
.next()
.ok_or_else(|| AdapterError::MissingField("partition_data".to_string()))?;
let messages = if let Some(records_bytes) = partition_data.records {
Self::parse_kafka_record_batch(&records_bytes)?
} else {
vec![]
};
Ok(ProduceRequest {
correlation_id: kafka_req.header.correlation_id,
topic: topic_data.topic,
partition: partition_data.partition as u32,
messages,
acks: kafka_req.acks,
timeout_ms: kafka_req.timeout_ms as u32,
})
}
fn convert_fetch_request(kafka_req: KafkaFetchRequest) -> Result<FetchRequest> {
let topic_data = kafka_req
.topics
.into_iter()
.next()
.ok_or_else(|| AdapterError::MissingField("topics".to_string()))?;
let partition_data = topic_data
.partitions
.into_iter()
.next()
.ok_or_else(|| AdapterError::MissingField("partitions".to_string()))?;
Ok(FetchRequest {
correlation_id: kafka_req.header.correlation_id,
topic: topic_data.topic,
partition: partition_data.partition as u32,
offset: partition_data.fetch_offset as u64,
max_bytes: partition_data.max_bytes as u32,
timeout_ms: kafka_req.max_wait_ms as u32,
})
}
fn convert_list_offsets_request(
kafka_req: KafkaListOffsetsRequest,
) -> Result<ListOffsetsRequest> {
let topic_data = kafka_req
.topics
.into_iter()
.next()
.ok_or_else(|| AdapterError::MissingField("topics".to_string()))?;
let partition_data = topic_data
.partitions
.into_iter()
.next()
.ok_or_else(|| AdapterError::MissingField("partitions".to_string()))?;
Ok(ListOffsetsRequest {
correlation_id: kafka_req.header.correlation_id,
topic: topic_data.topic,
partition: partition_data.partition as u32,
timestamp: partition_data.timestamp,
})
}
fn convert_metadata_request(kafka_req: KafkaMetadataRequest) -> Result<MetadataRequest> {
Ok(MetadataRequest {
correlation_id: kafka_req.header.correlation_id,
topics: kafka_req.topics.unwrap_or_default(),
api_version: kafka_req.header.api_version, allow_auto_topic_creation: kafka_req.allow_auto_topic_creation,
})
}
fn convert_produce_response(
fluxmq_resp: ProduceResponse,
correlation_id: i32,
) -> Result<KafkaProduceResponse> {
let partition_response = KafkaPartitionProduceResponse {
partition: fluxmq_resp.partition as i32,
error_code: Self::map_error_code(fluxmq_resp.error_code),
base_offset: fluxmq_resp.base_offset as i64,
log_append_time_ms: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as i64,
log_start_offset: 0, };
let topic_response = KafkaTopicProduceResponse {
topic: fluxmq_resp.topic,
partition_responses: vec![partition_response],
};
Ok(KafkaProduceResponse {
header: KafkaResponseHeader { correlation_id },
responses: vec![topic_response],
throttle_time_ms: 0,
})
}
fn convert_fetch_response(
fluxmq_resp: FetchResponse,
correlation_id: i32,
) -> Result<KafkaFetchResponse> {
let records = if fluxmq_resp.messages.is_empty() {
tracing::debug!("No messages in fetch response, returning empty bytes");
Some(bytes::Bytes::new())
} else {
let record_batch = Self::encode_messages_as_record_batch(&fluxmq_resp.messages);
tracing::debug!(
"Generated record batch for {} messages: {} bytes",
fluxmq_resp.messages.len(),
record_batch.len()
);
Some(record_batch)
};
let partition_response = KafkaPartitionFetchResponse {
partition: fluxmq_resp.partition as i32,
error_code: Self::map_error_code(fluxmq_resp.error_code),
high_watermark: fluxmq_resp.messages.len() as i64, last_stable_offset: fluxmq_resp.messages.len() as i64,
log_start_offset: 0,
aborted_transactions: vec![],
preferred_read_replica: -1,
records,
};
let topic_response = KafkaTopicFetchResponse {
topic: fluxmq_resp.topic,
partitions: vec![partition_response],
};
Ok(KafkaFetchResponse {
header: KafkaResponseHeader { correlation_id },
throttle_time_ms: 0,
error_code: KafkaErrorCode::NoError.as_i16(),
session_id: 0,
responses: vec![topic_response],
})
}
fn convert_list_offsets_response(
fluxmq_resp: ListOffsetsResponse,
correlation_id: i32,
) -> Result<KafkaListOffsetsResponse> {
let partition_response = KafkaListOffsetsPartitionResponse {
partition: fluxmq_resp.partition as i32,
error_code: Self::map_error_code(fluxmq_resp.error_code),
timestamp: fluxmq_resp.timestamp,
offset: fluxmq_resp.offset,
leader_epoch: -1, };
let topic_response = KafkaListOffsetsTopicResponse {
topic: fluxmq_resp.topic,
partitions: vec![partition_response],
};
Ok(KafkaListOffsetsResponse {
header: KafkaResponseHeader { correlation_id },
throttle_time_ms: 0,
topics: vec![topic_response],
})
}
fn convert_metadata_response(
fluxmq_resp: MetadataResponse,
correlation_id: i32,
) -> Result<KafkaMetadataResponse> {
let brokers = fluxmq_resp
.brokers
.into_iter()
.map(|b| KafkaBrokerMetadata {
node_id: b.node_id,
host: b.host,
port: b.port,
rack: None,
})
.collect();
let topics = fluxmq_resp
.topics
.into_iter()
.map(|t| {
let partitions = t
.partitions
.into_iter()
.map(|p| KafkaPartitionMetadata {
error_code: KafkaErrorCode::NoError.as_i16(),
partition: p.id as i32,
leader: p.leader.unwrap_or(-1),
leader_epoch: 0,
replica_nodes: p.replicas,
isr_nodes: p.isr,
offline_replicas: vec![],
})
.collect();
KafkaTopicMetadata {
error_code: t.error_code,
topic: t.name,
is_internal: false,
partitions,
topic_authorized_operations: -2147483648, }
})
.collect();
Ok(KafkaMetadataResponse {
header: KafkaResponseHeader { correlation_id },
api_version: fluxmq_resp.api_version, throttle_time_ms: 0,
brokers,
cluster_id: Some("fluxmq-cluster".to_string()),
controller_id: 0,
topics,
cluster_authorized_operations: -2147483648,
})
}
fn convert_join_group_request(
kafka_req: KafkaJoinGroupRequest,
) -> Result<ConsumerGroupMessage> {
use crate::consumer::{ConsumerGroupMessage, GroupProtocol};
let mut group_protocols = Vec::new();
let mut subscribed_topics = Vec::new();
for protocol in kafka_req.protocols {
if let Ok(topics) = Self::parse_consumer_protocol_metadata(&protocol.metadata) {
subscribed_topics.extend(topics);
}
group_protocols.push(GroupProtocol {
name: protocol.name,
metadata: protocol.metadata.to_vec(),
});
}
subscribed_topics.sort();
subscribed_topics.dedup();
debug!(
"JoinGroup: consumer {} subscribing to topics: {:?}",
kafka_req.member_id, subscribed_topics
);
Ok(ConsumerGroupMessage::JoinGroup {
group_id: kafka_req.group_id,
consumer_id: kafka_req.member_id,
client_id: kafka_req.header.client_id.unwrap_or("unknown".to_string()),
client_host: "unknown".to_string(), session_timeout_ms: kafka_req.session_timeout_ms as u64,
rebalance_timeout_ms: kafka_req.rebalance_timeout_ms as u64,
protocol_type: kafka_req.protocol_type,
group_protocols,
})
}
fn convert_offset_commit_request(
kafka_req: KafkaOffsetCommitRequest,
) -> Result<ConsumerGroupMessage> {
use crate::consumer::{ConsumerGroupMessage, TopicPartitionOffset};
let offsets = kafka_req
.topics
.into_iter()
.flat_map(|topic_data| {
topic_data
.partitions
.into_iter()
.map(move |partition_data| TopicPartitionOffset {
topic: topic_data.topic.clone(),
partition: partition_data.partition as u32,
offset: partition_data.offset,
metadata: partition_data.metadata,
})
})
.collect();
Ok(ConsumerGroupMessage::OffsetCommit {
group_id: kafka_req.group_id,
consumer_id: kafka_req.consumer_id,
generation_id: kafka_req.generation_id,
retention_time_ms: kafka_req.retention_time_ms,
offsets,
})
}
fn convert_offset_fetch_request(
kafka_req: KafkaOffsetFetchRequest,
) -> Result<ConsumerGroupMessage> {
use crate::consumer::{ConsumerGroupMessage, TopicPartition};
let topic_partitions = kafka_req.topics.map(|topics| {
topics
.into_iter()
.flat_map(|topic_data| {
topic_data.partitions.into_iter().map(move |partition| {
TopicPartition::new(topic_data.topic.clone(), partition as u32)
})
})
.collect()
});
Ok(ConsumerGroupMessage::OffsetFetch {
group_id: kafka_req.group_id,
topic_partitions,
})
}
fn convert_heartbeat_request(kafka_req: KafkaHeartbeatRequest) -> Result<ConsumerGroupMessage> {
Ok(ConsumerGroupMessage::Heartbeat {
group_id: kafka_req.group_id,
consumer_id: kafka_req.consumer_id,
generation_id: kafka_req.generation_id,
})
}
fn convert_leave_group_request(
kafka_req: KafkaLeaveGroupRequest,
) -> Result<ConsumerGroupMessage> {
Ok(ConsumerGroupMessage::LeaveGroup {
group_id: kafka_req.group_id,
consumer_id: kafka_req.consumer_id,
})
}
fn convert_sync_group_request(
kafka_req: KafkaSyncGroupRequest,
) -> Result<ConsumerGroupMessage> {
Ok(ConsumerGroupMessage::SyncGroup {
group_id: kafka_req.group_id,
consumer_id: kafka_req.consumer_id,
generation_id: kafka_req.generation_id,
group_assignments: std::collections::HashMap::new(),
})
}
fn convert_offset_commit_response(
_error_code: i16,
topic_partition_errors: Vec<crate::consumer::TopicPartitionError>,
correlation_id: i32,
) -> Result<KafkaOffsetCommitResponse> {
let mut topic_responses = std::collections::HashMap::new();
for error in topic_partition_errors {
let topic_response = topic_responses
.entry(error.topic.clone())
.or_insert_with(|| KafkaOffsetCommitTopicResponse {
topic: error.topic.clone(),
partitions: Vec::new(),
});
topic_response
.partitions
.push(KafkaOffsetCommitPartitionResponse {
partition: error.partition as i32,
error_code: error.error_code,
});
}
Ok(KafkaOffsetCommitResponse {
header: KafkaResponseHeader { correlation_id },
throttle_time_ms: 0,
topics: topic_responses.into_values().collect(),
})
}
fn convert_offset_fetch_response(
global_error_code: i16,
offsets: Vec<crate::consumer::TopicPartitionOffsetResult>,
correlation_id: i32,
) -> Result<KafkaOffsetFetchResponse> {
let mut topic_responses = std::collections::HashMap::new();
for offset in offsets {
let topic_response = topic_responses
.entry(offset.topic.clone())
.or_insert_with(|| KafkaOffsetFetchTopicResponse {
topic: offset.topic.clone(),
partitions: Vec::new(),
});
topic_response
.partitions
.push(KafkaOffsetFetchPartitionResponse {
partition: offset.partition as i32,
offset: offset.offset,
leader_epoch: offset.leader_epoch,
metadata: offset.metadata,
error_code: offset.error_code,
});
}
Ok(KafkaOffsetFetchResponse {
header: KafkaResponseHeader { correlation_id },
throttle_time_ms: 0,
topics: topic_responses.into_values().collect(),
error_code: global_error_code,
})
}
fn convert_create_topics_response(
fluxmq_resp: crate::protocol::CreateTopicsResponse,
correlation_id: i32,
) -> Result<KafkaCreateTopicsResponse> {
let kafka_topics = fluxmq_resp
.topics
.into_iter()
.map(|topic| {
KafkaCreatableTopicResult {
name: topic.name,
topic_id: None, error_code: topic.error_code,
error_message: topic.error_message,
topic_config_error_code: None,
num_partitions: Some(topic.num_partitions),
replication_factor: Some(topic.replication_factor),
configs: Some(
topic
.configs
.into_iter()
.map(|config| KafkaCreatableTopicConfigs {
name: config.name,
value: config.value,
read_only: false,
config_source: 0,
is_sensitive: false,
})
.collect(),
),
}
})
.collect();
Ok(KafkaCreateTopicsResponse {
header: KafkaResponseHeader { correlation_id },
throttle_time_ms: fluxmq_resp.throttle_time_ms,
topics: kafka_topics,
})
}
fn convert_delete_topics_request(req: KafkaDeleteTopicsRequest) -> Result<DeleteTopicsRequest> {
Ok(DeleteTopicsRequest {
correlation_id: req.correlation_id,
topic_names: req.topic_names,
timeout_ms: req.timeout_ms,
})
}
fn convert_delete_topics_response(
resp: DeleteTopicsResponse,
correlation_id: i32,
) -> Result<KafkaDeleteTopicsResponse> {
let kafka_topics = resp
.responses
.into_iter()
.map(|result| super::messages::DeletableTopicResult {
name: result.name,
topic_id: result.topic_id,
error_code: result.error_code,
error_message: result.error_message,
})
.collect();
Ok(KafkaDeleteTopicsResponse {
correlation_id,
throttle_time_ms: resp.throttle_time_ms,
responses: kafka_topics,
})
}
fn convert_sasl_handshake_request(
req: KafkaSaslHandshakeRequest,
) -> Result<SaslHandshakeRequest> {
Ok(SaslHandshakeRequest {
correlation_id: req.correlation_id,
mechanism: req.mechanism,
})
}
fn convert_sasl_handshake_response(
resp: SaslHandshakeResponse,
correlation_id: i32,
) -> Result<KafkaSaslHandshakeResponse> {
Ok(KafkaSaslHandshakeResponse {
correlation_id,
error_code: resp.error_code,
mechanisms: resp.mechanisms,
})
}
fn convert_sasl_authenticate_request(
req: KafkaSaslAuthenticateRequest,
) -> Result<SaslAuthenticateRequest> {
Ok(SaslAuthenticateRequest {
correlation_id: req.correlation_id,
auth_bytes: req.auth_bytes,
})
}
fn convert_sasl_authenticate_response(
resp: SaslAuthenticateResponse,
correlation_id: i32,
) -> Result<KafkaSaslAuthenticateResponse> {
Ok(KafkaSaslAuthenticateResponse {
correlation_id,
error_code: resp.error_code,
error_message: resp.error_message,
auth_bytes: resp.auth_bytes,
session_lifetime_ms: resp.session_lifetime_ms,
})
}
fn parse_kafka_record_batch(records_bytes: &Bytes) -> Result<Vec<Message>> {
let message = Message {
key: None,
value: records_bytes.clone(),
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
headers: HashMap::new(),
};
Ok(vec![message])
}
fn convert_create_topics_request(
kafka_req: KafkaCreateTopicsRequest,
) -> Result<crate::protocol::CreateTopicsRequest> {
use crate::protocol::{CreatableTopic, CreateTopicsRequest, CreateableTopicConfig};
let fluxmq_topics: Vec<CreatableTopic> = kafka_req
.topics
.into_iter()
.map(|kafka_topic| CreatableTopic {
name: kafka_topic.name,
num_partitions: kafka_topic.num_partitions,
replication_factor: kafka_topic.replication_factor,
assignments: vec![], configs: kafka_topic
.configs
.unwrap_or_default()
.into_iter()
.map(|config| CreateableTopicConfig {
name: config.name,
value: config.value,
})
.collect(),
})
.collect();
Ok(CreateTopicsRequest {
correlation_id: kafka_req.header.correlation_id,
topics: fluxmq_topics,
timeout_ms: kafka_req.timeout_ms,
validate_only: kafka_req.validate_only,
})
}
fn map_error_code(fluxmq_error_code: i16) -> i16 {
match fluxmq_error_code {
0 => KafkaErrorCode::NoError.as_i16(),
1 => KafkaErrorCode::OffsetOutOfRange.as_i16(),
2 => KafkaErrorCode::CorruptMessage.as_i16(),
3 => KafkaErrorCode::UnknownTopicOrPartition.as_i16(),
4 => KafkaErrorCode::LeaderNotAvailable.as_i16(),
5 => KafkaErrorCode::NotLeaderForPartition.as_i16(),
_ => KafkaErrorCode::Unknown.as_i16(),
}
}
fn convert_list_groups_request(
_kafka_req: KafkaListGroupsRequest,
) -> Result<ConsumerGroupMessage> {
Ok(ConsumerGroupMessage::ListGroups)
}
fn convert_describe_groups_request(
kafka_req: KafkaDescribeGroupsRequest,
) -> Result<ConsumerGroupMessage> {
let group_ids = kafka_req.groups;
Ok(ConsumerGroupMessage::DescribeGroups { group_ids })
}
pub fn convert_list_groups_response(
response: ConsumerGroupMessage,
correlation_id: i32,
) -> Result<KafkaResponse> {
match response {
ConsumerGroupMessage::ListGroupsResponse { error_code, groups } => {
let kafka_groups = groups
.into_iter()
.map(|group| KafkaListedGroup {
group_id: group.group_id,
protocol_type: group.protocol_type,
group_state: "Stable".to_string(), })
.collect();
Ok(KafkaResponse::ListGroups(KafkaListGroupsResponse {
header: KafkaResponseHeader { correlation_id },
throttle_time_ms: 0,
error_code: Self::map_error_code(error_code),
groups: kafka_groups,
}))
}
_ => Err(AdapterError::InvalidMessage(
"Expected ListGroups response".to_string(),
)),
}
}
pub fn convert_describe_groups_response(
response: ConsumerGroupMessage,
correlation_id: i32,
) -> Result<KafkaResponse> {
match response {
ConsumerGroupMessage::DescribeGroupsResponse { groups } => {
let kafka_groups = groups
.into_iter()
.map(|group| {
let kafka_members = group
.members
.into_iter()
.map(|member| KafkaDescribedGroupMember {
member_id: member.consumer_id,
group_instance_id: None,
client_id: member.client_id,
client_host: member.client_host,
member_metadata: Bytes::from(member.member_metadata),
member_assignment: Bytes::from(member.member_assignment),
})
.collect();
KafkaDescribedGroup {
error_code: Self::map_error_code(group.error_code),
group_id: group.group_id,
group_state: format!("{:?}", group.state),
protocol_type: group.protocol_type,
protocol_data: group.protocol_data,
members: kafka_members,
authorized_operations: 0,
}
})
.collect();
Ok(KafkaResponse::DescribeGroups(KafkaDescribeGroupsResponse {
header: KafkaResponseHeader { correlation_id },
throttle_time_ms: 0,
groups: kafka_groups,
}))
}
_ => Err(AdapterError::InvalidMessage(
"Expected DescribeGroups response".to_string(),
)),
}
}
fn parse_consumer_protocol_metadata(metadata: &bytes::Bytes) -> Result<Vec<String>> {
if metadata.len() < 6 {
return Ok(Vec::new());
}
let mut cursor = std::io::Cursor::new(metadata);
let mut version_bytes = [0u8; 2];
if cursor.read_exact(&mut version_bytes).is_err() {
return Ok(Vec::new());
}
let _version = u16::from_be_bytes(version_bytes);
let mut count_bytes = [0u8; 4];
if cursor.read_exact(&mut count_bytes).is_err() {
return Ok(Vec::new());
}
let topic_count = i32::from_be_bytes(count_bytes);
if topic_count < 0 || topic_count > 1000 {
return Ok(Vec::new());
}
let mut topics = Vec::new();
for _ in 0..topic_count {
let mut len_bytes = [0u8; 2];
if cursor.read_exact(&mut len_bytes).is_err() {
break;
}
let topic_len = u16::from_be_bytes(len_bytes) as usize;
if topic_len > 249 {
break;
}
let mut topic_bytes = vec![0u8; topic_len];
if cursor.read_exact(&mut topic_bytes).is_err() {
break;
}
if let Ok(topic_name) = String::from_utf8(topic_bytes) {
topics.push(topic_name);
}
}
Ok(topics)
}
fn serialize_consumer_metadata(subscribed_topics: &[String]) -> Bytes {
let mut metadata = Vec::new();
metadata.extend_from_slice(&0u16.to_be_bytes());
metadata.extend_from_slice(&(subscribed_topics.len() as i32).to_be_bytes());
for topic in subscribed_topics {
metadata.extend_from_slice(&(topic.len() as u16).to_be_bytes());
metadata.extend_from_slice(topic.as_bytes());
}
metadata.extend_from_slice(&0i32.to_be_bytes());
Bytes::from(metadata)
}
fn convert_multi_fetch_request(kafka_req: KafkaFetchRequest) -> Result<MultiFetchRequest> {
use crate::protocol::{PartitionFetchRequest, TopicFetchRequest};
let topics = kafka_req
.topics
.into_iter()
.map(|topic_data| {
let partitions = topic_data
.partitions
.into_iter()
.map(|partition_data| PartitionFetchRequest {
partition: partition_data.partition as u32,
offset: partition_data.fetch_offset as u64,
max_bytes: partition_data.max_bytes as u32,
})
.collect();
TopicFetchRequest {
topic: topic_data.topic,
partitions,
}
})
.collect();
Ok(MultiFetchRequest {
correlation_id: kafka_req.header.correlation_id,
topics,
max_wait_ms: kafka_req.max_wait_ms as u32,
min_bytes: kafka_req.min_bytes as u32,
max_bytes: kafka_req.max_bytes as u32,
})
}
fn convert_multi_fetch_response(
fluxmq_resp: MultiFetchResponse,
correlation_id: i32,
) -> Result<KafkaFetchResponse> {
let topic_responses = fluxmq_resp
.topics
.into_iter()
.map(|topic_resp| {
let partition_responses = topic_resp
.partitions
.into_iter()
.map(|partition_resp| {
let records =
Self::encode_messages_as_record_batch(&partition_resp.messages);
KafkaPartitionFetchResponse {
partition: partition_resp.partition as i32,
error_code: partition_resp.error_code,
high_watermark: partition_resp
.messages
.last()
.map(|(offset, _)| *offset as i64 + 1)
.unwrap_or(0),
last_stable_offset: -1,
log_start_offset: 0,
aborted_transactions: vec![],
preferred_read_replica: -1,
records: Some(records),
}
})
.collect();
KafkaTopicFetchResponse {
topic: topic_resp.topic,
partitions: partition_responses,
}
})
.collect();
Ok(KafkaFetchResponse {
header: KafkaResponseHeader { correlation_id },
throttle_time_ms: 0,
error_code: Self::map_error_code(fluxmq_resp.error_code),
session_id: 0,
responses: topic_responses,
})
}
fn encode_messages_as_record_batch(messages: &[(u64, Message)]) -> Bytes {
if messages.is_empty() {
return Bytes::new();
}
let mut records = Vec::new();
for (offset, message) in messages {
let value_bytes = &message.value;
records.extend_from_slice(&(*offset as i64).to_be_bytes());
let message_size = 4 + 1 + 1 + 4 + 4 + value_bytes.len();
records.extend_from_slice(&(message_size as i32).to_be_bytes());
let mut crc_hasher = Hasher::new();
crc_hasher.update(&[0]); crc_hasher.update(&[0]); crc_hasher.update(&(-1i32).to_be_bytes()); crc_hasher.update(&(value_bytes.len() as i32).to_be_bytes()); crc_hasher.update(value_bytes); let crc = crc_hasher.finalize();
records.extend_from_slice(&crc.to_be_bytes());
records.push(0);
records.push(0);
records.extend_from_slice(&(-1i32).to_be_bytes());
records.extend_from_slice(&(value_bytes.len() as i32).to_be_bytes());
records.extend_from_slice(value_bytes);
}
tracing::debug!(
"Created simple legacy records with {} bytes for {} messages",
records.len(),
messages.len()
);
Bytes::from(records)
}
fn convert_describe_configs_request(
kafka_req: KafkaDescribeConfigsRequest,
) -> Result<crate::protocol::DescribeConfigsRequest> {
use crate::protocol::{ConfigResource, DescribeConfigsRequest};
let fluxmq_resources: Vec<ConfigResource> = kafka_req
.resources
.into_iter()
.map(|kafka_resource| ConfigResource {
resource_type: kafka_resource.resource_type,
resource_name: kafka_resource.resource_name,
configuration_keys: kafka_resource.configuration_keys.unwrap_or_default(),
})
.collect();
Ok(DescribeConfigsRequest {
correlation_id: kafka_req.correlation_id,
resources: fluxmq_resources,
include_synonyms: kafka_req.include_synonyms,
include_documentation: kafka_req.include_documentation,
})
}
fn convert_describe_configs_response(
fluxmq_resp: crate::protocol::DescribeConfigsResponse,
correlation_id: i32,
) -> Result<KafkaDescribeConfigsResponse> {
let kafka_results: Vec<KafkaConfigResourceResult> = fluxmq_resp
.results
.into_iter()
.map(|fluxmq_result| {
let kafka_configs: Vec<KafkaConfigEntry> = fluxmq_result
.configs
.into_iter()
.map(|fluxmq_config| KafkaConfigEntry {
name: fluxmq_config.name,
value: Some(fluxmq_config.value),
read_only: fluxmq_config.read_only,
is_default: fluxmq_config.is_default,
config_source: fluxmq_config.config_source,
is_sensitive: fluxmq_config.is_sensitive,
synonyms: fluxmq_config
.synonyms
.into_iter()
.map(|synonym| KafkaConfigSynonym {
name: synonym.name,
value: synonym.value,
source: synonym.source,
})
.collect(),
config_type: fluxmq_config.config_type,
documentation: fluxmq_config.documentation,
})
.collect();
KafkaConfigResourceResult {
error_code: fluxmq_result.error_code,
error_message: fluxmq_result.error_message,
resource_type: fluxmq_result.resource_type,
resource_name: fluxmq_result.resource_name,
configs: kafka_configs,
}
})
.collect();
Ok(KafkaDescribeConfigsResponse {
correlation_id,
throttle_time_ms: fluxmq_resp.throttle_time_ms,
results: kafka_results,
})
}
fn convert_alter_configs_request(
kafka_req: KafkaAlterConfigsRequest,
) -> Result<crate::protocol::AlterConfigsRequest> {
use crate::protocol::{AlterConfigsRequest, AlterConfigsResource, AlterableConfig};
let fluxmq_resources: Vec<AlterConfigsResource> = kafka_req
.resources
.into_iter()
.map(|kafka_resource| {
let fluxmq_configs: Vec<AlterableConfig> = kafka_resource
.configs
.into_iter()
.map(|kafka_config| AlterableConfig {
name: kafka_config.name,
value: kafka_config.value,
})
.collect();
AlterConfigsResource {
resource_type: kafka_resource.resource_type,
resource_name: kafka_resource.resource_name,
configs: fluxmq_configs,
}
})
.collect();
Ok(AlterConfigsRequest {
correlation_id: kafka_req.correlation_id,
resources: fluxmq_resources,
validate_only: kafka_req.validate_only,
})
}
fn convert_alter_configs_response(
fluxmq_resp: crate::protocol::AlterConfigsResponse,
correlation_id: i32,
) -> Result<KafkaAlterConfigsResponse> {
let kafka_responses: Vec<KafkaAlterConfigsResourceResponse> = fluxmq_resp
.responses
.into_iter()
.map(|fluxmq_response| KafkaAlterConfigsResourceResponse {
error_code: fluxmq_response.error_code,
error_message: fluxmq_response.error_message,
resource_type: fluxmq_response.resource_type,
resource_name: fluxmq_response.resource_name,
})
.collect();
Ok(KafkaAlterConfigsResponse {
correlation_id,
throttle_time_ms: fluxmq_resp.throttle_time_ms,
responses: kafka_responses,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::protocol::{BrokerMetadata, PartitionMetadata, TopicMetadata};
#[test]
fn test_produce_request_conversion() {
let kafka_req = KafkaProduceRequest {
header: KafkaRequestHeader {
api_key: 0,
api_version: 1,
correlation_id: 123,
client_id: Some("test-client".to_string()),
},
transactional_id: None,
acks: 1,
timeout_ms: 1000,
topic_data: vec![KafkaTopicProduceData {
topic: "test-topic".to_string(),
partition_data: vec![KafkaPartitionProduceData {
partition: 0,
records: Some(Bytes::from("test-data")),
}],
}],
};
let fluxmq_req = ProtocolAdapter::convert_produce_request(kafka_req).unwrap();
assert_eq!(fluxmq_req.correlation_id, 123);
assert_eq!(fluxmq_req.topic, "test-topic");
assert_eq!(fluxmq_req.partition, 0);
assert_eq!(fluxmq_req.acks, 1);
assert_eq!(fluxmq_req.timeout_ms, 1000);
assert!(!fluxmq_req.messages.is_empty());
}
#[test]
fn test_metadata_response_conversion() {
let fluxmq_resp = MetadataResponse {
correlation_id: 456,
brokers: vec![BrokerMetadata {
node_id: 0,
host: "localhost".to_string(),
port: 9092,
}],
topics: vec![TopicMetadata {
name: "test-topic".to_string(),
error_code: 0,
partitions: vec![PartitionMetadata {
id: 0,
leader: Some(0),
replicas: vec![0],
isr: vec![0],
leader_epoch: 0,
}],
}],
api_version: 7,
};
let kafka_resp = ProtocolAdapter::convert_metadata_response(fluxmq_resp, 456).unwrap();
assert_eq!(kafka_resp.header.correlation_id, 456);
assert_eq!(kafka_resp.brokers.len(), 1);
assert_eq!(kafka_resp.topics.len(), 1);
assert_eq!(kafka_resp.topics[0].topic, "test-topic");
assert_eq!(kafka_resp.topics[0].partitions.len(), 1);
}
#[test]
fn test_error_code_mapping() {
assert_eq!(
ProtocolAdapter::map_error_code(0),
KafkaErrorCode::NoError.as_i16()
);
assert_eq!(
ProtocolAdapter::map_error_code(1),
KafkaErrorCode::OffsetOutOfRange.as_i16()
);
assert_eq!(
ProtocolAdapter::map_error_code(3),
KafkaErrorCode::UnknownTopicOrPartition.as_i16()
);
assert_eq!(
ProtocolAdapter::map_error_code(999),
KafkaErrorCode::Unknown.as_i16()
);
}
}