use rdkafka::Message;
use rdkafka::admin::TopicResult;
pub use rdkafka::error::KafkaError;
pub use rdkafka::error::RDKafkaErrorCode;
use rdkafka::message::{
BorrowedHeaders, Header, Headers, HeadersIter, OwnedHeaders, OwnedMessage, ToBytes,
};
use rdkafka::producer::FutureRecord;
use std::collections::HashMap;
pub use build::*;
pub use read::Reader;
pub use write::Writer;
pub mod build;
pub mod exclusive;
pub mod read;
pub mod shared;
pub mod write;
#[derive(Debug)]
pub enum ReadState {
Success,
Fail,
Discard,
DeadLetter,
}
#[derive(Debug)]
pub enum RetryState {
Success,
Discard,
DeadLetter,
}
#[derive(Debug, Clone)]
pub struct SendMail<'a> {
pub key: Option<&'a [u8]>,
pub payload: Option<&'a [u8]>,
pub headers: HashMap<&'a str, Option<&'a [u8]>>,
}
impl<'a> SendMail<'a> {
pub fn new() -> Self {
Self {
key: None,
payload: None,
headers: HashMap::new(),
}
}
pub fn key<K: ToBytes + ?Sized>(mut self, key: &'a K) -> Self {
self.key = Some(key.to_bytes());
self
}
pub fn payload<P: ToBytes + ?Sized>(mut self, p: &'a P) -> Self {
self.payload = Some(p.to_bytes());
self
}
pub fn headers(mut self, headers: HashMap<&'a str, Option<&'a [u8]>>) -> Self {
self.headers = headers;
self
}
pub fn add_header<V: ToBytes + ?Sized>(mut self, k: &'a str, v: Option<&'a V>) -> Self {
self.headers.insert(k, v.map(|v| v.to_bytes()));
self
}
}
#[derive(Debug, Clone)]
pub struct RecvMail {
pub(crate) msg: OwnedMessage,
}
impl RecvMail {
pub(crate) fn from(msg: OwnedMessage) -> Self {
Self { msg }
}
pub fn equal_key<K: ToBytes + ?Sized>(&self, key: &K) -> bool {
match self.msg.key() {
None => key.to_bytes().len() == 0,
Some(k) => k == key.to_bytes(),
}
}
pub fn key_string(&self) -> String {
match self.msg.key() {
None => String::new(),
Some(k) => String::from_utf8_lossy(k).to_string(),
}
}
pub fn key(&self) -> Option<&[u8]> {
self.msg.key()
}
pub fn payload(&self) -> Option<&[u8]> {
self.msg.payload()
}
pub fn partition(&self) -> i32 {
self.msg.partition()
}
pub fn offset(&self) -> i64 {
self.msg.offset()
}
pub fn header_iter(&self) -> HeadIterator<'_> {
match self.msg.headers() {
None => HeadIterator { iter: None },
Some(h) => HeadIterator {
iter: Some(h.as_borrowed().iter()),
},
}
}
}
pub struct HeadIterator<'a> {
iter: Option<HeadersIter<'a, BorrowedHeaders>>,
}
impl<'a> Iterator for HeadIterator<'a> {
type Item = (&'a str, Option<&'a [u8]>);
fn next(&mut self) -> Option<Self::Item> {
match self.iter.as_mut() {
None => None,
Some(iter) => {
let i = iter.next()?;
Some((i.key, i.value))
}
}
}
}
pub fn check_result(r: Result<Result<(), RDKafkaErrorCode>, KafkaError>) -> bool {
match r {
Ok(r) => match r {
Ok(_) => true,
_ => false,
},
Err(_) => false,
}
}
pub(crate) fn check_topic_result(
v: Vec<TopicResult>,
not: RDKafkaErrorCode,
) -> Result<(), RDKafkaErrorCode> {
for tr in v.iter() {
if let Err(t) = tr {
if t.1 != not {
return Err(t.1);
}
}
}
Ok(())
}
pub(crate) fn retry_mailbox(name: String) -> String {
name + "-retry"
}
pub(crate) fn dead_letter(name: String) -> String {
name + "-dead-letter"
}
pub(crate) fn group(name: String) -> String {
name + "-consumer"
}
pub(crate) fn group_with_no(name: String, no: i32) -> String {
group(name) + &no.to_string()
}
pub(crate) fn smail_to_kafka_record<'a>(
name: &'a str,
mail: &'a SendMail<'a>,
) -> FutureRecord<'a, [u8], [u8]> {
let mut record = FutureRecord::to(name);
if mail.key.is_some() {
record = record.key(mail.key.clone().unwrap());
}
if mail.payload.is_some() {
record = record.payload(mail.payload.clone().unwrap());
}
let mut header = OwnedHeaders::new();
for (k, v) in mail.headers.iter() {
header = header.insert(Header { key: k, value: *v });
}
record.headers(header)
}
pub(crate) fn rmail_to_kafka_record<'a>(
name: &'a str,
mail: &'a RecvMail,
) -> FutureRecord<'a, [u8], [u8]> {
let mut record = FutureRecord::to(name);
if mail.key().is_some() {
record = record.key(mail.key().clone().unwrap());
}
if mail.payload().is_some() {
record = record.payload(mail.payload().clone().unwrap());
}
let mut header = OwnedHeaders::new();
for (k, v) in mail.header_iter() {
header = header.insert(Header {
key: k,
value: v.clone(),
});
}
record.headers(header)
}