use bytes::Bytes;
use nom::{number::complete::be_i32, IResult};
use nombytes::NomBytes;
use crate::{
error::{Error, KafkaCode, Result},
parser, protocol,
};
#[derive(Debug, Default, PartialEq)]
pub struct MetadataResponse {
pub header_response: protocol::HeaderResponse,
pub brokers: Vec<Broker>,
pub controller_id: i32,
pub topics: Vec<Topic>,
}
impl MetadataResponse {
pub fn is_error(&self) -> Result<()> {
self.topics
.iter()
.map(|topic| topic.is_error())
.collect::<Result<Vec<()>>>()?;
Ok(())
}
}
impl TryFrom<Bytes> for MetadataResponse {
type Error = Error;
fn try_from(s: Bytes) -> Result<Self> {
tracing::trace!("Parsing MetadataResponse {:?}", s);
let (_, metadata) = parse_metadata_response(NomBytes::new(s.clone())).map_err(|err| {
tracing::error!("ERROR: Failed parsing MetadataResponse {:?}", err);
tracing::error!("ERROR: MetadataResponse Bytes {:?}", s);
Error::ParsingError(s)
})?;
tracing::trace!("Parsed MetadataResponse {:?}", metadata);
Ok(metadata)
}
}
pub fn parse_metadata_response(s: NomBytes) -> IResult<NomBytes, MetadataResponse> {
let (s, header_response) = protocol::parse_header_response(s)?;
let (s, brokers) = parser::parse_array(parse_broker)(s)?;
let (s, controller_id) = be_i32(s)?;
let (s, topics) = parser::parse_array(parse_topic)(s)?;
Ok((
s,
MetadataResponse {
header_response,
brokers,
controller_id,
topics,
},
))
}
#[derive(Debug, Clone, PartialEq)]
pub struct Broker {
pub node_id: i32,
pub host: Bytes,
pub port: i32,
pub rack: Option<Bytes>,
}
fn parse_broker(s: NomBytes) -> IResult<NomBytes, Broker> {
let (s, node_id) = be_i32(s)?;
let (s, host) = parser::parse_string(s)?;
let (s, port) = be_i32(s)?;
let (s, rack) = parser::parse_nullable_string(s)?;
Ok((
s,
Broker {
node_id,
host,
port,
rack,
},
))
}
#[derive(Debug, Clone, PartialEq)]
pub struct Topic {
pub error_code: KafkaCode,
pub name: Bytes,
pub is_internal: bool,
pub partitions: Vec<Partition>,
}
impl Topic {
pub fn is_error(&self) -> Result<()> {
if self.error_code != KafkaCode::None {
tracing::error!(
"ERROR: Kafka Error {:?} in topic {:?}",
self.error_code,
self.name
);
return Err(Error::KafkaError(self.error_code));
}
self.partitions
.iter()
.map(|partition| partition.is_error(self.name.clone()))
.collect::<Result<Vec<()>>>()?;
Ok(())
}
}
fn parse_topic(s: NomBytes) -> IResult<NomBytes, Topic> {
let (s, error_code) = parser::parse_kafka_code(s)?;
let (s, name) = parser::parse_string(s)?;
let (s, is_internal) = parser::parse_boolean(s)?;
let (s, partitions) = parser::parse_array(parse_partition)(s)?;
Ok((
s,
Topic {
error_code,
name,
is_internal,
partitions,
},
))
}
#[derive(Debug, Clone, PartialEq)]
pub struct Partition {
pub error_code: KafkaCode,
pub partition_index: i32,
pub leader_id: i32,
pub replica_nodes: Vec<i32>,
pub isr_nodes: Vec<i32>,
}
impl Partition {
pub fn is_error(&self, topic_name: Bytes) -> Result<()> {
if self.error_code != KafkaCode::None {
tracing::error!(
"ERROR: Kafka Error {:?} in topic {:?} partition {}",
self.error_code,
topic_name,
self.partition_index
);
Err(Error::KafkaError(self.error_code))
} else {
Ok(())
}
}
}
fn parse_partition(s: NomBytes) -> IResult<NomBytes, Partition> {
let (s, error_code) = parser::parse_kafka_code(s)?;
let (s, partition_index) = be_i32(s)?;
let (s, leader_id) = be_i32(s)?;
let (s, replica_nodes) = parser::parse_array(be_i32)(s)?;
let (s, isr_nodes) = parser::parse_array(be_i32)(s)?;
Ok((
s,
Partition {
error_code,
partition_index,
leader_id,
replica_nodes,
isr_nodes,
},
))
}