use crate::RDKafkaErrorCode;
use crate::util::{check_topic_result, dead_letter, retry_mailbox};
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
use rdkafka::client::DefaultClientContext;
use rdkafka::error::KafkaError;
use std::sync::Arc;
pub struct Mailbox {
name: String,
fixed_replication: i32,
client: Arc<AdminClient<DefaultClientContext>>,
}
impl Mailbox {
pub(crate) fn new(
client: Arc<AdminClient<DefaultClientContext>>,
name: String,
fixed_replication: i32,
) -> Self {
Mailbox {
name,
fixed_replication,
client,
}
}
pub async fn create(&self) -> Result<Result<(), RDKafkaErrorCode>, KafkaError> {
let dead_letter = dead_letter(&self.name);
let retry_mailbox = retry_mailbox(&self.name);
let topic1 = NewTopic::new(
&self.name,
1,
TopicReplication::Fixed(self.fixed_replication),
);
let topic2 = NewTopic::new(
&dead_letter,
1,
TopicReplication::Fixed(self.fixed_replication),
);
let topic3 = NewTopic::new(
&retry_mailbox,
1,
TopicReplication::Fixed(self.fixed_replication),
);
let r = self
.client
.create_topics(&[topic1, topic2, topic3], &AdminOptions::new())
.await?;
Ok(check_topic_result(r, RDKafkaErrorCode::TopicAlreadyExists))
}
pub async fn delete(&self) -> Result<Result<(), RDKafkaErrorCode>, KafkaError> {
let dead_letter = dead_letter(&self.name);
let retry_mailbox = retry_mailbox(&self.name);
let r = self
.client
.delete_topics(
&[&self.name, &dead_letter, &retry_mailbox],
&AdminOptions::new(),
)
.await?;
Ok(check_topic_result(
r,
RDKafkaErrorCode::UnknownTopicOrPartition,
))
}
}