use bytes::Bytes;
use nom::{
number::complete::{be_i32, be_i64},
IResult,
};
use nombytes::NomBytes;
use crate::{
error::{Error, KafkaCode, Result},
parser,
protocol::{parse_header_response, HeaderResponse},
};
#[derive(Debug, PartialEq)]
pub struct ListOffsetsResponse {
pub header: HeaderResponse,
pub topics: Vec<Topic>,
}
#[derive(Debug, PartialEq)]
pub struct Topic {
pub name: Bytes,
pub partitions: Vec<Partition>,
}
#[derive(Debug, PartialEq)]
pub struct Partition {
pub partition_index: i32,
pub error_code: KafkaCode,
pub timestamp: i64,
pub offset: i64,
}
impl TryFrom<Bytes> for ListOffsetsResponse {
type Error = Error;
fn try_from(s: Bytes) -> Result<Self> {
tracing::trace!("Parsing ListOffsetsResponse {:?}", s);
let (_, list_offsets) =
parse_list_offsets_response(NomBytes::new(s.clone())).map_err(|err| {
tracing::error!("ERROR: Failed parsing ListOffsetsResponse {:?}", err);
tracing::error!("ERROR: ListOffsetsResponse Bytes {:?}", s);
Error::ParsingError(s)
})?;
tracing::trace!("Parsed ListOffsetsResponse {:?}", list_offsets);
Ok(list_offsets)
}
}
impl ListOffsetsResponse {
pub fn into_box_iter(self) -> Box<impl Iterator<Item = (Bytes, Partition)>> {
Box::new(self.topics.into_iter().flat_map(|topic| {
topic
.partitions
.into_iter()
.map(move |partition| (topic.name.clone(), partition))
}))
}
}
pub fn parse_list_offsets_response(s: NomBytes) -> IResult<NomBytes, ListOffsetsResponse> {
let (s, header) = parse_header_response(s)?;
let (s, topics) = parser::parse_array(parse_topic)(s)?;
Ok((s, ListOffsetsResponse { header, topics }))
}
fn parse_topic(s: NomBytes) -> IResult<NomBytes, Topic> {
let (s, name) = parser::parse_string(s)?;
let (s, partitions) = parser::parse_array(parse_partition)(s)?;
Ok((s, Topic { name, partitions }))
}
fn parse_partition(s: NomBytes) -> IResult<NomBytes, Partition> {
let (s, partition_index) = be_i32(s)?;
let (s, error_code) = parser::parse_kafka_code(s)?;
let (s, timestamp) = be_i64(s)?;
let (s, offset) = be_i64(s)?;
Ok((
s,
Partition {
partition_index,
error_code,
timestamp,
offset,
},
))
}