use bytes::Bytes;
use nom::{
number::complete::{be_i16, be_i32},
IResult,
};
use nombytes::NomBytes;
use crate::{
error::{Error, KafkaCode, Result},
parser, protocol,
};
#[derive(Debug, PartialEq)]
pub struct SyncGroupResponse {
pub header: protocol::HeaderResponse,
pub throttle_time_ms: i32,
pub error_code: KafkaCode,
pub assignment: MemberAssignment,
}
#[derive(Debug, Clone, PartialEq)]
pub struct MemberAssignment {
pub version: i16,
pub partition_assignments: Vec<PartitionAssignment>,
pub user_data: Option<Bytes>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct PartitionAssignment {
pub topic_name: Bytes,
pub partitions: Vec<i32>,
}
impl TryFrom<Bytes> for SyncGroupResponse {
type Error = Error;
fn try_from(s: Bytes) -> Result<Self> {
tracing::trace!("Parsing SyncGroupResponse {:?}", s);
let (_, sync_group) =
parse_sync_group_response(NomBytes::new(s.clone())).map_err(|err| {
tracing::error!("ERROR: Failed parsing SyncGroupResponse {:?}", err);
tracing::error!("ERROR: SyncGroupResponse Bytes {:?}", s);
Error::ParsingError(s)
})?;
tracing::trace!("Parsed SyncGroupResponse {:?}", sync_group);
Ok(sync_group)
}
}
pub fn parse_sync_group_response(s: NomBytes) -> IResult<NomBytes, SyncGroupResponse> {
let (s, header) = protocol::parse_header_response(s)?;
let (s, throttle_time_ms) = be_i32(s)?;
let (s, error_code) = parser::parse_kafka_code(s)?;
let (s, _metadata_length) = be_i32(s)?;
let (s, assignment) = parse_member_assignment(s)?;
Ok((
s,
SyncGroupResponse {
header,
throttle_time_ms,
error_code,
assignment,
},
))
}
fn parse_member_assignment(s: NomBytes) -> IResult<NomBytes, MemberAssignment> {
let (s, version) = be_i16(s)?;
let (s, partition_assignments) = parser::parse_array(parse_partition_assignment)(s)?;
let (s, user_data) = parser::parse_nullable_bytes(s)?;
Ok((
s,
MemberAssignment {
version,
partition_assignments,
user_data,
},
))
}
fn parse_partition_assignment(s: NomBytes) -> IResult<NomBytes, PartitionAssignment> {
let (s, topic_name) = parser::parse_string(s)?;
let (s, partitions) = parser::parse_array(be_i32)(s)?;
Ok((
s,
PartitionAssignment {
topic_name,
partitions,
},
))
}