Skip to main content

wae_queue/
lib.rs

1//! WAE Queue - 消息队列抽象层
2//!
3//! 提供统一的消息队列能力抽象,支持多种消息队列后端。
4//!
5//! 深度融合 tokio 运行时,所有 API 都是异步优先设计。
6//! 微服务架构友好,支持消息确认、重试、延迟队列等特性。
7
8#![warn(missing_docs)]
9
10use serde::{Serialize, de::DeserializeOwned};
11use std::{fmt, time::Duration};
12
13/// 消息队列错误类型
14#[derive(Debug)]
15pub enum QueueError {
16    /// 连接失败
17    ConnectionFailed(String),
18
19    /// 序列化失败
20    SerializationFailed(String),
21
22    /// 反序列化失败
23    DeserializationFailed(String),
24
25    /// 队列不存在
26    QueueNotFound(String),
27
28    /// 消息发送失败
29    SendFailed(String),
30
31    /// 消息接收失败
32    ReceiveFailed(String),
33
34    /// 消息确认失败
35    AckFailed(String),
36
37    /// 操作超时
38    Timeout(String),
39
40    /// 服务内部错误
41    Internal(String),
42}
43
44impl fmt::Display for QueueError {
45    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46        match self {
47            QueueError::ConnectionFailed(msg) => write!(f, "Queue connection failed: {}", msg),
48            QueueError::SerializationFailed(msg) => write!(f, "Serialization failed: {}", msg),
49            QueueError::DeserializationFailed(msg) => write!(f, "Deserialization failed: {}", msg),
50            QueueError::QueueNotFound(msg) => write!(f, "Queue not found: {}", msg),
51            QueueError::SendFailed(msg) => write!(f, "Failed to send message: {}", msg),
52            QueueError::ReceiveFailed(msg) => write!(f, "Failed to receive message: {}", msg),
53            QueueError::AckFailed(msg) => write!(f, "Failed to acknowledge message: {}", msg),
54            QueueError::Timeout(msg) => write!(f, "Operation timeout: {}", msg),
55            QueueError::Internal(msg) => write!(f, "Queue internal error: {}", msg),
56        }
57    }
58}
59
60impl std::error::Error for QueueError {}
61
62/// 消息队列操作结果类型
63pub type QueueResult<T> = Result<T, QueueError>;
64
65/// 消息 ID 类型
66pub type MessageId = String;
67
68/// 队列名称类型
69pub type QueueName = String;
70
71/// 消息元数据
72#[derive(Debug, Clone, Default)]
73pub struct MessageMetadata {
74    /// 消息 ID
75    pub id: Option<MessageId>,
76    /// 关联 ID (用于消息关联)
77    pub correlation_id: Option<String>,
78    /// 回复队列
79    pub reply_to: Option<QueueName>,
80    /// 内容类型
81    pub content_type: Option<String>,
82    /// 时间戳
83    pub timestamp: Option<u64>,
84    /// 优先级 (0-9)
85    pub priority: Option<u8>,
86    /// 过期时间 (毫秒)
87    pub expiration: Option<u64>,
88    /// 自定义头信息
89    pub headers: std::collections::HashMap<String, String>,
90}
91
92/// 原始消息 (字节形式)
93#[derive(Debug, Clone)]
94pub struct RawMessage {
95    /// 消息数据
96    pub data: Vec<u8>,
97    /// 消息元数据
98    pub metadata: MessageMetadata,
99}
100
101impl RawMessage {
102    /// 创建新消息
103    pub fn new(data: Vec<u8>) -> Self {
104        Self { data, metadata: MessageMetadata::default() }
105    }
106
107    /// 设置关联 ID
108    pub fn with_correlation_id(mut self, id: impl Into<String>) -> Self {
109        self.metadata.correlation_id = Some(id.into());
110        self
111    }
112
113    /// 设置回复队列
114    pub fn with_reply_to(mut self, queue: impl Into<String>) -> Self {
115        self.metadata.reply_to = Some(queue.into());
116        self
117    }
118
119    /// 设置优先级
120    pub fn with_priority(mut self, priority: u8) -> Self {
121        self.metadata.priority = Some(priority.min(9));
122        self
123    }
124
125    /// 设置过期时间
126    pub fn with_expiration(mut self, ms: u64) -> Self {
127        self.metadata.expiration = Some(ms);
128        self
129    }
130
131    /// 添加自定义头
132    pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
133        self.metadata.headers.insert(key.into(), value.into());
134        self
135    }
136}
137
138/// 消息封装 (泛型)
139#[derive(Debug, Clone)]
140pub struct Message<T> {
141    /// 消息体
142    pub payload: T,
143    /// 消息元数据
144    pub metadata: MessageMetadata,
145}
146
147impl<T> Message<T> {
148    /// 创建新消息
149    pub fn new(payload: T) -> Self {
150        Self { payload, metadata: MessageMetadata::default() }
151    }
152
153    /// 设置关联 ID
154    pub fn with_correlation_id(mut self, id: impl Into<String>) -> Self {
155        self.metadata.correlation_id = Some(id.into());
156        self
157    }
158
159    /// 设置回复队列
160    pub fn with_reply_to(mut self, queue: impl Into<String>) -> Self {
161        self.metadata.reply_to = Some(queue.into());
162        self
163    }
164
165    /// 设置优先级
166    pub fn with_priority(mut self, priority: u8) -> Self {
167        self.metadata.priority = Some(priority.min(9));
168        self
169    }
170
171    /// 设置过期时间
172    pub fn with_expiration(mut self, ms: u64) -> Self {
173        self.metadata.expiration = Some(ms);
174        self
175    }
176
177    /// 添加自定义头
178    pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
179        self.metadata.headers.insert(key.into(), value.into());
180        self
181    }
182
183    /// 序列化为原始消息
184    pub fn into_raw(self) -> QueueResult<RawMessage>
185    where
186        T: Serialize,
187    {
188        let data = serde_json::to_vec(&self.payload).map_err(|e| QueueError::SerializationFailed(e.to_string()))?;
189        Ok(RawMessage { data, metadata: self.metadata })
190    }
191
192    /// 序列化为原始消息 (引用版本)
193    pub fn to_raw(&self) -> QueueResult<RawMessage>
194    where
195        T: Serialize,
196    {
197        let data = serde_json::to_vec(&self.payload).map_err(|e| QueueError::SerializationFailed(e.to_string()))?;
198        Ok(RawMessage { data, metadata: self.metadata.clone() })
199    }
200}
201
202impl RawMessage {
203    /// 反序列化为泛型消息
204    pub fn into_typed<T: DeserializeOwned>(self) -> QueueResult<Message<T>> {
205        let payload = serde_json::from_slice(&self.data).map_err(|e| QueueError::DeserializationFailed(e.to_string()))?;
206        Ok(Message { payload, metadata: self.metadata })
207    }
208}
209
210/// 接收到的原始消息 (带确认能力)
211#[derive(Debug)]
212pub struct ReceivedRawMessage {
213    /// 消息内容
214    pub message: RawMessage,
215    /// 消息 ID (用于确认)
216    pub delivery_tag: u64,
217    /// 重投递次数
218    pub redelivery_count: u32,
219}
220
221/// 接收到的消息 (泛型)
222#[derive(Debug)]
223pub struct ReceivedMessage<T> {
224    /// 消息内容
225    pub message: Message<T>,
226    /// 消息 ID (用于确认)
227    pub delivery_tag: u64,
228    /// 重投递次数
229    pub redelivery_count: u32,
230}
231
232/// 队列配置
233#[derive(Debug, Clone)]
234pub struct QueueConfig {
235    /// 队列名称
236    pub name: QueueName,
237    /// 是否持久化
238    pub durable: bool,
239    /// 是否自动删除 (当没有消费者时)
240    pub auto_delete: bool,
241    /// 最大消息数
242    pub max_messages: Option<u64>,
243    /// 最大消息大小 (字节)
244    pub max_message_size: Option<u64>,
245    /// 消息存活时间 (毫秒)
246    pub message_ttl: Option<u64>,
247    /// 死信队列
248    pub dead_letter_queue: Option<QueueName>,
249}
250
251impl QueueConfig {
252    /// 创建新的队列配置
253    pub fn new(name: impl Into<String>) -> Self {
254        Self {
255            name: name.into(),
256            durable: true,
257            auto_delete: false,
258            max_messages: None,
259            max_message_size: None,
260            message_ttl: None,
261            dead_letter_queue: None,
262        }
263    }
264
265    /// 设置持久化
266    pub fn durable(mut self, durable: bool) -> Self {
267        self.durable = durable;
268        self
269    }
270
271    /// 设置自动删除
272    pub fn auto_delete(mut self, auto_delete: bool) -> Self {
273        self.auto_delete = auto_delete;
274        self
275    }
276
277    /// 设置最大消息数
278    pub fn max_messages(mut self, max: u64) -> Self {
279        self.max_messages = Some(max);
280        self
281    }
282
283    /// 设置消息 TTL
284    pub fn message_ttl(mut self, ttl_ms: u64) -> Self {
285        self.message_ttl = Some(ttl_ms);
286        self
287    }
288
289    /// 设置死信队列
290    pub fn dead_letter_queue(mut self, queue: impl Into<String>) -> Self {
291        self.dead_letter_queue = Some(queue.into());
292        self
293    }
294}
295
296/// 生产者配置
297#[derive(Debug, Clone)]
298pub struct ProducerConfig {
299    /// 默认队列
300    pub default_queue: Option<QueueName>,
301    /// 消息确认超时
302    pub confirm_timeout: Duration,
303    /// 重试次数
304    pub retry_count: u32,
305    /// 重试间隔
306    pub retry_interval: Duration,
307}
308
309impl Default for ProducerConfig {
310    fn default() -> Self {
311        Self {
312            default_queue: None,
313            confirm_timeout: Duration::from_secs(5),
314            retry_count: 3,
315            retry_interval: Duration::from_millis(100),
316        }
317    }
318}
319
320/// 消费者配置
321#[derive(Debug, Clone)]
322pub struct ConsumerConfig {
323    /// 队列名称
324    pub queue: QueueName,
325    /// 消费者标签
326    pub consumer_tag: Option<String>,
327    /// 是否自动确认
328    pub auto_ack: bool,
329    /// 预取数量
330    pub prefetch_count: u16,
331    /// 是否独占
332    pub exclusive: bool,
333}
334
335impl ConsumerConfig {
336    /// 创建新的消费者配置
337    pub fn new(queue: impl Into<String>) -> Self {
338        Self { queue: queue.into(), consumer_tag: None, auto_ack: false, prefetch_count: 10, exclusive: false }
339    }
340
341    /// 设置自动确认
342    pub fn auto_ack(mut self, auto_ack: bool) -> Self {
343        self.auto_ack = auto_ack;
344        self
345    }
346
347    /// 设置预取数量
348    pub fn prefetch(mut self, count: u16) -> Self {
349        self.prefetch_count = count;
350        self
351    }
352}
353
354/// 消息生产者后端 trait (dyn 兼容)
355#[async_trait::async_trait]
356pub trait ProducerBackend: Send + Sync {
357    /// 发送原始消息到指定队列
358    async fn send_raw(&self, queue: &str, message: &RawMessage) -> QueueResult<MessageId>;
359
360    /// 发送原始消息到默认队列
361    async fn send_raw_default(&self, message: &RawMessage) -> QueueResult<MessageId>;
362
363    /// 发送延迟消息
364    async fn send_raw_delayed(&self, queue: &str, message: &RawMessage, delay: Duration) -> QueueResult<MessageId>;
365
366    /// 批量发送消息
367    async fn send_raw_batch(&self, queue: &str, messages: &[RawMessage]) -> QueueResult<Vec<MessageId>>;
368
369    /// 获取生产者配置
370    fn config(&self) -> &ProducerConfig;
371}
372
373/// 消息生产者 (提供泛型封装)
374pub struct MessageProducer {
375    backend: Box<dyn ProducerBackend>,
376}
377
378impl MessageProducer {
379    /// 从后端创建生产者
380    pub fn new(backend: Box<dyn ProducerBackend>) -> Self {
381        Self { backend }
382    }
383
384    /// 发送消息到指定队列
385    pub async fn send<T: Serialize + Send + Sync>(&self, queue: &str, message: &Message<T>) -> QueueResult<MessageId> {
386        let raw = message.to_raw()?;
387        self.backend.send_raw(queue, &raw).await
388    }
389
390    /// 发送消息到默认队列
391    pub async fn send_default<T: Serialize + Send + Sync>(&self, message: &Message<T>) -> QueueResult<MessageId> {
392        let raw = message.to_raw()?;
393        self.backend.send_raw_default(&raw).await
394    }
395
396    /// 发送延迟消息
397    pub async fn send_delayed<T: Serialize + Send + Sync>(
398        &self,
399        queue: &str,
400        message: &Message<T>,
401        delay: Duration,
402    ) -> QueueResult<MessageId> {
403        let raw = message.to_raw()?;
404        self.backend.send_raw_delayed(queue, &raw, delay).await
405    }
406
407    /// 批量发送消息
408    pub async fn send_batch<T: Serialize + Send + Sync>(
409        &self,
410        queue: &str,
411        messages: &[Message<T>],
412    ) -> QueueResult<Vec<MessageId>> {
413        let raw_messages: Vec<RawMessage> = messages.iter().map(|m| m.to_raw()).collect::<QueueResult<_>>()?;
414        self.backend.send_raw_batch(queue, &raw_messages).await
415    }
416
417    /// 获取配置
418    pub fn config(&self) -> &ProducerConfig {
419        self.backend.config()
420    }
421}
422
423/// 消息消费者后端 trait (dyn 兼容)
424#[async_trait::async_trait]
425pub trait ConsumerBackend: Send + Sync {
426    /// 接收原始消息
427    async fn receive_raw(&self) -> QueueResult<Option<ReceivedRawMessage>>;
428
429    /// 确认消息
430    async fn ack(&self, delivery_tag: u64) -> QueueResult<()>;
431
432    /// 拒绝消息
433    async fn nack(&self, delivery_tag: u64, requeue: bool) -> QueueResult<()>;
434
435    /// 获取消费者配置
436    fn config(&self) -> &ConsumerConfig;
437}
438
439/// 消息消费者 (提供泛型封装)
440pub struct MessageConsumer {
441    backend: Box<dyn ConsumerBackend>,
442}
443
444impl MessageConsumer {
445    /// 从后端创建消费者
446    pub fn new(backend: Box<dyn ConsumerBackend>) -> Self {
447        Self { backend }
448    }
449
450    /// 接收消息
451    pub async fn receive<T: DeserializeOwned + Send>(&self) -> QueueResult<Option<ReceivedMessage<T>>> {
452        let raw = match self.backend.receive_raw().await? {
453            Some(r) => r,
454            None => return Ok(None),
455        };
456
457        let message = raw.message.into_typed()?;
458        Ok(Some(ReceivedMessage { message, delivery_tag: raw.delivery_tag, redelivery_count: raw.redelivery_count }))
459    }
460
461    /// 确认消息
462    pub async fn ack(&self, delivery_tag: u64) -> QueueResult<()> {
463        self.backend.ack(delivery_tag).await
464    }
465
466    /// 拒绝消息
467    pub async fn nack(&self, delivery_tag: u64, requeue: bool) -> QueueResult<()> {
468        self.backend.nack(delivery_tag, requeue).await
469    }
470
471    /// 获取配置
472    pub fn config(&self) -> &ConsumerConfig {
473        self.backend.config()
474    }
475}
476
477/// 队列管理 trait
478#[async_trait::async_trait]
479pub trait QueueManager: Send + Sync {
480    /// 声明队列
481    async fn declare_queue(&self, config: &QueueConfig) -> QueueResult<()>;
482
483    /// 删除队列
484    async fn delete_queue(&self, name: &str) -> QueueResult<()>;
485
486    /// 检查队列是否存在
487    async fn queue_exists(&self, name: &str) -> QueueResult<bool>;
488
489    /// 获取队列消息数量
490    async fn queue_message_count(&self, name: &str) -> QueueResult<u64>;
491
492    /// 清空队列
493    async fn purge_queue(&self, name: &str) -> QueueResult<u64>;
494}
495
496/// 消息队列服务 trait
497
498pub trait QueueService: Send + Sync {
499    /// 创建生产者
500    async fn create_producer(&self, config: ProducerConfig) -> QueueResult<MessageProducer>;
501
502    /// 创建消费者
503    async fn create_consumer(&self, config: ConsumerConfig) -> QueueResult<MessageConsumer>;
504
505    /// 获取队列管理器
506    fn manager(&self) -> &dyn QueueManager;
507
508    /// 关闭连接
509    async fn close(&self) -> QueueResult<()>;
510}
511
512/// 内存队列实现
513pub mod memory {
514    use super::*;
515    use std::{
516        collections::{HashMap, VecDeque},
517        sync::Arc,
518    };
519    use tokio::sync::RwLock;
520
521    /// 内存队列存储
522    struct QueueStorage {
523        messages: VecDeque<(u64, Vec<u8>, MessageMetadata)>,
524        next_delivery_tag: u64,
525    }
526
527    impl QueueStorage {
528        fn new() -> Self {
529            Self { messages: VecDeque::new(), next_delivery_tag: 1 }
530        }
531    }
532
533    /// 内存队列管理器
534    pub struct MemoryQueueManager {
535        queues: Arc<RwLock<HashMap<String, QueueStorage>>>,
536        configs: Arc<RwLock<HashMap<String, QueueConfig>>>,
537    }
538
539    impl MemoryQueueManager {
540        /// 创建新的内存队列管理器
541        pub fn new() -> Self {
542            Self { queues: Arc::new(RwLock::new(HashMap::new())), configs: Arc::new(RwLock::new(HashMap::new())) }
543        }
544    }
545
546    impl Default for MemoryQueueManager {
547        fn default() -> Self {
548            Self::new()
549        }
550    }
551
552    #[async_trait::async_trait]
553    impl QueueManager for MemoryQueueManager {
554        async fn declare_queue(&self, config: &QueueConfig) -> QueueResult<()> {
555            let mut queues = self.queues.write().await;
556            let mut configs = self.configs.write().await;
557
558            if !queues.contains_key(&config.name) {
559                queues.insert(config.name.clone(), QueueStorage::new());
560            }
561            configs.insert(config.name.clone(), config.clone());
562            Ok(())
563        }
564
565        async fn delete_queue(&self, name: &str) -> QueueResult<()> {
566            let mut queues = self.queues.write().await;
567            let mut configs = self.configs.write().await;
568            queues.remove(name);
569            configs.remove(name);
570            Ok(())
571        }
572
573        async fn queue_exists(&self, name: &str) -> QueueResult<bool> {
574            let queues = self.queues.read().await;
575            Ok(queues.contains_key(name))
576        }
577
578        async fn queue_message_count(&self, name: &str) -> QueueResult<u64> {
579            let queues = self.queues.read().await;
580            Ok(queues.get(name).map(|q| q.messages.len() as u64).unwrap_or(0))
581        }
582
583        async fn purge_queue(&self, name: &str) -> QueueResult<u64> {
584            let mut queues = self.queues.write().await;
585            if let Some(queue) = queues.get_mut(name) {
586                let count = queue.messages.len() as u64;
587                queue.messages.clear();
588                return Ok(count);
589            }
590            Ok(0)
591        }
592    }
593
594    /// 内存生产者后端
595    pub struct MemoryProducerBackend {
596        config: ProducerConfig,
597        queues: Arc<RwLock<HashMap<String, QueueStorage>>>,
598        manager: Arc<MemoryQueueManager>,
599    }
600
601    impl MemoryProducerBackend {
602        /// 创建新的内存生产者后端
603        pub fn new(config: ProducerConfig, manager: Arc<MemoryQueueManager>) -> Self {
604            Self { config, queues: manager.queues.clone(), manager }
605        }
606    }
607
608    #[async_trait::async_trait]
609    impl ProducerBackend for MemoryProducerBackend {
610        async fn send_raw(&self, queue: &str, message: &RawMessage) -> QueueResult<MessageId> {
611            self.manager.declare_queue(&QueueConfig::new(queue)).await?;
612
613            let id = uuid::Uuid::new_v4().to_string();
614            let mut metadata = message.metadata.clone();
615            metadata.id = Some(id.clone());
616            metadata.timestamp =
617                Some(std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as u64);
618
619            let mut queues = self.queues.write().await;
620            if let Some(q) = queues.get_mut(queue) {
621                q.messages.push_back((q.next_delivery_tag, message.data.clone(), metadata));
622                q.next_delivery_tag += 1;
623            }
624            Ok(id)
625        }
626
627        async fn send_raw_default(&self, message: &RawMessage) -> QueueResult<MessageId> {
628            let queue = self
629                .config
630                .default_queue
631                .as_ref()
632                .ok_or_else(|| QueueError::QueueNotFound("No default queue configured".into()))?;
633            self.send_raw(queue, message).await
634        }
635
636        async fn send_raw_delayed(&self, queue: &str, message: &RawMessage, delay: Duration) -> QueueResult<MessageId> {
637            tokio::time::sleep(delay).await;
638            self.send_raw(queue, message).await
639        }
640
641        async fn send_raw_batch(&self, queue: &str, messages: &[RawMessage]) -> QueueResult<Vec<MessageId>> {
642            let mut ids = Vec::with_capacity(messages.len());
643            for msg in messages {
644                ids.push(self.send_raw(queue, msg).await?);
645            }
646            Ok(ids)
647        }
648
649        fn config(&self) -> &ProducerConfig {
650            &self.config
651        }
652    }
653
654    /// 内存消费者后端
655    pub struct MemoryConsumerBackend {
656        config: ConsumerConfig,
657        queues: Arc<RwLock<HashMap<String, QueueStorage>>>,
658    }
659
660    impl MemoryConsumerBackend {
661        /// 创建新的内存消费者后端
662        pub fn new(config: ConsumerConfig, manager: Arc<MemoryQueueManager>) -> Self {
663            Self { config, queues: manager.queues.clone() }
664        }
665    }
666
667    #[async_trait::async_trait]
668    impl ConsumerBackend for MemoryConsumerBackend {
669        async fn receive_raw(&self) -> QueueResult<Option<ReceivedRawMessage>> {
670            let mut queues = self.queues.write().await;
671            if let Some(queue) = queues.get_mut(&self.config.queue) {
672                if let Some((delivery_tag, data, metadata)) = queue.messages.pop_front() {
673                    let message = RawMessage { data, metadata };
674                    return Ok(Some(ReceivedRawMessage { message, delivery_tag, redelivery_count: 0 }));
675                }
676            }
677            Ok(None)
678        }
679
680        async fn ack(&self, _delivery_tag: u64) -> QueueResult<()> {
681            Ok(())
682        }
683
684        async fn nack(&self, _delivery_tag: u64, _requeue: bool) -> QueueResult<()> {
685            Ok(())
686        }
687
688        fn config(&self) -> &ConsumerConfig {
689            &self.config
690        }
691    }
692
693    /// 内存队列服务
694    pub struct MemoryQueueService {
695        manager: Arc<MemoryQueueManager>,
696    }
697
698    impl MemoryQueueService {
699        /// 创建新的内存队列服务
700        pub fn new() -> Self {
701            Self { manager: Arc::new(MemoryQueueManager::new()) }
702        }
703    }
704
705    impl Default for MemoryQueueService {
706        fn default() -> Self {
707            Self::new()
708        }
709    }
710
711    impl QueueService for MemoryQueueService {
712        async fn create_producer(&self, config: ProducerConfig) -> QueueResult<MessageProducer> {
713            Ok(MessageProducer::new(Box::new(MemoryProducerBackend::new(config, self.manager.clone()))))
714        }
715
716        async fn create_consumer(&self, config: ConsumerConfig) -> QueueResult<MessageConsumer> {
717            self.manager.declare_queue(&QueueConfig::new(&config.queue)).await?;
718            Ok(MessageConsumer::new(Box::new(MemoryConsumerBackend::new(config, self.manager.clone()))))
719        }
720
721        fn manager(&self) -> &dyn QueueManager {
722            self.manager.as_ref() as &dyn QueueManager
723        }
724
725        async fn close(&self) -> QueueResult<()> {
726            Ok(())
727        }
728    }
729}
730
731/// 便捷函数:创建内存队列服务
732pub fn memory_queue_service() -> memory::MemoryQueueService {
733    memory::MemoryQueueService::new()
734}