use crate::RDKafkaErrorCode;
use crate::util::{check_topic_result, dead_letter, retry_mailbox};
use rdkafka::admin::{AdminClient, AdminOptions, NewPartitions, NewTopic, TopicReplication};
use rdkafka::client::DefaultClientContext;
use rdkafka::error::KafkaError;
use std::sync::Arc;
pub struct Mailbox {
name: String,
num_partitions: i32,
fixed_replication: i32,
client: Arc<AdminClient<DefaultClientContext>>,
}
impl Mailbox {
pub(crate) fn new(
client: Arc<AdminClient<DefaultClientContext>>,
name: String,
num_partitions: i32,
fixed_replication: i32,
) -> Self {
Mailbox {
name,
num_partitions,
fixed_replication,
client,
}
}
pub async fn create(&self) -> Result<Result<(), RDKafkaErrorCode>, KafkaError> {
let dead_letter = dead_letter(&self.name);
let retry_mailbox = retry_mailbox(&self.name);
let topic1 = NewTopic::new(
&self.name,
self.num_partitions,
TopicReplication::Fixed(self.fixed_replication),
);
let topic2 = NewTopic::new(
&dead_letter,
1,
TopicReplication::Fixed(self.fixed_replication),
);
let topic3 = NewTopic::new(
&retry_mailbox,
self.num_partitions,
TopicReplication::Fixed(self.fixed_replication),
);
let r = self
.client
.create_topics(&[topic1, topic2, topic3], &AdminOptions::new())
.await?;
Ok(check_topic_result(r, RDKafkaErrorCode::TopicAlreadyExists))
}
pub async fn adjust(
&self,
num_partitions: i32,
) -> Result<Result<(), RDKafkaErrorCode>, KafkaError> {
let par = NewPartitions::new(&self.name, num_partitions as usize);
self.client
.create_partitions(&[par], &AdminOptions::new())
.await
.map(|r| match r[0] {
Ok(_) => Ok(()),
Err(ref t) => Err(t.1),
})
}
pub async fn delete(&self) -> Result<Result<(), RDKafkaErrorCode>, KafkaError> {
let retry_mailbox = retry_mailbox(&self.name);
let dead_letter = dead_letter(&self.name);
let r = self
.client
.delete_topics(
&[&self.name, &dead_letter, &retry_mailbox],
&AdminOptions::new(),
)
.await?;
Ok(check_topic_result(
r,
RDKafkaErrorCode::UnknownTopicOrPartition,
))
}
}