use std::{io, io::Cursor};
use bytes::{BufMut, BytesMut};
use crate::{
apikey::ApiMessageType, codec::*, request_header::RequestHeader,
response_header::ResponseHeader,
};
pub mod api_versions_request;
pub mod api_versions_response;
pub mod create_topic_request;
pub mod create_topic_response;
pub mod fetch_request;
pub mod fetch_response;
pub mod find_coordinator_request;
pub mod find_coordinator_response;
pub mod init_producer_id_request;
pub mod init_producer_id_response;
pub mod join_group_request;
pub mod join_group_response;
pub mod metadata_request;
pub mod metadata_response;
pub mod offset_fetch_request;
pub mod offset_fetch_response;
pub mod produce_request;
pub mod produce_response;
pub mod request_header;
pub mod response_header;
pub mod sync_group_request;
pub mod sync_group_response;
#[derive(Debug)]
pub enum Request {
ApiVersionsRequest(api_versions_request::ApiVersionsRequest),
CreateTopicRequest(create_topic_request::CreateTopicsRequest),
FetchRequest(fetch_request::FetchRequest),
FindCoordinatorRequest(find_coordinator_request::FindCoordinatorRequest),
InitProducerIdRequest(init_producer_id_request::InitProducerIdRequest),
JoinGroupRequest(join_group_request::JoinGroupRequest),
MetadataRequest(metadata_request::MetadataRequest),
OffsetFetchRequest(offset_fetch_request::OffsetFetchRequest),
ProduceRequest(produce_request::ProduceRequest),
SyncGroupRequest(sync_group_request::SyncGroupRequest),
}
impl Request {
pub fn decode(buf: &mut BytesMut) -> io::Result<(RequestHeader, Request)> {
let header_version = {
let mut cursor = Cursor::new(&buf[..]);
let api_key = Int16.decode(&mut cursor)?;
let api_version = Int16.decode(&mut cursor)?;
ApiMessageType::try_from(api_key)?.request_header_version(api_version)
};
let header = RequestHeader::decode(buf, header_version)?;
let api_type = ApiMessageType::try_from(header.request_api_key)?;
let api_version = header.request_api_version;
let request = match api_type {
ApiMessageType::API_VERSIONS => {
api_versions_request::ApiVersionsRequest::decode(buf, api_version)
.map(Request::ApiVersionsRequest)
}
ApiMessageType::CREATE_TOPICS => {
create_topic_request::CreateTopicsRequest::decode(buf, api_version)
.map(Request::CreateTopicRequest)
}
ApiMessageType::FETCH => {
fetch_request::FetchRequest::decode(buf, api_version).map(Request::FetchRequest)
}
ApiMessageType::FIND_COORDINATOR => {
find_coordinator_request::FindCoordinatorRequest::decode(buf, api_version)
.map(Request::FindCoordinatorRequest)
}
ApiMessageType::INIT_PRODUCER_ID => {
init_producer_id_request::InitProducerIdRequest::decode(buf, api_version)
.map(Request::InitProducerIdRequest)
}
ApiMessageType::JOIN_GROUP => {
join_group_request::JoinGroupRequest::decode(buf, api_version)
.map(Request::JoinGroupRequest)
}
ApiMessageType::METADATA => metadata_request::MetadataRequest::decode(buf, api_version)
.map(Request::MetadataRequest),
ApiMessageType::OFFSET_FETCH => {
offset_fetch_request::OffsetFetchRequest::decode(buf, api_version)
.map(Request::OffsetFetchRequest)
}
ApiMessageType::PRODUCE => produce_request::ProduceRequest::decode(buf, api_version)
.map(Request::ProduceRequest),
ApiMessageType::SYNC_GROUP => {
sync_group_request::SyncGroupRequest::decode(buf, api_version)
.map(Request::SyncGroupRequest)
}
_ => unimplemented!("{}", api_type.api_key),
}?;
Ok((header, request))
}
}
#[derive(Debug)]
pub enum Response {
ApiVersionsResponse(api_versions_response::ApiVersionsResponse),
CreateTopicsResponse(create_topic_response::CreateTopicsResponse),
FindCoordinatorResponse(find_coordinator_response::FindCoordinatorResponse),
FetchResponse(fetch_response::FetchResponse),
InitProducerIdResponse(init_producer_id_response::InitProducerIdResponse),
JoinGroupResponse(join_group_response::JoinGroupResponse),
MetadataResponse(metadata_response::MetadataResponse),
OffsetFetchResponse(offset_fetch_response::OffsetFetchResponse),
ProduceResponse(produce_response::ProduceResponse),
SyncGroupResponse(sync_group_response::SyncGroupResponse),
}
impl Response {
pub fn encode_alloc(&self, header: RequestHeader) -> io::Result<bytes::Bytes> {
let api_type = ApiMessageType::try_from(header.request_api_key)?;
let api_version = header.request_api_version;
let correlation_id = header.correlation_id;
let mut buf = vec![];
let response_header_version = api_type.response_header_version(api_version);
let response_header = ResponseHeader {
correlation_id,
unknown_tagged_fields: vec![],
};
response_header.encode(&mut buf, response_header_version)?;
match self {
Response::ApiVersionsResponse(resp) => resp.encode(&mut buf, api_version)?,
Response::CreateTopicsResponse(resp) => resp.encode(&mut buf, api_version)?,
Response::FindCoordinatorResponse(resp) => resp.encode(&mut buf, api_version)?,
Response::FetchResponse(resp) => resp.encode(&mut buf, api_version)?,
Response::InitProducerIdResponse(resp) => resp.encode(&mut buf, api_version)?,
Response::JoinGroupResponse(resp) => resp.encode(&mut buf, api_version)?,
Response::MetadataResponse(resp) => resp.encode(&mut buf, api_version)?,
Response::OffsetFetchResponse(resp) => resp.encode(&mut buf, api_version)?,
Response::ProduceResponse(resp) => resp.encode(&mut buf, api_version)?,
Response::SyncGroupResponse(resp) => resp.encode(&mut buf, api_version)?,
}
let mut bs = BytesMut::new();
Int32.encode(&mut bs, buf.len() as i32)?;
bs.put_slice(buf.as_slice());
Ok(bs.freeze())
}
}