use bytes::Bytes;
use nom::{number::complete::be_i32, IResult};
use nombytes::NomBytes;
use crate::{
error::{Error, KafkaCode, Result},
parser::{self, parse_array},
protocol::{parse_header_response, HeaderResponse},
};
#[derive(Debug, PartialEq)]
pub struct DeleteTopicsResponse {
pub header: HeaderResponse,
pub throttle_time_ms: i32,
pub topics: Vec<Topic>,
}
#[derive(Debug, PartialEq)]
pub struct Topic {
pub name: Bytes,
pub error_code: KafkaCode,
}
impl TryFrom<Bytes> for DeleteTopicsResponse {
type Error = Error;
fn try_from(s: Bytes) -> Result<Self> {
tracing::trace!("Parsing DeleteTopicsResponse {:?}", s);
let (_, delete_topics) =
parse_delete_topics_response(NomBytes::new(s.clone())).map_err(|err| {
tracing::error!("ERROR: Failed parsing DeleteTopicsResponse {:?}", err);
tracing::error!("ERROR: DeleteTopicsResponse Bytes {:?}", s);
Error::ParsingError(s)
})?;
tracing::trace!("Parsed DeleteTopicsResponse {:?}", delete_topics);
Ok(delete_topics)
}
}
impl DeleteTopicsResponse {
pub fn is_error(&self) -> Result<()> {
self.topics
.iter()
.map(|topic| topic.is_error())
.collect::<Result<Vec<()>>>()?;
Ok(())
}
}
impl Topic {
pub fn is_error(&self) -> Result<()> {
match self.error_code {
KafkaCode::None => Ok(()),
_ => Err(Error::KafkaError(self.error_code)),
}
}
}
pub fn parse_delete_topics_response(s: NomBytes) -> IResult<NomBytes, DeleteTopicsResponse> {
let (s, header) = parse_header_response(s)?;
let (s, throttle_time_ms) = be_i32(s)?;
let (s, topics) = parse_array(parse_topic)(s)?;
Ok((
s,
DeleteTopicsResponse {
header,
throttle_time_ms,
topics,
},
))
}
fn parse_topic(s: NomBytes) -> IResult<NomBytes, Topic> {
let (s, name) = parser::parse_string(s)?;
let (s, error_code) = parser::parse_kafka_code(s)?;
Ok((s, Topic { name, error_code }))
}