kmailbox 0.1.2

A simple kafka mailbox
Documentation
use crate::{ReadState, RecvMail, RetryState, rmail_to_kafka_record};
use rdkafka::consumer::{CommitMode, Consumer, StreamConsumer};
use rdkafka::error::KafkaError;
use rdkafka::producer::FutureProducer;
use std::time::Duration;

/// 邮箱读取
pub struct Reader {
    #[allow(dead_code)]
    name: String,
    dead_letter: String,
    retry_name: String,
    /// 正常消费者
    consumer: StreamConsumer,
    /// 重试消费者
    retry: StreamConsumer,
    /// 写入重试邮箱的生产者
    producer: FutureProducer,
}

impl Reader {
    pub(crate) fn new(
        name: String,
        dead_letter: String,
        retry_name: String,
        consumer: StreamConsumer,
        retry: StreamConsumer,
        producer: FutureProducer,
    ) -> Self {
        Self {
            name,
            dead_letter,
            retry_name,
            consumer,
            retry,
            producer,
        }
    }

    /// 读取数据
    ///
    /// @return: ok(none)表示遇到oef
    pub async fn read<P, Fut>(&self, process: P) -> Result<Option<ReadState>, KafkaError>
    where
        P: FnOnce(RecvMail) -> Fut,
        Fut: Future<Output = (ReadState, RecvMail)>,
    {
        match self.consumer.recv().await {
            Ok(m) => {
                let mail = RecvMail::from(m.detach());
                let (rs, mail) = process(mail).await;

                let target = match rs {
                    ReadState::Fail => Some(&self.retry_name),
                    ReadState::DeadLetter => Some(&self.dead_letter),
                    _ => None,
                };

                self.rewrite(target, &mail).await?;

                // 同步提交, 可以接受慢一点
                self.consumer.commit_message(&m, CommitMode::Sync)?;
                Ok(Some(rs))
            }
            Err(KafkaError::PartitionEOF(_)) => Ok(None),
            Err(e) => Err(e),
        }
    }

    /// 读取重试数据
    ///
    /// /// @return: ok(none)表示遇到oef
    pub async fn retry<P, Fut>(&self, process: P) -> Result<Option<RetryState>, KafkaError>
    where
        P: FnOnce(RecvMail) -> Fut,
        Fut: Future<Output = (RetryState, RecvMail)>,
    {
        match self.retry.recv().await {
            Ok(m) => {
                let mail = RecvMail::from(m.detach());
                let (rs, mail) = process(mail).await;

                let target = match rs {
                    RetryState::DeadLetter => Some(&self.dead_letter),
                    _ => None,
                };

                self.rewrite(target, &mail).await?;

                // 同步提交, 可以接受慢一点
                self.retry.commit_message(&m, CommitMode::Sync)?;
                Ok(Some(rs))
            }
            Err(KafkaError::PartitionEOF(_)) => Ok(None),
            Err(e) => Err(e),
        }
    }

    async fn rewrite<'a>(
        &self,
        target: Option<&'a String>,
        mail: &'a RecvMail,
    ) -> Result<(), KafkaError> {
        if let Some(target) = target {
            // 写入目标队列
            let record = rmail_to_kafka_record(target, mail);
            self.producer
                .send(record, Duration::from_secs(3))
                .await
                .map_err(|e| e.0)?;
        }

        Ok(())
    }
}