kinbox 0.1.3

A simple kafka inbox
Documentation
use crate::config::{CommonConfig, ReadConfig, WriteConfig};
use crate::name::Name;
use rdkafka::consumer::{CommitMode, Consumer, StreamConsumer};
use rdkafka::error::KafkaError;
use rdkafka::message::{BorrowedMessage, OwnedMessage};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::{Message, TopicPartitionList};
use std::time::Duration;

/// 读邮箱
pub struct Reader {
    name: Name,
    normal: StreamConsumer,
    retry: StreamConsumer,
    writer: FutureProducer,
}

impl Reader {
    /// id为none表示不分区
    pub fn new<S: AsRef<str>>(
        read: &ReadConfig,
        write: &WriteConfig,
        name: S,
        id: Option<i32>,
    ) -> Result<Self, KafkaError> {
        let name = Name::new(name.as_ref());
        let (group1, group2) = if let Some(id) = id {
            (name.group_with_id(id), name.retry_group_with_id(id))
        } else {
            (name.group(), name.retry_group())
        };

        let normal: StreamConsumer = read.clone().set_group_id(group1).inner.create()?;
        let retry: StreamConsumer = read.clone().set_group_id(group2).inner.create()?;
        let writer: FutureProducer = write.inner.create()?;

        if let Some(id) = id {
            {
                let mut tpl = TopicPartitionList::new();
                tpl.add_partition(&name.name, id);
                normal.assign(&tpl)?;
            }
            {
                let mut tpl = TopicPartitionList::new();
                tpl.add_partition(&name.retry, id);
                retry.assign(&tpl)?;
            }
        } else {
            normal.subscribe(&[&name.name])?;
            retry.subscribe(&[&name.retry])?;
        }

        Ok(Self {
            name,
            normal,
            retry,
            writer,
        })
    }

    pub fn new_with_brokers<V: AsRef<str>>(
        brokers: &[V],
        name: V,
        id: Option<i32>,
    ) -> Result<Self, KafkaError> {
        let mut c = ReadConfig::default();
        c.set_brokers(brokers);
        let mut c2 = WriteConfig::default();
        c2.set_brokers(brokers);
        Reader::new(&c, &c2, name, id)
    }

    /// 读取数据
    /// 返回值: ok(None)表示遇到oef, ok(state)表示处理结果
    pub async fn read<P, Fut>(&self, p: P) -> Result<Option<ReadState>, KafkaError>
    where
        P: Fn(OwnedMessage) -> Fut,
        Fut: Future<Output = ReadState>,
    {
        match self.normal.recv().await {
            Ok(bm) => {
                let rs = p(bm.detach()).await;
                let target = match rs {
                    ReadState::Fail => Some(&self.name.retry),
                    ReadState::DeadLetter => Some(&self.name.dead),
                    _ => None,
                };

                // 重发送
                self.resend(target, &bm, rs.is_dead()).await?;

                // 同步提交, 可以接受慢一点
                self.normal.commit_message(&bm, CommitMode::Sync)?;
                Ok(Some(rs))
            }
            Err(KafkaError::PartitionEOF(_)) => Ok(None),
            Err(e) => Err(e),
        }
    }

    /// 重试数据
    pub async fn retry<P, Fut>(&self, p: P) -> Result<Option<RetryState>, KafkaError>
    where
        P: Fn(OwnedMessage) -> Fut,
        Fut: Future<Output = RetryState>,
    {
        match self.retry.recv().await {
            Ok(bm) => {
                let rs = p(bm.detach()).await;
                let target = match rs {
                    RetryState::DeadLetter => Some(&self.name.dead),
                    _ => None,
                };

                // 重发送
                self.resend(target, &bm, rs.is_dead()).await?;

                // 同步提交, 可以接受慢一点
                self.retry.commit_message(&bm, CommitMode::Sync)?;
                Ok(Some(rs))
            }
            Err(KafkaError::PartitionEOF(_)) => Ok(None),
            Err(e) => Err(e),
        }
    }
}

impl Reader {
    async fn resend<'a>(
        &self,
        target: Option<&'a String>,
        bm: &BorrowedMessage<'a>,
        is_dead: bool,
    ) -> Result<(), KafkaError> {
        if let Some(target) = target {
            let mut record = FutureRecord::to(target);
            record.partition = if is_dead {
                Some(0)
            } else {
                Some(bm.partition())
            };
            record.payload = bm.payload();
            record.key = bm.key();
            record.headers = bm.headers().map(|h| h.detach());
            self.writer
                .send(record, Duration::from_secs(3))
                .await
                .map_err(|e| e.0)?;
        }
        Ok(())
    }
}

/// 读取状态
#[derive(Debug)]
pub enum ReadState {
    /// 成功
    Success,
    /// 失败, 会进入重试邮箱
    Fail,
    /// 丢弃
    Discard,
    /// 进入死信邮箱
    DeadLetter,
}

impl ReadState {
    pub fn is_success(&self) -> bool {
        matches!(self, ReadState::Success)
    }

    pub fn is_dead(&self) -> bool {
        matches!(self, ReadState::DeadLetter)
    }

    pub fn is_discard(&self) -> bool {
        matches!(self, ReadState::Discard)
    }

    pub fn is_fail(&self) -> bool {
        matches!(self, ReadState::Fail)
    }
}

/// 重试状态
#[derive(Debug)]
pub enum RetryState {
    /// 成功
    Success,
    /// 丢弃
    Discard,
    /// 进入死信邮箱
    DeadLetter,
}

impl RetryState {
    pub fn is_success(&self) -> bool {
        matches!(self, RetryState::Success)
    }

    pub fn is_dead(&self) -> bool {
        matches!(self, RetryState::DeadLetter)
    }

    pub fn is_discard(&self) -> bool {
        matches!(self, RetryState::Discard)
    }
}