use crate::{encode::ToByte, protocol::HeaderRequest};
const API_KEY_METADATA: i16 = 2;
const API_VERSION: i16 = 1;
#[derive(Debug)]
pub struct ListOffsetsRequest<'a> {
pub header: HeaderRequest<'a>,
pub replica_id: i32,
pub topics: Vec<Topic<'a>>,
}
#[derive(Debug)]
pub struct Topic<'a> {
pub name: &'a str,
pub partitions: Vec<Partition>,
}
#[derive(Debug)]
pub struct Partition {
pub partition_index: i32,
pub timestamp: i64,
}
impl<'a> ListOffsetsRequest<'a> {
pub fn new(correlation_id: i32, client_id: &'a str, replica_id: i32) -> Self {
let header = HeaderRequest::new(API_KEY_METADATA, API_VERSION, correlation_id, client_id);
Self {
header,
replica_id,
topics: vec![],
}
}
pub fn add(&mut self, topic_name: &'a str, partition_index: i32, timestamp: i64) {
match self
.topics
.iter_mut()
.find(|topic| topic.name == topic_name)
{
None => self.topics.push(Topic {
name: topic_name,
partitions: vec![Partition {
partition_index,
timestamp,
}],
}),
Some(topic) => {
if !topic
.partitions
.iter_mut()
.any(|partition| partition.partition_index == partition_index)
{
topic.partitions.push(Partition {
partition_index,
timestamp,
})
}
}
}
}
}
impl ToByte for ListOffsetsRequest<'_> {
fn encode<T: bytes::BufMut>(&self, buffer: &mut T) -> crate::error::Result<()> {
tracing::trace!("Encoding ListOffsetRequest {:?}", self);
self.header.encode(buffer)?;
self.replica_id.encode(buffer)?;
self.topics.encode(buffer)?;
Ok(())
}
}
impl ToByte for Topic<'_> {
fn encode<T: bytes::BufMut>(&self, buffer: &mut T) -> crate::error::Result<()> {
self.name.encode(buffer)?;
self.partitions.encode(buffer)?;
Ok(())
}
}
impl ToByte for Partition {
fn encode<T: bytes::BufMut>(&self, buffer: &mut T) -> crate::error::Result<()> {
self.partition_index.encode(buffer)?;
self.timestamp.encode(buffer)?;
Ok(())
}
}