use bytes::Bytes;
use nom::{number::complete::be_i32, IResult};
use nombytes::NomBytes;
use crate::{
error::{Error, KafkaCode, Result},
parser::{self, parse_array},
protocol::{parse_header_response, HeaderResponse},
};
#[derive(Debug, PartialEq)]
pub struct CreateTopicsResponse {
pub header: HeaderResponse,
pub throttle_time_ms: i32,
pub topics: Vec<Topic>,
}
#[derive(Debug, PartialEq)]
pub struct Topic {
pub name: Bytes,
pub error_code: KafkaCode,
pub error_message: Option<Bytes>,
}
impl TryFrom<Bytes> for CreateTopicsResponse {
type Error = Error;
fn try_from(s: Bytes) -> Result<Self> {
tracing::trace!("Parsing CreateTopicsResponse {:?}", s);
let (_, create_topics) =
parse_create_topics_response(NomBytes::new(s.clone())).map_err(|err| {
tracing::error!("ERROR: Failed parsing CreateTopicsResponse {:?}", err);
tracing::error!("ERROR: CreateTopicsResponse Bytes {:?}", s);
Error::ParsingError(s)
})?;
tracing::trace!("Parsed CreateTopicsResponse {:?}", create_topics);
Ok(create_topics)
}
}
impl CreateTopicsResponse {
pub fn is_error(&self) -> Result<()> {
self.topics
.iter()
.map(|topic| topic.is_error())
.collect::<Result<Vec<()>>>()?;
Ok(())
}
}
impl Topic {
pub fn is_error(&self) -> Result<()> {
match self.error_code {
KafkaCode::None => Ok(()),
_ => {
tracing::error!("Kafka error: {:?}", self.error_message);
Err(Error::KafkaError(self.error_code))
}
}
}
}
pub fn parse_create_topics_response(s: NomBytes) -> IResult<NomBytes, CreateTopicsResponse> {
let (s, header) = parse_header_response(s)?;
let (s, throttle_time_ms) = be_i32(s)?;
let (s, topics) = parse_array(parse_topic)(s)?;
Ok((
s,
CreateTopicsResponse {
header,
throttle_time_ms,
topics,
},
))
}
fn parse_topic(s: NomBytes) -> IResult<NomBytes, Topic> {
let (s, name) = parser::parse_string(s)?;
let (s, error_code) = parser::parse_kafka_code(s)?;
let (s, error_message) = parser::parse_nullable_string(s)?;
Ok((
s,
Topic {
name,
error_code,
error_message,
},
))
}