1#![warn(missing_docs)]
9
10use serde::{Serialize, de::DeserializeOwned};
11use std::{fmt, time::Duration};
12
13#[derive(Debug)]
15pub enum QueueError {
16 ConnectionFailed(String),
18
19 SerializationFailed(String),
21
22 DeserializationFailed(String),
24
25 QueueNotFound(String),
27
28 SendFailed(String),
30
31 ReceiveFailed(String),
33
34 AckFailed(String),
36
37 Timeout(String),
39
40 Internal(String),
42}
43
44impl fmt::Display for QueueError {
45 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46 match self {
47 QueueError::ConnectionFailed(msg) => write!(f, "Queue connection failed: {}", msg),
48 QueueError::SerializationFailed(msg) => write!(f, "Serialization failed: {}", msg),
49 QueueError::DeserializationFailed(msg) => write!(f, "Deserialization failed: {}", msg),
50 QueueError::QueueNotFound(msg) => write!(f, "Queue not found: {}", msg),
51 QueueError::SendFailed(msg) => write!(f, "Failed to send message: {}", msg),
52 QueueError::ReceiveFailed(msg) => write!(f, "Failed to receive message: {}", msg),
53 QueueError::AckFailed(msg) => write!(f, "Failed to acknowledge message: {}", msg),
54 QueueError::Timeout(msg) => write!(f, "Operation timeout: {}", msg),
55 QueueError::Internal(msg) => write!(f, "Queue internal error: {}", msg),
56 }
57 }
58}
59
60impl std::error::Error for QueueError {}
61
62pub type QueueResult<T> = Result<T, QueueError>;
64
65pub type MessageId = String;
67
68pub type QueueName = String;
70
71#[derive(Debug, Clone, Default)]
73pub struct MessageMetadata {
74 pub id: Option<MessageId>,
76 pub correlation_id: Option<String>,
78 pub reply_to: Option<QueueName>,
80 pub content_type: Option<String>,
82 pub timestamp: Option<u64>,
84 pub priority: Option<u8>,
86 pub expiration: Option<u64>,
88 pub headers: std::collections::HashMap<String, String>,
90}
91
92#[derive(Debug, Clone)]
94pub struct RawMessage {
95 pub data: Vec<u8>,
97 pub metadata: MessageMetadata,
99}
100
101impl RawMessage {
102 pub fn new(data: Vec<u8>) -> Self {
104 Self { data, metadata: MessageMetadata::default() }
105 }
106
107 pub fn with_correlation_id(mut self, id: impl Into<String>) -> Self {
109 self.metadata.correlation_id = Some(id.into());
110 self
111 }
112
113 pub fn with_reply_to(mut self, queue: impl Into<String>) -> Self {
115 self.metadata.reply_to = Some(queue.into());
116 self
117 }
118
119 pub fn with_priority(mut self, priority: u8) -> Self {
121 self.metadata.priority = Some(priority.min(9));
122 self
123 }
124
125 pub fn with_expiration(mut self, ms: u64) -> Self {
127 self.metadata.expiration = Some(ms);
128 self
129 }
130
131 pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
133 self.metadata.headers.insert(key.into(), value.into());
134 self
135 }
136}
137
138#[derive(Debug, Clone)]
140pub struct Message<T> {
141 pub payload: T,
143 pub metadata: MessageMetadata,
145}
146
147impl<T> Message<T> {
148 pub fn new(payload: T) -> Self {
150 Self { payload, metadata: MessageMetadata::default() }
151 }
152
153 pub fn with_correlation_id(mut self, id: impl Into<String>) -> Self {
155 self.metadata.correlation_id = Some(id.into());
156 self
157 }
158
159 pub fn with_reply_to(mut self, queue: impl Into<String>) -> Self {
161 self.metadata.reply_to = Some(queue.into());
162 self
163 }
164
165 pub fn with_priority(mut self, priority: u8) -> Self {
167 self.metadata.priority = Some(priority.min(9));
168 self
169 }
170
171 pub fn with_expiration(mut self, ms: u64) -> Self {
173 self.metadata.expiration = Some(ms);
174 self
175 }
176
177 pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
179 self.metadata.headers.insert(key.into(), value.into());
180 self
181 }
182
183 pub fn into_raw(self) -> QueueResult<RawMessage>
185 where
186 T: Serialize,
187 {
188 let data = serde_json::to_vec(&self.payload).map_err(|e| QueueError::SerializationFailed(e.to_string()))?;
189 Ok(RawMessage { data, metadata: self.metadata })
190 }
191
192 pub fn to_raw(&self) -> QueueResult<RawMessage>
194 where
195 T: Serialize,
196 {
197 let data = serde_json::to_vec(&self.payload).map_err(|e| QueueError::SerializationFailed(e.to_string()))?;
198 Ok(RawMessage { data, metadata: self.metadata.clone() })
199 }
200}
201
202impl RawMessage {
203 pub fn into_typed<T: DeserializeOwned>(self) -> QueueResult<Message<T>> {
205 let payload = serde_json::from_slice(&self.data).map_err(|e| QueueError::DeserializationFailed(e.to_string()))?;
206 Ok(Message { payload, metadata: self.metadata })
207 }
208}
209
210#[derive(Debug)]
212pub struct ReceivedRawMessage {
213 pub message: RawMessage,
215 pub delivery_tag: u64,
217 pub redelivery_count: u32,
219}
220
221#[derive(Debug)]
223pub struct ReceivedMessage<T> {
224 pub message: Message<T>,
226 pub delivery_tag: u64,
228 pub redelivery_count: u32,
230}
231
232#[derive(Debug, Clone)]
234pub struct QueueConfig {
235 pub name: QueueName,
237 pub durable: bool,
239 pub auto_delete: bool,
241 pub max_messages: Option<u64>,
243 pub max_message_size: Option<u64>,
245 pub message_ttl: Option<u64>,
247 pub dead_letter_queue: Option<QueueName>,
249}
250
251impl QueueConfig {
252 pub fn new(name: impl Into<String>) -> Self {
254 Self {
255 name: name.into(),
256 durable: true,
257 auto_delete: false,
258 max_messages: None,
259 max_message_size: None,
260 message_ttl: None,
261 dead_letter_queue: None,
262 }
263 }
264
265 pub fn durable(mut self, durable: bool) -> Self {
267 self.durable = durable;
268 self
269 }
270
271 pub fn auto_delete(mut self, auto_delete: bool) -> Self {
273 self.auto_delete = auto_delete;
274 self
275 }
276
277 pub fn max_messages(mut self, max: u64) -> Self {
279 self.max_messages = Some(max);
280 self
281 }
282
283 pub fn message_ttl(mut self, ttl_ms: u64) -> Self {
285 self.message_ttl = Some(ttl_ms);
286 self
287 }
288
289 pub fn dead_letter_queue(mut self, queue: impl Into<String>) -> Self {
291 self.dead_letter_queue = Some(queue.into());
292 self
293 }
294}
295
296#[derive(Debug, Clone)]
298pub struct ProducerConfig {
299 pub default_queue: Option<QueueName>,
301 pub confirm_timeout: Duration,
303 pub retry_count: u32,
305 pub retry_interval: Duration,
307}
308
309impl Default for ProducerConfig {
310 fn default() -> Self {
311 Self {
312 default_queue: None,
313 confirm_timeout: Duration::from_secs(5),
314 retry_count: 3,
315 retry_interval: Duration::from_millis(100),
316 }
317 }
318}
319
320#[derive(Debug, Clone)]
322pub struct ConsumerConfig {
323 pub queue: QueueName,
325 pub consumer_tag: Option<String>,
327 pub auto_ack: bool,
329 pub prefetch_count: u16,
331 pub exclusive: bool,
333}
334
335impl ConsumerConfig {
336 pub fn new(queue: impl Into<String>) -> Self {
338 Self { queue: queue.into(), consumer_tag: None, auto_ack: false, prefetch_count: 10, exclusive: false }
339 }
340
341 pub fn auto_ack(mut self, auto_ack: bool) -> Self {
343 self.auto_ack = auto_ack;
344 self
345 }
346
347 pub fn prefetch(mut self, count: u16) -> Self {
349 self.prefetch_count = count;
350 self
351 }
352}
353
354#[async_trait::async_trait]
356pub trait ProducerBackend: Send + Sync {
357 async fn send_raw(&self, queue: &str, message: &RawMessage) -> QueueResult<MessageId>;
359
360 async fn send_raw_default(&self, message: &RawMessage) -> QueueResult<MessageId>;
362
363 async fn send_raw_delayed(&self, queue: &str, message: &RawMessage, delay: Duration) -> QueueResult<MessageId>;
365
366 async fn send_raw_batch(&self, queue: &str, messages: &[RawMessage]) -> QueueResult<Vec<MessageId>>;
368
369 fn config(&self) -> &ProducerConfig;
371}
372
373pub struct MessageProducer {
375 backend: Box<dyn ProducerBackend>,
376}
377
378impl MessageProducer {
379 pub fn new(backend: Box<dyn ProducerBackend>) -> Self {
381 Self { backend }
382 }
383
384 pub async fn send<T: Serialize + Send + Sync>(&self, queue: &str, message: &Message<T>) -> QueueResult<MessageId> {
386 let raw = message.to_raw()?;
387 self.backend.send_raw(queue, &raw).await
388 }
389
390 pub async fn send_default<T: Serialize + Send + Sync>(&self, message: &Message<T>) -> QueueResult<MessageId> {
392 let raw = message.to_raw()?;
393 self.backend.send_raw_default(&raw).await
394 }
395
396 pub async fn send_delayed<T: Serialize + Send + Sync>(
398 &self,
399 queue: &str,
400 message: &Message<T>,
401 delay: Duration,
402 ) -> QueueResult<MessageId> {
403 let raw = message.to_raw()?;
404 self.backend.send_raw_delayed(queue, &raw, delay).await
405 }
406
407 pub async fn send_batch<T: Serialize + Send + Sync>(
409 &self,
410 queue: &str,
411 messages: &[Message<T>],
412 ) -> QueueResult<Vec<MessageId>> {
413 let raw_messages: Vec<RawMessage> = messages.iter().map(|m| m.to_raw()).collect::<QueueResult<_>>()?;
414 self.backend.send_raw_batch(queue, &raw_messages).await
415 }
416
417 pub fn config(&self) -> &ProducerConfig {
419 self.backend.config()
420 }
421}
422
423#[async_trait::async_trait]
425pub trait ConsumerBackend: Send + Sync {
426 async fn receive_raw(&self) -> QueueResult<Option<ReceivedRawMessage>>;
428
429 async fn ack(&self, delivery_tag: u64) -> QueueResult<()>;
431
432 async fn nack(&self, delivery_tag: u64, requeue: bool) -> QueueResult<()>;
434
435 fn config(&self) -> &ConsumerConfig;
437}
438
439pub struct MessageConsumer {
441 backend: Box<dyn ConsumerBackend>,
442}
443
444impl MessageConsumer {
445 pub fn new(backend: Box<dyn ConsumerBackend>) -> Self {
447 Self { backend }
448 }
449
450 pub async fn receive<T: DeserializeOwned + Send>(&self) -> QueueResult<Option<ReceivedMessage<T>>> {
452 let raw = match self.backend.receive_raw().await? {
453 Some(r) => r,
454 None => return Ok(None),
455 };
456
457 let message = raw.message.into_typed()?;
458 Ok(Some(ReceivedMessage { message, delivery_tag: raw.delivery_tag, redelivery_count: raw.redelivery_count }))
459 }
460
461 pub async fn ack(&self, delivery_tag: u64) -> QueueResult<()> {
463 self.backend.ack(delivery_tag).await
464 }
465
466 pub async fn nack(&self, delivery_tag: u64, requeue: bool) -> QueueResult<()> {
468 self.backend.nack(delivery_tag, requeue).await
469 }
470
471 pub fn config(&self) -> &ConsumerConfig {
473 self.backend.config()
474 }
475}
476
477#[async_trait::async_trait]
479pub trait QueueManager: Send + Sync {
480 async fn declare_queue(&self, config: &QueueConfig) -> QueueResult<()>;
482
483 async fn delete_queue(&self, name: &str) -> QueueResult<()>;
485
486 async fn queue_exists(&self, name: &str) -> QueueResult<bool>;
488
489 async fn queue_message_count(&self, name: &str) -> QueueResult<u64>;
491
492 async fn purge_queue(&self, name: &str) -> QueueResult<u64>;
494}
495
496pub trait QueueService: Send + Sync {
499 async fn create_producer(&self, config: ProducerConfig) -> QueueResult<MessageProducer>;
501
502 async fn create_consumer(&self, config: ConsumerConfig) -> QueueResult<MessageConsumer>;
504
505 fn manager(&self) -> &dyn QueueManager;
507
508 async fn close(&self) -> QueueResult<()>;
510}
511
512pub mod memory {
514 use super::*;
515 use std::{
516 collections::{HashMap, VecDeque},
517 sync::Arc,
518 };
519 use tokio::sync::RwLock;
520
521 struct QueueStorage {
523 messages: VecDeque<(u64, Vec<u8>, MessageMetadata)>,
524 next_delivery_tag: u64,
525 }
526
527 impl QueueStorage {
528 fn new() -> Self {
529 Self { messages: VecDeque::new(), next_delivery_tag: 1 }
530 }
531 }
532
533 pub struct MemoryQueueManager {
535 queues: Arc<RwLock<HashMap<String, QueueStorage>>>,
536 configs: Arc<RwLock<HashMap<String, QueueConfig>>>,
537 }
538
539 impl MemoryQueueManager {
540 pub fn new() -> Self {
542 Self { queues: Arc::new(RwLock::new(HashMap::new())), configs: Arc::new(RwLock::new(HashMap::new())) }
543 }
544 }
545
546 impl Default for MemoryQueueManager {
547 fn default() -> Self {
548 Self::new()
549 }
550 }
551
552 #[async_trait::async_trait]
553 impl QueueManager for MemoryQueueManager {
554 async fn declare_queue(&self, config: &QueueConfig) -> QueueResult<()> {
555 let mut queues = self.queues.write().await;
556 let mut configs = self.configs.write().await;
557
558 if !queues.contains_key(&config.name) {
559 queues.insert(config.name.clone(), QueueStorage::new());
560 }
561 configs.insert(config.name.clone(), config.clone());
562 Ok(())
563 }
564
565 async fn delete_queue(&self, name: &str) -> QueueResult<()> {
566 let mut queues = self.queues.write().await;
567 let mut configs = self.configs.write().await;
568 queues.remove(name);
569 configs.remove(name);
570 Ok(())
571 }
572
573 async fn queue_exists(&self, name: &str) -> QueueResult<bool> {
574 let queues = self.queues.read().await;
575 Ok(queues.contains_key(name))
576 }
577
578 async fn queue_message_count(&self, name: &str) -> QueueResult<u64> {
579 let queues = self.queues.read().await;
580 Ok(queues.get(name).map(|q| q.messages.len() as u64).unwrap_or(0))
581 }
582
583 async fn purge_queue(&self, name: &str) -> QueueResult<u64> {
584 let mut queues = self.queues.write().await;
585 if let Some(queue) = queues.get_mut(name) {
586 let count = queue.messages.len() as u64;
587 queue.messages.clear();
588 return Ok(count);
589 }
590 Ok(0)
591 }
592 }
593
594 pub struct MemoryProducerBackend {
596 config: ProducerConfig,
597 queues: Arc<RwLock<HashMap<String, QueueStorage>>>,
598 manager: Arc<MemoryQueueManager>,
599 }
600
601 impl MemoryProducerBackend {
602 pub fn new(config: ProducerConfig, manager: Arc<MemoryQueueManager>) -> Self {
604 Self { config, queues: manager.queues.clone(), manager }
605 }
606 }
607
608 #[async_trait::async_trait]
609 impl ProducerBackend for MemoryProducerBackend {
610 async fn send_raw(&self, queue: &str, message: &RawMessage) -> QueueResult<MessageId> {
611 self.manager.declare_queue(&QueueConfig::new(queue)).await?;
612
613 let id = uuid::Uuid::new_v4().to_string();
614 let mut metadata = message.metadata.clone();
615 metadata.id = Some(id.clone());
616 metadata.timestamp =
617 Some(std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as u64);
618
619 let mut queues = self.queues.write().await;
620 if let Some(q) = queues.get_mut(queue) {
621 q.messages.push_back((q.next_delivery_tag, message.data.clone(), metadata));
622 q.next_delivery_tag += 1;
623 }
624 Ok(id)
625 }
626
627 async fn send_raw_default(&self, message: &RawMessage) -> QueueResult<MessageId> {
628 let queue = self
629 .config
630 .default_queue
631 .as_ref()
632 .ok_or_else(|| QueueError::QueueNotFound("No default queue configured".into()))?;
633 self.send_raw(queue, message).await
634 }
635
636 async fn send_raw_delayed(&self, queue: &str, message: &RawMessage, delay: Duration) -> QueueResult<MessageId> {
637 tokio::time::sleep(delay).await;
638 self.send_raw(queue, message).await
639 }
640
641 async fn send_raw_batch(&self, queue: &str, messages: &[RawMessage]) -> QueueResult<Vec<MessageId>> {
642 let mut ids = Vec::with_capacity(messages.len());
643 for msg in messages {
644 ids.push(self.send_raw(queue, msg).await?);
645 }
646 Ok(ids)
647 }
648
649 fn config(&self) -> &ProducerConfig {
650 &self.config
651 }
652 }
653
654 pub struct MemoryConsumerBackend {
656 config: ConsumerConfig,
657 queues: Arc<RwLock<HashMap<String, QueueStorage>>>,
658 }
659
660 impl MemoryConsumerBackend {
661 pub fn new(config: ConsumerConfig, manager: Arc<MemoryQueueManager>) -> Self {
663 Self { config, queues: manager.queues.clone() }
664 }
665 }
666
667 #[async_trait::async_trait]
668 impl ConsumerBackend for MemoryConsumerBackend {
669 async fn receive_raw(&self) -> QueueResult<Option<ReceivedRawMessage>> {
670 let mut queues = self.queues.write().await;
671 if let Some(queue) = queues.get_mut(&self.config.queue) {
672 if let Some((delivery_tag, data, metadata)) = queue.messages.pop_front() {
673 let message = RawMessage { data, metadata };
674 return Ok(Some(ReceivedRawMessage { message, delivery_tag, redelivery_count: 0 }));
675 }
676 }
677 Ok(None)
678 }
679
680 async fn ack(&self, _delivery_tag: u64) -> QueueResult<()> {
681 Ok(())
682 }
683
684 async fn nack(&self, _delivery_tag: u64, _requeue: bool) -> QueueResult<()> {
685 Ok(())
686 }
687
688 fn config(&self) -> &ConsumerConfig {
689 &self.config
690 }
691 }
692
693 pub struct MemoryQueueService {
695 manager: Arc<MemoryQueueManager>,
696 }
697
698 impl MemoryQueueService {
699 pub fn new() -> Self {
701 Self { manager: Arc::new(MemoryQueueManager::new()) }
702 }
703 }
704
705 impl Default for MemoryQueueService {
706 fn default() -> Self {
707 Self::new()
708 }
709 }
710
711 impl QueueService for MemoryQueueService {
712 async fn create_producer(&self, config: ProducerConfig) -> QueueResult<MessageProducer> {
713 Ok(MessageProducer::new(Box::new(MemoryProducerBackend::new(config, self.manager.clone()))))
714 }
715
716 async fn create_consumer(&self, config: ConsumerConfig) -> QueueResult<MessageConsumer> {
717 self.manager.declare_queue(&QueueConfig::new(&config.queue)).await?;
718 Ok(MessageConsumer::new(Box::new(MemoryConsumerBackend::new(config, self.manager.clone()))))
719 }
720
721 fn manager(&self) -> &dyn QueueManager {
722 self.manager.as_ref() as &dyn QueueManager
723 }
724
725 async fn close(&self) -> QueueResult<()> {
726 Ok(())
727 }
728 }
729}
730
731pub fn memory_queue_service() -> memory::MemoryQueueService {
733 memory::MemoryQueueService::new()
734}