use bytes::BufMut;
use crate::{encode::ToByte, error::Result, protocol::HeaderRequest};
const API_KEY_FETCH: i16 = 1;
const API_VERSION: i16 = 4;
#[derive(Debug, Clone)]
pub struct FetchRequest<'a> {
pub header: HeaderRequest<'a>,
pub replica: i32,
pub max_wait_ms: i32,
pub min_bytes: i32,
pub max_bytes: i32,
pub isolation_level: i8,
pub topics: Vec<TopicPartition<'a>>,
}
#[derive(Debug, Clone)]
pub struct TopicPartition<'a> {
pub topic_name: &'a str,
pub partitions: Vec<Partition>,
}
#[derive(Debug, Clone)]
pub struct Partition {
pub partition_index: i32,
pub offset: i64,
pub max_bytes: i32,
}
impl<'a> FetchRequest<'a> {
pub fn new(
correlation_id: i32,
client_id: &'a str,
max_wait_ms: i32,
min_bytes: i32,
max_bytes: i32,
isolation_level: i8,
) -> FetchRequest<'a> {
FetchRequest {
header: HeaderRequest::new(API_KEY_FETCH, API_VERSION, correlation_id, client_id),
replica: -1,
max_wait_ms,
min_bytes,
max_bytes,
isolation_level,
topics: vec![],
}
}
pub fn add(&mut self, topic_name: &'a str, partition_index: i32, offset: i64, max_bytes: i32) {
match self
.topics
.iter_mut()
.find(|topic| topic.topic_name == topic_name)
{
None => self.topics.push(TopicPartition {
topic_name,
partitions: vec![Partition {
partition_index,
offset,
max_bytes,
}],
}),
Some(topic) => {
if !topic
.partitions
.iter_mut()
.any(|partition| partition.partition_index == partition_index)
{
topic.partitions.push(Partition {
partition_index,
offset,
max_bytes,
})
}
}
}
}
}
impl ToByte for FetchRequest<'_> {
fn encode<W: BufMut>(&self, buffer: &mut W) -> Result<()> {
tracing::trace!("Encoding FetchRequest {:?}", self);
self.header.encode(buffer)?;
self.replica.encode(buffer)?;
self.max_wait_ms.encode(buffer)?;
self.min_bytes.encode(buffer)?;
self.max_bytes.encode(buffer)?;
self.isolation_level.encode(buffer)?;
self.topics.encode(buffer)?;
Ok(())
}
}
impl ToByte for TopicPartition<'_> {
fn encode<W: BufMut>(&self, buffer: &mut W) -> Result<()> {
self.topic_name.encode(buffer)?;
self.partitions.encode(buffer)?;
Ok(())
}
}
impl ToByte for Partition {
fn encode<T: BufMut>(&self, buffer: &mut T) -> Result<()> {
self.partition_index.encode(buffer)?;
self.offset.encode(buffer)?;
self.max_bytes.encode(buffer)?;
Ok(())
}
}