use bytes::Bytes;
use nom::{number::complete::be_i32, IResult};
use nombytes::NomBytes;
use crate::{
error::{Error, KafkaCode, Result},
parser,
protocol::{parse_header_response, HeaderResponse},
};
#[derive(Debug, PartialEq)]
pub struct FindCoordinatorResponse {
pub header: HeaderResponse,
pub error_code: KafkaCode,
pub node_id: i32,
pub host: Bytes,
pub port: i32,
}
impl TryFrom<Bytes> for FindCoordinatorResponse {
type Error = Error;
fn try_from(s: Bytes) -> Result<Self> {
tracing::trace!("Parsing FindCoordinatorResponse {:?}", s);
let (_, find_coordinator) = parse_find_coordinator_response(NomBytes::new(s.clone()))
.map_err(|err| {
tracing::error!("ERROR: Failed parsing FindCoordinatorResponse {:?}", err);
tracing::error!("ERROR: FindCoordinatorResponse Bytes {:?}", s);
Error::ParsingError(s)
})?;
tracing::trace!("Parsed FindCoordinatorResponse {:?}", find_coordinator);
Ok(find_coordinator)
}
}
pub fn parse_find_coordinator_response(s: NomBytes) -> IResult<NomBytes, FindCoordinatorResponse> {
let (s, header) = parse_header_response(s)?;
let (s, error_code) = parser::parse_kafka_code(s)?;
let (s, node_id) = be_i32(s)?;
let (s, host) = parser::parse_string(s)?;
let (s, port) = be_i32(s)?;
Ok((
s,
FindCoordinatorResponse {
header,
error_code,
node_id,
host,
port,
},
))
}