pub mod commit_offset;
pub mod create_topics;
pub mod delete_topics;
pub mod fetch;
pub mod find_coordinator;
pub mod heartbeat;
pub mod join_group;
pub mod leave_group;
pub mod list_offsets;
pub mod metadata;
pub mod offset_fetch;
pub mod produce;
pub mod sasl_authenticate;
pub mod sasl_handshake;
pub mod sync_group;
use bytes::BufMut;
use nom::{number::complete::be_i32, IResult};
use nombytes::NomBytes;
pub use self::{
commit_offset::{request::OffsetCommitRequest, response::OffsetCommitResponse},
create_topics::{request::CreateTopicsRequest, response::CreateTopicsResponse},
delete_topics::{request::DeleteTopicsRequest, response::DeleteTopicsResponse},
fetch::{request::FetchRequest, response::FetchResponse},
find_coordinator::{request::FindCoordinatorRequest, response::FindCoordinatorResponse},
heartbeat::{request::HeartbeatRequest, response::HeartbeatResponse},
join_group::{request::JoinGroupRequest, response::JoinGroupResponse},
leave_group::{request::LeaveGroupRequest, response::LeaveGroupResponse},
list_offsets::{request::ListOffsetsRequest, response::ListOffsetsResponse},
metadata::{request::MetadataRequest, response::MetadataResponse},
offset_fetch::{request::OffsetFetchRequest, response::OffsetFetchResponse},
produce::{
request::{Header, ProduceRequest},
response::ProduceResponse,
},
sasl_authenticate::{request::SaslAuthenticationRequest, response::SaslAuthenticationResponse},
sasl_handshake::{request::SaslHandshakeRequest, response::SaslHandshakeResponse},
sync_group::{
request::{Assignment, MemberAssignment, PartitionAssignment, SyncGroupRequest},
response::SyncGroupResponse,
},
};
use crate::{encode::ToByte, error::Result};
#[derive(Debug, Clone)]
pub struct HeaderRequest<'a> {
pub api_key: i16,
pub api_version: i16,
pub correlation_id: i32,
pub client_id: &'a str,
}
impl<'a> HeaderRequest<'a> {
pub fn new(
api_key: i16,
api_version: i16,
correlation_id: i32,
client_id: &'a str,
) -> HeaderRequest<'a> {
HeaderRequest {
api_key,
api_version,
correlation_id,
client_id,
}
}
}
impl ToByte for HeaderRequest<'_> {
fn encode<W: BufMut>(&self, buffer: &mut W) -> Result<()> {
self.api_key.encode(buffer)?;
self.api_version.encode(buffer)?;
self.correlation_id.encode(buffer)?;
self.client_id.encode(buffer)?;
Ok(())
}
}
#[derive(Debug, Default, PartialEq)]
pub struct HeaderResponse {
pub correlation_id: i32,
}
pub fn parse_header_response(s: NomBytes) -> IResult<NomBytes, HeaderResponse> {
let (s, correlation_id) = be_i32(s)?;
Ok((s, HeaderResponse { correlation_id }))
}