1use super::{WaeResult, types::*};
4use serde::de::DeserializeOwned;
5
6#[async_trait::async_trait]
8pub trait ConsumerBackend: Send + Sync {
9 async fn receive_raw(&self) -> WaeResult<Option<ReceivedRawMessage>>;
11
12 async fn ack(&self, delivery_tag: u64) -> WaeResult<()>;
14
15 async fn nack(&self, delivery_tag: u64, requeue: bool) -> WaeResult<()>;
17
18 fn config(&self) -> &ConsumerConfig;
20}
21
22pub struct MessageConsumer {
24 backend: Box<dyn ConsumerBackend>,
25}
26
27impl MessageConsumer {
28 pub fn new(backend: Box<dyn ConsumerBackend>) -> Self {
30 Self { backend }
31 }
32
33 pub async fn receive<T: DeserializeOwned + Send>(&self) -> WaeResult<Option<ReceivedMessage<T>>> {
35 let raw = match self.backend.receive_raw().await? {
36 Some(r) => r,
37 None => return Ok(None),
38 };
39
40 let message = raw.message.into_typed()?;
41 Ok(Some(ReceivedMessage { message, delivery_tag: raw.delivery_tag, redelivery_count: raw.redelivery_count }))
42 }
43
44 pub async fn ack(&self, delivery_tag: u64) -> WaeResult<()> {
46 self.backend.ack(delivery_tag).await
47 }
48
49 pub async fn nack(&self, delivery_tag: u64, requeue: bool) -> WaeResult<()> {
51 self.backend.nack(delivery_tag, requeue).await
52 }
53
54 pub fn config(&self) -> &ConsumerConfig {
56 self.backend.config()
57 }
58}