1#![warn(missing_docs)]
9
10use serde::{Serialize, de::DeserializeOwned};
11use std::time::Duration;
12use wae_types::{WaeError, WaeResult};
13
14pub type MessageId = String;
16
17pub type QueueName = String;
19
20#[derive(Debug, Clone, Default)]
22pub struct MessageMetadata {
23 pub id: Option<MessageId>,
25 pub correlation_id: Option<String>,
27 pub reply_to: Option<QueueName>,
29 pub content_type: Option<String>,
31 pub timestamp: Option<u64>,
33 pub priority: Option<u8>,
35 pub expiration: Option<u64>,
37 pub headers: std::collections::HashMap<String, String>,
39}
40
41#[derive(Debug, Clone)]
43pub struct RawMessage {
44 pub data: Vec<u8>,
46 pub metadata: MessageMetadata,
48}
49
50impl RawMessage {
51 pub fn new(data: Vec<u8>) -> Self {
53 Self { data, metadata: MessageMetadata::default() }
54 }
55
56 pub fn with_correlation_id(mut self, id: impl Into<String>) -> Self {
58 self.metadata.correlation_id = Some(id.into());
59 self
60 }
61
62 pub fn with_reply_to(mut self, queue: impl Into<String>) -> Self {
64 self.metadata.reply_to = Some(queue.into());
65 self
66 }
67
68 pub fn with_priority(mut self, priority: u8) -> Self {
70 self.metadata.priority = Some(priority.min(9));
71 self
72 }
73
74 pub fn with_expiration(mut self, ms: u64) -> Self {
76 self.metadata.expiration = Some(ms);
77 self
78 }
79
80 pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
82 self.metadata.headers.insert(key.into(), value.into());
83 self
84 }
85}
86
87#[derive(Debug, Clone)]
89pub struct Message<T> {
90 pub payload: T,
92 pub metadata: MessageMetadata,
94}
95
96impl<T> Message<T> {
97 pub fn new(payload: T) -> Self {
99 Self { payload, metadata: MessageMetadata::default() }
100 }
101
102 pub fn with_correlation_id(mut self, id: impl Into<String>) -> Self {
104 self.metadata.correlation_id = Some(id.into());
105 self
106 }
107
108 pub fn with_reply_to(mut self, queue: impl Into<String>) -> Self {
110 self.metadata.reply_to = Some(queue.into());
111 self
112 }
113
114 pub fn with_priority(mut self, priority: u8) -> Self {
116 self.metadata.priority = Some(priority.min(9));
117 self
118 }
119
120 pub fn with_expiration(mut self, ms: u64) -> Self {
122 self.metadata.expiration = Some(ms);
123 self
124 }
125
126 pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
128 self.metadata.headers.insert(key.into(), value.into());
129 self
130 }
131
132 pub fn into_raw(self) -> WaeResult<RawMessage>
134 where
135 T: Serialize,
136 {
137 let data = serde_json::to_vec(&self.payload).map_err(|_e| WaeError::serialization_failed("Message"))?;
138 Ok(RawMessage { data, metadata: self.metadata })
139 }
140
141 pub fn to_raw(&self) -> WaeResult<RawMessage>
143 where
144 T: Serialize,
145 {
146 let data = serde_json::to_vec(&self.payload).map_err(|_e| WaeError::serialization_failed("Message"))?;
147 Ok(RawMessage { data, metadata: self.metadata.clone() })
148 }
149}
150
151impl RawMessage {
152 pub fn into_typed<T: DeserializeOwned>(self) -> WaeResult<Message<T>> {
154 let payload = serde_json::from_slice(&self.data).map_err(|_e| WaeError::deserialization_failed("Message"))?;
155 Ok(Message { payload, metadata: self.metadata })
156 }
157}
158
159#[derive(Debug)]
161pub struct ReceivedRawMessage {
162 pub message: RawMessage,
164 pub delivery_tag: u64,
166 pub redelivery_count: u32,
168}
169
170#[derive(Debug)]
172pub struct ReceivedMessage<T> {
173 pub message: Message<T>,
175 pub delivery_tag: u64,
177 pub redelivery_count: u32,
179}
180
181#[derive(Debug, Clone)]
183pub struct QueueConfig {
184 pub name: QueueName,
186 pub durable: bool,
188 pub auto_delete: bool,
190 pub max_messages: Option<u64>,
192 pub max_message_size: Option<u64>,
194 pub message_ttl: Option<u64>,
196 pub dead_letter_queue: Option<QueueName>,
198}
199
200impl QueueConfig {
201 pub fn new(name: impl Into<String>) -> Self {
203 Self {
204 name: name.into(),
205 durable: true,
206 auto_delete: false,
207 max_messages: None,
208 max_message_size: None,
209 message_ttl: None,
210 dead_letter_queue: None,
211 }
212 }
213
214 pub fn durable(mut self, durable: bool) -> Self {
216 self.durable = durable;
217 self
218 }
219
220 pub fn auto_delete(mut self, auto_delete: bool) -> Self {
222 self.auto_delete = auto_delete;
223 self
224 }
225
226 pub fn max_messages(mut self, max: u64) -> Self {
228 self.max_messages = Some(max);
229 self
230 }
231
232 pub fn message_ttl(mut self, ttl_ms: u64) -> Self {
234 self.message_ttl = Some(ttl_ms);
235 self
236 }
237
238 pub fn dead_letter_queue(mut self, queue: impl Into<String>) -> Self {
240 self.dead_letter_queue = Some(queue.into());
241 self
242 }
243}
244
245#[derive(Debug, Clone)]
247pub struct ProducerConfig {
248 pub default_queue: Option<QueueName>,
250 pub confirm_timeout: Duration,
252 pub retry_count: u32,
254 pub retry_interval: Duration,
256}
257
258impl Default for ProducerConfig {
259 fn default() -> Self {
260 Self {
261 default_queue: None,
262 confirm_timeout: Duration::from_secs(5),
263 retry_count: 3,
264 retry_interval: Duration::from_millis(100),
265 }
266 }
267}
268
269#[derive(Debug, Clone)]
271pub struct ConsumerConfig {
272 pub queue: QueueName,
274 pub consumer_tag: Option<String>,
276 pub auto_ack: bool,
278 pub prefetch_count: u16,
280 pub exclusive: bool,
282}
283
284impl ConsumerConfig {
285 pub fn new(queue: impl Into<String>) -> Self {
287 Self { queue: queue.into(), consumer_tag: None, auto_ack: false, prefetch_count: 10, exclusive: false }
288 }
289
290 pub fn auto_ack(mut self, auto_ack: bool) -> Self {
292 self.auto_ack = auto_ack;
293 self
294 }
295
296 pub fn prefetch(mut self, count: u16) -> Self {
298 self.prefetch_count = count;
299 self
300 }
301}
302
303#[async_trait::async_trait]
305pub trait ProducerBackend: Send + Sync {
306 async fn send_raw(&self, queue: &str, message: &RawMessage) -> WaeResult<MessageId>;
308
309 async fn send_raw_default(&self, message: &RawMessage) -> WaeResult<MessageId>;
311
312 async fn send_raw_delayed(&self, queue: &str, message: &RawMessage, delay: Duration) -> WaeResult<MessageId>;
314
315 async fn send_raw_batch(&self, queue: &str, messages: &[RawMessage]) -> WaeResult<Vec<MessageId>>;
317
318 fn config(&self) -> &ProducerConfig;
320}
321
322pub struct MessageProducer {
324 backend: Box<dyn ProducerBackend>,
325}
326
327impl MessageProducer {
328 pub fn new(backend: Box<dyn ProducerBackend>) -> Self {
330 Self { backend }
331 }
332
333 pub async fn send<T: Serialize + Send + Sync>(&self, queue: &str, message: &Message<T>) -> WaeResult<MessageId> {
335 let raw = message.to_raw()?;
336 self.backend.send_raw(queue, &raw).await
337 }
338
339 pub async fn send_default<T: Serialize + Send + Sync>(&self, message: &Message<T>) -> WaeResult<MessageId> {
341 let raw = message.to_raw()?;
342 self.backend.send_raw_default(&raw).await
343 }
344
345 pub async fn send_delayed<T: Serialize + Send + Sync>(
347 &self,
348 queue: &str,
349 message: &Message<T>,
350 delay: Duration,
351 ) -> WaeResult<MessageId> {
352 let raw = message.to_raw()?;
353 self.backend.send_raw_delayed(queue, &raw, delay).await
354 }
355
356 pub async fn send_batch<T: Serialize + Send + Sync>(
358 &self,
359 queue: &str,
360 messages: &[Message<T>],
361 ) -> WaeResult<Vec<MessageId>> {
362 let raw_messages: Vec<RawMessage> = messages.iter().map(|m| m.to_raw()).collect::<WaeResult<_>>()?;
363 self.backend.send_raw_batch(queue, &raw_messages).await
364 }
365
366 pub fn config(&self) -> &ProducerConfig {
368 self.backend.config()
369 }
370}
371
372#[async_trait::async_trait]
374pub trait ConsumerBackend: Send + Sync {
375 async fn receive_raw(&self) -> WaeResult<Option<ReceivedRawMessage>>;
377
378 async fn ack(&self, delivery_tag: u64) -> WaeResult<()>;
380
381 async fn nack(&self, delivery_tag: u64, requeue: bool) -> WaeResult<()>;
383
384 fn config(&self) -> &ConsumerConfig;
386}
387
388pub struct MessageConsumer {
390 backend: Box<dyn ConsumerBackend>,
391}
392
393impl MessageConsumer {
394 pub fn new(backend: Box<dyn ConsumerBackend>) -> Self {
396 Self { backend }
397 }
398
399 pub async fn receive<T: DeserializeOwned + Send>(&self) -> WaeResult<Option<ReceivedMessage<T>>> {
401 let raw = match self.backend.receive_raw().await? {
402 Some(r) => r,
403 None => return Ok(None),
404 };
405
406 let message = raw.message.into_typed()?;
407 Ok(Some(ReceivedMessage { message, delivery_tag: raw.delivery_tag, redelivery_count: raw.redelivery_count }))
408 }
409
410 pub async fn ack(&self, delivery_tag: u64) -> WaeResult<()> {
412 self.backend.ack(delivery_tag).await
413 }
414
415 pub async fn nack(&self, delivery_tag: u64, requeue: bool) -> WaeResult<()> {
417 self.backend.nack(delivery_tag, requeue).await
418 }
419
420 pub fn config(&self) -> &ConsumerConfig {
422 self.backend.config()
423 }
424}
425
426#[async_trait::async_trait]
428pub trait QueueManager: Send + Sync {
429 async fn declare_queue(&self, config: &QueueConfig) -> WaeResult<()>;
431
432 async fn delete_queue(&self, name: &str) -> WaeResult<()>;
434
435 async fn queue_exists(&self, name: &str) -> WaeResult<bool>;
437
438 async fn queue_message_count(&self, name: &str) -> WaeResult<u64>;
440
441 async fn purge_queue(&self, name: &str) -> WaeResult<u64>;
443}
444
445pub trait QueueService: Send + Sync {
448 async fn create_producer(&self, config: ProducerConfig) -> WaeResult<MessageProducer>;
450
451 async fn create_consumer(&self, config: ConsumerConfig) -> WaeResult<MessageConsumer>;
453
454 fn manager(&self) -> &dyn QueueManager;
456
457 async fn close(&self) -> WaeResult<()>;
459}
460
461pub mod memory {
463 use super::*;
464 use std::{
465 collections::{HashMap, VecDeque},
466 sync::Arc,
467 };
468 use tokio::sync::RwLock;
469
470 struct PendingMessage {
472 data: Vec<u8>,
473 metadata: MessageMetadata,
474 redelivery_count: u32,
475 delivery_tag: u64,
476 }
477
478 struct QueueStorage {
480 messages: VecDeque<(Vec<u8>, MessageMetadata)>,
481 pending_messages: HashMap<u64, PendingMessage>,
482 next_delivery_tag: u64,
483 }
484
485 impl QueueStorage {
486 fn new() -> Self {
487 Self { messages: VecDeque::new(), pending_messages: HashMap::new(), next_delivery_tag: 1 }
488 }
489 }
490
491 pub struct MemoryQueueManager {
493 queues: Arc<RwLock<HashMap<String, QueueStorage>>>,
494 configs: Arc<RwLock<HashMap<String, QueueConfig>>>,
495 }
496
497 impl MemoryQueueManager {
498 pub fn new() -> Self {
500 Self { queues: Arc::new(RwLock::new(HashMap::new())), configs: Arc::new(RwLock::new(HashMap::new())) }
501 }
502 }
503
504 impl Default for MemoryQueueManager {
505 fn default() -> Self {
506 Self::new()
507 }
508 }
509
510 #[async_trait::async_trait]
511 impl QueueManager for MemoryQueueManager {
512 async fn declare_queue(&self, config: &QueueConfig) -> WaeResult<()> {
513 let mut queues = self.queues.write().await;
514 let mut configs = self.configs.write().await;
515
516 if !queues.contains_key(&config.name) {
517 queues.insert(config.name.clone(), QueueStorage::new());
518 }
519 configs.insert(config.name.clone(), config.clone());
520 Ok(())
521 }
522
523 async fn delete_queue(&self, name: &str) -> WaeResult<()> {
524 let mut queues = self.queues.write().await;
525 let mut configs = self.configs.write().await;
526 queues.remove(name);
527 configs.remove(name);
528 Ok(())
529 }
530
531 async fn queue_exists(&self, name: &str) -> WaeResult<bool> {
532 let queues = self.queues.read().await;
533 Ok(queues.contains_key(name))
534 }
535
536 async fn queue_message_count(&self, name: &str) -> WaeResult<u64> {
537 let queues = self.queues.read().await;
538 Ok(queues.get(name).map(|q| q.messages.len() as u64 + q.pending_messages.len() as u64).unwrap_or(0))
539 }
540
541 async fn purge_queue(&self, name: &str) -> WaeResult<u64> {
542 let mut queues = self.queues.write().await;
543 if let Some(queue) = queues.get_mut(name) {
544 let count = queue.messages.len() as u64 + queue.pending_messages.len() as u64;
545 queue.messages.clear();
546 queue.pending_messages.clear();
547 return Ok(count);
548 }
549 Ok(0)
550 }
551 }
552
553 pub struct MemoryProducerBackend {
555 config: ProducerConfig,
556 queues: Arc<RwLock<HashMap<String, QueueStorage>>>,
557 manager: Arc<MemoryQueueManager>,
558 }
559
560 impl MemoryProducerBackend {
561 pub fn new(config: ProducerConfig, manager: Arc<MemoryQueueManager>) -> Self {
563 Self { config, queues: manager.queues.clone(), manager }
564 }
565
566 async fn send_raw_internal(&self, queue: &str, data: Vec<u8>, mut metadata: MessageMetadata) -> WaeResult<MessageId> {
568 let id = uuid::Uuid::new_v4().to_string();
569 metadata.id = Some(id.clone());
570 metadata.timestamp =
571 Some(std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as u64);
572
573 let mut queues = self.queues.write().await;
574 if let Some(q) = queues.get_mut(queue) {
575 q.messages.push_back((data, metadata));
576 }
577 Ok(id)
578 }
579 }
580
581 #[async_trait::async_trait]
582 impl ProducerBackend for MemoryProducerBackend {
583 async fn send_raw(&self, queue: &str, message: &RawMessage) -> WaeResult<MessageId> {
584 self.manager.declare_queue(&QueueConfig::new(queue)).await?;
585
586 let id = uuid::Uuid::new_v4().to_string();
587 let mut metadata = message.metadata.clone();
588 metadata.id = Some(id.clone());
589 metadata.timestamp =
590 Some(std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as u64);
591
592 let mut queues = self.queues.write().await;
593 if let Some(q) = queues.get_mut(queue) {
594 q.messages.push_back((message.data.clone(), metadata));
595 }
596 Ok(id)
597 }
598
599 async fn send_raw_default(&self, message: &RawMessage) -> WaeResult<MessageId> {
600 let queue = self.config.default_queue.as_ref().ok_or_else(|| WaeError::config_missing("default_queue"))?;
601 self.send_raw(queue, message).await
602 }
603
604 async fn send_raw_delayed(&self, queue: &str, message: &RawMessage, delay: Duration) -> WaeResult<MessageId> {
605 tokio::time::sleep(delay).await;
606 self.send_raw(queue, message).await
607 }
608
609 async fn send_raw_batch(&self, queue: &str, messages: &[RawMessage]) -> WaeResult<Vec<MessageId>> {
610 let mut ids = Vec::with_capacity(messages.len());
611 for msg in messages {
612 ids.push(self.send_raw(queue, msg).await?);
613 }
614 Ok(ids)
615 }
616
617 fn config(&self) -> &ProducerConfig {
618 &self.config
619 }
620 }
621
622 pub struct MemoryConsumerBackend {
624 config: ConsumerConfig,
625 queues: Arc<RwLock<HashMap<String, QueueStorage>>>,
626 manager: Arc<MemoryQueueManager>,
627 }
628
629 impl MemoryConsumerBackend {
630 pub fn new(config: ConsumerConfig, manager: Arc<MemoryQueueManager>) -> Self {
632 Self { config, queues: manager.queues.clone(), manager: manager.clone() }
633 }
634 }
635
636 #[async_trait::async_trait]
637 impl ConsumerBackend for MemoryConsumerBackend {
638 async fn receive_raw(&self) -> WaeResult<Option<ReceivedRawMessage>> {
639 let mut queues = self.queues.write().await;
640 if let Some(queue) = queues.get_mut(&self.config.queue) {
641 if let Some((data, metadata)) = queue.messages.pop_front() {
642 let delivery_tag = queue.next_delivery_tag;
643 queue.next_delivery_tag += 1;
644
645 let redelivery_count = metadata.headers.get("x-redelivery-count").and_then(|s| s.parse().ok()).unwrap_or(0);
646
647 let pending_msg =
648 PendingMessage { data: data.clone(), metadata: metadata.clone(), redelivery_count, delivery_tag };
649 queue.pending_messages.insert(delivery_tag, pending_msg);
650
651 let message = RawMessage { data, metadata };
652 return Ok(Some(ReceivedRawMessage { message, delivery_tag, redelivery_count }));
653 }
654 }
655 Ok(None)
656 }
657
658 async fn ack(&self, delivery_tag: u64) -> WaeResult<()> {
659 let mut queues = self.queues.write().await;
660 if let Some(queue) = queues.get_mut(&self.config.queue) {
661 queue.pending_messages.remove(&delivery_tag);
662 }
663 Ok(())
664 }
665
666 async fn nack(&self, delivery_tag: u64, requeue: bool) -> WaeResult<()> {
667 let configs = self.manager.configs.read().await;
668 let queue_config = configs.get(&self.config.queue).cloned();
669 drop(configs);
670
671 let mut queues = self.queues.write().await;
672
673 if let Some(queue) = queues.get_mut(&self.config.queue) {
674 if let Some(mut pending_msg) = queue.pending_messages.remove(&delivery_tag) {
675 if requeue {
676 pending_msg.redelivery_count += 1;
677 pending_msg
678 .metadata
679 .headers
680 .insert("x-redelivery-count".to_string(), pending_msg.redelivery_count.to_string());
681
682 queue.messages.push_back((pending_msg.data, pending_msg.metadata));
683 }
684 else {
685 if let Some(dlq_name) = queue_config.and_then(|c| c.dead_letter_queue) {
686 drop(queues);
687
688 self.manager.declare_queue(&QueueConfig::new(&dlq_name)).await?;
689
690 let mut queues = self.queues.write().await;
691 if let Some(dlq) = queues.get_mut(&dlq_name) {
692 dlq.messages.push_back((pending_msg.data, pending_msg.metadata));
693 }
694 }
695 }
696 }
697 }
698
699 Ok(())
700 }
701
702 fn config(&self) -> &ConsumerConfig {
703 &self.config
704 }
705 }
706
707 pub struct MemoryQueueService {
709 manager: Arc<MemoryQueueManager>,
710 }
711
712 impl MemoryQueueService {
713 pub fn new() -> Self {
715 Self { manager: Arc::new(MemoryQueueManager::new()) }
716 }
717 }
718
719 impl Default for MemoryQueueService {
720 fn default() -> Self {
721 Self::new()
722 }
723 }
724
725 impl QueueService for MemoryQueueService {
726 async fn create_producer(&self, config: ProducerConfig) -> WaeResult<MessageProducer> {
727 Ok(MessageProducer::new(Box::new(MemoryProducerBackend::new(config, self.manager.clone()))))
728 }
729
730 async fn create_consumer(&self, config: ConsumerConfig) -> WaeResult<MessageConsumer> {
731 self.manager.declare_queue(&QueueConfig::new(&config.queue)).await?;
732 Ok(MessageConsumer::new(Box::new(MemoryConsumerBackend::new(config, self.manager.clone()))))
733 }
734
735 fn manager(&self) -> &dyn QueueManager {
736 self.manager.as_ref() as &dyn QueueManager
737 }
738
739 async fn close(&self) -> WaeResult<()> {
740 Ok(())
741 }
742 }
743}
744
745#[cfg(feature = "redis-backend")]
747pub mod redis_backend {
748 use super::*;
749 use ::redis::{AsyncCommands, Client, FromRedisValue, RedisResult, streams::StreamReadOptions};
750 use base64::{Engine as _, engine::general_purpose};
751 use std::{collections::HashMap, sync::Arc};
752 use tokio::sync::Mutex;
753
754 pub struct RedisQueueManager {
756 client: Client,
757 configs: Arc<Mutex<HashMap<String, QueueConfig>>>,
758 }
759
760 impl RedisQueueManager {
761 pub fn new(client: Client) -> Self {
763 Self { client, configs: Arc::new(Mutex::new(HashMap::new())) }
764 }
765
766 pub async fn from_url(url: &str) -> WaeResult<Self> {
768 let client = Client::open(url).map_err(|e| WaeError::internal(format!("Failed to create Redis client: {}", e)))?;
769 Ok(Self::new(client))
770 }
771
772 fn stream_name(queue: &str) -> String {
773 format!("wae:stream:{}", queue)
774 }
775
776 fn group_name() -> &'static str {
777 "wae-consumer-group"
778 }
779 }
780
781 #[async_trait::async_trait]
782 impl QueueManager for RedisQueueManager {
783 async fn declare_queue(&self, config: &QueueConfig) -> WaeResult<()> {
784 let mut conn = self
785 .client
786 .get_async_connection()
787 .await
788 .map_err(|e| WaeError::internal(format!("Failed to get Redis connection: {}", e)))?;
789
790 let stream_name = Self::stream_name(&config.name);
791 let group_name = Self::group_name();
792
793 let _: RedisResult<()> = conn.xgroup_create_mkstream(&stream_name, group_name, "0").await;
794
795 let mut configs = self.configs.lock().await;
796 configs.insert(config.name.clone(), config.clone());
797
798 Ok(())
799 }
800
801 async fn delete_queue(&self, name: &str) -> WaeResult<()> {
802 let mut conn = self
803 .client
804 .get_async_connection()
805 .await
806 .map_err(|e| WaeError::internal(format!("Failed to get Redis connection: {}", e)))?;
807
808 let stream_name = Self::stream_name(name);
809
810 let _: RedisResult<()> = conn.del(&stream_name).await;
811
812 let mut configs = self.configs.lock().await;
813 configs.remove(name);
814
815 Ok(())
816 }
817
818 async fn queue_exists(&self, name: &str) -> WaeResult<bool> {
819 let mut conn = self
820 .client
821 .get_async_connection()
822 .await
823 .map_err(|e| WaeError::internal(format!("Failed to get Redis connection: {}", e)))?;
824
825 let stream_name = Self::stream_name(name);
826 let exists: bool = conn
827 .exists(&stream_name)
828 .await
829 .map_err(|e| WaeError::internal(format!("Failed to check if stream exists: {}", e)))?;
830
831 Ok(exists)
832 }
833
834 async fn queue_message_count(&self, name: &str) -> WaeResult<u64> {
835 let mut conn = self
836 .client
837 .get_async_connection()
838 .await
839 .map_err(|e| WaeError::internal(format!("Failed to get Redis connection: {}", e)))?;
840
841 let stream_name = Self::stream_name(name);
842 let len: u64 =
843 conn.xlen(&stream_name).await.map_err(|e| WaeError::internal(format!("Failed to get stream length: {}", e)))?;
844
845 Ok(len)
846 }
847
848 async fn purge_queue(&self, name: &str) -> WaeResult<u64> {
849 let mut conn = self
850 .client
851 .get_async_connection()
852 .await
853 .map_err(|e| WaeError::internal(format!("Failed to get Redis connection: {}", e)))?;
854
855 let stream_name = Self::stream_name(name);
856 let len: u64 =
857 conn.xlen(&stream_name).await.map_err(|e| WaeError::internal(format!("Failed to get stream length: {}", e)))?;
858
859 let _: RedisResult<()> = conn.del(&stream_name).await;
860
861 Ok(len)
862 }
863 }
864
865 pub struct RedisProducerBackend {
867 config: ProducerConfig,
868 client: Client,
869 manager: Arc<RedisQueueManager>,
870 }
871
872 impl RedisProducerBackend {
873 pub fn new(config: ProducerConfig, manager: Arc<RedisQueueManager>) -> Self {
875 Self { config, client: manager.client.clone(), manager }
876 }
877
878 fn encode_metadata(metadata: &MessageMetadata) -> HashMap<String, String> {
879 let mut fields = HashMap::new();
880
881 if let Some(id) = &metadata.id {
882 fields.insert("id".to_string(), id.clone());
883 }
884 if let Some(correlation_id) = &metadata.correlation_id {
885 fields.insert("correlation_id".to_string(), correlation_id.clone());
886 }
887 if let Some(reply_to) = &metadata.reply_to {
888 fields.insert("reply_to".to_string(), reply_to.clone());
889 }
890 if let Some(content_type) = &metadata.content_type {
891 fields.insert("content_type".to_string(), content_type.clone());
892 }
893 if let Some(timestamp) = metadata.timestamp {
894 fields.insert("timestamp".to_string(), timestamp.to_string());
895 }
896 if let Some(priority) = metadata.priority {
897 fields.insert("priority".to_string(), priority.to_string());
898 }
899 if let Some(expiration) = metadata.expiration {
900 fields.insert("expiration".to_string(), expiration.to_string());
901 }
902
903 for (key, value) in &metadata.headers {
904 fields.insert(format!("header:{}", key), value.clone());
905 }
906
907 fields
908 }
909 }
910
911 #[async_trait::async_trait]
912 impl ProducerBackend for RedisProducerBackend {
913 async fn send_raw(&self, queue: &str, message: &RawMessage) -> WaeResult<MessageId> {
914 self.manager.declare_queue(&QueueConfig::new(queue)).await?;
915
916 let mut conn = self
917 .client
918 .get_async_connection()
919 .await
920 .map_err(|e| WaeError::internal(format!("Failed to get Redis connection: {}", e)))?;
921
922 let stream_name = RedisQueueManager::stream_name(queue);
923 let data_b64 = general_purpose::STANDARD.encode(&message.data);
924
925 let mut fields_vec = Vec::new();
926 let mut fields = Self::encode_metadata(&message.metadata);
927 fields.insert("data".to_string(), data_b64);
928 for (k, v) in fields {
929 fields_vec.push((k, v));
930 }
931
932 let id: String = conn
933 .xadd(&stream_name, "*", &fields_vec)
934 .await
935 .map_err(|e| WaeError::internal(format!("Failed to add message to stream: {}", e)))?;
936
937 Ok(id)
938 }
939
940 async fn send_raw_default(&self, message: &RawMessage) -> WaeResult<MessageId> {
941 let queue = self.config.default_queue.as_ref().ok_or_else(|| WaeError::config_missing("default_queue"))?;
942 self.send_raw(queue, message).await
943 }
944
945 async fn send_raw_delayed(&self, queue: &str, message: &RawMessage, delay: Duration) -> WaeResult<MessageId> {
946 let delayed_queue = format!("{}:delayed", queue);
947 let delayed_stream = RedisQueueManager::stream_name(&delayed_queue);
948
949 let mut conn = self
950 .client
951 .get_async_connection()
952 .await
953 .map_err(|e| WaeError::internal(format!("Failed to get Redis connection: {}", e)))?;
954
955 let data_b64 = general_purpose::STANDARD.encode(&message.data);
956 let mut fields = Self::encode_metadata(&message.metadata);
957 fields.insert("data".to_string(), data_b64);
958 fields.insert("target_queue".to_string(), queue.to_string());
959
960 let mut fields_vec = Vec::new();
961 for (k, v) in fields {
962 fields_vec.push((k, v));
963 }
964
965 let id: String = conn
966 .xadd(&delayed_stream, "*", &fields_vec)
967 .await
968 .map_err(|e| WaeError::internal(format!("Failed to add delayed message: {}", e)))?;
969
970 let message_clone = message.clone();
971 tokio::spawn({
972 let client = self.client.clone();
973 let manager = self.manager.clone();
974 let queue = queue.to_string();
975 let delayed_stream = delayed_stream.clone();
976 let id = id.clone();
977 let delay = delay;
978
979 async move {
980 tokio::time::sleep(delay).await;
981
982 if let Ok(mut conn) = client.get_async_connection().await {
983 let _: RedisResult<()> = conn.xdel(&delayed_stream, &[&id]).await;
984
985 let mut producer = RedisProducerBackend::new(ProducerConfig::default(), manager.clone());
986 let _ = producer.send_raw(&queue, &message_clone).await;
987 }
988 }
989 });
990
991 Ok(id)
992 }
993
994 async fn send_raw_batch(&self, queue: &str, messages: &[RawMessage]) -> WaeResult<Vec<MessageId>> {
995 let mut ids = Vec::with_capacity(messages.len());
996 for msg in messages {
997 ids.push(self.send_raw(queue, msg).await?);
998 }
999 Ok(ids)
1000 }
1001
1002 fn config(&self) -> &ProducerConfig {
1003 &self.config
1004 }
1005 }
1006
1007 pub struct RedisConsumerBackend {
1009 config: ConsumerConfig,
1010 client: Client,
1011 manager: Arc<RedisQueueManager>,
1012 consumer_name: String,
1013 delivery_tags: Arc<Mutex<HashMap<u64, String>>>,
1014 next_delivery_tag: Arc<Mutex<u64>>,
1015 }
1016
1017 impl RedisConsumerBackend {
1018 pub fn new(config: ConsumerConfig, manager: Arc<RedisQueueManager>) -> Self {
1020 let consumer_name = format!("wae-consumer-{}", uuid::Uuid::new_v4());
1021 Self {
1022 config,
1023 client: manager.client.clone(),
1024 manager,
1025 consumer_name,
1026 delivery_tags: Arc::new(Mutex::new(HashMap::new())),
1027 next_delivery_tag: Arc::new(Mutex::new(1)),
1028 }
1029 }
1030
1031 fn decode_metadata(fields: &HashMap<String, String>) -> MessageMetadata {
1032 let mut metadata = MessageMetadata::default();
1033
1034 if let Some(id) = fields.get("id") {
1035 metadata.id = Some(id.clone());
1036 }
1037 if let Some(correlation_id) = fields.get("correlation_id") {
1038 metadata.correlation_id = Some(correlation_id.clone());
1039 }
1040 if let Some(reply_to) = fields.get("reply_to") {
1041 metadata.reply_to = Some(reply_to.clone());
1042 }
1043 if let Some(content_type) = fields.get("content_type") {
1044 metadata.content_type = Some(content_type.clone());
1045 }
1046 if let Some(timestamp) = fields.get("timestamp").and_then(|s| s.parse().ok()) {
1047 metadata.timestamp = Some(timestamp);
1048 }
1049 if let Some(priority) = fields.get("priority").and_then(|s| s.parse().ok()) {
1050 metadata.priority = Some(priority);
1051 }
1052 if let Some(expiration) = fields.get("expiration").and_then(|s| s.parse().ok()) {
1053 metadata.expiration = Some(expiration);
1054 }
1055
1056 for (key, value) in fields {
1057 if let Some(header_key) = key.strip_prefix("header:") {
1058 metadata.headers.insert(header_key.to_string(), value.clone());
1059 }
1060 }
1061
1062 metadata
1063 }
1064 }
1065
1066 #[async_trait::async_trait]
1067 impl ConsumerBackend for RedisConsumerBackend {
1068 async fn receive_raw(&self) -> WaeResult<Option<ReceivedRawMessage>> {
1069 let mut conn = self
1070 .client
1071 .get_async_connection()
1072 .await
1073 .map_err(|e| WaeError::internal(format!("Failed to get Redis connection: {}", e)))?;
1074
1075 let stream_name = RedisQueueManager::stream_name(&self.config.queue);
1076 let group_name = RedisQueueManager::group_name();
1077
1078 let opts = StreamReadOptions::default().group(group_name, &self.consumer_name).count(1).block(1000);
1079
1080 let result: RedisResult<redis::streams::StreamReadReply> = conn.xread_options(&[&stream_name], &[">"], &opts).await;
1081
1082 match result {
1083 Ok(streams) => {
1084 if let Some(stream) = streams.keys.into_iter().next() {
1085 if let Some(entry) = stream.ids.into_iter().next() {
1086 let mut fields = HashMap::new();
1087 for (key, value) in entry.map {
1088 if let Ok(s) = String::from_redis_value(&value) {
1089 fields.insert(key, s);
1090 }
1091 }
1092
1093 let data_b64 = fields.get("data").ok_or_else(|| WaeError::internal("Missing data field"))?;
1094 let data = general_purpose::STANDARD
1095 .decode(data_b64)
1096 .map_err(|e| WaeError::internal(format!("Failed to decode data: {}", e)))?;
1097
1098 let metadata = Self::decode_metadata(&fields);
1099
1100 let mut next_tag = self.next_delivery_tag.lock().await;
1101 let delivery_tag = *next_tag;
1102 *next_tag += 1;
1103 drop(next_tag);
1104
1105 let mut delivery_tags = self.delivery_tags.lock().await;
1106 delivery_tags.insert(delivery_tag, entry.id.clone());
1107
1108 let redelivery_count =
1109 metadata.headers.get("x-redelivery-count").and_then(|s| s.parse().ok()).unwrap_or(0);
1110
1111 let message = RawMessage { data, metadata };
1112 return Ok(Some(ReceivedRawMessage { message, delivery_tag, redelivery_count }));
1113 }
1114 }
1115 Ok(None)
1116 }
1117 Err(e) => {
1118 if e.to_string().contains("NOGROUP") {
1119 self.manager.declare_queue(&QueueConfig::new(&self.config.queue)).await?;
1120 Ok(None)
1121 }
1122 else {
1123 Err(WaeError::internal(format!("Failed to read from stream: {}", e)))
1124 }
1125 }
1126 }
1127 }
1128
1129 async fn ack(&self, delivery_tag: u64) -> WaeResult<()> {
1130 let mut conn = self
1131 .client
1132 .get_async_connection()
1133 .await
1134 .map_err(|e| WaeError::internal(format!("Failed to get Redis connection: {}", e)))?;
1135
1136 let stream_name = RedisQueueManager::stream_name(&self.config.queue);
1137 let group_name = RedisQueueManager::group_name();
1138
1139 let mut delivery_tags = self.delivery_tags.lock().await;
1140 if let Some(message_id) = delivery_tags.remove(&delivery_tag) {
1141 let _: RedisResult<()> = conn.xack(&stream_name, group_name, &[&message_id]).await;
1142 let _: RedisResult<()> = conn.xdel(&stream_name, &[&message_id]).await;
1143 }
1144
1145 Ok(())
1146 }
1147
1148 async fn nack(&self, delivery_tag: u64, requeue: bool) -> WaeResult<()> {
1149 let mut conn = self
1150 .client
1151 .get_async_connection()
1152 .await
1153 .map_err(|e| WaeError::internal(format!("Failed to get Redis connection: {}", e)))?;
1154
1155 let stream_name = RedisQueueManager::stream_name(&self.config.queue);
1156 let group_name = RedisQueueManager::group_name();
1157
1158 let mut delivery_tags = self.delivery_tags.lock().await;
1159 if let Some(message_id) = delivery_tags.remove(&delivery_tag) {
1160 if requeue {
1161 let _: RedisResult<()> =
1162 conn.xclaim(&stream_name, group_name, &self.consumer_name, 0, &[&message_id]).await;
1163 }
1164 else {
1165 let configs = self.manager.configs.lock().await;
1166 let dlq_name = configs.get(&self.config.queue).and_then(|c| c.dead_letter_queue.clone());
1167 drop(configs);
1168
1169 if let Some(dlq_name) = dlq_name {
1170 let _: RedisResult<()> = conn.xack(&stream_name, group_name, &[&message_id]).await;
1171 let _: RedisResult<()> = conn.xdel(&stream_name, &[&message_id]).await;
1172 }
1173 else {
1174 let _: RedisResult<()> = conn.xack(&stream_name, group_name, &[&message_id]).await;
1175 let _: RedisResult<()> = conn.xdel(&stream_name, &[&message_id]).await;
1176 }
1177 }
1178 }
1179
1180 Ok(())
1181 }
1182
1183 fn config(&self) -> &ConsumerConfig {
1184 &self.config
1185 }
1186 }
1187
1188 pub struct RedisQueueService {
1190 manager: Arc<RedisQueueManager>,
1191 }
1192
1193 impl RedisQueueService {
1194 pub fn new(manager: Arc<RedisQueueManager>) -> Self {
1196 Self { manager }
1197 }
1198
1199 pub async fn from_url(url: &str) -> WaeResult<Self> {
1201 let manager = Arc::new(RedisQueueManager::from_url(url).await?);
1202 Ok(Self::new(manager))
1203 }
1204 }
1205
1206 impl QueueService for RedisQueueService {
1207 async fn create_producer(&self, config: ProducerConfig) -> WaeResult<MessageProducer> {
1208 Ok(MessageProducer::new(Box::new(RedisProducerBackend::new(config, self.manager.clone()))))
1209 }
1210
1211 async fn create_consumer(&self, config: ConsumerConfig) -> WaeResult<MessageConsumer> {
1212 self.manager.declare_queue(&QueueConfig::new(&config.queue)).await?;
1213 Ok(MessageConsumer::new(Box::new(RedisConsumerBackend::new(config, self.manager.clone()))))
1214 }
1215
1216 fn manager(&self) -> &dyn QueueManager {
1217 self.manager.as_ref() as &dyn QueueManager
1218 }
1219
1220 async fn close(&self) -> WaeResult<()> {
1221 Ok(())
1222 }
1223 }
1224}
1225
1226pub fn memory_queue_service() -> memory::MemoryQueueService {
1228 memory::MemoryQueueService::new()
1229}
1230
1231#[cfg(feature = "kafka-backend")]
1233pub mod kafka_backend {
1234 use super::*;
1235 use base64::{Engine as _, engine::general_purpose};
1236 use rdkafka::{
1237 ClientConfig,
1238 consumer::{Consumer, DefaultConsumerContext, StreamConsumer},
1239 message::{Headers, OwnedMessage},
1240 producer::{DeliveryFuture, FutureProducer, FutureRecord},
1241 util::Timeout,
1242 };
1243 use std::{collections::HashMap, sync::Arc, time::Duration};
1244 use tokio::sync::Mutex;
1245 use uuid::Uuid;
1246
1247 #[derive(Debug, Clone)]
1249 pub struct KafkaConfig {
1250 pub brokers: String,
1252 pub client_id: Option<String>,
1254 pub producer_config: HashMap<String, String>,
1256 pub consumer_config: HashMap<String, String>,
1258 }
1259
1260 impl Default for KafkaConfig {
1261 fn default() -> Self {
1262 Self {
1263 brokers: "localhost:9092".to_string(),
1264 client_id: Some("wae-kafka".to_string()),
1265 producer_config: HashMap::new(),
1266 consumer_config: HashMap::new(),
1267 }
1268 }
1269 }
1270
1271 impl KafkaConfig {
1272 pub fn new(brokers: impl Into<String>) -> Self {
1274 Self { brokers: brokers.into(), ..Default::default() }
1275 }
1276 }
1277
1278 pub struct KafkaQueueManager {
1280 config: ClientConfig,
1281 }
1282
1283 impl KafkaQueueManager {
1284 pub fn new(config: &KafkaConfig) -> Self {
1286 let mut client_config = ClientConfig::new();
1287 client_config.set("bootstrap.servers", &config.brokers);
1288 if let Some(client_id) = &config.client_id {
1289 client_config.set("client.id", client_id);
1290 }
1291 for (key, value) in &config.producer_config {
1292 client_config.set(key, value);
1293 }
1294 for (key, value) in &config.consumer_config {
1295 client_config.set(key, value);
1296 }
1297 Self { config: client_config }
1298 }
1299
1300 fn topic_name(queue: &str) -> String {
1302 queue.to_string()
1303 }
1304 }
1305
1306 #[async_trait::async_trait]
1307 impl QueueManager for KafkaQueueManager {
1308 async fn declare_queue(&self, config: &QueueConfig) -> WaeResult<()> {
1309 Ok(())
1310 }
1311
1312 async fn delete_queue(&self, name: &str) -> WaeResult<()> {
1313 Ok(())
1314 }
1315
1316 async fn queue_exists(&self, name: &str) -> WaeResult<bool> {
1317 Ok(true)
1318 }
1319
1320 async fn queue_message_count(&self, name: &str) -> WaeResult<u64> {
1321 Ok(0)
1322 }
1323
1324 async fn purge_queue(&self, name: &str) -> WaeResult<u64> {
1325 Ok(0)
1326 }
1327 }
1328
1329 pub struct KafkaProducerBackend {
1331 config: ProducerConfig,
1332 producer: Arc<FutureProducer>,
1333 manager: Arc<KafkaQueueManager>,
1334 }
1335
1336 impl KafkaProducerBackend {
1337 pub fn new(config: ProducerConfig, manager: Arc<KafkaQueueManager>, producer: Arc<FutureProducer>) -> Self {
1339 Self { config, producer, manager }
1340 }
1341
1342 fn encode_metadata(metadata: &MessageMetadata) -> HashMap<String, String> {
1344 let mut fields = HashMap::new();
1345
1346 if let Some(id) = &metadata.id {
1347 fields.insert("id".to_string(), id.clone());
1348 }
1349 if let Some(correlation_id) = &metadata.correlation_id {
1350 fields.insert("correlation_id".to_string(), correlation_id.clone());
1351 }
1352 if let Some(reply_to) = &metadata.reply_to {
1353 fields.insert("reply_to".to_string(), reply_to.clone());
1354 }
1355 if let Some(content_type) = &metadata.content_type {
1356 fields.insert("content_type".to_string(), content_type.clone());
1357 }
1358 if let Some(timestamp) = metadata.timestamp {
1359 fields.insert("timestamp".to_string(), timestamp.to_string());
1360 }
1361 if let Some(priority) = metadata.priority {
1362 fields.insert("priority".to_string(), priority.to_string());
1363 }
1364 if let Some(expiration) = metadata.expiration {
1365 fields.insert("expiration".to_string(), expiration.to_string());
1366 }
1367
1368 for (key, value) in &metadata.headers {
1369 fields.insert(format!("header:{}", key), value.clone());
1370 }
1371
1372 fields
1373 }
1374 }
1375
1376 #[async_trait::async_trait]
1377 impl ProducerBackend for KafkaProducerBackend {
1378 async fn send_raw(&self, queue: &str, message: &RawMessage) -> WaeResult<MessageId> {
1379 let id = Uuid::new_v4().to_string();
1380 let mut metadata = message.metadata.clone();
1381 metadata.id = Some(id.clone());
1382 if metadata.timestamp.is_none() {
1383 metadata.timestamp =
1384 Some(std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as u64);
1385 }
1386
1387 let topic = KafkaQueueManager::topic_name(queue);
1388 let data_b64 = general_purpose::STANDARD.encode(&message.data);
1389 let mut fields = Self::encode_metadata(&metadata);
1390 fields.insert("data".to_string(), data_b64);
1391
1392 let payload = serde_json::to_vec(&fields)
1393 .map_err(|e| WaeError::internal(format!("Failed to serialize Kafka message: {}", e)))?;
1394
1395 let record = FutureRecord::to(&topic).payload(&payload).key(&id);
1396
1397 let result = self.producer.send(record, Timeout::After(Duration::from_secs(5))).await;
1398
1399 match result {
1400 Ok((_, _)) => Ok(id),
1401 Err((e, _)) => Err(WaeError::internal(format!("Failed to send Kafka message: {}", e))),
1402 }
1403 }
1404
1405 async fn send_raw_default(&self, message: &RawMessage) -> WaeResult<MessageId> {
1406 let queue = self.config.default_queue.as_ref().ok_or_else(|| WaeError::config_missing("default_queue"))?;
1407 self.send_raw(queue, message).await
1408 }
1409
1410 async fn send_raw_delayed(&self, queue: &str, message: &RawMessage, delay: Duration) -> WaeResult<MessageId> {
1411 let id = Uuid::new_v4().to_string();
1412 let message_clone = message.clone();
1413 let producer = self.producer.clone();
1414 let manager = self.manager.clone();
1415 let queue = queue.to_string();
1416
1417 tokio::spawn(async move {
1418 tokio::time::sleep(delay).await;
1419 let mut producer_config = ProducerConfig::default();
1420 producer_config.default_queue = Some(queue.clone());
1421 let mut producer = KafkaProducerBackend::new(producer_config, manager.clone(), producer.clone());
1422 let _ = producer.send_raw(&queue, &message_clone).await;
1423 });
1424
1425 Ok(id)
1426 }
1427
1428 async fn send_raw_batch(&self, queue: &str, messages: &[RawMessage]) -> WaeResult<Vec<MessageId>> {
1429 let mut ids = Vec::with_capacity(messages.len());
1430 for msg in messages {
1431 ids.push(self.send_raw(queue, msg).await?);
1432 }
1433 Ok(ids)
1434 }
1435
1436 fn config(&self) -> &ProducerConfig {
1437 &self.config
1438 }
1439 }
1440
1441 pub struct KafkaConsumerBackend {
1443 config: ConsumerConfig,
1444 consumer: Arc<StreamConsumer<DefaultConsumerContext>>,
1445 manager: Arc<KafkaQueueManager>,
1446 delivery_tags: Arc<Mutex<HashMap<u64, (i32, i64)>>>,
1447 next_delivery_tag: Arc<Mutex<u64>>,
1448 }
1449
1450 impl KafkaConsumerBackend {
1451 pub fn new(
1453 config: ConsumerConfig,
1454 manager: Arc<KafkaQueueManager>,
1455 consumer: Arc<StreamConsumer<DefaultConsumerContext>>,
1456 ) -> Self {
1457 Self {
1458 config,
1459 consumer,
1460 manager,
1461 delivery_tags: Arc::new(Mutex::new(HashMap::new())),
1462 next_delivery_tag: Arc::new(Mutex::new(1)),
1463 }
1464 }
1465
1466 fn decode_metadata(fields: &HashMap<String, String>) -> MessageMetadata {
1468 let mut metadata = MessageMetadata::default();
1469
1470 if let Some(id) = fields.get("id") {
1471 metadata.id = Some(id.clone());
1472 }
1473 if let Some(correlation_id) = fields.get("correlation_id") {
1474 metadata.correlation_id = Some(correlation_id.clone());
1475 }
1476 if let Some(reply_to) = fields.get("reply_to") {
1477 metadata.reply_to = Some(reply_to.clone());
1478 }
1479 if let Some(content_type) = fields.get("content_type") {
1480 metadata.content_type = Some(content_type.clone());
1481 }
1482 if let Some(timestamp) = fields.get("timestamp").and_then(|s| s.parse().ok()) {
1483 metadata.timestamp = Some(timestamp);
1484 }
1485 if let Some(priority) = fields.get("priority").and_then(|s| s.parse().ok()) {
1486 metadata.priority = Some(priority);
1487 }
1488 if let Some(expiration) = fields.get("expiration").and_then(|s| s.parse().ok()) {
1489 metadata.expiration = Some(expiration);
1490 }
1491
1492 for (key, value) in fields {
1493 if let Some(header_key) = key.strip_prefix("header:") {
1494 metadata.headers.insert(header_key.to_string(), value.clone());
1495 }
1496 }
1497
1498 metadata
1499 }
1500 }
1501
1502 #[async_trait::async_trait]
1503 impl ConsumerBackend for KafkaConsumerBackend {
1504 async fn receive_raw(&self) -> WaeResult<Option<ReceivedRawMessage>> {
1505 match tokio::time::timeout(Duration::from_secs(1), self.consumer.recv()).await {
1506 Ok(Ok(message)) => {
1507 let payload = message.payload().ok_or_else(|| WaeError::internal("Kafka message missing payload"))?;
1508 let fields: HashMap<String, String> = serde_json::from_slice(payload)
1509 .map_err(|e| WaeError::internal(format!("Failed to deserialize Kafka message: {}", e)))?;
1510 let data_b64 = fields.get("data").ok_or_else(|| WaeError::internal("Missing data field"))?;
1511 let data = general_purpose::STANDARD
1512 .decode(data_b64)
1513 .map_err(|e| WaeError::internal(format!("Failed to decode data: {}", e)))?;
1514 let metadata = Self::decode_metadata(&fields);
1515
1516 let mut next_tag = self.next_delivery_tag.lock().await;
1517 let delivery_tag = *next_tag;
1518 *next_tag += 1;
1519 drop(next_tag);
1520
1521 let mut delivery_tags = self.delivery_tags.lock().await;
1522 delivery_tags.insert(delivery_tag, (message.partition(), message.offset()));
1523
1524 let redelivery_count = metadata.headers.get("x-redelivery-count").and_then(|s| s.parse().ok()).unwrap_or(0);
1525
1526 let raw_message = RawMessage { data, metadata };
1527 Ok(Some(ReceivedRawMessage { message: raw_message, delivery_tag, redelivery_count }))
1528 }
1529 Ok(Err(e)) => Err(WaeError::internal(format!("Failed to receive from Kafka: {}", e))),
1530 Err(_) => Ok(None),
1531 }
1532 }
1533
1534 async fn ack(&self, delivery_tag: u64) -> WaeResult<()> {
1535 let mut delivery_tags = self.delivery_tags.lock().await;
1536 if let Some((partition, offset)) = delivery_tags.remove(&delivery_tag) {
1537 self.consumer
1538 .commit_offset(
1539 &rdkafka::TopicPartitionList::new().with_partition_offset(
1540 &self.config.queue,
1541 partition,
1542 rdkafka::Offset::Offset(offset + 1),
1543 ),
1544 rdkafka::consumer::CommitMode::Async,
1545 )
1546 .map_err(|e| WaeError::internal(format!("Failed to commit Kafka offset: {}", e)))?;
1547 }
1548 Ok(())
1549 }
1550
1551 async fn nack(&self, delivery_tag: u64, requeue: bool) -> WaeResult<()> {
1552 let mut delivery_tags = self.delivery_tags.lock().await;
1553 if let Some((partition, offset)) = delivery_tags.remove(&delivery_tag) {
1554 if requeue {
1555 self.consumer
1556 .seek(
1557 &self.config.queue,
1558 partition,
1559 rdkafka::Offset::Offset(offset),
1560 Timeout::After(Duration::from_secs(5)),
1561 )
1562 .await
1563 .map_err(|e| WaeError::internal(format!("Failed to seek Kafka offset: {}", e)))?;
1564 }
1565 else {
1566 self.consumer
1567 .commit_offset(
1568 &rdkafka::TopicPartitionList::new().with_partition_offset(
1569 &self.config.queue,
1570 partition,
1571 rdkafka::Offset::Offset(offset + 1),
1572 ),
1573 rdkafka::consumer::CommitMode::Async,
1574 )
1575 .map_err(|e| WaeError::internal(format!("Failed to commit Kafka offset: {}", e)))?;
1576 }
1577 }
1578 Ok(())
1579 }
1580
1581 fn config(&self) -> &ConsumerConfig {
1582 &self.config
1583 }
1584 }
1585
1586 pub struct KafkaQueueService {
1588 manager: Arc<KafkaQueueManager>,
1589 producer: Arc<FutureProducer>,
1590 consumer_config: ClientConfig,
1591 }
1592
1593 impl KafkaQueueService {
1594 pub async fn new(config: KafkaConfig) -> WaeResult<Self> {
1596 let manager = Arc::new(KafkaQueueManager::new(&config));
1597
1598 let mut producer_config = ClientConfig::new();
1599 producer_config.set("bootstrap.servers", &config.brokers);
1600 if let Some(client_id) = &config.client_id {
1601 producer_config.set("client.id", client_id);
1602 }
1603 for (key, value) in &config.producer_config {
1604 producer_config.set(key, value);
1605 }
1606 let producer: FutureProducer =
1607 producer_config.create().map_err(|e| WaeError::internal(format!("Failed to create Kafka producer: {}", e)))?;
1608
1609 let mut consumer_config = ClientConfig::new();
1610 consumer_config.set("bootstrap.servers", &config.brokers);
1611 consumer_config.set("group.id", format!("wae-{}", Uuid::new_v4()));
1612 consumer_config.set("enable.auto.commit", "false");
1613 consumer_config.set("auto.offset.reset", "earliest");
1614 if let Some(client_id) = &config.client_id {
1615 consumer_config.set("client.id", client_id);
1616 }
1617 for (key, value) in &config.consumer_config {
1618 consumer_config.set(key, value);
1619 }
1620
1621 Ok(Self { manager, producer: Arc::new(producer), consumer_config })
1622 }
1623 }
1624
1625 impl QueueService for KafkaQueueService {
1626 async fn create_producer(&self, config: ProducerConfig) -> WaeResult<MessageProducer> {
1627 Ok(MessageProducer::new(Box::new(KafkaProducerBackend::new(config, self.manager.clone(), self.producer.clone()))))
1628 }
1629
1630 async fn create_consumer(&self, config: ConsumerConfig) -> WaeResult<MessageConsumer> {
1631 let consumer: StreamConsumer<DefaultConsumerContext> = self
1632 .consumer_config
1633 .create()
1634 .map_err(|e| WaeError::internal(format!("Failed to create Kafka consumer: {}", e)))?;
1635
1636 consumer
1637 .subscribe(&[&config.queue])
1638 .map_err(|e| WaeError::internal(format!("Failed to subscribe to Kafka topic: {}", e)))?;
1639
1640 let backend = KafkaConsumerBackend::new(config, self.manager.clone(), Arc::new(consumer));
1641
1642 Ok(MessageConsumer::new(Box::new(backend)))
1643 }
1644
1645 fn manager(&self) -> &dyn QueueManager {
1646 self.manager.as_ref() as &dyn QueueManager
1647 }
1648
1649 async fn close(&self) -> WaeResult<()> {
1650 Ok(())
1651 }
1652 }
1653}
1654
1655#[cfg(feature = "rabbitmq-backend")]
1657pub mod rabbitmq {
1658 use super::*;
1659 use lapin::{
1660 BasicProperties, Channel, Connection, ConnectionProperties, message::Delivery, options::*,
1661 publisher_confirm::Confirmation, types::*,
1662 };
1663 use std::sync::Arc;
1664 use tokio::sync::Semaphore;
1665 use uuid::Uuid;
1666
1667 #[derive(Clone)]
1669 pub struct RabbitMQConfig {
1670 pub amqp_url: String,
1672 pub connection_properties: ConnectionProperties,
1674 }
1675
1676 impl Default for RabbitMQConfig {
1677 fn default() -> Self {
1678 Self {
1679 amqp_url: "amqp://guest:guest@localhost:5672/%2f".to_string(),
1680 connection_properties: ConnectionProperties::default(),
1681 }
1682 }
1683 }
1684
1685 impl RabbitMQConfig {
1686 pub fn new(amqp_url: impl Into<String>) -> Self {
1688 Self { amqp_url: amqp_url.into(), connection_properties: ConnectionProperties::default() }
1689 }
1690 }
1691
1692 pub struct RabbitMQQueueManager {
1694 channel: Arc<Channel>,
1695 }
1696
1697 impl RabbitMQQueueManager {
1698 pub fn new(channel: Arc<Channel>) -> Self {
1700 Self { channel }
1701 }
1702
1703 async fn declare_queue_internal(&self, config: &QueueConfig) -> WaeResult<()> {
1705 let mut arguments = FieldTable::default();
1706
1707 if let Some(max_messages) = config.max_messages {
1708 arguments.insert("x-max-length".into(), AMQPValue::LongUInt(max_messages as u32));
1709 }
1710
1711 if let Some(max_message_size) = config.max_message_size {
1712 arguments.insert("x-max-length-bytes".into(), AMQPValue::LongUInt(max_message_size as u32));
1713 }
1714
1715 if let Some(message_ttl) = config.message_ttl {
1716 arguments.insert("x-message-ttl".into(), AMQPValue::LongUInt(message_ttl as u32));
1717 }
1718
1719 if let Some(dlq) = &config.dead_letter_queue {
1720 arguments.insert("x-dead-letter-exchange".into(), AMQPValue::ShortString("".into()));
1721 arguments.insert("x-dead-letter-routing-key".into(), AMQPValue::ShortString(dlq.clone().into()));
1722 }
1723
1724 self.channel
1725 .queue_declare(
1726 &config.name,
1727 QueueDeclareOptions {
1728 passive: false,
1729 durable: config.durable,
1730 exclusive: false,
1731 auto_delete: config.auto_delete,
1732 nowait: false,
1733 },
1734 arguments,
1735 )
1736 .await
1737 .map_err(|e| WaeError::internal(format!("RabbitMQ declare queue: {}", e)))?;
1738
1739 Ok(())
1740 }
1741 }
1742
1743 #[async_trait::async_trait]
1744 impl QueueManager for RabbitMQQueueManager {
1745 async fn declare_queue(&self, config: &QueueConfig) -> WaeResult<()> {
1746 self.declare_queue_internal(config).await
1747 }
1748
1749 async fn delete_queue(&self, name: &str) -> WaeResult<()> {
1750 self.channel
1751 .queue_delete(name, QueueDeleteOptions::default())
1752 .await
1753 .map_err(|e| WaeError::internal(format!("RabbitMQ delete queue: {}", e)))?;
1754 Ok(())
1755 }
1756
1757 async fn queue_exists(&self, name: &str) -> WaeResult<bool> {
1758 let result = self
1759 .channel
1760 .queue_declare(name, QueueDeclareOptions { passive: true, ..Default::default() }, FieldTable::default())
1761 .await;
1762
1763 match result {
1764 Ok(_) => Ok(true),
1765 Err(lapin::Error::ProtocolError(_)) => Ok(false),
1766 Err(e) => Err(WaeError::internal(format!("RabbitMQ check queue exists: {}", e))),
1767 }
1768 }
1769
1770 async fn queue_message_count(&self, name: &str) -> WaeResult<u64> {
1771 let queue = self
1772 .channel
1773 .queue_declare(name, QueueDeclareOptions { passive: true, ..Default::default() }, FieldTable::default())
1774 .await
1775 .map_err(|e| WaeError::internal(format!("RabbitMQ get queue message count: {}", e)))?;
1776
1777 Ok(queue.message_count() as u64)
1778 }
1779
1780 async fn purge_queue(&self, name: &str) -> WaeResult<u64> {
1781 let result = self
1782 .channel
1783 .queue_purge(name, QueuePurgeOptions::default())
1784 .await
1785 .map_err(|e| WaeError::internal(format!("RabbitMQ purge queue: {}", e)))?;
1786
1787 Ok(result as u64)
1788 }
1789 }
1790
1791 pub struct RabbitMQProducerBackend {
1793 config: ProducerConfig,
1794 channel: Arc<Channel>,
1795 manager: Arc<RabbitMQQueueManager>,
1796 }
1797
1798 impl RabbitMQProducerBackend {
1799 pub fn new(config: ProducerConfig, channel: Arc<Channel>, manager: Arc<RabbitMQQueueManager>) -> Self {
1801 Self { config, channel, manager }
1802 }
1803
1804 fn metadata_to_properties(metadata: &MessageMetadata) -> BasicProperties {
1806 let mut props = BasicProperties::default();
1807
1808 if let Some(correlation_id) = &metadata.correlation_id {
1809 props = props.with_correlation_id(correlation_id.clone().into());
1810 }
1811
1812 if let Some(reply_to) = &metadata.reply_to {
1813 props = props.with_reply_to(reply_to.clone().into());
1814 }
1815
1816 if let Some(content_type) = &metadata.content_type {
1817 props = props.with_content_type(content_type.clone().into());
1818 }
1819
1820 if let Some(timestamp) = metadata.timestamp {
1821 props = props.with_timestamp(timestamp);
1822 }
1823
1824 if let Some(priority) = metadata.priority {
1825 props = props.with_priority(priority);
1826 }
1827
1828 if let Some(expiration) = metadata.expiration {
1829 props = props.with_expiration(expiration.to_string().into());
1830 }
1831
1832 let mut headers = FieldTable::default();
1833 for (key, value) in &metadata.headers {
1834 headers.insert(key.clone().into(), AMQPValue::LongString(value.clone().into()));
1835 }
1836
1837 if metadata.headers.len() > 0 {
1838 props = props.with_headers(headers);
1839 }
1840
1841 props
1842 }
1843
1844 async fn send_raw_internal(&self, queue: &str, message: &RawMessage, id: &str) -> WaeResult<()> {
1846 let mut metadata = message.metadata.clone();
1847 if metadata.timestamp.is_none() {
1848 metadata.timestamp =
1849 Some(std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as u64);
1850 }
1851
1852 let mut props = Self::metadata_to_properties(&metadata);
1853 props = props.with_message_id(id.into());
1854
1855 let confirm = self
1856 .channel
1857 .basic_publish("", queue, BasicPublishOptions::default(), &message.data, props)
1858 .await
1859 .map_err(|e| WaeError::internal(format!("RabbitMQ publish message: {}", e)))?
1860 .await
1861 .map_err(|e| WaeError::internal(format!("RabbitMQ wait for confirm: {}", e)))?;
1862
1863 match confirm {
1864 Confirmation::Ack(_) => Ok(()),
1865 Confirmation::Nack(_) => Err(WaeError::internal("RabbitMQ message nacked: Message was nacked by broker")),
1866 Confirmation::NotRequested => Ok(()),
1867 }
1868 }
1869 }
1870
1871 #[async_trait::async_trait]
1872 impl ProducerBackend for RabbitMQProducerBackend {
1873 async fn send_raw(&self, queue: &str, message: &RawMessage) -> WaeResult<MessageId> {
1874 self.manager.declare_queue(&QueueConfig::new(queue)).await?;
1875
1876 let id = Uuid::new_v4().to_string();
1877 self.send_raw_internal(queue, message, &id).await?;
1878 Ok(id)
1879 }
1880
1881 async fn send_raw_default(&self, message: &RawMessage) -> WaeResult<MessageId> {
1882 let queue = self.config.default_queue.as_ref().ok_or_else(|| WaeError::config_missing("default_queue"))?;
1883 self.send_raw(queue, message).await
1884 }
1885
1886 async fn send_raw_delayed(&self, queue: &str, message: &RawMessage, delay: Duration) -> WaeResult<MessageId> {
1887 let delay_ms = delay.as_millis() as u64;
1888 let delay_queue_name = format!("{}.delay.{}", queue, delay_ms);
1889
1890 let mut delay_queue_config = QueueConfig::new(&delay_queue_name);
1891 delay_queue_config = delay_queue_config.dead_letter_queue(queue);
1892 delay_queue_config.message_ttl = Some(delay_ms);
1893
1894 self.manager.declare_queue(&delay_queue_config).await?;
1895
1896 let id = Uuid::new_v4().to_string();
1897 self.send_raw_internal(&delay_queue_name, message, &id).await?;
1898 Ok(id)
1899 }
1900
1901 async fn send_raw_batch(&self, queue: &str, messages: &[RawMessage]) -> WaeResult<Vec<MessageId>> {
1902 let mut ids = Vec::with_capacity(messages.len());
1903 for msg in messages {
1904 ids.push(self.send_raw(queue, msg).await?);
1905 }
1906 Ok(ids)
1907 }
1908
1909 fn config(&self) -> &ProducerConfig {
1910 &self.config
1911 }
1912 }
1913
1914 pub struct RabbitMQConsumerBackend {
1916 config: ConsumerConfig,
1917 channel: Arc<Channel>,
1918 manager: Arc<RabbitMQQueueManager>,
1919 consumer: Arc<tokio::sync::Mutex<lapin::Consumer>>,
1920 semaphore: Arc<Semaphore>,
1921 }
1922
1923 impl RabbitMQConsumerBackend {
1924 pub async fn new(config: ConsumerConfig, channel: Arc<Channel>, manager: Arc<RabbitMQQueueManager>) -> WaeResult<Self> {
1926 channel
1927 .basic_qos(config.prefetch_count, BasicQosOptions::default())
1928 .await
1929 .map_err(|e| WaeError::internal(format!("RabbitMQ set qos: {}", e)))?;
1930
1931 let consumer = channel
1932 .basic_consume(
1933 &config.queue,
1934 config.consumer_tag.as_deref().unwrap_or(""),
1935 BasicConsumeOptions {
1936 no_local: false,
1937 no_ack: config.auto_ack,
1938 exclusive: config.exclusive,
1939 nowait: false,
1940 },
1941 FieldTable::default(),
1942 )
1943 .await
1944 .map_err(|e| WaeError::internal(format!("RabbitMQ create consumer: {}", e)))?;
1945
1946 let prefetch_count = config.prefetch_count;
1947 Ok(Self {
1948 config,
1949 channel,
1950 manager,
1951 consumer: Arc::new(tokio::sync::Mutex::new(consumer)),
1952 semaphore: Arc::new(Semaphore::new(prefetch_count as usize)),
1953 })
1954 }
1955
1956 fn delivery_to_raw_message(delivery: &Delivery) -> RawMessage {
1958 let mut metadata = MessageMetadata::default();
1959
1960 metadata.id = delivery.properties.message_id().clone().map(|s| s.to_string());
1961 metadata.correlation_id = delivery.properties.correlation_id().clone().map(|s| s.to_string());
1962 metadata.reply_to = delivery.properties.reply_to().clone().map(|s| s.to_string());
1963 metadata.content_type = delivery.properties.content_type().clone().map(|s| s.to_string());
1964 metadata.timestamp = delivery.properties.timestamp().map(|t| t);
1965 metadata.priority = *delivery.properties.priority();
1966 metadata.expiration = delivery.properties.expiration().clone().and_then(|s| s.as_str().parse().ok());
1967
1968 if let Some(headers) = delivery.properties.headers() {
1969 for (key, value) in headers.inner().iter() {
1970 if let AMQPValue::LongString(s) = value {
1971 metadata.headers.insert(key.to_string(), s.to_string());
1972 }
1973 else if let AMQPValue::ShortString(s) = value {
1974 metadata.headers.insert(key.to_string(), s.to_string());
1975 }
1976 }
1977 }
1978
1979 RawMessage { data: delivery.data.clone(), metadata }
1980 }
1981 }
1982
1983 #[async_trait::async_trait]
1984 impl ConsumerBackend for RabbitMQConsumerBackend {
1985 async fn receive_raw(&self) -> WaeResult<Option<ReceivedRawMessage>> {
1986 use futures::StreamExt;
1987
1988 let mut consumer = self.consumer.lock().await;
1989 if let Some(delivery_result) = consumer.next().await {
1990 let delivery = delivery_result.map_err(|e| WaeError::internal(format!("RabbitMQ receive message: {}", e)))?;
1991
1992 let message = Self::delivery_to_raw_message(&delivery);
1993 let redelivery_count = 0;
1994
1995 Ok(Some(ReceivedRawMessage { message, delivery_tag: delivery.delivery_tag, redelivery_count }))
1996 }
1997 else {
1998 Ok(None)
1999 }
2000 }
2001
2002 async fn ack(&self, delivery_tag: u64) -> WaeResult<()> {
2003 self.channel
2004 .basic_ack(delivery_tag, BasicAckOptions::default())
2005 .await
2006 .map_err(|e| WaeError::internal(format!("RabbitMQ ack message: {}", e)))?;
2007 Ok(())
2008 }
2009
2010 async fn nack(&self, delivery_tag: u64, requeue: bool) -> WaeResult<()> {
2011 self.channel
2012 .basic_nack(delivery_tag, BasicNackOptions { multiple: false, requeue })
2013 .await
2014 .map_err(|e| WaeError::internal(format!("RabbitMQ nack message: {}", e)))?;
2015 Ok(())
2016 }
2017
2018 fn config(&self) -> &ConsumerConfig {
2019 &self.config
2020 }
2021 }
2022
2023 pub struct RabbitMQQueueService {
2025 connection: Arc<Connection>,
2026 channel: Arc<Channel>,
2027 manager: Arc<RabbitMQQueueManager>,
2028 }
2029
2030 impl RabbitMQQueueService {
2031 pub async fn new(config: RabbitMQConfig) -> WaeResult<Self> {
2033 let connection = Connection::connect(&config.amqp_url, config.connection_properties)
2034 .await
2035 .map_err(|e| WaeError::internal(format!("RabbitMQ connect: {}", e)))?;
2036
2037 let channel =
2038 connection.create_channel().await.map_err(|e| WaeError::internal(format!("RabbitMQ create channel: {}", e)))?;
2039
2040 channel
2041 .confirm_select(ConfirmSelectOptions::default())
2042 .await
2043 .map_err(|e| WaeError::internal(format!("RabbitMQ enable confirm mode: {}", e)))?;
2044
2045 let connection = Arc::new(connection);
2046 let channel = Arc::new(channel);
2047 let manager = Arc::new(RabbitMQQueueManager::new(channel.clone()));
2048
2049 Ok(Self { connection, channel, manager })
2050 }
2051 }
2052
2053 impl QueueService for RabbitMQQueueService {
2054 async fn create_producer(&self, config: ProducerConfig) -> WaeResult<MessageProducer> {
2055 Ok(MessageProducer::new(Box::new(RabbitMQProducerBackend::new(config, self.channel.clone(), self.manager.clone()))))
2056 }
2057
2058 async fn create_consumer(&self, config: ConsumerConfig) -> WaeResult<MessageConsumer> {
2059 self.manager.declare_queue(&QueueConfig::new(&config.queue)).await?;
2060
2061 let backend = RabbitMQConsumerBackend::new(config, self.channel.clone(), self.manager.clone()).await?;
2062
2063 Ok(MessageConsumer::new(Box::new(backend)))
2064 }
2065
2066 fn manager(&self) -> &dyn QueueManager {
2067 self.manager.as_ref() as &dyn QueueManager
2068 }
2069
2070 async fn close(&self) -> WaeResult<()> {
2071 self.connection.close(0, "").await.map_err(|e| WaeError::internal(format!("RabbitMQ close connection: {}", e)))?;
2072 Ok(())
2073 }
2074 }
2075}
2076
2077#[cfg(feature = "pulsar-backend")]
2079pub mod pulsar_backend {
2080 use super::*;
2081 use base64::{Engine as _, engine::general_purpose};
2082 use pulsar::{Authentication, Pulsar, TokioExecutor, consumer, message::proto::MessageIdData, producer};
2083 use std::{collections::HashMap, sync::Arc, time::Duration};
2084 use tokio::sync::Mutex;
2085 use uuid::Uuid;
2086
2087 #[derive(Debug, Clone)]
2089 pub struct PulsarConfig {
2090 pub brokers: String,
2092 pub authentication: Option<Authentication>,
2094 pub producer_options: producer::ProducerOptions,
2096 pub consumer_options: consumer::ConsumerOptions,
2098 }
2099
2100 impl Default for PulsarConfig {
2101 fn default() -> Self {
2102 Self {
2103 brokers: "pulsar://localhost:6650".to_string(),
2104 authentication: None,
2105 producer_options: producer::ProducerOptions::default(),
2106 consumer_options: consumer::ConsumerOptions::default(),
2107 }
2108 }
2109 }
2110
2111 impl PulsarConfig {
2112 pub fn new(brokers: impl Into<String>) -> Self {
2114 Self { brokers: brokers.into(), ..Default::default() }
2115 }
2116 }
2117
2118 pub struct PulsarQueueManager {
2120 client: Arc<Pulsar<TokioExecutor>>,
2121 }
2122
2123 impl PulsarQueueManager {
2124 pub fn new(client: Arc<Pulsar<TokioExecutor>>) -> Self {
2126 Self { client }
2127 }
2128
2129 pub async fn from_config(config: &PulsarConfig) -> WaeResult<Self> {
2131 let mut builder = Pulsar::builder(&config.brokers, TokioExecutor);
2132 if let Some(auth) = &config.authentication {
2133 builder = builder.with_auth(auth.clone());
2134 }
2135 let client =
2136 builder.build().await.map_err(|e| WaeError::internal(format!("Failed to create Pulsar client: {}", e)))?;
2137 Ok(Self::new(Arc::new(client)))
2138 }
2139
2140 fn topic_name(queue: &str) -> String {
2142 format!("persistent://public/default/{}", queue)
2143 }
2144 }
2145
2146 #[async_trait::async_trait]
2147 impl QueueManager for PulsarQueueManager {
2148 async fn declare_queue(&self, _config: &QueueConfig) -> WaeResult<()> {
2149 Ok(())
2150 }
2151
2152 async fn delete_queue(&self, _name: &str) -> WaeResult<()> {
2153 Ok(())
2154 }
2155
2156 async fn queue_exists(&self, _name: &str) -> WaeResult<bool> {
2157 Ok(true)
2158 }
2159
2160 async fn queue_message_count(&self, _name: &str) -> WaeResult<u64> {
2161 Ok(0)
2162 }
2163
2164 async fn purge_queue(&self, _name: &str) -> WaeResult<u64> {
2165 Ok(0)
2166 }
2167 }
2168
2169 pub struct PulsarProducerBackend {
2171 config: ProducerConfig,
2172 producer: Arc<producer::Producer<TokioExecutor>>,
2173 manager: Arc<PulsarQueueManager>,
2174 }
2175
2176 impl PulsarProducerBackend {
2177 pub fn new(
2179 config: ProducerConfig,
2180 manager: Arc<PulsarQueueManager>,
2181 producer: Arc<producer::Producer<TokioExecutor>>,
2182 ) -> Self {
2183 Self { config, producer, manager }
2184 }
2185
2186 fn encode_metadata(metadata: &MessageMetadata) -> HashMap<String, String> {
2188 let mut fields = HashMap::new();
2189
2190 if let Some(id) = &metadata.id {
2191 fields.insert("id".to_string(), id.clone());
2192 }
2193 if let Some(correlation_id) = &metadata.correlation_id {
2194 fields.insert("correlation_id".to_string(), correlation_id.clone());
2195 }
2196 if let Some(reply_to) = &metadata.reply_to {
2197 fields.insert("reply_to".to_string(), reply_to.clone());
2198 }
2199 if let Some(content_type) = &metadata.content_type {
2200 fields.insert("content_type".to_string(), content_type.clone());
2201 }
2202 if let Some(timestamp) = metadata.timestamp {
2203 fields.insert("timestamp".to_string(), timestamp.to_string());
2204 }
2205 if let Some(priority) = metadata.priority {
2206 fields.insert("priority".to_string(), priority.to_string());
2207 }
2208 if let Some(expiration) = metadata.expiration {
2209 fields.insert("expiration".to_string(), expiration.to_string());
2210 }
2211
2212 for (key, value) in &metadata.headers {
2213 fields.insert(format!("header:{}", key), value.clone());
2214 }
2215
2216 fields
2217 }
2218 }
2219
2220 #[async_trait::async_trait]
2221 impl ProducerBackend for PulsarProducerBackend {
2222 async fn send_raw(&self, queue: &str, message: &RawMessage) -> WaeResult<MessageId> {
2223 let id = Uuid::new_v4().to_string();
2224 let mut metadata = message.metadata.clone();
2225 metadata.id = Some(id.clone());
2226 if metadata.timestamp.is_none() {
2227 metadata.timestamp =
2228 Some(std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as u64);
2229 }
2230
2231 let data_b64 = general_purpose::STANDARD.encode(&message.data);
2232 let mut fields = Self::encode_metadata(&metadata);
2233 fields.insert("data".to_string(), data_b64);
2234
2235 let payload = serde_json::to_vec(&fields)
2236 .map_err(|e| WaeError::internal(format!("Failed to serialize Pulsar message: {}", e)))?;
2237
2238 let message_id = self
2239 .producer
2240 .send(payload)
2241 .await
2242 .map_err(|e| WaeError::internal(format!("Failed to send Pulsar message: {}", e)))?
2243 .await
2244 .map_err(|e| WaeError::internal(format!("Failed to confirm Pulsar message: {}", e)))?;
2245
2246 Ok(id)
2247 }
2248
2249 async fn send_raw_default(&self, message: &RawMessage) -> WaeResult<MessageId> {
2250 let queue = self.config.default_queue.as_ref().ok_or_else(|| WaeError::config_missing("default_queue"))?;
2251 self.send_raw(queue, message).await
2252 }
2253
2254 async fn send_raw_delayed(&self, queue: &str, message: &RawMessage, delay: Duration) -> WaeResult<MessageId> {
2255 let id = Uuid::new_v4().to_string();
2256 let message_clone = message.clone();
2257 let producer = self.producer.clone();
2258 let manager = self.manager.clone();
2259 let queue = queue.to_string();
2260
2261 tokio::spawn(async move {
2262 tokio::time::sleep(delay).await;
2263 let mut producer_config = ProducerConfig::default();
2264 producer_config.default_queue = Some(queue.clone());
2265 let mut producer_backend = PulsarProducerBackend::new(producer_config, manager.clone(), producer.clone());
2266 let _ = producer_backend.send_raw(&queue, &message_clone).await;
2267 });
2268
2269 Ok(id)
2270 }
2271
2272 async fn send_raw_batch(&self, queue: &str, messages: &[RawMessage]) -> WaeResult<Vec<MessageId>> {
2273 let mut ids = Vec::with_capacity(messages.len());
2274 for msg in messages {
2275 ids.push(self.send_raw(queue, msg).await?);
2276 }
2277 Ok(ids)
2278 }
2279
2280 fn config(&self) -> &ProducerConfig {
2281 &self.config
2282 }
2283 }
2284
2285 pub struct PulsarConsumerBackend {
2287 config: ConsumerConfig,
2288 consumer: Arc<Mutex<consumer::Consumer<TokioExecutor>>>,
2289 manager: Arc<PulsarQueueManager>,
2290 delivery_tags: Arc<Mutex<HashMap<u64, MessageIdData>>>,
2291 next_delivery_tag: Arc<Mutex<u64>>,
2292 }
2293
2294 impl PulsarConsumerBackend {
2295 pub fn new(
2297 config: ConsumerConfig,
2298 manager: Arc<PulsarQueueManager>,
2299 consumer: consumer::Consumer<TokioExecutor>,
2300 ) -> Self {
2301 Self {
2302 config,
2303 consumer: Arc::new(Mutex::new(consumer)),
2304 manager,
2305 delivery_tags: Arc::new(Mutex::new(HashMap::new())),
2306 next_delivery_tag: Arc::new(Mutex::new(1)),
2307 }
2308 }
2309
2310 fn decode_metadata(fields: &HashMap<String, String>) -> MessageMetadata {
2312 let mut metadata = MessageMetadata::default();
2313
2314 if let Some(id) = fields.get("id") {
2315 metadata.id = Some(id.clone());
2316 }
2317 if let Some(correlation_id) = fields.get("correlation_id") {
2318 metadata.correlation_id = Some(correlation_id.clone());
2319 }
2320 if let Some(reply_to) = fields.get("reply_to") {
2321 metadata.reply_to = Some(reply_to.clone());
2322 }
2323 if let Some(content_type) = fields.get("content_type") {
2324 metadata.content_type = Some(content_type.clone());
2325 }
2326 if let Some(timestamp) = fields.get("timestamp").and_then(|s| s.parse().ok()) {
2327 metadata.timestamp = Some(timestamp);
2328 }
2329 if let Some(priority) = fields.get("priority").and_then(|s| s.parse().ok()) {
2330 metadata.priority = Some(priority);
2331 }
2332 if let Some(expiration) = fields.get("expiration").and_then(|s| s.parse().ok()) {
2333 metadata.expiration = Some(expiration);
2334 }
2335
2336 for (key, value) in fields {
2337 if let Some(header_key) = key.strip_prefix("header:") {
2338 metadata.headers.insert(header_key.to_string(), value.clone());
2339 }
2340 }
2341
2342 metadata
2343 }
2344 }
2345
2346 #[async_trait::async_trait]
2347 impl ConsumerBackend for PulsarConsumerBackend {
2348 async fn receive_raw(&self) -> WaeResult<Option<ReceivedRawMessage>> {
2349 match tokio::time::timeout(Duration::from_secs(1), async {
2350 let mut consumer = self.consumer.lock().await;
2351 consumer.next().await
2352 })
2353 .await
2354 {
2355 Ok(Some(msg_result)) => {
2356 let msg = msg_result.map_err(|e| WaeError::internal(format!("Failed to receive from Pulsar: {}", e)))?;
2357 let payload = msg.payload.data.clone();
2358 let fields: HashMap<String, String> = serde_json::from_slice(&payload)
2359 .map_err(|e| WaeError::internal(format!("Failed to deserialize Pulsar message: {}", e)))?;
2360 let data_b64 = fields.get("data").ok_or_else(|| WaeError::internal("Missing data field"))?;
2361 let data = general_purpose::STANDARD
2362 .decode(data_b64)
2363 .map_err(|e| WaeError::internal(format!("Failed to decode data: {}", e)))?;
2364 let metadata = Self::decode_metadata(&fields);
2365
2366 let mut next_tag = self.next_delivery_tag.lock().await;
2367 let delivery_tag = *next_tag;
2368 *next_tag += 1;
2369 drop(next_tag);
2370
2371 let mut delivery_tags = self.delivery_tags.lock().await;
2372 delivery_tags.insert(delivery_tag, msg.message_id.clone());
2373
2374 let redelivery_count = metadata.headers.get("x-redelivery-count").and_then(|s| s.parse().ok()).unwrap_or(0);
2375
2376 let raw_message = RawMessage { data, metadata };
2377 Ok(Some(ReceivedRawMessage { message: raw_message, delivery_tag, redelivery_count }))
2378 }
2379 Ok(None) => Ok(None),
2380 Err(_) => Ok(None),
2381 }
2382 }
2383
2384 async fn ack(&self, delivery_tag: u64) -> WaeResult<()> {
2385 let mut delivery_tags = self.delivery_tags.lock().await;
2386 if let Some(message_id) = delivery_tags.remove(&delivery_tag) {
2387 let mut consumer = self.consumer.lock().await;
2388 consumer
2389 .ack(&message_id)
2390 .await
2391 .map_err(|e| WaeError::internal(format!("Failed to ack Pulsar message: {}", e)))?;
2392 }
2393 Ok(())
2394 }
2395
2396 async fn nack(&self, delivery_tag: u64, requeue: bool) -> WaeResult<()> {
2397 let mut delivery_tags = self.delivery_tags.lock().await;
2398 if let Some(message_id) = delivery_tags.remove(&delivery_tag) {
2399 let mut consumer = self.consumer.lock().await;
2400 if requeue {
2401 consumer
2402 .nack_with_redelivery(&message_id)
2403 .await
2404 .map_err(|e| WaeError::internal(format!("Failed to nack Pulsar message with redelivery: {}", e)))?;
2405 }
2406 else {
2407 consumer
2408 .ack(&message_id)
2409 .await
2410 .map_err(|e| WaeError::internal(format!("Failed to ack Pulsar message in nack: {}", e)))?;
2411 }
2412 }
2413 Ok(())
2414 }
2415
2416 fn config(&self) -> &ConsumerConfig {
2417 &self.config
2418 }
2419 }
2420
2421 pub struct PulsarQueueService {
2423 manager: Arc<PulsarQueueManager>,
2424 client: Arc<Pulsar<TokioExecutor>>,
2425 producer_options: producer::ProducerOptions,
2426 consumer_options: consumer::ConsumerOptions,
2427 }
2428
2429 impl PulsarQueueService {
2430 pub async fn new(config: PulsarConfig) -> WaeResult<Self> {
2432 let manager = PulsarQueueManager::from_config(&config).await?;
2433 let client = manager.client.clone();
2434 Ok(Self {
2435 manager: Arc::new(manager),
2436 client,
2437 producer_options: config.producer_options,
2438 consumer_options: config.consumer_options,
2439 })
2440 }
2441 }
2442
2443 impl QueueService for PulsarQueueService {
2444 async fn create_producer(&self, config: ProducerConfig) -> WaeResult<MessageProducer> {
2445 let topic = config
2446 .default_queue
2447 .as_ref()
2448 .map(|q| PulsarQueueManager::topic_name(q))
2449 .unwrap_or_else(|| PulsarQueueManager::topic_name("default"));
2450
2451 let producer: producer::Producer<TokioExecutor> = self
2452 .client
2453 .producer()
2454 .with_options(self.producer_options.clone())
2455 .with_topic(topic)
2456 .build()
2457 .await
2458 .map_err(|e| WaeError::internal(format!("Failed to create Pulsar producer: {}", e)))?;
2459
2460 Ok(MessageProducer::new(Box::new(PulsarProducerBackend::new(config, self.manager.clone(), Arc::new(producer)))))
2461 }
2462
2463 async fn create_consumer(&self, config: ConsumerConfig) -> WaeResult<MessageConsumer> {
2464 let topic = PulsarQueueManager::topic_name(&config.queue);
2465 let consumer_name = config.consumer_tag.clone().unwrap_or_else(|| format!("wae-consumer-{}", Uuid::new_v4()));
2466 let subscription = format!("wae-subscription-{}", Uuid::new_v4());
2467
2468 let consumer: consumer::Consumer<TokioExecutor> = self
2469 .client
2470 .consumer()
2471 .with_options(self.consumer_options.clone())
2472 .with_topic(topic)
2473 .with_consumer_name(consumer_name)
2474 .with_subscription(subscription)
2475 .with_subscription_type(consumer::SubscriptionType::Exclusive)
2476 .build()
2477 .await
2478 .map_err(|e| WaeError::internal(format!("Failed to create Pulsar consumer: {}", e)))?;
2479
2480 let backend = PulsarConsumerBackend::new(config, self.manager.clone(), consumer);
2481
2482 Ok(MessageConsumer::new(Box::new(backend)))
2483 }
2484
2485 fn manager(&self) -> &dyn QueueManager {
2486 self.manager.as_ref() as &dyn QueueManager
2487 }
2488
2489 async fn close(&self) -> WaeResult<()> {
2490 Ok(())
2491 }
2492 }
2493}