use rdkafka::Message;
use rdkafka::message::{BorrowedHeaders, Headers, HeadersIter, OwnedMessage, ToBytes};
use std::collections::HashMap;
#[derive(Debug)]
pub enum ReadState {
Success,
Fail,
Discard,
DeadLetter,
}
#[derive(Debug)]
pub enum RetryState {
Success,
Discard,
DeadLetter,
}
#[derive(Debug, Clone)]
pub struct RecvMail {
pub(crate) msg: OwnedMessage,
}
impl RecvMail {
pub(crate) fn from(msg: OwnedMessage) -> Self {
Self { msg }
}
pub fn equal_key<K: ToBytes + ?Sized>(&self, key: &K) -> bool {
match self.msg.key() {
None => key.to_bytes().len() == 0,
Some(k) => k == key.to_bytes(),
}
}
pub fn key_string(&self) -> String {
match self.msg.key() {
None => String::new(),
Some(k) => String::from_utf8_lossy(k).to_string(),
}
}
pub fn key(&self) -> Option<&[u8]> {
self.msg.key()
}
pub fn payload(&self) -> Option<&[u8]> {
self.msg.payload()
}
pub fn partition(&self) -> i32 {
self.msg.partition()
}
pub fn offset(&self) -> i64 {
self.msg.offset()
}
pub fn header_iter(&self) -> HeadIterator<'_> {
match self.msg.headers() {
None => HeadIterator { iter: None },
Some(h) => HeadIterator {
iter: Some(h.as_borrowed().iter()),
},
}
}
}
pub struct HeadIterator<'a> {
iter: Option<HeadersIter<'a, BorrowedHeaders>>,
}
impl<'a> Iterator for HeadIterator<'a> {
type Item = (&'a str, Option<&'a [u8]>);
fn next(&mut self) -> Option<Self::Item> {
match self.iter.as_mut() {
None => None,
Some(iter) => {
let i = iter.next()?;
Some((i.key, i.value))
}
}
}
}
#[derive(Debug, Clone)]
pub struct SendMail<'a> {
pub key: Option<&'a [u8]>,
pub payload: Option<&'a [u8]>,
pub headers: HashMap<&'a str, Option<&'a [u8]>>,
pub partition: Option<i32>,
}
impl<'a> SendMail<'a> {
pub fn new() -> Self {
Self {
key: None,
payload: None,
headers: HashMap::new(),
partition: None,
}
}
pub fn key<K: ToBytes + ?Sized>(mut self, key: Option<&'a K>) -> Self {
self.key = key.map(|k| k.to_bytes());
self
}
pub fn payload<P: ToBytes + ?Sized>(mut self, p: Option<&'a P>) -> Self {
self.payload = p.map(|p| p.to_bytes());
self
}
pub fn headers(mut self, headers: HashMap<&'a str, Option<&'a [u8]>>) -> Self {
self.headers = headers;
self
}
pub fn partition(mut self, partition: Option<i32>) -> Self {
self.partition = partition;
self
}
pub fn add_header<V: ToBytes + ?Sized>(mut self, k: &'a str, v: Option<&'a V>) -> Self {
self.headers.insert(k, v.map(|v| v.to_bytes()));
self
}
}
#[derive(Clone, Debug)]
pub struct MetaMail {
pub name: String,
pub partitions: Vec<(i32, usize)>,
}