Skip to main content

wae_queue/
producers.rs

1//! 消息生产者实现
2
3use super::{WaeResult, types::*};
4use serde::Serialize;
5use std::time::Duration;
6
7/// 消息生产者后端 trait (dyn 兼容)
8#[async_trait::async_trait]
9pub trait ProducerBackend: Send + Sync {
10    /// 发送原始消息到指定队列
11    async fn send_raw(&self, queue: &str, message: &RawMessage) -> WaeResult<MessageId>;
12
13    /// 发送原始消息到默认队列
14    async fn send_raw_default(&self, message: &RawMessage) -> WaeResult<MessageId>;
15
16    /// 发送延迟消息
17    async fn send_raw_delayed(&self, queue: &str, message: &RawMessage, delay: Duration) -> WaeResult<MessageId>;
18
19    /// 批量发送消息
20    async fn send_raw_batch(&self, queue: &str, messages: &[RawMessage]) -> WaeResult<Vec<MessageId>>;
21
22    /// 获取生产者配置
23    fn config(&self) -> &ProducerConfig;
24}
25
26/// 消息生产者 (提供泛型封装)
27pub struct MessageProducer {
28    backend: Box<dyn ProducerBackend>,
29}
30
31impl MessageProducer {
32    /// 从后端创建生产者
33    pub fn new(backend: Box<dyn ProducerBackend>) -> Self {
34        Self { backend }
35    }
36
37    /// 发送消息到指定队列
38    pub async fn send<T: Serialize + Send + Sync>(&self, queue: &str, message: &Message<T>) -> WaeResult<MessageId> {
39        let raw = message.to_raw()?;
40        self.backend.send_raw(queue, &raw).await
41    }
42
43    /// 发送消息到默认队列
44    pub async fn send_default<T: Serialize + Send + Sync>(&self, message: &Message<T>) -> WaeResult<MessageId> {
45        let raw = message.to_raw()?;
46        self.backend.send_raw_default(&raw).await
47    }
48
49    /// 发送延迟消息
50    pub async fn send_delayed<T: Serialize + Send + Sync>(
51        &self,
52        queue: &str,
53        message: &Message<T>,
54        delay: Duration,
55    ) -> WaeResult<MessageId> {
56        let raw = message.to_raw()?;
57        self.backend.send_raw_delayed(queue, &raw, delay).await
58    }
59
60    /// 批量发送消息
61    pub async fn send_batch<T: Serialize + Send + Sync>(
62        &self,
63        queue: &str,
64        messages: &[Message<T>],
65    ) -> WaeResult<Vec<MessageId>> {
66        let raw_messages: Vec<RawMessage> = messages.iter().map(|m| m.to_raw()).collect::<WaeResult<_>>()?;
67        self.backend.send_raw_batch(queue, &raw_messages).await
68    }
69
70    /// 获取配置
71    pub fn config(&self) -> &ProducerConfig {
72        self.backend.config()
73    }
74}