use super::{WaeResult, types::*};
use serde::de::DeserializeOwned;
#[async_trait::async_trait]
pub trait ConsumerBackend: Send + Sync {
async fn receive_raw(&self) -> WaeResult<Option<ReceivedRawMessage>>;
async fn ack(&self, delivery_tag: u64) -> WaeResult<()>;
async fn nack(&self, delivery_tag: u64, requeue: bool) -> WaeResult<()>;
fn config(&self) -> &ConsumerConfig;
}
pub struct MessageConsumer {
backend: Box<dyn ConsumerBackend>,
}
impl MessageConsumer {
pub fn new(backend: Box<dyn ConsumerBackend>) -> Self {
Self { backend }
}
pub async fn receive<T: DeserializeOwned + Send>(&self) -> WaeResult<Option<ReceivedMessage<T>>> {
let raw = match self.backend.receive_raw().await? {
Some(r) => r,
None => return Ok(None),
};
let message = raw.message.into_typed()?;
Ok(Some(ReceivedMessage { message, delivery_tag: raw.delivery_tag, redelivery_count: raw.redelivery_count }))
}
pub async fn ack(&self, delivery_tag: u64) -> WaeResult<()> {
self.backend.ack(delivery_tag).await
}
pub async fn nack(&self, delivery_tag: u64, requeue: bool) -> WaeResult<()> {
self.backend.nack(delivery_tag, requeue).await
}
pub fn config(&self) -> &ConsumerConfig {
self.backend.config()
}
}