kmailbox 0.1.6

A simple kafka mailbox
Documentation
use crate::RDKafkaErrorCode;
use crate::util::{check_topic_result, dead_letter, retry_mailbox};
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
use rdkafka::client::DefaultClientContext;
use rdkafka::error::KafkaError;
use std::sync::Arc;

/// 共享邮箱
pub struct Mailbox {
    name: String,
    fixed_replication: i32,
    client: Arc<AdminClient<DefaultClientContext>>,
}

impl Mailbox {
    pub(crate) fn new(
        client: Arc<AdminClient<DefaultClientContext>>,
        name: String,
        fixed_replication: i32,
    ) -> Self {
        Mailbox {
            name,
            fixed_replication,
            client,
        }
    }

    /// 创建邮箱, 如果邮箱已存在也会返回成功
    pub async fn create(&self) -> Result<Result<(), RDKafkaErrorCode>, KafkaError> {
        let dead_letter = dead_letter(&self.name);
        let retry_mailbox = retry_mailbox(&self.name);
        let topic1 = NewTopic::new(
            &self.name,
            1,
            TopicReplication::Fixed(self.fixed_replication),
        );
        let topic2 = NewTopic::new(
            &dead_letter,
            1,
            TopicReplication::Fixed(self.fixed_replication),
        );
        let topic3 = NewTopic::new(
            &retry_mailbox,
            1,
            TopicReplication::Fixed(self.fixed_replication),
        );

        let r = self
            .client
            .create_topics(&[topic1, topic2, topic3], &AdminOptions::new())
            .await?;
        Ok(check_topic_result(r, RDKafkaErrorCode::TopicAlreadyExists))
    }

    /// 删除邮箱, 如果邮箱不存在也会成功
    pub async fn delete(&self) -> Result<Result<(), RDKafkaErrorCode>, KafkaError> {
        let dead_letter = dead_letter(&self.name);
        let retry_mailbox = retry_mailbox(&self.name);

        let r = self
            .client
            .delete_topics(
                &[&self.name, &dead_letter, &retry_mailbox],
                &AdminOptions::new(),
            )
            .await?;
        Ok(check_topic_result(
            r,
            RDKafkaErrorCode::UnknownTopicOrPartition,
        ))
    }
}