kmailbox 0.1.5

A simple kafka mailbox
Documentation
use crate::RDKafkaErrorCode;
use crate::mail::RecvMail;
use crate::mail::SendMail;
use rdkafka::admin::TopicResult;
use rdkafka::message::{Header, OwnedHeaders};
use rdkafka::producer::FutureRecord;

pub(crate) fn check_topic_result(
    v: Vec<TopicResult>,
    not: RDKafkaErrorCode,
) -> Result<(), RDKafkaErrorCode> {
    for tr in v.iter() {
        if let Err(t) = tr {
            if t.1 != not {
                return Err(t.1);
            }
        }
    }
    Ok(())
}

pub(crate) fn retry_mailbox(name: &str) -> String {
    name.to_string() + "-retry"
}

pub(crate) fn decode_retry_mailbox(name: &str) -> &str {
    name.trim_end_matches("-retry")
}

pub(crate) fn dead_letter(name: &str) -> String {
    name.to_string() + "-dead-letter"
}

pub(crate) fn decode_dead_letter(name: &str) -> &str {
    name.trim_end_matches("-dead-letter")
}

pub(crate) fn group(name: &str) -> String {
    name.to_string() + "-consumer"
}

pub(crate) fn group_with_no(name: &str, no: i32) -> String {
    group(name) + &no.to_string()
}

pub(crate) fn smail_to_kafka_record<'a>(
    name: &'a str,
    mail: &'a SendMail<'a>,
) -> FutureRecord<'a, [u8], [u8]> {
    let mut record = FutureRecord::to(name);

    if let Some(key) = mail.key {
        record = record.key(key);
    }

    if let Some(payload) = mail.payload {
        record = record.payload(payload);
    }

    if let Some(partition) = mail.partition {
        record = record.partition(partition);
    }

    let mut header = OwnedHeaders::new();
    for (k, v) in mail.headers.iter() {
        header = header.insert(Header { key: k, value: *v });
    }

    record.headers(header)
}

pub(crate) fn rmail_to_kafka_record<'a>(
    name: &'a str,
    mail: &'a RecvMail,
) -> FutureRecord<'a, [u8], [u8]> {
    let mut record = FutureRecord::to(name);

    if let Some(key) = mail.key() {
        record = record.key(key);
    }

    if let Some(payload) = mail.payload() {
        record = record.payload(payload);
    }

    record = record.partition(mail.partition());

    let mut header = OwnedHeaders::new();
    for (k, v) in mail.header_iter() {
        header = header.insert(Header {
            key: k,
            value: v.clone(),
        });
    }

    record.headers(header)
}