kmailbox 0.1.6

A simple kafka mailbox
Documentation
use rdkafka::Message;
use rdkafka::message::{BorrowedHeaders, Headers, HeadersIter, OwnedMessage, ToBytes};
use std::collections::HashMap;

/// 读取状态
#[derive(Debug)]
pub enum ReadState {
    /// 成功
    Success,
    /// 失败, 会进入重试邮箱
    Fail,
    /// 丢弃
    Discard,
    /// 进入死信邮箱
    DeadLetter,
}

/// 重试状态
#[derive(Debug)]
pub enum RetryState {
    /// 成功
    Success,
    /// 丢弃
    Discard,
    /// 进入死信邮箱
    DeadLetter,
}

/// 收到的邮件
#[derive(Debug, Clone)]
pub struct RecvMail {
    pub(crate) msg: OwnedMessage,
}

impl RecvMail {
    pub(crate) fn from(msg: OwnedMessage) -> Self {
        Self { msg }
    }

    pub fn equal_key<K: ToBytes + ?Sized>(&self, key: &K) -> bool {
        match self.msg.key() {
            None => key.to_bytes().len() == 0,
            Some(k) => k == key.to_bytes(),
        }
    }

    pub fn key_string(&self) -> String {
        match self.msg.key() {
            None => String::new(),
            Some(k) => String::from_utf8_lossy(k).to_string(),
        }
    }

    pub fn key(&self) -> Option<&[u8]> {
        self.msg.key()
    }

    pub fn payload(&self) -> Option<&[u8]> {
        self.msg.payload()
    }

    pub fn partition(&self) -> i32 {
        self.msg.partition()
    }

    pub fn offset(&self) -> i64 {
        self.msg.offset()
    }

    pub fn header_iter(&self) -> HeadIterator<'_> {
        match self.msg.headers() {
            None => HeadIterator { iter: None },
            Some(h) => HeadIterator {
                iter: Some(h.as_borrowed().iter()),
            },
        }
    }

    // pub fn timestamp(&self) -> Timestamp {
    //     self.msg.timestamp()
    // }
}

/// 迭代头部
pub struct HeadIterator<'a> {
    iter: Option<HeadersIter<'a, BorrowedHeaders>>,
}

impl<'a> Iterator for HeadIterator<'a> {
    type Item = (&'a str, Option<&'a [u8]>);

    fn next(&mut self) -> Option<Self::Item> {
        match self.iter.as_mut() {
            None => None,
            Some(iter) => {
                let i = iter.next()?;
                Some((i.key, i.value))
            }
        }
    }
}

/// 发送的邮件
#[derive(Debug, Clone)]
pub struct SendMail<'a> {
    pub key: Option<&'a [u8]>,
    pub payload: Option<&'a [u8]>,
    pub headers: HashMap<&'a str, Option<&'a [u8]>>,
    // 可选分区
    pub partition: Option<i32>,
}

impl<'a> SendMail<'a> {
    pub fn new() -> Self {
        Self {
            key: None,
            payload: None,
            headers: HashMap::new(),
            partition: None,
        }
    }

    pub fn key<K: ToBytes + ?Sized>(mut self, key: Option<&'a K>) -> Self {
        self.key = key.map(|k| k.to_bytes());
        self
    }

    pub fn payload<P: ToBytes + ?Sized>(mut self, p: Option<&'a P>) -> Self {
        self.payload = p.map(|p| p.to_bytes());
        self
    }

    pub fn headers(mut self, headers: HashMap<&'a str, Option<&'a [u8]>>) -> Self {
        self.headers = headers;
        self
    }

    pub fn partition(mut self, partition: Option<i32>) -> Self {
        self.partition = partition;
        self
    }

    pub fn add_header<V: ToBytes + ?Sized>(mut self, k: &'a str, v: Option<&'a V>) -> Self {
        self.headers.insert(k, v.map(|v| v.to_bytes()));
        self
    }
}

/// 邮箱元数据
#[derive(Clone, Debug)]
pub struct MetaMail {
    pub name: String,
    /// 分区id, 副本数
    pub partitions: Vec<(i32, usize)>,
}