kmailbox 0.1.3

A simple kafka mailbox
Documentation
use rdkafka::Message;
use rdkafka::admin::TopicResult;
pub use rdkafka::error::KafkaError;
pub use rdkafka::error::RDKafkaErrorCode;
use rdkafka::message::{
    BorrowedHeaders, Header, Headers, HeadersIter, OwnedHeaders, OwnedMessage, ToBytes,
};
use rdkafka::producer::FutureRecord;
use std::collections::HashMap;

// re-exports
pub use build::*;
pub use read::Reader;
pub use write::Writer;

pub mod build;
pub mod exclusive;
pub mod read;
pub mod shared;
pub mod write;

/// 读取状态
#[derive(Debug)]
pub enum ReadState {
    /// 成功
    Success,
    /// 失败, 会进入重试邮箱
    Fail,
    /// 丢弃
    Discard,
    /// 进入死信邮箱
    DeadLetter,
}

/// 重试状态
#[derive(Debug)]
pub enum RetryState {
    /// 成功
    Success,
    /// 丢弃
    Discard,
    /// 进入死信邮箱
    DeadLetter,
}

/// 发送的邮件
#[derive(Debug, Clone)]
pub struct SendMail<'a> {
    pub key: Option<&'a [u8]>,
    pub payload: Option<&'a [u8]>,
    pub headers: HashMap<&'a str, Option<&'a [u8]>>,
}

impl<'a> SendMail<'a> {
    pub fn new() -> Self {
        Self {
            key: None,
            payload: None,
            headers: HashMap::new(),
        }
    }

    pub fn key<K: ToBytes + ?Sized>(mut self, key: &'a K) -> Self {
        self.key = Some(key.to_bytes());
        self
    }

    pub fn payload<P: ToBytes + ?Sized>(mut self, p: &'a P) -> Self {
        self.payload = Some(p.to_bytes());
        self
    }

    pub fn headers(mut self, headers: HashMap<&'a str, Option<&'a [u8]>>) -> Self {
        self.headers = headers;
        self
    }

    pub fn add_header<V: ToBytes + ?Sized>(mut self, k: &'a str, v: Option<&'a V>) -> Self {
        self.headers.insert(k, v.map(|v| v.to_bytes()));
        self
    }
}

/// 收到的邮件
#[derive(Debug, Clone)]
pub struct RecvMail {
    pub(crate) msg: OwnedMessage,
}

impl RecvMail {
    pub(crate) fn from(msg: OwnedMessage) -> Self {
        Self { msg }
    }

    pub fn equal_key<K: ToBytes + ?Sized>(&self, key: &K) -> bool {
        match self.msg.key() {
            None => key.to_bytes().len() == 0,
            Some(k) => k == key.to_bytes(),
        }
    }

    pub fn key_string(&self) -> String {
        match self.msg.key() {
            None => String::new(),
            Some(k) => String::from_utf8_lossy(k).to_string(),
        }
    }

    pub fn key(&self) -> Option<&[u8]> {
        self.msg.key()
    }

    pub fn payload(&self) -> Option<&[u8]> {
        self.msg.payload()
    }

    pub fn partition(&self) -> i32 {
        self.msg.partition()
    }

    pub fn offset(&self) -> i64 {
        self.msg.offset()
    }

    pub fn header_iter(&self) -> HeadIterator<'_> {
        match self.msg.headers() {
            None => HeadIterator { iter: None },
            Some(h) => HeadIterator {
                iter: Some(h.as_borrowed().iter()),
            },
        }
    }

    // pub fn timestamp(&self) -> Timestamp {
    //     self.msg.timestamp()
    // }
}

/// 迭代头部
pub struct HeadIterator<'a> {
    iter: Option<HeadersIter<'a, BorrowedHeaders>>,
}

impl<'a> Iterator for HeadIterator<'a> {
    type Item = (&'a str, Option<&'a [u8]>);

    fn next(&mut self) -> Option<Self::Item> {
        match self.iter.as_mut() {
            None => None,
            Some(iter) => {
                let i = iter.next()?;
                Some((i.key, i.value))
            }
        }
    }
}

pub fn check_result(r: Result<Result<(), RDKafkaErrorCode>, KafkaError>) -> bool {
    match r {
        Ok(r) => match r {
            Ok(_) => true,
            _ => false,
        },
        Err(_) => false,
    }
}

pub(crate) fn check_topic_result(
    v: Vec<TopicResult>,
    not: RDKafkaErrorCode,
) -> Result<(), RDKafkaErrorCode> {
    for tr in v.iter() {
        if let Err(t) = tr {
            if t.1 != not {
                return Err(t.1);
            }
        }
    }
    Ok(())
}

pub(crate) fn retry_mailbox(name: String) -> String {
    name + "-retry"
}

pub(crate) fn dead_letter(name: String) -> String {
    name + "-dead-letter"
}

pub(crate) fn group(name: String) -> String {
    name + "-consumer"
}

pub(crate) fn group_with_no(name: String, no: i32) -> String {
    group(name) + &no.to_string()
}

pub(crate) fn smail_to_kafka_record<'a>(
    name: &'a str,
    mail: &'a SendMail<'a>,
) -> FutureRecord<'a, [u8], [u8]> {
    let mut record = FutureRecord::to(name);
    if mail.key.is_some() {
        record = record.key(mail.key.clone().unwrap());
    }
    if mail.payload.is_some() {
        record = record.payload(mail.payload.clone().unwrap());
    }

    let mut header = OwnedHeaders::new();
    for (k, v) in mail.headers.iter() {
        header = header.insert(Header { key: k, value: *v });
    }

    record.headers(header)
}

pub(crate) fn rmail_to_kafka_record<'a>(
    name: &'a str,
    mail: &'a RecvMail,
) -> FutureRecord<'a, [u8], [u8]> {
    let mut record = FutureRecord::to(name);
    if mail.key().is_some() {
        record = record.key(mail.key().clone().unwrap());
    }
    if mail.payload().is_some() {
        record = record.payload(mail.payload().clone().unwrap());
    }

    let mut header = OwnedHeaders::new();
    for (k, v) in mail.header_iter() {
        header = header.insert(Header {
            key: k,
            value: v.clone(),
        });
    }

    record.headers(header)
}