Skip to main content

wae_queue/backends/
memory.rs

1//! 内存队列实现
2
3use super::super::*;
4use std::{
5    collections::{HashMap, VecDeque},
6    sync::Arc,
7};
8use tokio::sync::RwLock;
9
10/// 待处理消息状态
11struct PendingMessage {
12    data: Vec<u8>,
13    metadata: MessageMetadata,
14    redelivery_count: u32,
15    delivery_tag: u64,
16}
17
18/// 内存队列存储
19struct QueueStorage {
20    messages: VecDeque<(Vec<u8>, MessageMetadata)>,
21    pending_messages: HashMap<u64, PendingMessage>,
22    next_delivery_tag: u64,
23}
24
25impl QueueStorage {
26    fn new() -> Self {
27        Self { messages: VecDeque::new(), pending_messages: HashMap::new(), next_delivery_tag: 1 }
28    }
29}
30
31/// 内存队列管理器
32pub struct MemoryQueueManager {
33    queues: Arc<RwLock<HashMap<String, QueueStorage>>>,
34    configs: Arc<RwLock<HashMap<String, QueueConfig>>>,
35}
36
37impl MemoryQueueManager {
38    /// 创建新的内存队列管理器
39    pub fn new() -> Self {
40        Self { queues: Arc::new(RwLock::new(HashMap::new())), configs: Arc::new(RwLock::new(HashMap::new())) }
41    }
42}
43
44impl Default for MemoryQueueManager {
45    fn default() -> Self {
46        Self::new()
47    }
48}
49
50#[async_trait::async_trait]
51impl QueueManager for MemoryQueueManager {
52    async fn declare_queue(&self, config: &QueueConfig) -> WaeResult<()> {
53        let mut queues = self.queues.write().await;
54        let mut configs = self.configs.write().await;
55
56        if !queues.contains_key(&config.name) {
57            queues.insert(config.name.clone(), QueueStorage::new());
58        }
59        configs.insert(config.name.clone(), config.clone());
60        Ok(())
61    }
62
63    async fn delete_queue(&self, name: &str) -> WaeResult<()> {
64        let mut queues = self.queues.write().await;
65        let mut configs = self.configs.write().await;
66        queues.remove(name);
67        configs.remove(name);
68        Ok(())
69    }
70
71    async fn queue_exists(&self, name: &str) -> WaeResult<bool> {
72        let queues = self.queues.read().await;
73        Ok(queues.contains_key(name))
74    }
75
76    async fn queue_message_count(&self, name: &str) -> WaeResult<u64> {
77        let queues = self.queues.read().await;
78        Ok(queues.get(name).map(|q| q.messages.len() as u64 + q.pending_messages.len() as u64).unwrap_or(0))
79    }
80
81    async fn purge_queue(&self, name: &str) -> WaeResult<u64> {
82        let mut queues = self.queues.write().await;
83        if let Some(queue) = queues.get_mut(name) {
84            let count = queue.messages.len() as u64 + queue.pending_messages.len() as u64;
85            queue.messages.clear();
86            queue.pending_messages.clear();
87            return Ok(count);
88        }
89        Ok(0)
90    }
91}
92
93/// 内存生产者后端
94pub struct MemoryProducerBackend {
95    config: ProducerConfig,
96    queues: Arc<RwLock<HashMap<String, QueueStorage>>>,
97    manager: Arc<MemoryQueueManager>,
98}
99
100impl MemoryProducerBackend {
101    /// 创建新的内存生产者后端
102    pub fn new(config: ProducerConfig, manager: Arc<MemoryQueueManager>) -> Self {
103        Self { config, queues: manager.queues.clone(), manager }
104    }
105
106    /// 内部发送消息到指定队列(不声明队列)
107    async fn send_raw_internal(&self, queue: &str, data: Vec<u8>, mut metadata: MessageMetadata) -> WaeResult<MessageId> {
108        let id = uuid::Uuid::new_v4().to_string();
109        metadata.id = Some(id.clone());
110        metadata.timestamp =
111            Some(std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as u64);
112
113        let mut queues = self.queues.write().await;
114        if let Some(q) = queues.get_mut(queue) {
115            q.messages.push_back((data, metadata));
116        }
117        Ok(id)
118    }
119}
120
121#[async_trait::async_trait]
122impl ProducerBackend for MemoryProducerBackend {
123    async fn send_raw(&self, queue: &str, message: &RawMessage) -> WaeResult<MessageId> {
124        self.manager.declare_queue(&QueueConfig::new(queue)).await?;
125
126        let id = uuid::Uuid::new_v4().to_string();
127        let mut metadata = message.metadata.clone();
128        metadata.id = Some(id.clone());
129        metadata.timestamp =
130            Some(std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as u64);
131
132        let mut queues = self.queues.write().await;
133        if let Some(q) = queues.get_mut(queue) {
134            q.messages.push_back((message.data.clone(), metadata));
135        }
136        Ok(id)
137    }
138
139    async fn send_raw_default(&self, message: &RawMessage) -> WaeResult<MessageId> {
140        let queue = self.config.default_queue.as_ref().ok_or_else(|| WaeError::config_missing("default_queue"))?;
141        self.send_raw(queue, message).await
142    }
143
144    async fn send_raw_delayed(&self, queue: &str, message: &RawMessage, delay: Duration) -> WaeResult<MessageId> {
145        tokio::time::sleep(delay).await;
146        self.send_raw(queue, message).await
147    }
148
149    async fn send_raw_batch(&self, queue: &str, messages: &[RawMessage]) -> WaeResult<Vec<MessageId>> {
150        let mut ids = Vec::with_capacity(messages.len());
151        for msg in messages {
152            ids.push(self.send_raw(queue, msg).await?);
153        }
154        Ok(ids)
155    }
156
157    fn config(&self) -> &ProducerConfig {
158        &self.config
159    }
160}
161
162/// 内存消费者后端
163pub struct MemoryConsumerBackend {
164    config: ConsumerConfig,
165    queues: Arc<RwLock<HashMap<String, QueueStorage>>>,
166    manager: Arc<MemoryQueueManager>,
167}
168
169impl MemoryConsumerBackend {
170    /// 创建新的内存消费者后端
171    pub fn new(config: ConsumerConfig, manager: Arc<MemoryQueueManager>) -> Self {
172        Self { config, queues: manager.queues.clone(), manager: manager.clone() }
173    }
174}
175
176#[async_trait::async_trait]
177impl ConsumerBackend for MemoryConsumerBackend {
178    async fn receive_raw(&self) -> WaeResult<Option<ReceivedRawMessage>> {
179        let mut queues = self.queues.write().await;
180        if let Some(queue) = queues.get_mut(&self.config.queue) {
181            if let Some((data, metadata)) = queue.messages.pop_front() {
182                let delivery_tag = queue.next_delivery_tag;
183                queue.next_delivery_tag += 1;
184
185                let redelivery_count = metadata.headers.get("x-redelivery-count").and_then(|s| s.parse().ok()).unwrap_or(0);
186
187                let pending_msg =
188                    PendingMessage { data: data.clone(), metadata: metadata.clone(), redelivery_count, delivery_tag };
189                queue.pending_messages.insert(delivery_tag, pending_msg);
190
191                let message = RawMessage { data, metadata };
192                return Ok(Some(ReceivedRawMessage { message, delivery_tag, redelivery_count }));
193            }
194        }
195        Ok(None)
196    }
197
198    async fn ack(&self, delivery_tag: u64) -> WaeResult<()> {
199        let mut queues = self.queues.write().await;
200        if let Some(queue) = queues.get_mut(&self.config.queue) {
201            queue.pending_messages.remove(&delivery_tag);
202        }
203        Ok(())
204    }
205
206    async fn nack(&self, delivery_tag: u64, requeue: bool) -> WaeResult<()> {
207        let configs = self.manager.configs.read().await;
208        let queue_config = configs.get(&self.config.queue).cloned();
209        drop(configs);
210
211        let mut queues = self.queues.write().await;
212
213        if let Some(queue) = queues.get_mut(&self.config.queue) {
214            if let Some(mut pending_msg) = queue.pending_messages.remove(&delivery_tag) {
215                if requeue {
216                    pending_msg.redelivery_count += 1;
217                    pending_msg
218                        .metadata
219                        .headers
220                        .insert("x-redelivery-count".to_string(), pending_msg.redelivery_count.to_string());
221
222                    queue.messages.push_back((pending_msg.data, pending_msg.metadata));
223                }
224                else {
225                    if let Some(dlq_name) = queue_config.and_then(|c| c.dead_letter_queue) {
226                        drop(queues);
227
228                        self.manager.declare_queue(&QueueConfig::new(&dlq_name)).await?;
229
230                        let mut queues = self.queues.write().await;
231                        if let Some(dlq) = queues.get_mut(&dlq_name) {
232                            dlq.messages.push_back((pending_msg.data, pending_msg.metadata));
233                        }
234                    }
235                }
236            }
237        }
238
239        Ok(())
240    }
241
242    fn config(&self) -> &ConsumerConfig {
243        &self.config
244    }
245}
246
247/// 内存队列服务
248pub struct MemoryQueueService {
249    manager: Arc<MemoryQueueManager>,
250}
251
252impl MemoryQueueService {
253    /// 创建新的内存队列服务
254    pub fn new() -> Self {
255        Self { manager: Arc::new(MemoryQueueManager::new()) }
256    }
257}
258
259impl Default for MemoryQueueService {
260    fn default() -> Self {
261        Self::new()
262    }
263}
264
265impl QueueService for MemoryQueueService {
266    async fn create_producer(&self, config: ProducerConfig) -> WaeResult<MessageProducer> {
267        Ok(MessageProducer::new(Box::new(MemoryProducerBackend::new(config, self.manager.clone()))))
268    }
269
270    async fn create_consumer(&self, config: ConsumerConfig) -> WaeResult<MessageConsumer> {
271        self.manager.declare_queue(&QueueConfig::new(&config.queue)).await?;
272        Ok(MessageConsumer::new(Box::new(MemoryConsumerBackend::new(config, self.manager.clone()))))
273    }
274
275    fn manager(&self) -> &dyn QueueManager {
276        self.manager.as_ref() as &dyn QueueManager
277    }
278
279    async fn close(&self) -> WaeResult<()> {
280        Ok(())
281    }
282}