1use super::{WaeResult, types::*};
4use serde::Serialize;
5use std::time::Duration;
6
7#[async_trait::async_trait]
9pub trait ProducerBackend: Send + Sync {
10 async fn send_raw(&self, queue: &str, message: &RawMessage) -> WaeResult<MessageId>;
12
13 async fn send_raw_default(&self, message: &RawMessage) -> WaeResult<MessageId>;
15
16 async fn send_raw_delayed(&self, queue: &str, message: &RawMessage, delay: Duration) -> WaeResult<MessageId>;
18
19 async fn send_raw_batch(&self, queue: &str, messages: &[RawMessage]) -> WaeResult<Vec<MessageId>>;
21
22 fn config(&self) -> &ProducerConfig;
24}
25
26pub struct MessageProducer {
28 backend: Box<dyn ProducerBackend>,
29}
30
31impl MessageProducer {
32 pub fn new(backend: Box<dyn ProducerBackend>) -> Self {
34 Self { backend }
35 }
36
37 pub async fn send<T: Serialize + Send + Sync>(&self, queue: &str, message: &Message<T>) -> WaeResult<MessageId> {
39 let raw = message.to_raw()?;
40 self.backend.send_raw(queue, &raw).await
41 }
42
43 pub async fn send_default<T: Serialize + Send + Sync>(&self, message: &Message<T>) -> WaeResult<MessageId> {
45 let raw = message.to_raw()?;
46 self.backend.send_raw_default(&raw).await
47 }
48
49 pub async fn send_delayed<T: Serialize + Send + Sync>(
51 &self,
52 queue: &str,
53 message: &Message<T>,
54 delay: Duration,
55 ) -> WaeResult<MessageId> {
56 let raw = message.to_raw()?;
57 self.backend.send_raw_delayed(queue, &raw, delay).await
58 }
59
60 pub async fn send_batch<T: Serialize + Send + Sync>(
62 &self,
63 queue: &str,
64 messages: &[Message<T>],
65 ) -> WaeResult<Vec<MessageId>> {
66 let raw_messages: Vec<RawMessage> = messages.iter().map(|m| m.to_raw()).collect::<WaeResult<_>>()?;
67 self.backend.send_raw_batch(queue, &raw_messages).await
68 }
69
70 pub fn config(&self) -> &ProducerConfig {
72 self.backend.config()
73 }
74}