kmailbox 0.1.3

A simple kafka mailbox
Documentation
use crate::{RDKafkaErrorCode, check_topic_result};
use rdkafka::admin::{AdminClient, AdminOptions, NewPartitions, NewTopic, TopicReplication};
use rdkafka::client::DefaultClientContext;
use rdkafka::error::KafkaError;
use std::sync::Arc;

/// 专享邮箱
pub struct Mailbox {
    name: String,
    num_partitions: i32,
    fixed_replication: i32,
    client: Arc<AdminClient<DefaultClientContext>>,
}

impl Mailbox {
    pub(crate) fn new(
        client: Arc<AdminClient<DefaultClientContext>>,
        name: String,
        num_partitions: i32,
        fixed_replication: i32,
    ) -> Self {
        Mailbox {
            name,
            num_partitions,
            fixed_replication,
            client,
        }
    }

    /// 创建邮箱, 如果邮箱已存在也会返回成功
    pub async fn create(&self) -> Result<Result<(), RDKafkaErrorCode>, KafkaError> {
        let dead_letter = crate::dead_letter(self.name.clone());
        let retry_mailbox = crate::retry_mailbox(self.name.clone());
        let topic1 = NewTopic::new(
            &self.name,
            self.num_partitions,
            TopicReplication::Fixed(self.fixed_replication),
        );
        let topic2 = NewTopic::new(
            &dead_letter,
            1,
            TopicReplication::Fixed(self.fixed_replication),
        );
        let topic3 = NewTopic::new(
            &retry_mailbox,
            self.num_partitions,
            TopicReplication::Fixed(self.fixed_replication),
        );

        let r = self
            .client
            .create_topics(&[topic1, topic2, topic3], &AdminOptions::new())
            .await?;
        Ok(check_topic_result(r, RDKafkaErrorCode::TopicAlreadyExists))
    }

    /// 调整邮箱
    pub async fn adjust(
        &self,
        num_partitions: i32,
    ) -> Result<Result<(), RDKafkaErrorCode>, KafkaError> {
        let par = NewPartitions::new(&self.name, num_partitions as usize);
        self.client
            .create_partitions(&[par], &AdminOptions::new())
            .await
            .map(|r| match r[0] {
                Ok(_) => Ok(()),
                Err(ref t) => Err(t.1),
            })
    }

    /// 删除邮箱
    pub async fn delete(&self) -> Result<Result<(), RDKafkaErrorCode>, KafkaError> {
        let retry_mailbox = crate::retry_mailbox(self.name.clone());
        let dead_letter = crate::dead_letter(self.name.clone());

        let r = self
            .client
            .delete_topics(
                &[&self.name, &dead_letter, &retry_mailbox],
                &AdminOptions::new(),
            )
            .await?;
        Ok(check_topic_result(
            r,
            RDKafkaErrorCode::UnknownTopicOrPartition,
        ))
    }
}