use bytes::Bytes;
use nom::{number::complete::be_i32, IResult};
use nombytes::NomBytes;
use crate::{
error::{Error, KafkaCode, Result},
parser::{self, parse_array},
protocol::{parse_header_response, HeaderResponse},
};
#[derive(Debug, PartialEq)]
pub struct JoinGroupResponse {
pub header: HeaderResponse,
pub throttle_time_ms: i32,
pub error_code: KafkaCode,
pub generation_id: i32,
pub protocol_name: Bytes,
pub leader: Bytes,
pub member_id: Bytes,
pub members: Vec<Member>,
}
#[derive(Debug, PartialEq)]
pub struct Member {
pub member_id: Bytes,
pub metadata: Bytes,
}
impl TryFrom<Bytes> for JoinGroupResponse {
type Error = Error;
fn try_from(s: Bytes) -> Result<Self> {
tracing::trace!("Parsing JoinGroupResponse {:?}", s);
let (_, join_group) =
parse_join_group_response(NomBytes::new(s.clone())).map_err(|err| {
tracing::error!("ERROR: Failed parsing JoinGroupResponse {:?}", err);
tracing::error!("ERROR: JoinGroupResponse Bytes {:?}", s);
Error::ParsingError(s)
})?;
tracing::trace!("Parsed JoinGroupResponse {:?}", join_group);
Ok(join_group)
}
}
pub fn parse_join_group_response(s: NomBytes) -> IResult<NomBytes, JoinGroupResponse> {
let (s, header) = parse_header_response(s)?;
let (s, throttle_time_ms) = be_i32(s)?;
let (s, error_code) = parser::parse_kafka_code(s)?;
let (s, generation_id) = be_i32(s)?;
let (s, protocol_name) = parser::parse_string(s)?;
let (s, leader) = parser::parse_string(s)?;
let (s, member_id) = parser::parse_string(s)?;
let (s, members) = parse_array(parse_member)(s)?;
Ok((
s,
JoinGroupResponse {
header,
throttle_time_ms,
error_code,
generation_id,
protocol_name,
leader,
member_id,
members,
},
))
}
fn parse_member(s: NomBytes) -> IResult<NomBytes, Member> {
let (s, member_id) = parser::parse_string(s)?;
let (s, metadata) = parser::parse_bytes(s)?;
Ok((
s,
Member {
member_id,
metadata,
},
))
}