use bytes::Bytes;
use nom::{
number::complete::{be_i32, be_i64},
IResult,
};
use nombytes::NomBytes;
use crate::{
error::{Error, KafkaCode, Result},
parser,
protocol::{parse_header_response, HeaderResponse},
};
#[derive(Debug, PartialEq)]
pub struct ProduceResponse {
pub header: HeaderResponse,
pub responses: Vec<Response>,
}
#[derive(Debug, PartialEq)]
pub struct Response {
pub name: Bytes,
pub partition_responses: Vec<PartitionResponse>,
}
#[derive(Debug, PartialEq)]
pub struct PartitionResponse {
pub index: i32,
pub error_code: KafkaCode,
pub base_offset: i64,
pub log_append_time: i64,
}
impl TryFrom<Bytes> for ProduceResponse {
type Error = Error;
fn try_from(s: Bytes) -> Result<Self> {
tracing::trace!("Parsing ProduceResponse {:?}", s);
let (_, produce_fetch) =
parse_produce_fetch_response(NomBytes::new(s.clone())).map_err(|err| {
tracing::error!("ERROR: Failed parsing ProduceResponse {:?}", err);
tracing::error!("ERROR: ProduceResponse Bytes {:?}", s);
Error::ParsingError(s)
})?;
tracing::trace!("Parsed ProduceResponse {:?}", produce_fetch);
Ok(produce_fetch)
}
}
pub fn parse_produce_fetch_response(s: NomBytes) -> IResult<NomBytes, ProduceResponse> {
let (s, header) = parse_header_response(s)?;
let (s, responses) = parser::parse_array(parse_response)(s)?;
Ok((s, ProduceResponse { header, responses }))
}
pub fn parse_response(s: NomBytes) -> IResult<NomBytes, Response> {
let (s, name) = parser::parse_string(s)?;
let (s, partition_responses) = parser::parse_array(parse_partition_response)(s)?;
Ok((
s,
Response {
name,
partition_responses,
},
))
}
pub fn parse_partition_response(s: NomBytes) -> IResult<NomBytes, PartitionResponse> {
let (s, index) = be_i32(s)?;
let (s, error_code) = parser::parse_kafka_code(s)?;
let (s, base_offset) = be_i64(s)?;
let (s, log_append_time) = be_i64(s)?;
Ok((
s,
PartitionResponse {
index,
error_code,
base_offset,
log_append_time,
},
))
}