use crate::traits::{
ConsumerError, MessageConsumer, MessageDisposition, MessagePublisher, PublisherError, Sent,
SentBatch,
};
use crate::CanonicalMessage;
use async_trait::async_trait;
use std::any::Any;
use std::sync::Arc;
use tokio::sync::Mutex;
pub struct ReaderPublisher {
consumer: Arc<Mutex<Box<dyn MessageConsumer>>>,
}
impl ReaderPublisher {
pub fn new(consumer: Box<dyn MessageConsumer>) -> Self {
Self {
consumer: Arc::new(Mutex::new(consumer)),
}
}
}
#[async_trait]
impl MessagePublisher for ReaderPublisher {
async fn send(&self, _message: CanonicalMessage) -> Result<Sent, PublisherError> {
let mut consumer = self.consumer.lock().await;
match consumer.receive().await {
Ok(received) => {
if let Err(e) = (received.commit)(MessageDisposition::Ack).await {
return Err(PublisherError::Retryable(anyhow::anyhow!(
"Failed to commit message in ReaderPublisher: {}",
e
)));
}
Ok(Sent::Response(received.message))
}
Err(e) => match e {
ConsumerError::EndOfStream => Err(PublisherError::NonRetryable(anyhow::anyhow!(e))),
_ => Err(PublisherError::Retryable(anyhow::anyhow!(e))),
},
}
}
async fn send_batch(
&self,
messages: Vec<CanonicalMessage>,
) -> Result<SentBatch, PublisherError> {
let count = messages.len();
if count == 0 {
return Ok(SentBatch::Ack);
}
let mut consumer = self.consumer.lock().await;
match consumer.receive_batch(count).await {
Ok(batch) => {
let received_count = batch.messages.len();
if received_count > 0 {
if let Err(e) =
(batch.commit)(vec![MessageDisposition::Ack; received_count]).await
{
return Err(PublisherError::Retryable(anyhow::anyhow!(
"Failed to commit batch in ReaderPublisher: {}",
e
)));
}
}
Ok(SentBatch::Ack)
}
Err(e) => match e {
ConsumerError::EndOfStream => Err(PublisherError::NonRetryable(anyhow::anyhow!(e))),
_ => Err(PublisherError::Retryable(anyhow::anyhow!(e))),
},
}
}
fn as_any(&self) -> &dyn Any {
self
}
}