use crate::{
encode::ToByte,
error::{Error, Result},
protocol::HeaderRequest,
};
use bytes::Bytes;
use nom::AsBytes;
const API_KEY_METADATA: i16 = 8;
const API_VERSION: i16 = 2;
#[derive(Debug)]
pub struct OffsetCommitRequest<'a> {
pub header: HeaderRequest<'a>,
pub group_id: &'a str,
pub generation_id_or_member_epoch: i32,
pub member_id: String,
pub retention_time_ms: i64,
pub topics: Vec<Topic<'a>>,
}
#[derive(Debug)]
pub struct Topic<'a> {
pub name: &'a str,
pub partitions: Vec<Partition<'a>>,
}
#[derive(Debug)]
pub struct Partition<'a> {
pub partition_index: i32,
pub committed_offset: i64,
pub committed_metadata: Option<&'a str>,
}
impl<'a> OffsetCommitRequest<'a> {
pub fn new(
correlation_id: i32,
client_id: &'a str,
group_id: &'a str,
generation_id_or_member_epoch: i32,
member_id: Bytes,
retention_time_ms: i64,
) -> Result<Self> {
let header = HeaderRequest::new(API_KEY_METADATA, API_VERSION, correlation_id, client_id);
Ok(Self {
header,
group_id,
generation_id_or_member_epoch,
member_id: String::from_utf8(member_id.as_bytes().to_vec())
.map_err(|_| Error::DecodingUtf8Error)?,
retention_time_ms,
topics: vec![],
})
}
pub fn add(
&mut self,
topic_name: &'a str,
partition_index: i32,
committed_offset: i64,
committed_metadata: Option<&'a str>,
) {
match self
.topics
.iter_mut()
.find(|topic| topic.name == topic_name)
{
None => self.topics.push(Topic {
name: topic_name,
partitions: vec![Partition {
partition_index,
committed_offset,
committed_metadata,
}],
}),
Some(topic) => {
match topic
.partitions
.iter_mut()
.find(|partition| partition.partition_index == partition_index)
{
None => topic.partitions.push(Partition {
partition_index,
committed_offset,
committed_metadata,
}),
Some(partition) => {
tracing::warn!(
"Overwriting commit offset for {} {}",
topic_name,
partition_index
);
partition.committed_offset = committed_offset;
}
}
}
}
}
}
impl ToByte for OffsetCommitRequest<'_> {
fn encode<T: bytes::BufMut>(&self, buffer: &mut T) -> crate::error::Result<()> {
tracing::trace!("Encoding OffsetCommitRequest {:?}", self);
self.header.encode(buffer)?;
self.group_id.encode(buffer)?;
self.generation_id_or_member_epoch.encode(buffer)?;
self.member_id.encode(buffer)?;
self.retention_time_ms.encode(buffer)?;
self.topics.encode(buffer)?;
Ok(())
}
}
impl ToByte for Topic<'_> {
fn encode<T: bytes::BufMut>(&self, buffer: &mut T) -> crate::error::Result<()> {
self.name.encode(buffer)?;
self.partitions.encode(buffer)?;
Ok(())
}
}
impl ToByte for Partition<'_> {
fn encode<T: bytes::BufMut>(&self, buffer: &mut T) -> crate::error::Result<()> {
self.partition_index.encode(buffer)?;
self.committed_offset.encode(buffer)?;
self.committed_metadata.encode(buffer)?;
Ok(())
}
}