Skip to main content

wae_queue/
consumers.rs

1//! 消息消费者实现
2
3use super::{WaeResult, types::*};
4use serde::de::DeserializeOwned;
5
6/// 消息消费者后端 trait (dyn 兼容)
7#[async_trait::async_trait]
8pub trait ConsumerBackend: Send + Sync {
9    /// 接收原始消息
10    async fn receive_raw(&self) -> WaeResult<Option<ReceivedRawMessage>>;
11
12    /// 确认消息
13    async fn ack(&self, delivery_tag: u64) -> WaeResult<()>;
14
15    /// 拒绝消息
16    async fn nack(&self, delivery_tag: u64, requeue: bool) -> WaeResult<()>;
17
18    /// 获取消费者配置
19    fn config(&self) -> &ConsumerConfig;
20}
21
22/// 消息消费者 (提供泛型封装)
23pub struct MessageConsumer {
24    backend: Box<dyn ConsumerBackend>,
25}
26
27impl MessageConsumer {
28    /// 从后端创建消费者
29    pub fn new(backend: Box<dyn ConsumerBackend>) -> Self {
30        Self { backend }
31    }
32
33    /// 接收消息
34    pub async fn receive<T: DeserializeOwned + Send>(&self) -> WaeResult<Option<ReceivedMessage<T>>> {
35        let raw = match self.backend.receive_raw().await? {
36            Some(r) => r,
37            None => return Ok(None),
38        };
39
40        let message = raw.message.into_typed()?;
41        Ok(Some(ReceivedMessage { message, delivery_tag: raw.delivery_tag, redelivery_count: raw.redelivery_count }))
42    }
43
44    /// 确认消息
45    pub async fn ack(&self, delivery_tag: u64) -> WaeResult<()> {
46        self.backend.ack(delivery_tag).await
47    }
48
49    /// 拒绝消息
50    pub async fn nack(&self, delivery_tag: u64, requeue: bool) -> WaeResult<()> {
51        self.backend.nack(delivery_tag, requeue).await
52    }
53
54    /// 获取配置
55    pub fn config(&self) -> &ConsumerConfig {
56        self.backend.config()
57    }
58}