use crate::RDKafkaErrorCode;
use crate::mail::RecvMail;
use crate::mail::SendMail;
use rdkafka::admin::TopicResult;
use rdkafka::message::{Header, OwnedHeaders};
use rdkafka::producer::FutureRecord;
pub(crate) fn check_topic_result(
v: Vec<TopicResult>,
not: RDKafkaErrorCode,
) -> Result<(), RDKafkaErrorCode> {
for tr in v.iter() {
if let Err(t) = tr {
if t.1 != not {
return Err(t.1);
}
}
}
Ok(())
}
pub(crate) fn retry_mailbox(name: &str) -> String {
name.to_string() + "-retry"
}
pub(crate) fn decode_retry_mailbox(name: &str) -> &str {
name.trim_end_matches("-retry")
}
pub(crate) fn dead_letter(name: &str) -> String {
name.to_string() + "-dead-letter"
}
pub(crate) fn decode_dead_letter(name: &str) -> &str {
name.trim_end_matches("-dead-letter")
}
pub(crate) fn group(name: &str) -> String {
name.to_string() + "-consumer"
}
pub(crate) fn group_with_no(name: &str, no: i32) -> String {
group(name) + &no.to_string()
}
pub(crate) fn smail_to_kafka_record<'a>(
name: &'a str,
mail: &'a SendMail<'a>,
) -> FutureRecord<'a, [u8], [u8]> {
let mut record = FutureRecord::to(name);
if let Some(key) = mail.key {
record = record.key(key);
}
if let Some(payload) = mail.payload {
record = record.payload(payload);
}
if let Some(partition) = mail.partition {
record = record.partition(partition);
}
let mut header = OwnedHeaders::new();
for (k, v) in mail.headers.iter() {
header = header.insert(Header { key: k, value: *v });
}
record.headers(header)
}
pub(crate) fn rmail_to_kafka_record<'a>(
name: &'a str,
mail: &'a RecvMail,
) -> FutureRecord<'a, [u8], [u8]> {
let mut record = FutureRecord::to(name);
if let Some(key) = mail.key() {
record = record.key(key);
}
if let Some(payload) = mail.payload() {
record = record.payload(payload);
}
record = record.partition(mail.partition());
let mut header = OwnedHeaders::new();
for (k, v) in mail.header_iter() {
header = header.insert(Header {
key: k,
value: v.clone(),
});
}
record.headers(header)
}