use crate::{encode::ToByte, error::Result, protocol::HeaderRequest};
const API_KEY_METADATA: i16 = 19;
const API_VERSION: i16 = 3;
#[derive(Debug)]
pub struct CreateTopicsRequest<'a> {
pub header: HeaderRequest<'a>,
pub topics: Vec<Topic<'a>>,
pub timeout_ms: i32,
pub validate_only: bool,
}
#[derive(Debug)]
pub struct Topic<'a> {
pub name: &'a str,
pub num_partitions: i32,
pub replication_factor: i16,
pub assignments: Vec<Assignment>,
pub configs: Vec<Config>,
}
#[derive(Debug)]
pub struct Assignment {
partition_index: i32,
broker_ids: Vec<i32>,
}
#[derive(Debug)]
pub struct Config {
name: String,
value: Option<String>,
}
impl<'a> CreateTopicsRequest<'a> {
pub fn new(
correlation_id: i32,
client_id: &'a str,
timeout_ms: i32,
validate_only: bool,
) -> Result<Self> {
let header = HeaderRequest::new(API_KEY_METADATA, API_VERSION, correlation_id, client_id);
Ok(Self {
header,
timeout_ms,
validate_only,
topics: vec![],
})
}
pub fn add(&mut self, topic_name: &'a str, num_partitions: i32, replication_factor: i16) {
match self
.topics
.iter_mut()
.find(|topic| topic.name == topic_name)
{
None => self.topics.push(Topic {
name: topic_name,
num_partitions,
replication_factor,
assignments: vec![],
configs: vec![],
}),
Some(_) => {
}
}
}
}
impl ToByte for CreateTopicsRequest<'_> {
fn encode<T: bytes::BufMut>(&self, buffer: &mut T) -> crate::error::Result<()> {
tracing::trace!("Encoding CreateTopicsRequest {:?}", self);
self.header.encode(buffer)?;
self.topics.encode(buffer)?;
self.timeout_ms.encode(buffer)?;
self.validate_only.encode(buffer)?;
Ok(())
}
}
impl ToByte for Topic<'_> {
fn encode<T: bytes::BufMut>(&self, buffer: &mut T) -> crate::error::Result<()> {
self.name.encode(buffer)?;
self.num_partitions.encode(buffer)?;
self.replication_factor.encode(buffer)?;
self.assignments.encode(buffer)?;
self.configs.encode(buffer)?;
Ok(())
}
}
impl ToByte for Assignment {
fn encode<T: bytes::BufMut>(&self, buffer: &mut T) -> crate::error::Result<()> {
self.partition_index.encode(buffer)?;
self.broker_ids.encode(buffer)?;
Ok(())
}
}
impl ToByte for Config {
fn encode<T: bytes::BufMut>(&self, buffer: &mut T) -> crate::error::Result<()> {
self.name.encode(buffer)?;
self.value.encode(buffer)?;
Ok(())
}
}