use bytes::Bytes;
use nom::AsBytes;
use crate::{
encode::ToByte,
error::{Error, Result},
protocol::HeaderRequest,
};
const API_KEY_METADATA: i16 = 14;
const API_VERSION: i16 = 2;
#[derive(Debug)]
pub struct SyncGroupRequest<'a> {
pub header: HeaderRequest<'a>,
pub group_id: &'a str,
pub generation_id: i32,
pub member_id: String,
pub assignments: Vec<Assignment<'a>>,
}
#[derive(Debug, Clone)]
pub struct Assignment<'a> {
pub member_id: String,
pub assignment: MemberAssignment<'a>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct MemberAssignment<'a> {
pub version: i16,
pub partition_assignments: Vec<PartitionAssignment<'a>>,
pub user_data: Option<Bytes>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct PartitionAssignment<'a> {
pub topic_name: &'a str,
pub partitions: Vec<i32>,
}
impl Assignment<'_> {
pub fn new(member_id: Bytes, assignment: MemberAssignment) -> Result<Assignment> {
Ok(Assignment {
member_id: String::from_utf8(member_id.as_bytes().to_vec())
.map_err(|_| Error::DecodingUtf8Error)?,
assignment,
})
}
}
impl<'a> PartitionAssignment<'a> {
pub fn new(topic_name: &'a str, partitions: Vec<i32>) -> Self {
Self {
topic_name,
partitions,
}
}
}
impl<'a> SyncGroupRequest<'a> {
pub fn new(
correlation_id: i32,
client_id: &'a str,
group_id: &'a str,
generation_id: i32,
member_id: Bytes,
assignments: Vec<Assignment<'a>>,
) -> Result<Self> {
let header = HeaderRequest::new(API_KEY_METADATA, API_VERSION, correlation_id, client_id);
Ok(Self {
header,
group_id,
generation_id,
member_id: String::from_utf8(member_id.as_bytes().to_vec())
.map_err(|_| Error::DecodingUtf8Error)?,
assignments,
})
}
}
impl ToByte for SyncGroupRequest<'_> {
fn encode<T: bytes::BufMut>(&self, buffer: &mut T) -> crate::error::Result<()> {
tracing::trace!("Encoding SyncGroupRequest {:?}", self);
self.header.encode(buffer)?;
self.group_id.encode(buffer)?;
self.generation_id.encode(buffer)?;
self.member_id.encode(buffer)?;
self.assignments.encode(buffer)?;
Ok(())
}
}
impl ToByte for Assignment<'_> {
fn encode<T: bytes::BufMut>(&self, buffer: &mut T) -> crate::error::Result<()> {
self.member_id.encode(buffer)?;
let mut buf = Vec::with_capacity(4);
self.assignment.encode(&mut buf)?;
buf.encode(buffer)?;
Ok(())
}
}
impl ToByte for MemberAssignment<'_> {
fn encode<T: bytes::BufMut>(&self, buffer: &mut T) -> crate::error::Result<()> {
self.version.encode(buffer)?;
self.partition_assignments.encode(buffer)?;
self.user_data.encode(buffer)?;
Ok(())
}
}
impl ToByte for PartitionAssignment<'_> {
fn encode<T: bytes::BufMut>(&self, buffer: &mut T) -> crate::error::Result<()> {
self.topic_name.encode(buffer)?;
self.partitions.encode(buffer)?;
Ok(())
}
}