1use crate::error::QueueError;
4use crate::message::{
5 Message, MessageId, QueueName, ReceiptHandle, ReceivedMessage, SessionId, Timestamp,
6};
7use crate::provider::{InMemoryConfig, ProviderConfig, ProviderType, QueueConfig, SessionSupport};
8use crate::providers::InMemoryProvider;
9use async_trait::async_trait;
10use chrono::Duration;
11
12#[cfg(test)]
13#[path = "client_tests.rs"]
14mod tests;
15
16#[async_trait]
18pub trait QueueClient: Send + Sync {
19 async fn send_message(
21 &self,
22 queue: &QueueName,
23 message: Message,
24 ) -> Result<MessageId, QueueError>;
25
26 async fn send_messages(
28 &self,
29 queue: &QueueName,
30 messages: Vec<Message>,
31 ) -> Result<Vec<MessageId>, QueueError>;
32
33 async fn receive_message(
43 &self,
44 queue: &QueueName,
45 timeout: Duration,
46 ) -> Result<Option<ReceivedMessage>, QueueError>;
47
48 async fn receive_messages(
50 &self,
51 queue: &QueueName,
52 max_messages: u32,
53 timeout: Duration,
54 ) -> Result<Vec<ReceivedMessage>, QueueError>;
55
56 async fn complete_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError>;
58
59 async fn abandon_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError>;
61
62 async fn dead_letter_message(
64 &self,
65 receipt: ReceiptHandle,
66 reason: String,
67 ) -> Result<(), QueueError>;
68
69 async fn accept_session(
80 &self,
81 queue: &QueueName,
82 session_id: Option<SessionId>,
83 ) -> Result<Box<dyn SessionClient>, QueueError>;
84
85 fn provider_type(&self) -> ProviderType;
87
88 fn supports_sessions(&self) -> bool;
90
91 fn supports_batching(&self) -> bool;
93}
94
95#[async_trait]
97pub trait SessionClient: Send + Sync {
98 async fn receive_message(
100 &self,
101 timeout: Duration,
102 ) -> Result<Option<ReceivedMessage>, QueueError>;
103
104 async fn complete_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError>;
106
107 async fn abandon_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError>;
109
110 async fn dead_letter_message(
112 &self,
113 receipt: ReceiptHandle,
114 reason: String,
115 ) -> Result<(), QueueError>;
116
117 async fn renew_session_lock(&self) -> Result<(), QueueError>;
119
120 async fn close_session(&self) -> Result<(), QueueError>;
122
123 fn session_id(&self) -> &SessionId;
125
126 fn session_expires_at(&self) -> Timestamp;
128}
129
130#[async_trait]
132pub trait QueueProvider: Send + Sync {
133 async fn send_message(
135 &self,
136 queue: &QueueName,
137 message: &Message,
138 ) -> Result<MessageId, QueueError>;
139
140 async fn send_messages(
142 &self,
143 queue: &QueueName,
144 messages: &[Message],
145 ) -> Result<Vec<MessageId>, QueueError>;
146
147 async fn receive_message(
149 &self,
150 queue: &QueueName,
151 timeout: Duration,
152 ) -> Result<Option<ReceivedMessage>, QueueError>;
153
154 async fn receive_messages(
156 &self,
157 queue: &QueueName,
158 max_messages: u32,
159 timeout: Duration,
160 ) -> Result<Vec<ReceivedMessage>, QueueError>;
161
162 async fn complete_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError>;
164
165 async fn abandon_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError>;
167
168 async fn dead_letter_message(
170 &self,
171 receipt: &ReceiptHandle,
172 reason: &str,
173 ) -> Result<(), QueueError>;
174
175 async fn create_session_client(
177 &self,
178 queue: &QueueName,
179 session_id: Option<SessionId>,
180 ) -> Result<Box<dyn SessionProvider>, QueueError>;
181
182 fn provider_type(&self) -> ProviderType;
184
185 fn supports_sessions(&self) -> SessionSupport;
187
188 fn supports_batching(&self) -> bool;
190
191 fn max_batch_size(&self) -> u32;
193}
194
195#[async_trait]
197pub trait SessionProvider: Send + Sync {
198 async fn receive_message(
200 &self,
201 timeout: Duration,
202 ) -> Result<Option<ReceivedMessage>, QueueError>;
203
204 async fn complete_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError>;
206
207 async fn abandon_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError>;
209
210 async fn dead_letter_message(
212 &self,
213 receipt: &ReceiptHandle,
214 reason: &str,
215 ) -> Result<(), QueueError>;
216
217 async fn renew_session_lock(&self) -> Result<(), QueueError>;
219
220 async fn close_session(&self) -> Result<(), QueueError>;
222
223 fn session_id(&self) -> &SessionId;
225
226 fn session_expires_at(&self) -> Timestamp;
228}
229
230pub struct QueueClientFactory;
232
233impl QueueClientFactory {
234 pub async fn create_client(config: QueueConfig) -> Result<Box<dyn QueueClient>, QueueError> {
236 let client_config = config.clone();
238
239 let provider: Box<dyn QueueProvider> = match config.provider {
241 ProviderConfig::InMemory(in_memory_config) => {
242 Box::new(InMemoryProvider::new(in_memory_config))
243 }
244 ProviderConfig::AzureServiceBus(azure_config) => {
245 let azure_provider = crate::providers::AzureServiceBusProvider::new(azure_config)
246 .await
247 .map_err(|e| e.to_queue_error())?;
248 Box::new(azure_provider)
249 }
250 ProviderConfig::AwsSqs(aws_config) => {
251 let aws_provider = crate::providers::AwsSqsProvider::new(aws_config)
252 .await
253 .map_err(|e| e.to_queue_error())?;
254 Box::new(aws_provider)
255 }
256 ProviderConfig::RabbitMq(rmq_config) => {
257 let rmq_provider = crate::providers::RabbitMqProvider::new(rmq_config)
258 .await
259 .map_err(|e| e.to_queue_error())?;
260 Box::new(rmq_provider)
261 }
262 ProviderConfig::Nats(nats_config) => {
263 let nats_provider = crate::providers::NatsProvider::new(nats_config)
264 .await
265 .map_err(|e| e.to_queue_error())?;
266 Box::new(nats_provider)
267 }
268 };
269
270 Ok(Box::new(StandardQueueClient::new(provider, client_config)))
272 }
273
274 pub fn create_test_client() -> Box<dyn QueueClient> {
276 let provider = InMemoryProvider::new(InMemoryConfig::default());
277 let config = QueueConfig::default();
278 Box::new(StandardQueueClient::new(Box::new(provider), config))
279 }
280}
281
282pub struct StandardQueueClient {
284 provider: Box<dyn QueueProvider>,
285 #[allow(dead_code)] config: QueueConfig,
287}
288
289impl StandardQueueClient {
290 pub fn new(provider: Box<dyn QueueProvider>, config: QueueConfig) -> Self {
292 Self { provider, config }
293 }
294}
295
296#[async_trait]
297impl QueueClient for StandardQueueClient {
298 async fn send_message(
299 &self,
300 queue: &QueueName,
301 message: Message,
302 ) -> Result<MessageId, QueueError> {
303 self.provider.send_message(queue, &message).await
304 }
305
306 async fn send_messages(
307 &self,
308 queue: &QueueName,
309 messages: Vec<Message>,
310 ) -> Result<Vec<MessageId>, QueueError> {
311 self.provider.send_messages(queue, &messages).await
313 }
314
315 async fn receive_message(
316 &self,
317 queue: &QueueName,
318 timeout: Duration,
319 ) -> Result<Option<ReceivedMessage>, QueueError> {
320 self.provider.receive_message(queue, timeout).await
321 }
322
323 async fn receive_messages(
324 &self,
325 queue: &QueueName,
326 max_messages: u32,
327 timeout: Duration,
328 ) -> Result<Vec<ReceivedMessage>, QueueError> {
329 self.provider
330 .receive_messages(queue, max_messages, timeout)
331 .await
332 }
333
334 async fn complete_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError> {
335 self.provider.complete_message(&receipt).await
336 }
337
338 async fn abandon_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError> {
339 self.provider.abandon_message(&receipt).await
340 }
341
342 async fn dead_letter_message(
343 &self,
344 receipt: ReceiptHandle,
345 reason: String,
346 ) -> Result<(), QueueError> {
347 self.provider.dead_letter_message(&receipt, &reason).await
348 }
349
350 async fn accept_session(
351 &self,
352 queue: &QueueName,
353 session_id: Option<SessionId>,
354 ) -> Result<Box<dyn SessionClient>, QueueError> {
355 let session_provider = self
356 .provider
357 .create_session_client(queue, session_id)
358 .await?;
359 Ok(Box::new(StandardSessionClient::new(session_provider)))
360 }
361
362 fn provider_type(&self) -> ProviderType {
363 self.provider.provider_type()
364 }
365
366 fn supports_sessions(&self) -> bool {
367 matches!(
368 self.provider.supports_sessions(),
369 SessionSupport::Native | SessionSupport::Emulated
370 )
371 }
372
373 fn supports_batching(&self) -> bool {
374 self.provider.supports_batching()
375 }
376}
377
378struct StandardSessionClient {
380 provider: Box<dyn SessionProvider>,
381}
382
383impl StandardSessionClient {
384 fn new(provider: Box<dyn SessionProvider>) -> Self {
385 Self { provider }
386 }
387}
388
389#[async_trait]
390impl SessionClient for StandardSessionClient {
391 async fn receive_message(
392 &self,
393 timeout: Duration,
394 ) -> Result<Option<ReceivedMessage>, QueueError> {
395 self.provider.receive_message(timeout).await
396 }
397
398 async fn complete_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError> {
399 self.provider.complete_message(&receipt).await
400 }
401
402 async fn abandon_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError> {
403 self.provider.abandon_message(&receipt).await
404 }
405
406 async fn dead_letter_message(
407 &self,
408 receipt: ReceiptHandle,
409 reason: String,
410 ) -> Result<(), QueueError> {
411 self.provider.dead_letter_message(&receipt, &reason).await
412 }
413
414 async fn renew_session_lock(&self) -> Result<(), QueueError> {
415 self.provider.renew_session_lock().await
416 }
417
418 async fn close_session(&self) -> Result<(), QueueError> {
419 self.provider.close_session().await
420 }
421
422 fn session_id(&self) -> &SessionId {
423 self.provider.session_id()
424 }
425
426 fn session_expires_at(&self) -> Timestamp {
427 self.provider.session_expires_at()
428 }
429}