use crate::config::{CommonConfig, ReadConfig, WriteConfig};
use crate::name::Name;
use rdkafka::consumer::{CommitMode, Consumer, StreamConsumer};
use rdkafka::error::KafkaError;
use rdkafka::message::{BorrowedMessage, OwnedMessage};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::{Message, TopicPartitionList};
use std::time::Duration;
pub struct Reader {
name: Name,
normal: StreamConsumer,
retry: StreamConsumer,
writer: FutureProducer,
}
impl Reader {
pub fn new<S: AsRef<str>>(
read: &ReadConfig,
write: &WriteConfig,
name: S,
id: Option<i32>,
) -> Result<Self, KafkaError> {
let name = Name::new(name.as_ref());
let (group1, group2) = if let Some(id) = id {
(name.group_with_id(id), name.retry_group_with_id(id))
} else {
(name.group(), name.retry_group())
};
let normal: StreamConsumer = read.clone().set_group_id(group1).inner.create()?;
let retry: StreamConsumer = read.clone().set_group_id(group2).inner.create()?;
let writer: FutureProducer = write.inner.create()?;
if let Some(id) = id {
{
let mut tpl = TopicPartitionList::new();
tpl.add_partition(&name.name, id);
normal.assign(&tpl)?;
}
{
let mut tpl = TopicPartitionList::new();
tpl.add_partition(&name.retry, id);
retry.assign(&tpl)?;
}
} else {
normal.subscribe(&[&name.name])?;
retry.subscribe(&[&name.retry])?;
}
Ok(Self {
name,
normal,
retry,
writer,
})
}
pub fn new_with_brokers<V: AsRef<str>>(
brokers: &[V],
name: V,
id: Option<i32>,
) -> Result<Self, KafkaError> {
let mut c = ReadConfig::default();
c.set_brokers(brokers);
let mut c2 = WriteConfig::default();
c2.set_brokers(brokers);
Reader::new(&c, &c2, name, id)
}
pub async fn read<P, Fut>(&self, p: P) -> Result<Option<ReadState>, KafkaError>
where
P: Fn(OwnedMessage) -> Fut,
Fut: Future<Output = ReadState>,
{
match self.normal.recv().await {
Ok(bm) => {
let rs = p(bm.detach()).await;
let target = match rs {
ReadState::Fail => Some(&self.name.retry),
ReadState::DeadLetter => Some(&self.name.dead),
_ => None,
};
self.resend(target, &bm, rs.is_dead()).await?;
self.normal.commit_message(&bm, CommitMode::Sync)?;
Ok(Some(rs))
}
Err(KafkaError::PartitionEOF(_)) => Ok(None),
Err(e) => Err(e),
}
}
pub async fn retry<P, Fut>(&self, p: P) -> Result<Option<RetryState>, KafkaError>
where
P: Fn(OwnedMessage) -> Fut,
Fut: Future<Output = RetryState>,
{
match self.retry.recv().await {
Ok(bm) => {
let rs = p(bm.detach()).await;
let target = match rs {
RetryState::DeadLetter => Some(&self.name.dead),
_ => None,
};
self.resend(target, &bm, rs.is_dead()).await?;
self.retry.commit_message(&bm, CommitMode::Sync)?;
Ok(Some(rs))
}
Err(KafkaError::PartitionEOF(_)) => Ok(None),
Err(e) => Err(e),
}
}
}
impl Reader {
async fn resend<'a>(
&self,
target: Option<&'a String>,
bm: &BorrowedMessage<'a>,
is_dead: bool,
) -> Result<(), KafkaError> {
if let Some(target) = target {
let mut record = FutureRecord::to(target);
record.partition = if is_dead {
Some(0)
} else {
Some(bm.partition())
};
record.payload = bm.payload();
record.key = bm.key();
record.headers = bm.headers().map(|h| h.detach());
self.writer
.send(record, Duration::from_secs(3))
.await
.map_err(|e| e.0)?;
}
Ok(())
}
}
#[derive(Debug)]
pub enum ReadState {
Success,
Fail,
Discard,
DeadLetter,
}
impl ReadState {
pub fn is_success(&self) -> bool {
matches!(self, ReadState::Success)
}
pub fn is_dead(&self) -> bool {
matches!(self, ReadState::DeadLetter)
}
pub fn is_discard(&self) -> bool {
matches!(self, ReadState::Discard)
}
pub fn is_fail(&self) -> bool {
matches!(self, ReadState::Fail)
}
}
#[derive(Debug)]
pub enum RetryState {
Success,
Discard,
DeadLetter,
}
impl RetryState {
pub fn is_success(&self) -> bool {
matches!(self, RetryState::Success)
}
pub fn is_dead(&self) -> bool {
matches!(self, RetryState::DeadLetter)
}
pub fn is_discard(&self) -> bool {
matches!(self, RetryState::Discard)
}
}