1use serde::{Serialize, de::DeserializeOwned};
4use std::time::Duration;
5use wae_types::{WaeError, WaeResult};
6
7pub type MessageId = String;
9
10pub type QueueName = String;
12
13#[derive(Debug, Clone, Default)]
15pub struct MessageMetadata {
16 pub id: Option<MessageId>,
18 pub correlation_id: Option<String>,
20 pub reply_to: Option<QueueName>,
22 pub content_type: Option<String>,
24 pub timestamp: Option<u64>,
26 pub priority: Option<u8>,
28 pub expiration: Option<u64>,
30 pub headers: std::collections::HashMap<String, String>,
32}
33
34#[derive(Debug, Clone)]
36pub struct RawMessage {
37 pub data: Vec<u8>,
39 pub metadata: MessageMetadata,
41}
42
43impl RawMessage {
44 pub fn new(data: Vec<u8>) -> Self {
46 Self { data, metadata: MessageMetadata::default() }
47 }
48
49 pub fn with_correlation_id(mut self, id: impl Into<String>) -> Self {
51 self.metadata.correlation_id = Some(id.into());
52 self
53 }
54
55 pub fn with_reply_to(mut self, queue: impl Into<String>) -> Self {
57 self.metadata.reply_to = Some(queue.into());
58 self
59 }
60
61 pub fn with_priority(mut self, priority: u8) -> Self {
63 self.metadata.priority = Some(priority.min(9));
64 self
65 }
66
67 pub fn with_expiration(mut self, ms: u64) -> Self {
69 self.metadata.expiration = Some(ms);
70 self
71 }
72
73 pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
75 self.metadata.headers.insert(key.into(), value.into());
76 self
77 }
78}
79
80#[derive(Debug, Clone)]
82pub struct Message<T> {
83 pub payload: T,
85 pub metadata: MessageMetadata,
87}
88
89impl<T> Message<T> {
90 pub fn new(payload: T) -> Self {
92 Self { payload, metadata: MessageMetadata::default() }
93 }
94
95 pub fn with_correlation_id(mut self, id: impl Into<String>) -> Self {
97 self.metadata.correlation_id = Some(id.into());
98 self
99 }
100
101 pub fn with_reply_to(mut self, queue: impl Into<String>) -> Self {
103 self.metadata.reply_to = Some(queue.into());
104 self
105 }
106
107 pub fn with_priority(mut self, priority: u8) -> Self {
109 self.metadata.priority = Some(priority.min(9));
110 self
111 }
112
113 pub fn with_expiration(mut self, ms: u64) -> Self {
115 self.metadata.expiration = Some(ms);
116 self
117 }
118
119 pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
121 self.metadata.headers.insert(key.into(), value.into());
122 self
123 }
124
125 pub fn into_raw(self) -> WaeResult<RawMessage>
127 where
128 T: Serialize,
129 {
130 let data = serde_json::to_vec(&self.payload).map_err(|_e| WaeError::serialization_failed("Message"))?;
131 Ok(RawMessage { data, metadata: self.metadata })
132 }
133
134 pub fn to_raw(&self) -> WaeResult<RawMessage>
136 where
137 T: Serialize,
138 {
139 let data = serde_json::to_vec(&self.payload).map_err(|_e| WaeError::serialization_failed("Message"))?;
140 Ok(RawMessage { data, metadata: self.metadata.clone() })
141 }
142}
143
144impl RawMessage {
145 pub fn into_typed<T: DeserializeOwned>(self) -> WaeResult<Message<T>> {
147 let payload = serde_json::from_slice(&self.data).map_err(|_e| WaeError::deserialization_failed("Message"))?;
148 Ok(Message { payload, metadata: self.metadata })
149 }
150}
151
152#[derive(Debug)]
154pub struct ReceivedRawMessage {
155 pub message: RawMessage,
157 pub delivery_tag: u64,
159 pub redelivery_count: u32,
161}
162
163#[derive(Debug)]
165pub struct ReceivedMessage<T> {
166 pub message: Message<T>,
168 pub delivery_tag: u64,
170 pub redelivery_count: u32,
172}
173
174#[derive(Debug, Clone)]
176pub struct QueueConfig {
177 pub name: QueueName,
179 pub durable: bool,
181 pub auto_delete: bool,
183 pub max_messages: Option<u64>,
185 pub max_message_size: Option<u64>,
187 pub message_ttl: Option<u64>,
189 pub dead_letter_queue: Option<QueueName>,
191}
192
193impl QueueConfig {
194 pub fn new(name: impl Into<String>) -> Self {
196 Self {
197 name: name.into(),
198 durable: true,
199 auto_delete: false,
200 max_messages: None,
201 max_message_size: None,
202 message_ttl: None,
203 dead_letter_queue: None,
204 }
205 }
206
207 pub fn durable(mut self, durable: bool) -> Self {
209 self.durable = durable;
210 self
211 }
212
213 pub fn auto_delete(mut self, auto_delete: bool) -> Self {
215 self.auto_delete = auto_delete;
216 self
217 }
218
219 pub fn max_messages(mut self, max: u64) -> Self {
221 self.max_messages = Some(max);
222 self
223 }
224
225 pub fn message_ttl(mut self, ttl_ms: u64) -> Self {
227 self.message_ttl = Some(ttl_ms);
228 self
229 }
230
231 pub fn dead_letter_queue(mut self, queue: impl Into<String>) -> Self {
233 self.dead_letter_queue = Some(queue.into());
234 self
235 }
236}
237
238#[derive(Debug, Clone)]
240pub struct ProducerConfig {
241 pub default_queue: Option<QueueName>,
243 pub confirm_timeout: Duration,
245 pub retry_count: u32,
247 pub retry_interval: Duration,
249}
250
251impl Default for ProducerConfig {
252 fn default() -> Self {
253 Self {
254 default_queue: None,
255 confirm_timeout: Duration::from_secs(5),
256 retry_count: 3,
257 retry_interval: Duration::from_millis(100),
258 }
259 }
260}
261
262#[derive(Debug, Clone)]
264pub struct ConsumerConfig {
265 pub queue: QueueName,
267 pub consumer_tag: Option<String>,
269 pub auto_ack: bool,
271 pub prefetch_count: u16,
273 pub exclusive: bool,
275}
276
277impl ConsumerConfig {
278 pub fn new(queue: impl Into<String>) -> Self {
280 Self { queue: queue.into(), consumer_tag: None, auto_ack: false, prefetch_count: 10, exclusive: false }
281 }
282
283 pub fn auto_ack(mut self, auto_ack: bool) -> Self {
285 self.auto_ack = auto_ack;
286 self
287 }
288
289 pub fn prefetch(mut self, count: u16) -> Self {
291 self.prefetch_count = count;
292 self
293 }
294}