Skip to main content

wae_queue/
lib.rs

1//! WAE Queue - 消息队列抽象层
2//!
3//! 提供统一的消息队列能力抽象,支持多种消息队列后端。
4//!
5//! 深度融合 tokio 运行时,所有 API 都是异步优先设计。
6//! 微服务架构友好,支持消息确认、重试、延迟队列等特性。
7
8#![warn(missing_docs)]
9
10use serde::{Serialize, de::DeserializeOwned};
11use std::time::Duration;
12use wae_types::{WaeError, WaeResult};
13
14/// 消息 ID 类型
15pub type MessageId = String;
16
17/// 队列名称类型
18pub type QueueName = String;
19
20/// 消息元数据
21#[derive(Debug, Clone, Default)]
22pub struct MessageMetadata {
23    /// 消息 ID
24    pub id: Option<MessageId>,
25    /// 关联 ID (用于消息关联)
26    pub correlation_id: Option<String>,
27    /// 回复队列
28    pub reply_to: Option<QueueName>,
29    /// 内容类型
30    pub content_type: Option<String>,
31    /// 时间戳
32    pub timestamp: Option<u64>,
33    /// 优先级 (0-9)
34    pub priority: Option<u8>,
35    /// 过期时间 (毫秒)
36    pub expiration: Option<u64>,
37    /// 自定义头信息
38    pub headers: std::collections::HashMap<String, String>,
39}
40
41/// 原始消息 (字节形式)
42#[derive(Debug, Clone)]
43pub struct RawMessage {
44    /// 消息数据
45    pub data: Vec<u8>,
46    /// 消息元数据
47    pub metadata: MessageMetadata,
48}
49
50impl RawMessage {
51    /// 创建新消息
52    pub fn new(data: Vec<u8>) -> Self {
53        Self { data, metadata: MessageMetadata::default() }
54    }
55
56    /// 设置关联 ID
57    pub fn with_correlation_id(mut self, id: impl Into<String>) -> Self {
58        self.metadata.correlation_id = Some(id.into());
59        self
60    }
61
62    /// 设置回复队列
63    pub fn with_reply_to(mut self, queue: impl Into<String>) -> Self {
64        self.metadata.reply_to = Some(queue.into());
65        self
66    }
67
68    /// 设置优先级
69    pub fn with_priority(mut self, priority: u8) -> Self {
70        self.metadata.priority = Some(priority.min(9));
71        self
72    }
73
74    /// 设置过期时间
75    pub fn with_expiration(mut self, ms: u64) -> Self {
76        self.metadata.expiration = Some(ms);
77        self
78    }
79
80    /// 添加自定义头
81    pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
82        self.metadata.headers.insert(key.into(), value.into());
83        self
84    }
85}
86
87/// 消息封装 (泛型)
88#[derive(Debug, Clone)]
89pub struct Message<T> {
90    /// 消息体
91    pub payload: T,
92    /// 消息元数据
93    pub metadata: MessageMetadata,
94}
95
96impl<T> Message<T> {
97    /// 创建新消息
98    pub fn new(payload: T) -> Self {
99        Self { payload, metadata: MessageMetadata::default() }
100    }
101
102    /// 设置关联 ID
103    pub fn with_correlation_id(mut self, id: impl Into<String>) -> Self {
104        self.metadata.correlation_id = Some(id.into());
105        self
106    }
107
108    /// 设置回复队列
109    pub fn with_reply_to(mut self, queue: impl Into<String>) -> Self {
110        self.metadata.reply_to = Some(queue.into());
111        self
112    }
113
114    /// 设置优先级
115    pub fn with_priority(mut self, priority: u8) -> Self {
116        self.metadata.priority = Some(priority.min(9));
117        self
118    }
119
120    /// 设置过期时间
121    pub fn with_expiration(mut self, ms: u64) -> Self {
122        self.metadata.expiration = Some(ms);
123        self
124    }
125
126    /// 添加自定义头
127    pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
128        self.metadata.headers.insert(key.into(), value.into());
129        self
130    }
131
132    /// 序列化为原始消息
133    pub fn into_raw(self) -> WaeResult<RawMessage>
134    where
135        T: Serialize,
136    {
137        let data = serde_json::to_vec(&self.payload).map_err(|_e| WaeError::serialization_failed("Message"))?;
138        Ok(RawMessage { data, metadata: self.metadata })
139    }
140
141    /// 序列化为原始消息 (引用版本)
142    pub fn to_raw(&self) -> WaeResult<RawMessage>
143    where
144        T: Serialize,
145    {
146        let data = serde_json::to_vec(&self.payload).map_err(|_e| WaeError::serialization_failed("Message"))?;
147        Ok(RawMessage { data, metadata: self.metadata.clone() })
148    }
149}
150
151impl RawMessage {
152    /// 反序列化为泛型消息
153    pub fn into_typed<T: DeserializeOwned>(self) -> WaeResult<Message<T>> {
154        let payload = serde_json::from_slice(&self.data).map_err(|_e| WaeError::deserialization_failed("Message"))?;
155        Ok(Message { payload, metadata: self.metadata })
156    }
157}
158
159/// 接收到的原始消息 (带确认能力)
160#[derive(Debug)]
161pub struct ReceivedRawMessage {
162    /// 消息内容
163    pub message: RawMessage,
164    /// 消息 ID (用于确认)
165    pub delivery_tag: u64,
166    /// 重投递次数
167    pub redelivery_count: u32,
168}
169
170/// 接收到的消息 (泛型)
171#[derive(Debug)]
172pub struct ReceivedMessage<T> {
173    /// 消息内容
174    pub message: Message<T>,
175    /// 消息 ID (用于确认)
176    pub delivery_tag: u64,
177    /// 重投递次数
178    pub redelivery_count: u32,
179}
180
181/// 队列配置
182#[derive(Debug, Clone)]
183pub struct QueueConfig {
184    /// 队列名称
185    pub name: QueueName,
186    /// 是否持久化
187    pub durable: bool,
188    /// 是否自动删除 (当没有消费者时)
189    pub auto_delete: bool,
190    /// 最大消息数
191    pub max_messages: Option<u64>,
192    /// 最大消息大小 (字节)
193    pub max_message_size: Option<u64>,
194    /// 消息存活时间 (毫秒)
195    pub message_ttl: Option<u64>,
196    /// 死信队列
197    pub dead_letter_queue: Option<QueueName>,
198}
199
200impl QueueConfig {
201    /// 创建新的队列配置
202    pub fn new(name: impl Into<String>) -> Self {
203        Self {
204            name: name.into(),
205            durable: true,
206            auto_delete: false,
207            max_messages: None,
208            max_message_size: None,
209            message_ttl: None,
210            dead_letter_queue: None,
211        }
212    }
213
214    /// 设置持久化
215    pub fn durable(mut self, durable: bool) -> Self {
216        self.durable = durable;
217        self
218    }
219
220    /// 设置自动删除
221    pub fn auto_delete(mut self, auto_delete: bool) -> Self {
222        self.auto_delete = auto_delete;
223        self
224    }
225
226    /// 设置最大消息数
227    pub fn max_messages(mut self, max: u64) -> Self {
228        self.max_messages = Some(max);
229        self
230    }
231
232    /// 设置消息 TTL
233    pub fn message_ttl(mut self, ttl_ms: u64) -> Self {
234        self.message_ttl = Some(ttl_ms);
235        self
236    }
237
238    /// 设置死信队列
239    pub fn dead_letter_queue(mut self, queue: impl Into<String>) -> Self {
240        self.dead_letter_queue = Some(queue.into());
241        self
242    }
243}
244
245/// 生产者配置
246#[derive(Debug, Clone)]
247pub struct ProducerConfig {
248    /// 默认队列
249    pub default_queue: Option<QueueName>,
250    /// 消息确认超时
251    pub confirm_timeout: Duration,
252    /// 重试次数
253    pub retry_count: u32,
254    /// 重试间隔
255    pub retry_interval: Duration,
256}
257
258impl Default for ProducerConfig {
259    fn default() -> Self {
260        Self {
261            default_queue: None,
262            confirm_timeout: Duration::from_secs(5),
263            retry_count: 3,
264            retry_interval: Duration::from_millis(100),
265        }
266    }
267}
268
269/// 消费者配置
270#[derive(Debug, Clone)]
271pub struct ConsumerConfig {
272    /// 队列名称
273    pub queue: QueueName,
274    /// 消费者标签
275    pub consumer_tag: Option<String>,
276    /// 是否自动确认
277    pub auto_ack: bool,
278    /// 预取数量
279    pub prefetch_count: u16,
280    /// 是否独占
281    pub exclusive: bool,
282}
283
284impl ConsumerConfig {
285    /// 创建新的消费者配置
286    pub fn new(queue: impl Into<String>) -> Self {
287        Self { queue: queue.into(), consumer_tag: None, auto_ack: false, prefetch_count: 10, exclusive: false }
288    }
289
290    /// 设置自动确认
291    pub fn auto_ack(mut self, auto_ack: bool) -> Self {
292        self.auto_ack = auto_ack;
293        self
294    }
295
296    /// 设置预取数量
297    pub fn prefetch(mut self, count: u16) -> Self {
298        self.prefetch_count = count;
299        self
300    }
301}
302
303/// 消息生产者后端 trait (dyn 兼容)
304#[async_trait::async_trait]
305pub trait ProducerBackend: Send + Sync {
306    /// 发送原始消息到指定队列
307    async fn send_raw(&self, queue: &str, message: &RawMessage) -> WaeResult<MessageId>;
308
309    /// 发送原始消息到默认队列
310    async fn send_raw_default(&self, message: &RawMessage) -> WaeResult<MessageId>;
311
312    /// 发送延迟消息
313    async fn send_raw_delayed(&self, queue: &str, message: &RawMessage, delay: Duration) -> WaeResult<MessageId>;
314
315    /// 批量发送消息
316    async fn send_raw_batch(&self, queue: &str, messages: &[RawMessage]) -> WaeResult<Vec<MessageId>>;
317
318    /// 获取生产者配置
319    fn config(&self) -> &ProducerConfig;
320}
321
322/// 消息生产者 (提供泛型封装)
323pub struct MessageProducer {
324    backend: Box<dyn ProducerBackend>,
325}
326
327impl MessageProducer {
328    /// 从后端创建生产者
329    pub fn new(backend: Box<dyn ProducerBackend>) -> Self {
330        Self { backend }
331    }
332
333    /// 发送消息到指定队列
334    pub async fn send<T: Serialize + Send + Sync>(&self, queue: &str, message: &Message<T>) -> WaeResult<MessageId> {
335        let raw = message.to_raw()?;
336        self.backend.send_raw(queue, &raw).await
337    }
338
339    /// 发送消息到默认队列
340    pub async fn send_default<T: Serialize + Send + Sync>(&self, message: &Message<T>) -> WaeResult<MessageId> {
341        let raw = message.to_raw()?;
342        self.backend.send_raw_default(&raw).await
343    }
344
345    /// 发送延迟消息
346    pub async fn send_delayed<T: Serialize + Send + Sync>(
347        &self,
348        queue: &str,
349        message: &Message<T>,
350        delay: Duration,
351    ) -> WaeResult<MessageId> {
352        let raw = message.to_raw()?;
353        self.backend.send_raw_delayed(queue, &raw, delay).await
354    }
355
356    /// 批量发送消息
357    pub async fn send_batch<T: Serialize + Send + Sync>(
358        &self,
359        queue: &str,
360        messages: &[Message<T>],
361    ) -> WaeResult<Vec<MessageId>> {
362        let raw_messages: Vec<RawMessage> = messages.iter().map(|m| m.to_raw()).collect::<WaeResult<_>>()?;
363        self.backend.send_raw_batch(queue, &raw_messages).await
364    }
365
366    /// 获取配置
367    pub fn config(&self) -> &ProducerConfig {
368        self.backend.config()
369    }
370}
371
372/// 消息消费者后端 trait (dyn 兼容)
373#[async_trait::async_trait]
374pub trait ConsumerBackend: Send + Sync {
375    /// 接收原始消息
376    async fn receive_raw(&self) -> WaeResult<Option<ReceivedRawMessage>>;
377
378    /// 确认消息
379    async fn ack(&self, delivery_tag: u64) -> WaeResult<()>;
380
381    /// 拒绝消息
382    async fn nack(&self, delivery_tag: u64, requeue: bool) -> WaeResult<()>;
383
384    /// 获取消费者配置
385    fn config(&self) -> &ConsumerConfig;
386}
387
388/// 消息消费者 (提供泛型封装)
389pub struct MessageConsumer {
390    backend: Box<dyn ConsumerBackend>,
391}
392
393impl MessageConsumer {
394    /// 从后端创建消费者
395    pub fn new(backend: Box<dyn ConsumerBackend>) -> Self {
396        Self { backend }
397    }
398
399    /// 接收消息
400    pub async fn receive<T: DeserializeOwned + Send>(&self) -> WaeResult<Option<ReceivedMessage<T>>> {
401        let raw = match self.backend.receive_raw().await? {
402            Some(r) => r,
403            None => return Ok(None),
404        };
405
406        let message = raw.message.into_typed()?;
407        Ok(Some(ReceivedMessage { message, delivery_tag: raw.delivery_tag, redelivery_count: raw.redelivery_count }))
408    }
409
410    /// 确认消息
411    pub async fn ack(&self, delivery_tag: u64) -> WaeResult<()> {
412        self.backend.ack(delivery_tag).await
413    }
414
415    /// 拒绝消息
416    pub async fn nack(&self, delivery_tag: u64, requeue: bool) -> WaeResult<()> {
417        self.backend.nack(delivery_tag, requeue).await
418    }
419
420    /// 获取配置
421    pub fn config(&self) -> &ConsumerConfig {
422        self.backend.config()
423    }
424}
425
426/// 队列管理 trait
427#[async_trait::async_trait]
428pub trait QueueManager: Send + Sync {
429    /// 声明队列
430    async fn declare_queue(&self, config: &QueueConfig) -> WaeResult<()>;
431
432    /// 删除队列
433    async fn delete_queue(&self, name: &str) -> WaeResult<()>;
434
435    /// 检查队列是否存在
436    async fn queue_exists(&self, name: &str) -> WaeResult<bool>;
437
438    /// 获取队列消息数量
439    async fn queue_message_count(&self, name: &str) -> WaeResult<u64>;
440
441    /// 清空队列
442    async fn purge_queue(&self, name: &str) -> WaeResult<u64>;
443}
444
445/// 消息队列服务 trait
446
447pub trait QueueService: Send + Sync {
448    /// 创建生产者
449    async fn create_producer(&self, config: ProducerConfig) -> WaeResult<MessageProducer>;
450
451    /// 创建消费者
452    async fn create_consumer(&self, config: ConsumerConfig) -> WaeResult<MessageConsumer>;
453
454    /// 获取队列管理器
455    fn manager(&self) -> &dyn QueueManager;
456
457    /// 关闭连接
458    async fn close(&self) -> WaeResult<()>;
459}
460
461/// 内存队列实现
462pub mod memory {
463    use super::*;
464    use std::{
465        collections::{HashMap, VecDeque},
466        sync::Arc,
467    };
468    use tokio::sync::RwLock;
469
470    /// 待处理消息状态
471    struct PendingMessage {
472        data: Vec<u8>,
473        metadata: MessageMetadata,
474        redelivery_count: u32,
475        delivery_tag: u64,
476    }
477
478    /// 内存队列存储
479    struct QueueStorage {
480        messages: VecDeque<(Vec<u8>, MessageMetadata)>,
481        pending_messages: HashMap<u64, PendingMessage>,
482        next_delivery_tag: u64,
483    }
484
485    impl QueueStorage {
486        fn new() -> Self {
487            Self { messages: VecDeque::new(), pending_messages: HashMap::new(), next_delivery_tag: 1 }
488        }
489    }
490
491    /// 内存队列管理器
492    pub struct MemoryQueueManager {
493        queues: Arc<RwLock<HashMap<String, QueueStorage>>>,
494        configs: Arc<RwLock<HashMap<String, QueueConfig>>>,
495    }
496
497    impl MemoryQueueManager {
498        /// 创建新的内存队列管理器
499        pub fn new() -> Self {
500            Self { queues: Arc::new(RwLock::new(HashMap::new())), configs: Arc::new(RwLock::new(HashMap::new())) }
501        }
502    }
503
504    impl Default for MemoryQueueManager {
505        fn default() -> Self {
506            Self::new()
507        }
508    }
509
510    #[async_trait::async_trait]
511    impl QueueManager for MemoryQueueManager {
512        async fn declare_queue(&self, config: &QueueConfig) -> WaeResult<()> {
513            let mut queues = self.queues.write().await;
514            let mut configs = self.configs.write().await;
515
516            if !queues.contains_key(&config.name) {
517                queues.insert(config.name.clone(), QueueStorage::new());
518            }
519            configs.insert(config.name.clone(), config.clone());
520            Ok(())
521        }
522
523        async fn delete_queue(&self, name: &str) -> WaeResult<()> {
524            let mut queues = self.queues.write().await;
525            let mut configs = self.configs.write().await;
526            queues.remove(name);
527            configs.remove(name);
528            Ok(())
529        }
530
531        async fn queue_exists(&self, name: &str) -> WaeResult<bool> {
532            let queues = self.queues.read().await;
533            Ok(queues.contains_key(name))
534        }
535
536        async fn queue_message_count(&self, name: &str) -> WaeResult<u64> {
537            let queues = self.queues.read().await;
538            Ok(queues.get(name).map(|q| q.messages.len() as u64 + q.pending_messages.len() as u64).unwrap_or(0))
539        }
540
541        async fn purge_queue(&self, name: &str) -> WaeResult<u64> {
542            let mut queues = self.queues.write().await;
543            if let Some(queue) = queues.get_mut(name) {
544                let count = queue.messages.len() as u64 + queue.pending_messages.len() as u64;
545                queue.messages.clear();
546                queue.pending_messages.clear();
547                return Ok(count);
548            }
549            Ok(0)
550        }
551    }
552
553    /// 内存生产者后端
554    pub struct MemoryProducerBackend {
555        config: ProducerConfig,
556        queues: Arc<RwLock<HashMap<String, QueueStorage>>>,
557        manager: Arc<MemoryQueueManager>,
558    }
559
560    impl MemoryProducerBackend {
561        /// 创建新的内存生产者后端
562        pub fn new(config: ProducerConfig, manager: Arc<MemoryQueueManager>) -> Self {
563            Self { config, queues: manager.queues.clone(), manager }
564        }
565
566        /// 内部发送消息到指定队列(不声明队列)
567        async fn send_raw_internal(&self, queue: &str, data: Vec<u8>, mut metadata: MessageMetadata) -> WaeResult<MessageId> {
568            let id = uuid::Uuid::new_v4().to_string();
569            metadata.id = Some(id.clone());
570            metadata.timestamp =
571                Some(std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as u64);
572
573            let mut queues = self.queues.write().await;
574            if let Some(q) = queues.get_mut(queue) {
575                q.messages.push_back((data, metadata));
576            }
577            Ok(id)
578        }
579    }
580
581    #[async_trait::async_trait]
582    impl ProducerBackend for MemoryProducerBackend {
583        async fn send_raw(&self, queue: &str, message: &RawMessage) -> WaeResult<MessageId> {
584            self.manager.declare_queue(&QueueConfig::new(queue)).await?;
585
586            let id = uuid::Uuid::new_v4().to_string();
587            let mut metadata = message.metadata.clone();
588            metadata.id = Some(id.clone());
589            metadata.timestamp =
590                Some(std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as u64);
591
592            let mut queues = self.queues.write().await;
593            if let Some(q) = queues.get_mut(queue) {
594                q.messages.push_back((message.data.clone(), metadata));
595            }
596            Ok(id)
597        }
598
599        async fn send_raw_default(&self, message: &RawMessage) -> WaeResult<MessageId> {
600            let queue = self.config.default_queue.as_ref().ok_or_else(|| WaeError::config_missing("default_queue"))?;
601            self.send_raw(queue, message).await
602        }
603
604        async fn send_raw_delayed(&self, queue: &str, message: &RawMessage, delay: Duration) -> WaeResult<MessageId> {
605            tokio::time::sleep(delay).await;
606            self.send_raw(queue, message).await
607        }
608
609        async fn send_raw_batch(&self, queue: &str, messages: &[RawMessage]) -> WaeResult<Vec<MessageId>> {
610            let mut ids = Vec::with_capacity(messages.len());
611            for msg in messages {
612                ids.push(self.send_raw(queue, msg).await?);
613            }
614            Ok(ids)
615        }
616
617        fn config(&self) -> &ProducerConfig {
618            &self.config
619        }
620    }
621
622    /// 内存消费者后端
623    pub struct MemoryConsumerBackend {
624        config: ConsumerConfig,
625        queues: Arc<RwLock<HashMap<String, QueueStorage>>>,
626        manager: Arc<MemoryQueueManager>,
627    }
628
629    impl MemoryConsumerBackend {
630        /// 创建新的内存消费者后端
631        pub fn new(config: ConsumerConfig, manager: Arc<MemoryQueueManager>) -> Self {
632            Self { config, queues: manager.queues.clone(), manager: manager.clone() }
633        }
634    }
635
636    #[async_trait::async_trait]
637    impl ConsumerBackend for MemoryConsumerBackend {
638        async fn receive_raw(&self) -> WaeResult<Option<ReceivedRawMessage>> {
639            let mut queues = self.queues.write().await;
640            if let Some(queue) = queues.get_mut(&self.config.queue) {
641                if let Some((data, metadata)) = queue.messages.pop_front() {
642                    let delivery_tag = queue.next_delivery_tag;
643                    queue.next_delivery_tag += 1;
644
645                    let redelivery_count = metadata.headers.get("x-redelivery-count").and_then(|s| s.parse().ok()).unwrap_or(0);
646
647                    let pending_msg =
648                        PendingMessage { data: data.clone(), metadata: metadata.clone(), redelivery_count, delivery_tag };
649                    queue.pending_messages.insert(delivery_tag, pending_msg);
650
651                    let message = RawMessage { data, metadata };
652                    return Ok(Some(ReceivedRawMessage { message, delivery_tag, redelivery_count }));
653                }
654            }
655            Ok(None)
656        }
657
658        async fn ack(&self, delivery_tag: u64) -> WaeResult<()> {
659            let mut queues = self.queues.write().await;
660            if let Some(queue) = queues.get_mut(&self.config.queue) {
661                queue.pending_messages.remove(&delivery_tag);
662            }
663            Ok(())
664        }
665
666        async fn nack(&self, delivery_tag: u64, requeue: bool) -> WaeResult<()> {
667            let configs = self.manager.configs.read().await;
668            let queue_config = configs.get(&self.config.queue).cloned();
669            drop(configs);
670
671            let mut queues = self.queues.write().await;
672
673            if let Some(queue) = queues.get_mut(&self.config.queue) {
674                if let Some(mut pending_msg) = queue.pending_messages.remove(&delivery_tag) {
675                    if requeue {
676                        pending_msg.redelivery_count += 1;
677                        pending_msg
678                            .metadata
679                            .headers
680                            .insert("x-redelivery-count".to_string(), pending_msg.redelivery_count.to_string());
681
682                        queue.messages.push_back((pending_msg.data, pending_msg.metadata));
683                    }
684                    else {
685                        if let Some(dlq_name) = queue_config.and_then(|c| c.dead_letter_queue) {
686                            drop(queues);
687
688                            self.manager.declare_queue(&QueueConfig::new(&dlq_name)).await?;
689
690                            let mut queues = self.queues.write().await;
691                            if let Some(dlq) = queues.get_mut(&dlq_name) {
692                                dlq.messages.push_back((pending_msg.data, pending_msg.metadata));
693                            }
694                        }
695                    }
696                }
697            }
698
699            Ok(())
700        }
701
702        fn config(&self) -> &ConsumerConfig {
703            &self.config
704        }
705    }
706
707    /// 内存队列服务
708    pub struct MemoryQueueService {
709        manager: Arc<MemoryQueueManager>,
710    }
711
712    impl MemoryQueueService {
713        /// 创建新的内存队列服务
714        pub fn new() -> Self {
715            Self { manager: Arc::new(MemoryQueueManager::new()) }
716        }
717    }
718
719    impl Default for MemoryQueueService {
720        fn default() -> Self {
721            Self::new()
722        }
723    }
724
725    impl QueueService for MemoryQueueService {
726        async fn create_producer(&self, config: ProducerConfig) -> WaeResult<MessageProducer> {
727            Ok(MessageProducer::new(Box::new(MemoryProducerBackend::new(config, self.manager.clone()))))
728        }
729
730        async fn create_consumer(&self, config: ConsumerConfig) -> WaeResult<MessageConsumer> {
731            self.manager.declare_queue(&QueueConfig::new(&config.queue)).await?;
732            Ok(MessageConsumer::new(Box::new(MemoryConsumerBackend::new(config, self.manager.clone()))))
733        }
734
735        fn manager(&self) -> &dyn QueueManager {
736            self.manager.as_ref() as &dyn QueueManager
737        }
738
739        async fn close(&self) -> WaeResult<()> {
740            Ok(())
741        }
742    }
743}
744
745/// Redis Streams 队列实现
746#[cfg(feature = "redis-backend")]
747pub mod redis_backend {
748    use super::*;
749    use ::redis::{AsyncCommands, Client, FromRedisValue, RedisResult, streams::StreamReadOptions};
750    use base64::{Engine as _, engine::general_purpose};
751    use std::{collections::HashMap, sync::Arc};
752    use tokio::sync::Mutex;
753
754    /// Redis 队列管理器
755    pub struct RedisQueueManager {
756        client: Client,
757        configs: Arc<Mutex<HashMap<String, QueueConfig>>>,
758    }
759
760    impl RedisQueueManager {
761        /// 创建新的 Redis 队列管理器
762        pub fn new(client: Client) -> Self {
763            Self { client, configs: Arc::new(Mutex::new(HashMap::new())) }
764        }
765
766        /// 从 URL 创建 Redis 队列管理器
767        pub async fn from_url(url: &str) -> WaeResult<Self> {
768            let client = Client::open(url).map_err(|e| WaeError::internal(format!("Failed to create Redis client: {}", e)))?;
769            Ok(Self::new(client))
770        }
771
772        fn stream_name(queue: &str) -> String {
773            format!("wae:stream:{}", queue)
774        }
775
776        fn group_name() -> &'static str {
777            "wae-consumer-group"
778        }
779    }
780
781    #[async_trait::async_trait]
782    impl QueueManager for RedisQueueManager {
783        async fn declare_queue(&self, config: &QueueConfig) -> WaeResult<()> {
784            let mut conn = self
785                .client
786                .get_async_connection()
787                .await
788                .map_err(|e| WaeError::internal(format!("Failed to get Redis connection: {}", e)))?;
789
790            let stream_name = Self::stream_name(&config.name);
791            let group_name = Self::group_name();
792
793            let _: RedisResult<()> = conn.xgroup_create_mkstream(&stream_name, group_name, "0").await;
794
795            let mut configs = self.configs.lock().await;
796            configs.insert(config.name.clone(), config.clone());
797
798            Ok(())
799        }
800
801        async fn delete_queue(&self, name: &str) -> WaeResult<()> {
802            let mut conn = self
803                .client
804                .get_async_connection()
805                .await
806                .map_err(|e| WaeError::internal(format!("Failed to get Redis connection: {}", e)))?;
807
808            let stream_name = Self::stream_name(name);
809
810            let _: RedisResult<()> = conn.del(&stream_name).await;
811
812            let mut configs = self.configs.lock().await;
813            configs.remove(name);
814
815            Ok(())
816        }
817
818        async fn queue_exists(&self, name: &str) -> WaeResult<bool> {
819            let mut conn = self
820                .client
821                .get_async_connection()
822                .await
823                .map_err(|e| WaeError::internal(format!("Failed to get Redis connection: {}", e)))?;
824
825            let stream_name = Self::stream_name(name);
826            let exists: bool = conn
827                .exists(&stream_name)
828                .await
829                .map_err(|e| WaeError::internal(format!("Failed to check if stream exists: {}", e)))?;
830
831            Ok(exists)
832        }
833
834        async fn queue_message_count(&self, name: &str) -> WaeResult<u64> {
835            let mut conn = self
836                .client
837                .get_async_connection()
838                .await
839                .map_err(|e| WaeError::internal(format!("Failed to get Redis connection: {}", e)))?;
840
841            let stream_name = Self::stream_name(name);
842            let len: u64 =
843                conn.xlen(&stream_name).await.map_err(|e| WaeError::internal(format!("Failed to get stream length: {}", e)))?;
844
845            Ok(len)
846        }
847
848        async fn purge_queue(&self, name: &str) -> WaeResult<u64> {
849            let mut conn = self
850                .client
851                .get_async_connection()
852                .await
853                .map_err(|e| WaeError::internal(format!("Failed to get Redis connection: {}", e)))?;
854
855            let stream_name = Self::stream_name(name);
856            let len: u64 =
857                conn.xlen(&stream_name).await.map_err(|e| WaeError::internal(format!("Failed to get stream length: {}", e)))?;
858
859            let _: RedisResult<()> = conn.del(&stream_name).await;
860
861            Ok(len)
862        }
863    }
864
865    /// Redis 生产者后端
866    pub struct RedisProducerBackend {
867        config: ProducerConfig,
868        client: Client,
869        manager: Arc<RedisQueueManager>,
870    }
871
872    impl RedisProducerBackend {
873        /// 创建新的 Redis 生产者后端
874        pub fn new(config: ProducerConfig, manager: Arc<RedisQueueManager>) -> Self {
875            Self { config, client: manager.client.clone(), manager }
876        }
877
878        fn encode_metadata(metadata: &MessageMetadata) -> HashMap<String, String> {
879            let mut fields = HashMap::new();
880
881            if let Some(id) = &metadata.id {
882                fields.insert("id".to_string(), id.clone());
883            }
884            if let Some(correlation_id) = &metadata.correlation_id {
885                fields.insert("correlation_id".to_string(), correlation_id.clone());
886            }
887            if let Some(reply_to) = &metadata.reply_to {
888                fields.insert("reply_to".to_string(), reply_to.clone());
889            }
890            if let Some(content_type) = &metadata.content_type {
891                fields.insert("content_type".to_string(), content_type.clone());
892            }
893            if let Some(timestamp) = metadata.timestamp {
894                fields.insert("timestamp".to_string(), timestamp.to_string());
895            }
896            if let Some(priority) = metadata.priority {
897                fields.insert("priority".to_string(), priority.to_string());
898            }
899            if let Some(expiration) = metadata.expiration {
900                fields.insert("expiration".to_string(), expiration.to_string());
901            }
902
903            for (key, value) in &metadata.headers {
904                fields.insert(format!("header:{}", key), value.clone());
905            }
906
907            fields
908        }
909    }
910
911    #[async_trait::async_trait]
912    impl ProducerBackend for RedisProducerBackend {
913        async fn send_raw(&self, queue: &str, message: &RawMessage) -> WaeResult<MessageId> {
914            self.manager.declare_queue(&QueueConfig::new(queue)).await?;
915
916            let mut conn = self
917                .client
918                .get_async_connection()
919                .await
920                .map_err(|e| WaeError::internal(format!("Failed to get Redis connection: {}", e)))?;
921
922            let stream_name = RedisQueueManager::stream_name(queue);
923            let data_b64 = general_purpose::STANDARD.encode(&message.data);
924
925            let mut fields_vec = Vec::new();
926            let mut fields = Self::encode_metadata(&message.metadata);
927            fields.insert("data".to_string(), data_b64);
928            for (k, v) in fields {
929                fields_vec.push((k, v));
930            }
931
932            let id: String = conn
933                .xadd(&stream_name, "*", &fields_vec)
934                .await
935                .map_err(|e| WaeError::internal(format!("Failed to add message to stream: {}", e)))?;
936
937            Ok(id)
938        }
939
940        async fn send_raw_default(&self, message: &RawMessage) -> WaeResult<MessageId> {
941            let queue = self.config.default_queue.as_ref().ok_or_else(|| WaeError::config_missing("default_queue"))?;
942            self.send_raw(queue, message).await
943        }
944
945        async fn send_raw_delayed(&self, queue: &str, message: &RawMessage, delay: Duration) -> WaeResult<MessageId> {
946            let delayed_queue = format!("{}:delayed", queue);
947            let delayed_stream = RedisQueueManager::stream_name(&delayed_queue);
948
949            let mut conn = self
950                .client
951                .get_async_connection()
952                .await
953                .map_err(|e| WaeError::internal(format!("Failed to get Redis connection: {}", e)))?;
954
955            let data_b64 = general_purpose::STANDARD.encode(&message.data);
956            let mut fields = Self::encode_metadata(&message.metadata);
957            fields.insert("data".to_string(), data_b64);
958            fields.insert("target_queue".to_string(), queue.to_string());
959
960            let mut fields_vec = Vec::new();
961            for (k, v) in fields {
962                fields_vec.push((k, v));
963            }
964
965            let id: String = conn
966                .xadd(&delayed_stream, "*", &fields_vec)
967                .await
968                .map_err(|e| WaeError::internal(format!("Failed to add delayed message: {}", e)))?;
969
970            let message_clone = message.clone();
971            tokio::spawn({
972                let client = self.client.clone();
973                let manager = self.manager.clone();
974                let queue = queue.to_string();
975                let delayed_stream = delayed_stream.clone();
976                let id = id.clone();
977                let delay = delay;
978
979                async move {
980                    tokio::time::sleep(delay).await;
981
982                    if let Ok(mut conn) = client.get_async_connection().await {
983                        let _: RedisResult<()> = conn.xdel(&delayed_stream, &[&id]).await;
984
985                        let mut producer = RedisProducerBackend::new(ProducerConfig::default(), manager.clone());
986                        let _ = producer.send_raw(&queue, &message_clone).await;
987                    }
988                }
989            });
990
991            Ok(id)
992        }
993
994        async fn send_raw_batch(&self, queue: &str, messages: &[RawMessage]) -> WaeResult<Vec<MessageId>> {
995            let mut ids = Vec::with_capacity(messages.len());
996            for msg in messages {
997                ids.push(self.send_raw(queue, msg).await?);
998            }
999            Ok(ids)
1000        }
1001
1002        fn config(&self) -> &ProducerConfig {
1003            &self.config
1004        }
1005    }
1006
1007    /// Redis 消费者后端
1008    pub struct RedisConsumerBackend {
1009        config: ConsumerConfig,
1010        client: Client,
1011        manager: Arc<RedisQueueManager>,
1012        consumer_name: String,
1013        delivery_tags: Arc<Mutex<HashMap<u64, String>>>,
1014        next_delivery_tag: Arc<Mutex<u64>>,
1015    }
1016
1017    impl RedisConsumerBackend {
1018        /// 创建新的 Redis 消费者后端
1019        pub fn new(config: ConsumerConfig, manager: Arc<RedisQueueManager>) -> Self {
1020            let consumer_name = format!("wae-consumer-{}", uuid::Uuid::new_v4());
1021            Self {
1022                config,
1023                client: manager.client.clone(),
1024                manager,
1025                consumer_name,
1026                delivery_tags: Arc::new(Mutex::new(HashMap::new())),
1027                next_delivery_tag: Arc::new(Mutex::new(1)),
1028            }
1029        }
1030
1031        fn decode_metadata(fields: &HashMap<String, String>) -> MessageMetadata {
1032            let mut metadata = MessageMetadata::default();
1033
1034            if let Some(id) = fields.get("id") {
1035                metadata.id = Some(id.clone());
1036            }
1037            if let Some(correlation_id) = fields.get("correlation_id") {
1038                metadata.correlation_id = Some(correlation_id.clone());
1039            }
1040            if let Some(reply_to) = fields.get("reply_to") {
1041                metadata.reply_to = Some(reply_to.clone());
1042            }
1043            if let Some(content_type) = fields.get("content_type") {
1044                metadata.content_type = Some(content_type.clone());
1045            }
1046            if let Some(timestamp) = fields.get("timestamp").and_then(|s| s.parse().ok()) {
1047                metadata.timestamp = Some(timestamp);
1048            }
1049            if let Some(priority) = fields.get("priority").and_then(|s| s.parse().ok()) {
1050                metadata.priority = Some(priority);
1051            }
1052            if let Some(expiration) = fields.get("expiration").and_then(|s| s.parse().ok()) {
1053                metadata.expiration = Some(expiration);
1054            }
1055
1056            for (key, value) in fields {
1057                if let Some(header_key) = key.strip_prefix("header:") {
1058                    metadata.headers.insert(header_key.to_string(), value.clone());
1059                }
1060            }
1061
1062            metadata
1063        }
1064    }
1065
1066    #[async_trait::async_trait]
1067    impl ConsumerBackend for RedisConsumerBackend {
1068        async fn receive_raw(&self) -> WaeResult<Option<ReceivedRawMessage>> {
1069            let mut conn = self
1070                .client
1071                .get_async_connection()
1072                .await
1073                .map_err(|e| WaeError::internal(format!("Failed to get Redis connection: {}", e)))?;
1074
1075            let stream_name = RedisQueueManager::stream_name(&self.config.queue);
1076            let group_name = RedisQueueManager::group_name();
1077
1078            let opts = StreamReadOptions::default().group(group_name, &self.consumer_name).count(1).block(1000);
1079
1080            let result: RedisResult<redis::streams::StreamReadReply> = conn.xread_options(&[&stream_name], &[">"], &opts).await;
1081
1082            match result {
1083                Ok(streams) => {
1084                    if let Some(stream) = streams.keys.into_iter().next() {
1085                        if let Some(entry) = stream.ids.into_iter().next() {
1086                            let mut fields = HashMap::new();
1087                            for (key, value) in entry.map {
1088                                if let Ok(s) = String::from_redis_value(&value) {
1089                                    fields.insert(key, s);
1090                                }
1091                            }
1092
1093                            let data_b64 = fields.get("data").ok_or_else(|| WaeError::internal("Missing data field"))?;
1094                            let data = general_purpose::STANDARD
1095                                .decode(data_b64)
1096                                .map_err(|e| WaeError::internal(format!("Failed to decode data: {}", e)))?;
1097
1098                            let metadata = Self::decode_metadata(&fields);
1099
1100                            let mut next_tag = self.next_delivery_tag.lock().await;
1101                            let delivery_tag = *next_tag;
1102                            *next_tag += 1;
1103                            drop(next_tag);
1104
1105                            let mut delivery_tags = self.delivery_tags.lock().await;
1106                            delivery_tags.insert(delivery_tag, entry.id.clone());
1107
1108                            let redelivery_count =
1109                                metadata.headers.get("x-redelivery-count").and_then(|s| s.parse().ok()).unwrap_or(0);
1110
1111                            let message = RawMessage { data, metadata };
1112                            return Ok(Some(ReceivedRawMessage { message, delivery_tag, redelivery_count }));
1113                        }
1114                    }
1115                    Ok(None)
1116                }
1117                Err(e) => {
1118                    if e.to_string().contains("NOGROUP") {
1119                        self.manager.declare_queue(&QueueConfig::new(&self.config.queue)).await?;
1120                        Ok(None)
1121                    }
1122                    else {
1123                        Err(WaeError::internal(format!("Failed to read from stream: {}", e)))
1124                    }
1125                }
1126            }
1127        }
1128
1129        async fn ack(&self, delivery_tag: u64) -> WaeResult<()> {
1130            let mut conn = self
1131                .client
1132                .get_async_connection()
1133                .await
1134                .map_err(|e| WaeError::internal(format!("Failed to get Redis connection: {}", e)))?;
1135
1136            let stream_name = RedisQueueManager::stream_name(&self.config.queue);
1137            let group_name = RedisQueueManager::group_name();
1138
1139            let mut delivery_tags = self.delivery_tags.lock().await;
1140            if let Some(message_id) = delivery_tags.remove(&delivery_tag) {
1141                let _: RedisResult<()> = conn.xack(&stream_name, group_name, &[&message_id]).await;
1142                let _: RedisResult<()> = conn.xdel(&stream_name, &[&message_id]).await;
1143            }
1144
1145            Ok(())
1146        }
1147
1148        async fn nack(&self, delivery_tag: u64, requeue: bool) -> WaeResult<()> {
1149            let mut conn = self
1150                .client
1151                .get_async_connection()
1152                .await
1153                .map_err(|e| WaeError::internal(format!("Failed to get Redis connection: {}", e)))?;
1154
1155            let stream_name = RedisQueueManager::stream_name(&self.config.queue);
1156            let group_name = RedisQueueManager::group_name();
1157
1158            let mut delivery_tags = self.delivery_tags.lock().await;
1159            if let Some(message_id) = delivery_tags.remove(&delivery_tag) {
1160                if requeue {
1161                    let _: RedisResult<()> =
1162                        conn.xclaim(&stream_name, group_name, &self.consumer_name, 0, &[&message_id]).await;
1163                }
1164                else {
1165                    let configs = self.manager.configs.lock().await;
1166                    let dlq_name = configs.get(&self.config.queue).and_then(|c| c.dead_letter_queue.clone());
1167                    drop(configs);
1168
1169                    if let Some(dlq_name) = dlq_name {
1170                        let _: RedisResult<()> = conn.xack(&stream_name, group_name, &[&message_id]).await;
1171                        let _: RedisResult<()> = conn.xdel(&stream_name, &[&message_id]).await;
1172                    }
1173                    else {
1174                        let _: RedisResult<()> = conn.xack(&stream_name, group_name, &[&message_id]).await;
1175                        let _: RedisResult<()> = conn.xdel(&stream_name, &[&message_id]).await;
1176                    }
1177                }
1178            }
1179
1180            Ok(())
1181        }
1182
1183        fn config(&self) -> &ConsumerConfig {
1184            &self.config
1185        }
1186    }
1187
1188    /// Redis 队列服务
1189    pub struct RedisQueueService {
1190        manager: Arc<RedisQueueManager>,
1191    }
1192
1193    impl RedisQueueService {
1194        /// 创建新的 Redis 队列服务
1195        pub fn new(manager: Arc<RedisQueueManager>) -> Self {
1196            Self { manager }
1197        }
1198
1199        /// 从 URL 创建 Redis 队列服务
1200        pub async fn from_url(url: &str) -> WaeResult<Self> {
1201            let manager = Arc::new(RedisQueueManager::from_url(url).await?);
1202            Ok(Self::new(manager))
1203        }
1204    }
1205
1206    impl QueueService for RedisQueueService {
1207        async fn create_producer(&self, config: ProducerConfig) -> WaeResult<MessageProducer> {
1208            Ok(MessageProducer::new(Box::new(RedisProducerBackend::new(config, self.manager.clone()))))
1209        }
1210
1211        async fn create_consumer(&self, config: ConsumerConfig) -> WaeResult<MessageConsumer> {
1212            self.manager.declare_queue(&QueueConfig::new(&config.queue)).await?;
1213            Ok(MessageConsumer::new(Box::new(RedisConsumerBackend::new(config, self.manager.clone()))))
1214        }
1215
1216        fn manager(&self) -> &dyn QueueManager {
1217            self.manager.as_ref() as &dyn QueueManager
1218        }
1219
1220        async fn close(&self) -> WaeResult<()> {
1221            Ok(())
1222        }
1223    }
1224}
1225
1226/// 便捷函数:创建内存队列服务
1227pub fn memory_queue_service() -> memory::MemoryQueueService {
1228    memory::MemoryQueueService::new()
1229}
1230
1231/// Kafka 队列实现
1232#[cfg(feature = "kafka-backend")]
1233pub mod kafka_backend {
1234    use super::*;
1235    use base64::{Engine as _, engine::general_purpose};
1236    use rdkafka::{
1237        ClientConfig,
1238        consumer::{Consumer, DefaultConsumerContext, StreamConsumer},
1239        message::{Headers, OwnedMessage},
1240        producer::{DeliveryFuture, FutureProducer, FutureRecord},
1241        util::Timeout,
1242    };
1243    use std::{collections::HashMap, sync::Arc, time::Duration};
1244    use tokio::sync::Mutex;
1245    use uuid::Uuid;
1246
1247    /// Kafka 连接配置
1248    #[derive(Debug, Clone)]
1249    pub struct KafkaConfig {
1250        /// Kafka Broker 地址 (例如: localhost:9092)
1251        pub brokers: String,
1252        /// 客户端 ID
1253        pub client_id: Option<String>,
1254        /// 生产者配置
1255        pub producer_config: HashMap<String, String>,
1256        /// 消费者配置
1257        pub consumer_config: HashMap<String, String>,
1258    }
1259
1260    impl Default for KafkaConfig {
1261        fn default() -> Self {
1262            Self {
1263                brokers: "localhost:9092".to_string(),
1264                client_id: Some("wae-kafka".to_string()),
1265                producer_config: HashMap::new(),
1266                consumer_config: HashMap::new(),
1267            }
1268        }
1269    }
1270
1271    impl KafkaConfig {
1272        /// 创建新的 Kafka 配置
1273        pub fn new(brokers: impl Into<String>) -> Self {
1274            Self { brokers: brokers.into(), ..Default::default() }
1275        }
1276    }
1277
1278    /// Kafka 队列管理器
1279    pub struct KafkaQueueManager {
1280        config: ClientConfig,
1281    }
1282
1283    impl KafkaQueueManager {
1284        /// 创建新的 Kafka 队列管理器
1285        pub fn new(config: &KafkaConfig) -> Self {
1286            let mut client_config = ClientConfig::new();
1287            client_config.set("bootstrap.servers", &config.brokers);
1288            if let Some(client_id) = &config.client_id {
1289                client_config.set("client.id", client_id);
1290            }
1291            for (key, value) in &config.producer_config {
1292                client_config.set(key, value);
1293            }
1294            for (key, value) in &config.consumer_config {
1295                client_config.set(key, value);
1296            }
1297            Self { config: client_config }
1298        }
1299
1300        /// 内部主题名称
1301        fn topic_name(queue: &str) -> String {
1302            queue.to_string()
1303        }
1304    }
1305
1306    #[async_trait::async_trait]
1307    impl QueueManager for KafkaQueueManager {
1308        async fn declare_queue(&self, config: &QueueConfig) -> WaeResult<()> {
1309            Ok(())
1310        }
1311
1312        async fn delete_queue(&self, name: &str) -> WaeResult<()> {
1313            Ok(())
1314        }
1315
1316        async fn queue_exists(&self, name: &str) -> WaeResult<bool> {
1317            Ok(true)
1318        }
1319
1320        async fn queue_message_count(&self, name: &str) -> WaeResult<u64> {
1321            Ok(0)
1322        }
1323
1324        async fn purge_queue(&self, name: &str) -> WaeResult<u64> {
1325            Ok(0)
1326        }
1327    }
1328
1329    /// Kafka 生产者后端
1330    pub struct KafkaProducerBackend {
1331        config: ProducerConfig,
1332        producer: Arc<FutureProducer>,
1333        manager: Arc<KafkaQueueManager>,
1334    }
1335
1336    impl KafkaProducerBackend {
1337        /// 创建新的 Kafka 生产者后端
1338        pub fn new(config: ProducerConfig, manager: Arc<KafkaQueueManager>, producer: Arc<FutureProducer>) -> Self {
1339            Self { config, producer, manager }
1340        }
1341
1342        /// 编码元数据
1343        fn encode_metadata(metadata: &MessageMetadata) -> HashMap<String, String> {
1344            let mut fields = HashMap::new();
1345
1346            if let Some(id) = &metadata.id {
1347                fields.insert("id".to_string(), id.clone());
1348            }
1349            if let Some(correlation_id) = &metadata.correlation_id {
1350                fields.insert("correlation_id".to_string(), correlation_id.clone());
1351            }
1352            if let Some(reply_to) = &metadata.reply_to {
1353                fields.insert("reply_to".to_string(), reply_to.clone());
1354            }
1355            if let Some(content_type) = &metadata.content_type {
1356                fields.insert("content_type".to_string(), content_type.clone());
1357            }
1358            if let Some(timestamp) = metadata.timestamp {
1359                fields.insert("timestamp".to_string(), timestamp.to_string());
1360            }
1361            if let Some(priority) = metadata.priority {
1362                fields.insert("priority".to_string(), priority.to_string());
1363            }
1364            if let Some(expiration) = metadata.expiration {
1365                fields.insert("expiration".to_string(), expiration.to_string());
1366            }
1367
1368            for (key, value) in &metadata.headers {
1369                fields.insert(format!("header:{}", key), value.clone());
1370            }
1371
1372            fields
1373        }
1374    }
1375
1376    #[async_trait::async_trait]
1377    impl ProducerBackend for KafkaProducerBackend {
1378        async fn send_raw(&self, queue: &str, message: &RawMessage) -> WaeResult<MessageId> {
1379            let id = Uuid::new_v4().to_string();
1380            let mut metadata = message.metadata.clone();
1381            metadata.id = Some(id.clone());
1382            if metadata.timestamp.is_none() {
1383                metadata.timestamp =
1384                    Some(std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as u64);
1385            }
1386
1387            let topic = KafkaQueueManager::topic_name(queue);
1388            let data_b64 = general_purpose::STANDARD.encode(&message.data);
1389            let mut fields = Self::encode_metadata(&metadata);
1390            fields.insert("data".to_string(), data_b64);
1391
1392            let payload = serde_json::to_vec(&fields)
1393                .map_err(|e| WaeError::internal(format!("Failed to serialize Kafka message: {}", e)))?;
1394
1395            let record = FutureRecord::to(&topic).payload(&payload).key(&id);
1396
1397            let result = self.producer.send(record, Timeout::After(Duration::from_secs(5))).await;
1398
1399            match result {
1400                Ok((_, _)) => Ok(id),
1401                Err((e, _)) => Err(WaeError::internal(format!("Failed to send Kafka message: {}", e))),
1402            }
1403        }
1404
1405        async fn send_raw_default(&self, message: &RawMessage) -> WaeResult<MessageId> {
1406            let queue = self.config.default_queue.as_ref().ok_or_else(|| WaeError::config_missing("default_queue"))?;
1407            self.send_raw(queue, message).await
1408        }
1409
1410        async fn send_raw_delayed(&self, queue: &str, message: &RawMessage, delay: Duration) -> WaeResult<MessageId> {
1411            let id = Uuid::new_v4().to_string();
1412            let message_clone = message.clone();
1413            let producer = self.producer.clone();
1414            let manager = self.manager.clone();
1415            let queue = queue.to_string();
1416
1417            tokio::spawn(async move {
1418                tokio::time::sleep(delay).await;
1419                let mut producer_config = ProducerConfig::default();
1420                producer_config.default_queue = Some(queue.clone());
1421                let mut producer = KafkaProducerBackend::new(producer_config, manager.clone(), producer.clone());
1422                let _ = producer.send_raw(&queue, &message_clone).await;
1423            });
1424
1425            Ok(id)
1426        }
1427
1428        async fn send_raw_batch(&self, queue: &str, messages: &[RawMessage]) -> WaeResult<Vec<MessageId>> {
1429            let mut ids = Vec::with_capacity(messages.len());
1430            for msg in messages {
1431                ids.push(self.send_raw(queue, msg).await?);
1432            }
1433            Ok(ids)
1434        }
1435
1436        fn config(&self) -> &ProducerConfig {
1437            &self.config
1438        }
1439    }
1440
1441    /// Kafka 消费者后端
1442    pub struct KafkaConsumerBackend {
1443        config: ConsumerConfig,
1444        consumer: Arc<StreamConsumer<DefaultConsumerContext>>,
1445        manager: Arc<KafkaQueueManager>,
1446        delivery_tags: Arc<Mutex<HashMap<u64, (i32, i64)>>>,
1447        next_delivery_tag: Arc<Mutex<u64>>,
1448    }
1449
1450    impl KafkaConsumerBackend {
1451        /// 创建新的 Kafka 消费者后端
1452        pub fn new(
1453            config: ConsumerConfig,
1454            manager: Arc<KafkaQueueManager>,
1455            consumer: Arc<StreamConsumer<DefaultConsumerContext>>,
1456        ) -> Self {
1457            Self {
1458                config,
1459                consumer,
1460                manager,
1461                delivery_tags: Arc::new(Mutex::new(HashMap::new())),
1462                next_delivery_tag: Arc::new(Mutex::new(1)),
1463            }
1464        }
1465
1466        /// 解码元数据
1467        fn decode_metadata(fields: &HashMap<String, String>) -> MessageMetadata {
1468            let mut metadata = MessageMetadata::default();
1469
1470            if let Some(id) = fields.get("id") {
1471                metadata.id = Some(id.clone());
1472            }
1473            if let Some(correlation_id) = fields.get("correlation_id") {
1474                metadata.correlation_id = Some(correlation_id.clone());
1475            }
1476            if let Some(reply_to) = fields.get("reply_to") {
1477                metadata.reply_to = Some(reply_to.clone());
1478            }
1479            if let Some(content_type) = fields.get("content_type") {
1480                metadata.content_type = Some(content_type.clone());
1481            }
1482            if let Some(timestamp) = fields.get("timestamp").and_then(|s| s.parse().ok()) {
1483                metadata.timestamp = Some(timestamp);
1484            }
1485            if let Some(priority) = fields.get("priority").and_then(|s| s.parse().ok()) {
1486                metadata.priority = Some(priority);
1487            }
1488            if let Some(expiration) = fields.get("expiration").and_then(|s| s.parse().ok()) {
1489                metadata.expiration = Some(expiration);
1490            }
1491
1492            for (key, value) in fields {
1493                if let Some(header_key) = key.strip_prefix("header:") {
1494                    metadata.headers.insert(header_key.to_string(), value.clone());
1495                }
1496            }
1497
1498            metadata
1499        }
1500    }
1501
1502    #[async_trait::async_trait]
1503    impl ConsumerBackend for KafkaConsumerBackend {
1504        async fn receive_raw(&self) -> WaeResult<Option<ReceivedRawMessage>> {
1505            match tokio::time::timeout(Duration::from_secs(1), self.consumer.recv()).await {
1506                Ok(Ok(message)) => {
1507                    let payload = message.payload().ok_or_else(|| WaeError::internal("Kafka message missing payload"))?;
1508                    let fields: HashMap<String, String> = serde_json::from_slice(payload)
1509                        .map_err(|e| WaeError::internal(format!("Failed to deserialize Kafka message: {}", e)))?;
1510                    let data_b64 = fields.get("data").ok_or_else(|| WaeError::internal("Missing data field"))?;
1511                    let data = general_purpose::STANDARD
1512                        .decode(data_b64)
1513                        .map_err(|e| WaeError::internal(format!("Failed to decode data: {}", e)))?;
1514                    let metadata = Self::decode_metadata(&fields);
1515
1516                    let mut next_tag = self.next_delivery_tag.lock().await;
1517                    let delivery_tag = *next_tag;
1518                    *next_tag += 1;
1519                    drop(next_tag);
1520
1521                    let mut delivery_tags = self.delivery_tags.lock().await;
1522                    delivery_tags.insert(delivery_tag, (message.partition(), message.offset()));
1523
1524                    let redelivery_count = metadata.headers.get("x-redelivery-count").and_then(|s| s.parse().ok()).unwrap_or(0);
1525
1526                    let raw_message = RawMessage { data, metadata };
1527                    Ok(Some(ReceivedRawMessage { message: raw_message, delivery_tag, redelivery_count }))
1528                }
1529                Ok(Err(e)) => Err(WaeError::internal(format!("Failed to receive from Kafka: {}", e))),
1530                Err(_) => Ok(None),
1531            }
1532        }
1533
1534        async fn ack(&self, delivery_tag: u64) -> WaeResult<()> {
1535            let mut delivery_tags = self.delivery_tags.lock().await;
1536            if let Some((partition, offset)) = delivery_tags.remove(&delivery_tag) {
1537                self.consumer
1538                    .commit_offset(
1539                        &rdkafka::TopicPartitionList::new().with_partition_offset(
1540                            &self.config.queue,
1541                            partition,
1542                            rdkafka::Offset::Offset(offset + 1),
1543                        ),
1544                        rdkafka::consumer::CommitMode::Async,
1545                    )
1546                    .map_err(|e| WaeError::internal(format!("Failed to commit Kafka offset: {}", e)))?;
1547            }
1548            Ok(())
1549        }
1550
1551        async fn nack(&self, delivery_tag: u64, requeue: bool) -> WaeResult<()> {
1552            let mut delivery_tags = self.delivery_tags.lock().await;
1553            if let Some((partition, offset)) = delivery_tags.remove(&delivery_tag) {
1554                if requeue {
1555                    self.consumer
1556                        .seek(
1557                            &self.config.queue,
1558                            partition,
1559                            rdkafka::Offset::Offset(offset),
1560                            Timeout::After(Duration::from_secs(5)),
1561                        )
1562                        .await
1563                        .map_err(|e| WaeError::internal(format!("Failed to seek Kafka offset: {}", e)))?;
1564                }
1565                else {
1566                    self.consumer
1567                        .commit_offset(
1568                            &rdkafka::TopicPartitionList::new().with_partition_offset(
1569                                &self.config.queue,
1570                                partition,
1571                                rdkafka::Offset::Offset(offset + 1),
1572                            ),
1573                            rdkafka::consumer::CommitMode::Async,
1574                        )
1575                        .map_err(|e| WaeError::internal(format!("Failed to commit Kafka offset: {}", e)))?;
1576                }
1577            }
1578            Ok(())
1579        }
1580
1581        fn config(&self) -> &ConsumerConfig {
1582            &self.config
1583        }
1584    }
1585
1586    /// Kafka 队列服务
1587    pub struct KafkaQueueService {
1588        manager: Arc<KafkaQueueManager>,
1589        producer: Arc<FutureProducer>,
1590        consumer_config: ClientConfig,
1591    }
1592
1593    impl KafkaQueueService {
1594        /// 创建新的 Kafka 队列服务
1595        pub async fn new(config: KafkaConfig) -> WaeResult<Self> {
1596            let manager = Arc::new(KafkaQueueManager::new(&config));
1597
1598            let mut producer_config = ClientConfig::new();
1599            producer_config.set("bootstrap.servers", &config.brokers);
1600            if let Some(client_id) = &config.client_id {
1601                producer_config.set("client.id", client_id);
1602            }
1603            for (key, value) in &config.producer_config {
1604                producer_config.set(key, value);
1605            }
1606            let producer: FutureProducer =
1607                producer_config.create().map_err(|e| WaeError::internal(format!("Failed to create Kafka producer: {}", e)))?;
1608
1609            let mut consumer_config = ClientConfig::new();
1610            consumer_config.set("bootstrap.servers", &config.brokers);
1611            consumer_config.set("group.id", format!("wae-{}", Uuid::new_v4()));
1612            consumer_config.set("enable.auto.commit", "false");
1613            consumer_config.set("auto.offset.reset", "earliest");
1614            if let Some(client_id) = &config.client_id {
1615                consumer_config.set("client.id", client_id);
1616            }
1617            for (key, value) in &config.consumer_config {
1618                consumer_config.set(key, value);
1619            }
1620
1621            Ok(Self { manager, producer: Arc::new(producer), consumer_config })
1622        }
1623    }
1624
1625    impl QueueService for KafkaQueueService {
1626        async fn create_producer(&self, config: ProducerConfig) -> WaeResult<MessageProducer> {
1627            Ok(MessageProducer::new(Box::new(KafkaProducerBackend::new(config, self.manager.clone(), self.producer.clone()))))
1628        }
1629
1630        async fn create_consumer(&self, config: ConsumerConfig) -> WaeResult<MessageConsumer> {
1631            let consumer: StreamConsumer<DefaultConsumerContext> = self
1632                .consumer_config
1633                .create()
1634                .map_err(|e| WaeError::internal(format!("Failed to create Kafka consumer: {}", e)))?;
1635
1636            consumer
1637                .subscribe(&[&config.queue])
1638                .map_err(|e| WaeError::internal(format!("Failed to subscribe to Kafka topic: {}", e)))?;
1639
1640            let backend = KafkaConsumerBackend::new(config, self.manager.clone(), Arc::new(consumer));
1641
1642            Ok(MessageConsumer::new(Box::new(backend)))
1643        }
1644
1645        fn manager(&self) -> &dyn QueueManager {
1646            self.manager.as_ref() as &dyn QueueManager
1647        }
1648
1649        async fn close(&self) -> WaeResult<()> {
1650            Ok(())
1651        }
1652    }
1653}
1654
1655/// RabbitMQ (AMQP) 队列实现
1656#[cfg(feature = "rabbitmq-backend")]
1657pub mod rabbitmq {
1658    use super::*;
1659    use lapin::{
1660        BasicProperties, Channel, Connection, ConnectionProperties, message::Delivery, options::*,
1661        publisher_confirm::Confirmation, types::*,
1662    };
1663    use std::sync::Arc;
1664    use tokio::sync::Semaphore;
1665    use uuid::Uuid;
1666
1667    /// RabbitMQ 连接配置
1668    #[derive(Clone)]
1669    pub struct RabbitMQConfig {
1670        /// AMQP URL (例如: amqp://guest:guest@localhost:5672/%2f)
1671        pub amqp_url: String,
1672        /// 连接属性
1673        pub connection_properties: ConnectionProperties,
1674    }
1675
1676    impl Default for RabbitMQConfig {
1677        fn default() -> Self {
1678            Self {
1679                amqp_url: "amqp://guest:guest@localhost:5672/%2f".to_string(),
1680                connection_properties: ConnectionProperties::default(),
1681            }
1682        }
1683    }
1684
1685    impl RabbitMQConfig {
1686        /// 创建新的 RabbitMQ 配置
1687        pub fn new(amqp_url: impl Into<String>) -> Self {
1688            Self { amqp_url: amqp_url.into(), connection_properties: ConnectionProperties::default() }
1689        }
1690    }
1691
1692    /// RabbitMQ 队列管理器
1693    pub struct RabbitMQQueueManager {
1694        channel: Arc<Channel>,
1695    }
1696
1697    impl RabbitMQQueueManager {
1698        /// 创建新的 RabbitMQ 队列管理器
1699        pub fn new(channel: Arc<Channel>) -> Self {
1700            Self { channel }
1701        }
1702
1703        /// 内部声明队列(使用 lapin API)
1704        async fn declare_queue_internal(&self, config: &QueueConfig) -> WaeResult<()> {
1705            let mut arguments = FieldTable::default();
1706
1707            if let Some(max_messages) = config.max_messages {
1708                arguments.insert("x-max-length".into(), AMQPValue::LongUInt(max_messages as u32));
1709            }
1710
1711            if let Some(max_message_size) = config.max_message_size {
1712                arguments.insert("x-max-length-bytes".into(), AMQPValue::LongUInt(max_message_size as u32));
1713            }
1714
1715            if let Some(message_ttl) = config.message_ttl {
1716                arguments.insert("x-message-ttl".into(), AMQPValue::LongUInt(message_ttl as u32));
1717            }
1718
1719            if let Some(dlq) = &config.dead_letter_queue {
1720                arguments.insert("x-dead-letter-exchange".into(), AMQPValue::ShortString("".into()));
1721                arguments.insert("x-dead-letter-routing-key".into(), AMQPValue::ShortString(dlq.clone().into()));
1722            }
1723
1724            self.channel
1725                .queue_declare(
1726                    &config.name,
1727                    QueueDeclareOptions {
1728                        passive: false,
1729                        durable: config.durable,
1730                        exclusive: false,
1731                        auto_delete: config.auto_delete,
1732                        nowait: false,
1733                    },
1734                    arguments,
1735                )
1736                .await
1737                .map_err(|e| WaeError::internal(format!("RabbitMQ declare queue: {}", e)))?;
1738
1739            Ok(())
1740        }
1741    }
1742
1743    #[async_trait::async_trait]
1744    impl QueueManager for RabbitMQQueueManager {
1745        async fn declare_queue(&self, config: &QueueConfig) -> WaeResult<()> {
1746            self.declare_queue_internal(config).await
1747        }
1748
1749        async fn delete_queue(&self, name: &str) -> WaeResult<()> {
1750            self.channel
1751                .queue_delete(name, QueueDeleteOptions::default())
1752                .await
1753                .map_err(|e| WaeError::internal(format!("RabbitMQ delete queue: {}", e)))?;
1754            Ok(())
1755        }
1756
1757        async fn queue_exists(&self, name: &str) -> WaeResult<bool> {
1758            let result = self
1759                .channel
1760                .queue_declare(name, QueueDeclareOptions { passive: true, ..Default::default() }, FieldTable::default())
1761                .await;
1762
1763            match result {
1764                Ok(_) => Ok(true),
1765                Err(lapin::Error::ProtocolError(_)) => Ok(false),
1766                Err(e) => Err(WaeError::internal(format!("RabbitMQ check queue exists: {}", e))),
1767            }
1768        }
1769
1770        async fn queue_message_count(&self, name: &str) -> WaeResult<u64> {
1771            let queue = self
1772                .channel
1773                .queue_declare(name, QueueDeclareOptions { passive: true, ..Default::default() }, FieldTable::default())
1774                .await
1775                .map_err(|e| WaeError::internal(format!("RabbitMQ get queue message count: {}", e)))?;
1776
1777            Ok(queue.message_count() as u64)
1778        }
1779
1780        async fn purge_queue(&self, name: &str) -> WaeResult<u64> {
1781            let result = self
1782                .channel
1783                .queue_purge(name, QueuePurgeOptions::default())
1784                .await
1785                .map_err(|e| WaeError::internal(format!("RabbitMQ purge queue: {}", e)))?;
1786
1787            Ok(result as u64)
1788        }
1789    }
1790
1791    /// RabbitMQ 生产者后端
1792    pub struct RabbitMQProducerBackend {
1793        config: ProducerConfig,
1794        channel: Arc<Channel>,
1795        manager: Arc<RabbitMQQueueManager>,
1796    }
1797
1798    impl RabbitMQProducerBackend {
1799        /// 创建新的 RabbitMQ 生产者后端
1800        pub fn new(config: ProducerConfig, channel: Arc<Channel>, manager: Arc<RabbitMQQueueManager>) -> Self {
1801            Self { config, channel, manager }
1802        }
1803
1804        /// 转换元数据为 RabbitMQ 头信息
1805        fn metadata_to_properties(metadata: &MessageMetadata) -> BasicProperties {
1806            let mut props = BasicProperties::default();
1807
1808            if let Some(correlation_id) = &metadata.correlation_id {
1809                props = props.with_correlation_id(correlation_id.clone().into());
1810            }
1811
1812            if let Some(reply_to) = &metadata.reply_to {
1813                props = props.with_reply_to(reply_to.clone().into());
1814            }
1815
1816            if let Some(content_type) = &metadata.content_type {
1817                props = props.with_content_type(content_type.clone().into());
1818            }
1819
1820            if let Some(timestamp) = metadata.timestamp {
1821                props = props.with_timestamp(timestamp);
1822            }
1823
1824            if let Some(priority) = metadata.priority {
1825                props = props.with_priority(priority);
1826            }
1827
1828            if let Some(expiration) = metadata.expiration {
1829                props = props.with_expiration(expiration.to_string().into());
1830            }
1831
1832            let mut headers = FieldTable::default();
1833            for (key, value) in &metadata.headers {
1834                headers.insert(key.clone().into(), AMQPValue::LongString(value.clone().into()));
1835            }
1836
1837            if metadata.headers.len() > 0 {
1838                props = props.with_headers(headers);
1839            }
1840
1841            props
1842        }
1843
1844        /// 内部发送原始消息
1845        async fn send_raw_internal(&self, queue: &str, message: &RawMessage, id: &str) -> WaeResult<()> {
1846            let mut metadata = message.metadata.clone();
1847            if metadata.timestamp.is_none() {
1848                metadata.timestamp =
1849                    Some(std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as u64);
1850            }
1851
1852            let mut props = Self::metadata_to_properties(&metadata);
1853            props = props.with_message_id(id.into());
1854
1855            let confirm = self
1856                .channel
1857                .basic_publish("", queue, BasicPublishOptions::default(), &message.data, props)
1858                .await
1859                .map_err(|e| WaeError::internal(format!("RabbitMQ publish message: {}", e)))?
1860                .await
1861                .map_err(|e| WaeError::internal(format!("RabbitMQ wait for confirm: {}", e)))?;
1862
1863            match confirm {
1864                Confirmation::Ack(_) => Ok(()),
1865                Confirmation::Nack(_) => Err(WaeError::internal("RabbitMQ message nacked: Message was nacked by broker")),
1866                Confirmation::NotRequested => Ok(()),
1867            }
1868        }
1869    }
1870
1871    #[async_trait::async_trait]
1872    impl ProducerBackend for RabbitMQProducerBackend {
1873        async fn send_raw(&self, queue: &str, message: &RawMessage) -> WaeResult<MessageId> {
1874            self.manager.declare_queue(&QueueConfig::new(queue)).await?;
1875
1876            let id = Uuid::new_v4().to_string();
1877            self.send_raw_internal(queue, message, &id).await?;
1878            Ok(id)
1879        }
1880
1881        async fn send_raw_default(&self, message: &RawMessage) -> WaeResult<MessageId> {
1882            let queue = self.config.default_queue.as_ref().ok_or_else(|| WaeError::config_missing("default_queue"))?;
1883            self.send_raw(queue, message).await
1884        }
1885
1886        async fn send_raw_delayed(&self, queue: &str, message: &RawMessage, delay: Duration) -> WaeResult<MessageId> {
1887            let delay_ms = delay.as_millis() as u64;
1888            let delay_queue_name = format!("{}.delay.{}", queue, delay_ms);
1889
1890            let mut delay_queue_config = QueueConfig::new(&delay_queue_name);
1891            delay_queue_config = delay_queue_config.dead_letter_queue(queue);
1892            delay_queue_config.message_ttl = Some(delay_ms);
1893
1894            self.manager.declare_queue(&delay_queue_config).await?;
1895
1896            let id = Uuid::new_v4().to_string();
1897            self.send_raw_internal(&delay_queue_name, message, &id).await?;
1898            Ok(id)
1899        }
1900
1901        async fn send_raw_batch(&self, queue: &str, messages: &[RawMessage]) -> WaeResult<Vec<MessageId>> {
1902            let mut ids = Vec::with_capacity(messages.len());
1903            for msg in messages {
1904                ids.push(self.send_raw(queue, msg).await?);
1905            }
1906            Ok(ids)
1907        }
1908
1909        fn config(&self) -> &ProducerConfig {
1910            &self.config
1911        }
1912    }
1913
1914    /// RabbitMQ 消费者后端
1915    pub struct RabbitMQConsumerBackend {
1916        config: ConsumerConfig,
1917        channel: Arc<Channel>,
1918        manager: Arc<RabbitMQQueueManager>,
1919        consumer: Arc<tokio::sync::Mutex<lapin::Consumer>>,
1920        semaphore: Arc<Semaphore>,
1921    }
1922
1923    impl RabbitMQConsumerBackend {
1924        /// 创建新的 RabbitMQ 消费者后端
1925        pub async fn new(config: ConsumerConfig, channel: Arc<Channel>, manager: Arc<RabbitMQQueueManager>) -> WaeResult<Self> {
1926            channel
1927                .basic_qos(config.prefetch_count, BasicQosOptions::default())
1928                .await
1929                .map_err(|e| WaeError::internal(format!("RabbitMQ set qos: {}", e)))?;
1930
1931            let consumer = channel
1932                .basic_consume(
1933                    &config.queue,
1934                    config.consumer_tag.as_deref().unwrap_or(""),
1935                    BasicConsumeOptions {
1936                        no_local: false,
1937                        no_ack: config.auto_ack,
1938                        exclusive: config.exclusive,
1939                        nowait: false,
1940                    },
1941                    FieldTable::default(),
1942                )
1943                .await
1944                .map_err(|e| WaeError::internal(format!("RabbitMQ create consumer: {}", e)))?;
1945
1946            let prefetch_count = config.prefetch_count;
1947            Ok(Self {
1948                config,
1949                channel,
1950                manager,
1951                consumer: Arc::new(tokio::sync::Mutex::new(consumer)),
1952                semaphore: Arc::new(Semaphore::new(prefetch_count as usize)),
1953            })
1954        }
1955
1956        /// 转换 RabbitMQ 消息为原始消息
1957        fn delivery_to_raw_message(delivery: &Delivery) -> RawMessage {
1958            let mut metadata = MessageMetadata::default();
1959
1960            metadata.id = delivery.properties.message_id().clone().map(|s| s.to_string());
1961            metadata.correlation_id = delivery.properties.correlation_id().clone().map(|s| s.to_string());
1962            metadata.reply_to = delivery.properties.reply_to().clone().map(|s| s.to_string());
1963            metadata.content_type = delivery.properties.content_type().clone().map(|s| s.to_string());
1964            metadata.timestamp = delivery.properties.timestamp().map(|t| t);
1965            metadata.priority = *delivery.properties.priority();
1966            metadata.expiration = delivery.properties.expiration().clone().and_then(|s| s.as_str().parse().ok());
1967
1968            if let Some(headers) = delivery.properties.headers() {
1969                for (key, value) in headers.inner().iter() {
1970                    if let AMQPValue::LongString(s) = value {
1971                        metadata.headers.insert(key.to_string(), s.to_string());
1972                    }
1973                    else if let AMQPValue::ShortString(s) = value {
1974                        metadata.headers.insert(key.to_string(), s.to_string());
1975                    }
1976                }
1977            }
1978
1979            RawMessage { data: delivery.data.clone(), metadata }
1980        }
1981    }
1982
1983    #[async_trait::async_trait]
1984    impl ConsumerBackend for RabbitMQConsumerBackend {
1985        async fn receive_raw(&self) -> WaeResult<Option<ReceivedRawMessage>> {
1986            use futures::StreamExt;
1987
1988            let mut consumer = self.consumer.lock().await;
1989            if let Some(delivery_result) = consumer.next().await {
1990                let delivery = delivery_result.map_err(|e| WaeError::internal(format!("RabbitMQ receive message: {}", e)))?;
1991
1992                let message = Self::delivery_to_raw_message(&delivery);
1993                let redelivery_count = 0;
1994
1995                Ok(Some(ReceivedRawMessage { message, delivery_tag: delivery.delivery_tag, redelivery_count }))
1996            }
1997            else {
1998                Ok(None)
1999            }
2000        }
2001
2002        async fn ack(&self, delivery_tag: u64) -> WaeResult<()> {
2003            self.channel
2004                .basic_ack(delivery_tag, BasicAckOptions::default())
2005                .await
2006                .map_err(|e| WaeError::internal(format!("RabbitMQ ack message: {}", e)))?;
2007            Ok(())
2008        }
2009
2010        async fn nack(&self, delivery_tag: u64, requeue: bool) -> WaeResult<()> {
2011            self.channel
2012                .basic_nack(delivery_tag, BasicNackOptions { multiple: false, requeue })
2013                .await
2014                .map_err(|e| WaeError::internal(format!("RabbitMQ nack message: {}", e)))?;
2015            Ok(())
2016        }
2017
2018        fn config(&self) -> &ConsumerConfig {
2019            &self.config
2020        }
2021    }
2022
2023    /// RabbitMQ 队列服务
2024    pub struct RabbitMQQueueService {
2025        connection: Arc<Connection>,
2026        channel: Arc<Channel>,
2027        manager: Arc<RabbitMQQueueManager>,
2028    }
2029
2030    impl RabbitMQQueueService {
2031        /// 创建新的 RabbitMQ 队列服务
2032        pub async fn new(config: RabbitMQConfig) -> WaeResult<Self> {
2033            let connection = Connection::connect(&config.amqp_url, config.connection_properties)
2034                .await
2035                .map_err(|e| WaeError::internal(format!("RabbitMQ connect: {}", e)))?;
2036
2037            let channel =
2038                connection.create_channel().await.map_err(|e| WaeError::internal(format!("RabbitMQ create channel: {}", e)))?;
2039
2040            channel
2041                .confirm_select(ConfirmSelectOptions::default())
2042                .await
2043                .map_err(|e| WaeError::internal(format!("RabbitMQ enable confirm mode: {}", e)))?;
2044
2045            let connection = Arc::new(connection);
2046            let channel = Arc::new(channel);
2047            let manager = Arc::new(RabbitMQQueueManager::new(channel.clone()));
2048
2049            Ok(Self { connection, channel, manager })
2050        }
2051    }
2052
2053    impl QueueService for RabbitMQQueueService {
2054        async fn create_producer(&self, config: ProducerConfig) -> WaeResult<MessageProducer> {
2055            Ok(MessageProducer::new(Box::new(RabbitMQProducerBackend::new(config, self.channel.clone(), self.manager.clone()))))
2056        }
2057
2058        async fn create_consumer(&self, config: ConsumerConfig) -> WaeResult<MessageConsumer> {
2059            self.manager.declare_queue(&QueueConfig::new(&config.queue)).await?;
2060
2061            let backend = RabbitMQConsumerBackend::new(config, self.channel.clone(), self.manager.clone()).await?;
2062
2063            Ok(MessageConsumer::new(Box::new(backend)))
2064        }
2065
2066        fn manager(&self) -> &dyn QueueManager {
2067            self.manager.as_ref() as &dyn QueueManager
2068        }
2069
2070        async fn close(&self) -> WaeResult<()> {
2071            self.connection.close(0, "").await.map_err(|e| WaeError::internal(format!("RabbitMQ close connection: {}", e)))?;
2072            Ok(())
2073        }
2074    }
2075}
2076
2077/// Pulsar 队列实现
2078#[cfg(feature = "pulsar-backend")]
2079pub mod pulsar_backend {
2080    use super::*;
2081    use base64::{Engine as _, engine::general_purpose};
2082    use pulsar::{Authentication, Pulsar, TokioExecutor, consumer, message::proto::MessageIdData, producer};
2083    use std::{collections::HashMap, sync::Arc, time::Duration};
2084    use tokio::sync::Mutex;
2085    use uuid::Uuid;
2086
2087    /// Pulsar 连接配置
2088    #[derive(Debug, Clone)]
2089    pub struct PulsarConfig {
2090        /// Pulsar Broker 地址 (例如: pulsar://localhost:6650)
2091        pub brokers: String,
2092        /// 认证配置
2093        pub authentication: Option<Authentication>,
2094        /// 生产者配置
2095        pub producer_options: producer::ProducerOptions,
2096        /// 消费者配置
2097        pub consumer_options: consumer::ConsumerOptions,
2098    }
2099
2100    impl Default for PulsarConfig {
2101        fn default() -> Self {
2102            Self {
2103                brokers: "pulsar://localhost:6650".to_string(),
2104                authentication: None,
2105                producer_options: producer::ProducerOptions::default(),
2106                consumer_options: consumer::ConsumerOptions::default(),
2107            }
2108        }
2109    }
2110
2111    impl PulsarConfig {
2112        /// 创建新的 Pulsar 配置
2113        pub fn new(brokers: impl Into<String>) -> Self {
2114            Self { brokers: brokers.into(), ..Default::default() }
2115        }
2116    }
2117
2118    /// Pulsar 队列管理器
2119    pub struct PulsarQueueManager {
2120        client: Arc<Pulsar<TokioExecutor>>,
2121    }
2122
2123    impl PulsarQueueManager {
2124        /// 创建新的 Pulsar 队列管理器
2125        pub fn new(client: Arc<Pulsar<TokioExecutor>>) -> Self {
2126            Self { client }
2127        }
2128
2129        /// 从配置创建 Pulsar 队列管理器
2130        pub async fn from_config(config: &PulsarConfig) -> WaeResult<Self> {
2131            let mut builder = Pulsar::builder(&config.brokers, TokioExecutor);
2132            if let Some(auth) = &config.authentication {
2133                builder = builder.with_auth(auth.clone());
2134            }
2135            let client =
2136                builder.build().await.map_err(|e| WaeError::internal(format!("Failed to create Pulsar client: {}", e)))?;
2137            Ok(Self::new(Arc::new(client)))
2138        }
2139
2140        /// 内部主题名称
2141        fn topic_name(queue: &str) -> String {
2142            format!("persistent://public/default/{}", queue)
2143        }
2144    }
2145
2146    #[async_trait::async_trait]
2147    impl QueueManager for PulsarQueueManager {
2148        async fn declare_queue(&self, _config: &QueueConfig) -> WaeResult<()> {
2149            Ok(())
2150        }
2151
2152        async fn delete_queue(&self, _name: &str) -> WaeResult<()> {
2153            Ok(())
2154        }
2155
2156        async fn queue_exists(&self, _name: &str) -> WaeResult<bool> {
2157            Ok(true)
2158        }
2159
2160        async fn queue_message_count(&self, _name: &str) -> WaeResult<u64> {
2161            Ok(0)
2162        }
2163
2164        async fn purge_queue(&self, _name: &str) -> WaeResult<u64> {
2165            Ok(0)
2166        }
2167    }
2168
2169    /// Pulsar 生产者后端
2170    pub struct PulsarProducerBackend {
2171        config: ProducerConfig,
2172        producer: Arc<producer::Producer<TokioExecutor>>,
2173        manager: Arc<PulsarQueueManager>,
2174    }
2175
2176    impl PulsarProducerBackend {
2177        /// 创建新的 Pulsar 生产者后端
2178        pub fn new(
2179            config: ProducerConfig,
2180            manager: Arc<PulsarQueueManager>,
2181            producer: Arc<producer::Producer<TokioExecutor>>,
2182        ) -> Self {
2183            Self { config, producer, manager }
2184        }
2185
2186        /// 编码元数据
2187        fn encode_metadata(metadata: &MessageMetadata) -> HashMap<String, String> {
2188            let mut fields = HashMap::new();
2189
2190            if let Some(id) = &metadata.id {
2191                fields.insert("id".to_string(), id.clone());
2192            }
2193            if let Some(correlation_id) = &metadata.correlation_id {
2194                fields.insert("correlation_id".to_string(), correlation_id.clone());
2195            }
2196            if let Some(reply_to) = &metadata.reply_to {
2197                fields.insert("reply_to".to_string(), reply_to.clone());
2198            }
2199            if let Some(content_type) = &metadata.content_type {
2200                fields.insert("content_type".to_string(), content_type.clone());
2201            }
2202            if let Some(timestamp) = metadata.timestamp {
2203                fields.insert("timestamp".to_string(), timestamp.to_string());
2204            }
2205            if let Some(priority) = metadata.priority {
2206                fields.insert("priority".to_string(), priority.to_string());
2207            }
2208            if let Some(expiration) = metadata.expiration {
2209                fields.insert("expiration".to_string(), expiration.to_string());
2210            }
2211
2212            for (key, value) in &metadata.headers {
2213                fields.insert(format!("header:{}", key), value.clone());
2214            }
2215
2216            fields
2217        }
2218    }
2219
2220    #[async_trait::async_trait]
2221    impl ProducerBackend for PulsarProducerBackend {
2222        async fn send_raw(&self, queue: &str, message: &RawMessage) -> WaeResult<MessageId> {
2223            let id = Uuid::new_v4().to_string();
2224            let mut metadata = message.metadata.clone();
2225            metadata.id = Some(id.clone());
2226            if metadata.timestamp.is_none() {
2227                metadata.timestamp =
2228                    Some(std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as u64);
2229            }
2230
2231            let data_b64 = general_purpose::STANDARD.encode(&message.data);
2232            let mut fields = Self::encode_metadata(&metadata);
2233            fields.insert("data".to_string(), data_b64);
2234
2235            let payload = serde_json::to_vec(&fields)
2236                .map_err(|e| WaeError::internal(format!("Failed to serialize Pulsar message: {}", e)))?;
2237
2238            let message_id = self
2239                .producer
2240                .send(payload)
2241                .await
2242                .map_err(|e| WaeError::internal(format!("Failed to send Pulsar message: {}", e)))?
2243                .await
2244                .map_err(|e| WaeError::internal(format!("Failed to confirm Pulsar message: {}", e)))?;
2245
2246            Ok(id)
2247        }
2248
2249        async fn send_raw_default(&self, message: &RawMessage) -> WaeResult<MessageId> {
2250            let queue = self.config.default_queue.as_ref().ok_or_else(|| WaeError::config_missing("default_queue"))?;
2251            self.send_raw(queue, message).await
2252        }
2253
2254        async fn send_raw_delayed(&self, queue: &str, message: &RawMessage, delay: Duration) -> WaeResult<MessageId> {
2255            let id = Uuid::new_v4().to_string();
2256            let message_clone = message.clone();
2257            let producer = self.producer.clone();
2258            let manager = self.manager.clone();
2259            let queue = queue.to_string();
2260
2261            tokio::spawn(async move {
2262                tokio::time::sleep(delay).await;
2263                let mut producer_config = ProducerConfig::default();
2264                producer_config.default_queue = Some(queue.clone());
2265                let mut producer_backend = PulsarProducerBackend::new(producer_config, manager.clone(), producer.clone());
2266                let _ = producer_backend.send_raw(&queue, &message_clone).await;
2267            });
2268
2269            Ok(id)
2270        }
2271
2272        async fn send_raw_batch(&self, queue: &str, messages: &[RawMessage]) -> WaeResult<Vec<MessageId>> {
2273            let mut ids = Vec::with_capacity(messages.len());
2274            for msg in messages {
2275                ids.push(self.send_raw(queue, msg).await?);
2276            }
2277            Ok(ids)
2278        }
2279
2280        fn config(&self) -> &ProducerConfig {
2281            &self.config
2282        }
2283    }
2284
2285    /// Pulsar 消费者后端
2286    pub struct PulsarConsumerBackend {
2287        config: ConsumerConfig,
2288        consumer: Arc<Mutex<consumer::Consumer<TokioExecutor>>>,
2289        manager: Arc<PulsarQueueManager>,
2290        delivery_tags: Arc<Mutex<HashMap<u64, MessageIdData>>>,
2291        next_delivery_tag: Arc<Mutex<u64>>,
2292    }
2293
2294    impl PulsarConsumerBackend {
2295        /// 创建新的 Pulsar 消费者后端
2296        pub fn new(
2297            config: ConsumerConfig,
2298            manager: Arc<PulsarQueueManager>,
2299            consumer: consumer::Consumer<TokioExecutor>,
2300        ) -> Self {
2301            Self {
2302                config,
2303                consumer: Arc::new(Mutex::new(consumer)),
2304                manager,
2305                delivery_tags: Arc::new(Mutex::new(HashMap::new())),
2306                next_delivery_tag: Arc::new(Mutex::new(1)),
2307            }
2308        }
2309
2310        /// 解码元数据
2311        fn decode_metadata(fields: &HashMap<String, String>) -> MessageMetadata {
2312            let mut metadata = MessageMetadata::default();
2313
2314            if let Some(id) = fields.get("id") {
2315                metadata.id = Some(id.clone());
2316            }
2317            if let Some(correlation_id) = fields.get("correlation_id") {
2318                metadata.correlation_id = Some(correlation_id.clone());
2319            }
2320            if let Some(reply_to) = fields.get("reply_to") {
2321                metadata.reply_to = Some(reply_to.clone());
2322            }
2323            if let Some(content_type) = fields.get("content_type") {
2324                metadata.content_type = Some(content_type.clone());
2325            }
2326            if let Some(timestamp) = fields.get("timestamp").and_then(|s| s.parse().ok()) {
2327                metadata.timestamp = Some(timestamp);
2328            }
2329            if let Some(priority) = fields.get("priority").and_then(|s| s.parse().ok()) {
2330                metadata.priority = Some(priority);
2331            }
2332            if let Some(expiration) = fields.get("expiration").and_then(|s| s.parse().ok()) {
2333                metadata.expiration = Some(expiration);
2334            }
2335
2336            for (key, value) in fields {
2337                if let Some(header_key) = key.strip_prefix("header:") {
2338                    metadata.headers.insert(header_key.to_string(), value.clone());
2339                }
2340            }
2341
2342            metadata
2343        }
2344    }
2345
2346    #[async_trait::async_trait]
2347    impl ConsumerBackend for PulsarConsumerBackend {
2348        async fn receive_raw(&self) -> WaeResult<Option<ReceivedRawMessage>> {
2349            match tokio::time::timeout(Duration::from_secs(1), async {
2350                let mut consumer = self.consumer.lock().await;
2351                consumer.next().await
2352            })
2353            .await
2354            {
2355                Ok(Some(msg_result)) => {
2356                    let msg = msg_result.map_err(|e| WaeError::internal(format!("Failed to receive from Pulsar: {}", e)))?;
2357                    let payload = msg.payload.data.clone();
2358                    let fields: HashMap<String, String> = serde_json::from_slice(&payload)
2359                        .map_err(|e| WaeError::internal(format!("Failed to deserialize Pulsar message: {}", e)))?;
2360                    let data_b64 = fields.get("data").ok_or_else(|| WaeError::internal("Missing data field"))?;
2361                    let data = general_purpose::STANDARD
2362                        .decode(data_b64)
2363                        .map_err(|e| WaeError::internal(format!("Failed to decode data: {}", e)))?;
2364                    let metadata = Self::decode_metadata(&fields);
2365
2366                    let mut next_tag = self.next_delivery_tag.lock().await;
2367                    let delivery_tag = *next_tag;
2368                    *next_tag += 1;
2369                    drop(next_tag);
2370
2371                    let mut delivery_tags = self.delivery_tags.lock().await;
2372                    delivery_tags.insert(delivery_tag, msg.message_id.clone());
2373
2374                    let redelivery_count = metadata.headers.get("x-redelivery-count").and_then(|s| s.parse().ok()).unwrap_or(0);
2375
2376                    let raw_message = RawMessage { data, metadata };
2377                    Ok(Some(ReceivedRawMessage { message: raw_message, delivery_tag, redelivery_count }))
2378                }
2379                Ok(None) => Ok(None),
2380                Err(_) => Ok(None),
2381            }
2382        }
2383
2384        async fn ack(&self, delivery_tag: u64) -> WaeResult<()> {
2385            let mut delivery_tags = self.delivery_tags.lock().await;
2386            if let Some(message_id) = delivery_tags.remove(&delivery_tag) {
2387                let mut consumer = self.consumer.lock().await;
2388                consumer
2389                    .ack(&message_id)
2390                    .await
2391                    .map_err(|e| WaeError::internal(format!("Failed to ack Pulsar message: {}", e)))?;
2392            }
2393            Ok(())
2394        }
2395
2396        async fn nack(&self, delivery_tag: u64, requeue: bool) -> WaeResult<()> {
2397            let mut delivery_tags = self.delivery_tags.lock().await;
2398            if let Some(message_id) = delivery_tags.remove(&delivery_tag) {
2399                let mut consumer = self.consumer.lock().await;
2400                if requeue {
2401                    consumer
2402                        .nack_with_redelivery(&message_id)
2403                        .await
2404                        .map_err(|e| WaeError::internal(format!("Failed to nack Pulsar message with redelivery: {}", e)))?;
2405                }
2406                else {
2407                    consumer
2408                        .ack(&message_id)
2409                        .await
2410                        .map_err(|e| WaeError::internal(format!("Failed to ack Pulsar message in nack: {}", e)))?;
2411                }
2412            }
2413            Ok(())
2414        }
2415
2416        fn config(&self) -> &ConsumerConfig {
2417            &self.config
2418        }
2419    }
2420
2421    /// Pulsar 队列服务
2422    pub struct PulsarQueueService {
2423        manager: Arc<PulsarQueueManager>,
2424        client: Arc<Pulsar<TokioExecutor>>,
2425        producer_options: producer::ProducerOptions,
2426        consumer_options: consumer::ConsumerOptions,
2427    }
2428
2429    impl PulsarQueueService {
2430        /// 创建新的 Pulsar 队列服务
2431        pub async fn new(config: PulsarConfig) -> WaeResult<Self> {
2432            let manager = PulsarQueueManager::from_config(&config).await?;
2433            let client = manager.client.clone();
2434            Ok(Self {
2435                manager: Arc::new(manager),
2436                client,
2437                producer_options: config.producer_options,
2438                consumer_options: config.consumer_options,
2439            })
2440        }
2441    }
2442
2443    impl QueueService for PulsarQueueService {
2444        async fn create_producer(&self, config: ProducerConfig) -> WaeResult<MessageProducer> {
2445            let topic = config
2446                .default_queue
2447                .as_ref()
2448                .map(|q| PulsarQueueManager::topic_name(q))
2449                .unwrap_or_else(|| PulsarQueueManager::topic_name("default"));
2450
2451            let producer: producer::Producer<TokioExecutor> = self
2452                .client
2453                .producer()
2454                .with_options(self.producer_options.clone())
2455                .with_topic(topic)
2456                .build()
2457                .await
2458                .map_err(|e| WaeError::internal(format!("Failed to create Pulsar producer: {}", e)))?;
2459
2460            Ok(MessageProducer::new(Box::new(PulsarProducerBackend::new(config, self.manager.clone(), Arc::new(producer)))))
2461        }
2462
2463        async fn create_consumer(&self, config: ConsumerConfig) -> WaeResult<MessageConsumer> {
2464            let topic = PulsarQueueManager::topic_name(&config.queue);
2465            let consumer_name = config.consumer_tag.clone().unwrap_or_else(|| format!("wae-consumer-{}", Uuid::new_v4()));
2466            let subscription = format!("wae-subscription-{}", Uuid::new_v4());
2467
2468            let consumer: consumer::Consumer<TokioExecutor> = self
2469                .client
2470                .consumer()
2471                .with_options(self.consumer_options.clone())
2472                .with_topic(topic)
2473                .with_consumer_name(consumer_name)
2474                .with_subscription(subscription)
2475                .with_subscription_type(consumer::SubscriptionType::Exclusive)
2476                .build()
2477                .await
2478                .map_err(|e| WaeError::internal(format!("Failed to create Pulsar consumer: {}", e)))?;
2479
2480            let backend = PulsarConsumerBackend::new(config, self.manager.clone(), consumer);
2481
2482            Ok(MessageConsumer::new(Box::new(backend)))
2483        }
2484
2485        fn manager(&self) -> &dyn QueueManager {
2486            self.manager.as_ref() as &dyn QueueManager
2487        }
2488
2489        async fn close(&self) -> WaeResult<()> {
2490            Ok(())
2491        }
2492    }
2493}