wae_queue/backends/
memory.rs1use super::super::*;
4use std::{
5 collections::{HashMap, VecDeque},
6 sync::Arc,
7};
8use tokio::sync::RwLock;
9
10struct PendingMessage {
12 data: Vec<u8>,
13 metadata: MessageMetadata,
14 redelivery_count: u32,
15 delivery_tag: u64,
16}
17
18struct QueueStorage {
20 messages: VecDeque<(Vec<u8>, MessageMetadata)>,
21 pending_messages: HashMap<u64, PendingMessage>,
22 next_delivery_tag: u64,
23}
24
25impl QueueStorage {
26 fn new() -> Self {
27 Self { messages: VecDeque::new(), pending_messages: HashMap::new(), next_delivery_tag: 1 }
28 }
29}
30
31pub struct MemoryQueueManager {
33 queues: Arc<RwLock<HashMap<String, QueueStorage>>>,
34 configs: Arc<RwLock<HashMap<String, QueueConfig>>>,
35}
36
37impl MemoryQueueManager {
38 pub fn new() -> Self {
40 Self { queues: Arc::new(RwLock::new(HashMap::new())), configs: Arc::new(RwLock::new(HashMap::new())) }
41 }
42}
43
44impl Default for MemoryQueueManager {
45 fn default() -> Self {
46 Self::new()
47 }
48}
49
50#[async_trait::async_trait]
51impl QueueManager for MemoryQueueManager {
52 async fn declare_queue(&self, config: &QueueConfig) -> WaeResult<()> {
53 let mut queues = self.queues.write().await;
54 let mut configs = self.configs.write().await;
55
56 if !queues.contains_key(&config.name) {
57 queues.insert(config.name.clone(), QueueStorage::new());
58 }
59 configs.insert(config.name.clone(), config.clone());
60 Ok(())
61 }
62
63 async fn delete_queue(&self, name: &str) -> WaeResult<()> {
64 let mut queues = self.queues.write().await;
65 let mut configs = self.configs.write().await;
66 queues.remove(name);
67 configs.remove(name);
68 Ok(())
69 }
70
71 async fn queue_exists(&self, name: &str) -> WaeResult<bool> {
72 let queues = self.queues.read().await;
73 Ok(queues.contains_key(name))
74 }
75
76 async fn queue_message_count(&self, name: &str) -> WaeResult<u64> {
77 let queues = self.queues.read().await;
78 Ok(queues.get(name).map(|q| q.messages.len() as u64 + q.pending_messages.len() as u64).unwrap_or(0))
79 }
80
81 async fn purge_queue(&self, name: &str) -> WaeResult<u64> {
82 let mut queues = self.queues.write().await;
83 if let Some(queue) = queues.get_mut(name) {
84 let count = queue.messages.len() as u64 + queue.pending_messages.len() as u64;
85 queue.messages.clear();
86 queue.pending_messages.clear();
87 return Ok(count);
88 }
89 Ok(0)
90 }
91}
92
93pub struct MemoryProducerBackend {
95 config: ProducerConfig,
96 queues: Arc<RwLock<HashMap<String, QueueStorage>>>,
97 manager: Arc<MemoryQueueManager>,
98}
99
100impl MemoryProducerBackend {
101 pub fn new(config: ProducerConfig, manager: Arc<MemoryQueueManager>) -> Self {
103 Self { config, queues: manager.queues.clone(), manager }
104 }
105
106 async fn send_raw_internal(&self, queue: &str, data: Vec<u8>, mut metadata: MessageMetadata) -> WaeResult<MessageId> {
108 let id = uuid::Uuid::new_v4().to_string();
109 metadata.id = Some(id.clone());
110 metadata.timestamp =
111 Some(std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as u64);
112
113 let mut queues = self.queues.write().await;
114 if let Some(q) = queues.get_mut(queue) {
115 q.messages.push_back((data, metadata));
116 }
117 Ok(id)
118 }
119}
120
121#[async_trait::async_trait]
122impl ProducerBackend for MemoryProducerBackend {
123 async fn send_raw(&self, queue: &str, message: &RawMessage) -> WaeResult<MessageId> {
124 self.manager.declare_queue(&QueueConfig::new(queue)).await?;
125
126 let id = uuid::Uuid::new_v4().to_string();
127 let mut metadata = message.metadata.clone();
128 metadata.id = Some(id.clone());
129 metadata.timestamp =
130 Some(std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as u64);
131
132 let mut queues = self.queues.write().await;
133 if let Some(q) = queues.get_mut(queue) {
134 q.messages.push_back((message.data.clone(), metadata));
135 }
136 Ok(id)
137 }
138
139 async fn send_raw_default(&self, message: &RawMessage) -> WaeResult<MessageId> {
140 let queue = self.config.default_queue.as_ref().ok_or_else(|| WaeError::config_missing("default_queue"))?;
141 self.send_raw(queue, message).await
142 }
143
144 async fn send_raw_delayed(&self, queue: &str, message: &RawMessage, delay: Duration) -> WaeResult<MessageId> {
145 tokio::time::sleep(delay).await;
146 self.send_raw(queue, message).await
147 }
148
149 async fn send_raw_batch(&self, queue: &str, messages: &[RawMessage]) -> WaeResult<Vec<MessageId>> {
150 let mut ids = Vec::with_capacity(messages.len());
151 for msg in messages {
152 ids.push(self.send_raw(queue, msg).await?);
153 }
154 Ok(ids)
155 }
156
157 fn config(&self) -> &ProducerConfig {
158 &self.config
159 }
160}
161
162pub struct MemoryConsumerBackend {
164 config: ConsumerConfig,
165 queues: Arc<RwLock<HashMap<String, QueueStorage>>>,
166 manager: Arc<MemoryQueueManager>,
167}
168
169impl MemoryConsumerBackend {
170 pub fn new(config: ConsumerConfig, manager: Arc<MemoryQueueManager>) -> Self {
172 Self { config, queues: manager.queues.clone(), manager: manager.clone() }
173 }
174}
175
176#[async_trait::async_trait]
177impl ConsumerBackend for MemoryConsumerBackend {
178 async fn receive_raw(&self) -> WaeResult<Option<ReceivedRawMessage>> {
179 let mut queues = self.queues.write().await;
180 if let Some(queue) = queues.get_mut(&self.config.queue) {
181 if let Some((data, metadata)) = queue.messages.pop_front() {
182 let delivery_tag = queue.next_delivery_tag;
183 queue.next_delivery_tag += 1;
184
185 let redelivery_count = metadata.headers.get("x-redelivery-count").and_then(|s| s.parse().ok()).unwrap_or(0);
186
187 let pending_msg =
188 PendingMessage { data: data.clone(), metadata: metadata.clone(), redelivery_count, delivery_tag };
189 queue.pending_messages.insert(delivery_tag, pending_msg);
190
191 let message = RawMessage { data, metadata };
192 return Ok(Some(ReceivedRawMessage { message, delivery_tag, redelivery_count }));
193 }
194 }
195 Ok(None)
196 }
197
198 async fn ack(&self, delivery_tag: u64) -> WaeResult<()> {
199 let mut queues = self.queues.write().await;
200 if let Some(queue) = queues.get_mut(&self.config.queue) {
201 queue.pending_messages.remove(&delivery_tag);
202 }
203 Ok(())
204 }
205
206 async fn nack(&self, delivery_tag: u64, requeue: bool) -> WaeResult<()> {
207 let configs = self.manager.configs.read().await;
208 let queue_config = configs.get(&self.config.queue).cloned();
209 drop(configs);
210
211 let mut queues = self.queues.write().await;
212
213 if let Some(queue) = queues.get_mut(&self.config.queue) {
214 if let Some(mut pending_msg) = queue.pending_messages.remove(&delivery_tag) {
215 if requeue {
216 pending_msg.redelivery_count += 1;
217 pending_msg
218 .metadata
219 .headers
220 .insert("x-redelivery-count".to_string(), pending_msg.redelivery_count.to_string());
221
222 queue.messages.push_back((pending_msg.data, pending_msg.metadata));
223 }
224 else {
225 if let Some(dlq_name) = queue_config.and_then(|c| c.dead_letter_queue) {
226 drop(queues);
227
228 self.manager.declare_queue(&QueueConfig::new(&dlq_name)).await?;
229
230 let mut queues = self.queues.write().await;
231 if let Some(dlq) = queues.get_mut(&dlq_name) {
232 dlq.messages.push_back((pending_msg.data, pending_msg.metadata));
233 }
234 }
235 }
236 }
237 }
238
239 Ok(())
240 }
241
242 fn config(&self) -> &ConsumerConfig {
243 &self.config
244 }
245}
246
247pub struct MemoryQueueService {
249 manager: Arc<MemoryQueueManager>,
250}
251
252impl MemoryQueueService {
253 pub fn new() -> Self {
255 Self { manager: Arc::new(MemoryQueueManager::new()) }
256 }
257}
258
259impl Default for MemoryQueueService {
260 fn default() -> Self {
261 Self::new()
262 }
263}
264
265impl QueueService for MemoryQueueService {
266 async fn create_producer(&self, config: ProducerConfig) -> WaeResult<MessageProducer> {
267 Ok(MessageProducer::new(Box::new(MemoryProducerBackend::new(config, self.manager.clone()))))
268 }
269
270 async fn create_consumer(&self, config: ConsumerConfig) -> WaeResult<MessageConsumer> {
271 self.manager.declare_queue(&QueueConfig::new(&config.queue)).await?;
272 Ok(MessageConsumer::new(Box::new(MemoryConsumerBackend::new(config, self.manager.clone()))))
273 }
274
275 fn manager(&self) -> &dyn QueueManager {
276 self.manager.as_ref() as &dyn QueueManager
277 }
278
279 async fn close(&self) -> WaeResult<()> {
280 Ok(())
281 }
282}