use crate::{ReadState, RecvMail, RetryState, rmail_to_kafka_record};
use rdkafka::consumer::{CommitMode, Consumer, StreamConsumer};
use rdkafka::error::KafkaError;
use rdkafka::producer::FutureProducer;
use std::time::Duration;
pub struct Reader {
#[allow(dead_code)]
name: String,
dead_letter: String,
retry_name: String,
consumer: StreamConsumer,
retry: StreamConsumer,
producer: FutureProducer,
}
impl Reader {
pub(crate) fn new(
name: String,
dead_letter: String,
retry_name: String,
consumer: StreamConsumer,
retry: StreamConsumer,
producer: FutureProducer,
) -> Self {
Self {
name,
dead_letter,
retry_name,
consumer,
retry,
producer,
}
}
pub async fn read<P, Fut>(&self, process: P) -> Result<Option<ReadState>, KafkaError>
where
P: FnOnce(RecvMail) -> Fut,
Fut: Future<Output = (ReadState, RecvMail)>,
{
match self.consumer.recv().await {
Ok(m) => {
let mail = RecvMail::from(m.detach());
let (rs, mail) = process(mail).await;
let target = match rs {
ReadState::Fail => Some(&self.retry_name),
ReadState::DeadLetter => Some(&self.dead_letter),
_ => None,
};
self.rewrite(target, &mail).await?;
self.consumer.commit_message(&m, CommitMode::Sync)?;
Ok(Some(rs))
}
Err(KafkaError::PartitionEOF(_)) => Ok(None),
Err(e) => Err(e),
}
}
pub async fn retry<P, Fut>(&self, process: P) -> Result<Option<RetryState>, KafkaError>
where
P: FnOnce(RecvMail) -> Fut,
Fut: Future<Output = (RetryState, RecvMail)>,
{
match self.retry.recv().await {
Ok(m) => {
let mail = RecvMail::from(m.detach());
let (rs, mail) = process(mail).await;
let target = match rs {
RetryState::DeadLetter => Some(&self.dead_letter),
_ => None,
};
self.rewrite(target, &mail).await?;
self.retry.commit_message(&m, CommitMode::Sync)?;
Ok(Some(rs))
}
Err(KafkaError::PartitionEOF(_)) => Ok(None),
Err(e) => Err(e),
}
}
async fn rewrite<'a>(
&self,
target: Option<&'a String>,
mail: &'a RecvMail,
) -> Result<(), KafkaError> {
if let Some(target) = target {
let record = rmail_to_kafka_record(target, mail);
self.producer
.send(record, Duration::from_secs(3))
.await
.map_err(|e| e.0)?;
}
Ok(())
}
}