1use crate::error::QueueError;
4use crate::message::{
5 Message, MessageId, QueueName, ReceiptHandle, ReceivedMessage, SessionId, Timestamp,
6};
7use crate::provider::{InMemoryConfig, ProviderConfig, ProviderType, QueueConfig, SessionSupport};
8use crate::providers::InMemoryProvider;
9use async_trait::async_trait;
10use chrono::Duration;
11
12#[cfg(test)]
13#[path = "client_tests.rs"]
14mod tests;
15
16#[async_trait]
18pub trait QueueClient: Send + Sync {
19 async fn send_message(
21 &self,
22 queue: &QueueName,
23 message: Message,
24 ) -> Result<MessageId, QueueError>;
25
26 async fn send_messages(
28 &self,
29 queue: &QueueName,
30 messages: Vec<Message>,
31 ) -> Result<Vec<MessageId>, QueueError>;
32
33 async fn receive_message(
35 &self,
36 queue: &QueueName,
37 timeout: Duration,
38 ) -> Result<Option<ReceivedMessage>, QueueError>;
39
40 async fn receive_messages(
42 &self,
43 queue: &QueueName,
44 max_messages: u32,
45 timeout: Duration,
46 ) -> Result<Vec<ReceivedMessage>, QueueError>;
47
48 async fn complete_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError>;
50
51 async fn abandon_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError>;
53
54 async fn dead_letter_message(
56 &self,
57 receipt: ReceiptHandle,
58 reason: String,
59 ) -> Result<(), QueueError>;
60
61 async fn accept_session(
63 &self,
64 queue: &QueueName,
65 session_id: Option<SessionId>,
66 ) -> Result<Box<dyn SessionClient>, QueueError>;
67
68 fn provider_type(&self) -> ProviderType;
70
71 fn supports_sessions(&self) -> bool;
73
74 fn supports_batching(&self) -> bool;
76}
77
78#[async_trait]
80pub trait SessionClient: Send + Sync {
81 async fn receive_message(
83 &self,
84 timeout: Duration,
85 ) -> Result<Option<ReceivedMessage>, QueueError>;
86
87 async fn complete_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError>;
89
90 async fn abandon_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError>;
92
93 async fn dead_letter_message(
95 &self,
96 receipt: ReceiptHandle,
97 reason: String,
98 ) -> Result<(), QueueError>;
99
100 async fn renew_session_lock(&self) -> Result<(), QueueError>;
102
103 async fn close_session(&self) -> Result<(), QueueError>;
105
106 fn session_id(&self) -> &SessionId;
108
109 fn session_expires_at(&self) -> Timestamp;
111}
112
113#[async_trait]
115pub trait QueueProvider: Send + Sync {
116 async fn send_message(
118 &self,
119 queue: &QueueName,
120 message: &Message,
121 ) -> Result<MessageId, QueueError>;
122
123 async fn send_messages(
125 &self,
126 queue: &QueueName,
127 messages: &[Message],
128 ) -> Result<Vec<MessageId>, QueueError>;
129
130 async fn receive_message(
132 &self,
133 queue: &QueueName,
134 timeout: Duration,
135 ) -> Result<Option<ReceivedMessage>, QueueError>;
136
137 async fn receive_messages(
139 &self,
140 queue: &QueueName,
141 max_messages: u32,
142 timeout: Duration,
143 ) -> Result<Vec<ReceivedMessage>, QueueError>;
144
145 async fn complete_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError>;
147
148 async fn abandon_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError>;
150
151 async fn dead_letter_message(
153 &self,
154 receipt: &ReceiptHandle,
155 reason: &str,
156 ) -> Result<(), QueueError>;
157
158 async fn create_session_client(
160 &self,
161 queue: &QueueName,
162 session_id: Option<SessionId>,
163 ) -> Result<Box<dyn SessionProvider>, QueueError>;
164
165 fn provider_type(&self) -> ProviderType;
167
168 fn supports_sessions(&self) -> SessionSupport;
170
171 fn supports_batching(&self) -> bool;
173
174 fn max_batch_size(&self) -> u32;
176}
177
178#[async_trait]
180pub trait SessionProvider: Send + Sync {
181 async fn receive_message(
183 &self,
184 timeout: Duration,
185 ) -> Result<Option<ReceivedMessage>, QueueError>;
186
187 async fn complete_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError>;
189
190 async fn abandon_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError>;
192
193 async fn dead_letter_message(
195 &self,
196 receipt: &ReceiptHandle,
197 reason: &str,
198 ) -> Result<(), QueueError>;
199
200 async fn renew_session_lock(&self) -> Result<(), QueueError>;
202
203 async fn close_session(&self) -> Result<(), QueueError>;
205
206 fn session_id(&self) -> &SessionId;
208
209 fn session_expires_at(&self) -> Timestamp;
211}
212
213pub struct QueueClientFactory;
215
216impl QueueClientFactory {
217 pub async fn create_client(config: QueueConfig) -> Result<Box<dyn QueueClient>, QueueError> {
219 let client_config = config.clone();
221
222 let provider: Box<dyn QueueProvider> = match config.provider {
224 ProviderConfig::InMemory(in_memory_config) => {
225 Box::new(InMemoryProvider::new(in_memory_config))
226 }
227 ProviderConfig::AzureServiceBus(azure_config) => {
228 let azure_provider = crate::providers::AzureServiceBusProvider::new(azure_config)
229 .await
230 .map_err(|e| e.to_queue_error())?;
231 Box::new(azure_provider)
232 }
233 ProviderConfig::AwsSqs(aws_config) => {
234 let aws_provider = crate::providers::AwsSqsProvider::new(aws_config)
235 .await
236 .map_err(|e| e.to_queue_error())?;
237 Box::new(aws_provider)
238 }
239 };
240
241 Ok(Box::new(StandardQueueClient::new(provider, client_config)))
243 }
244
245 pub fn create_test_client() -> Box<dyn QueueClient> {
247 let provider = InMemoryProvider::new(InMemoryConfig::default());
248 let config = QueueConfig::default();
249 Box::new(StandardQueueClient::new(Box::new(provider), config))
250 }
251}
252
253pub struct StandardQueueClient {
255 provider: Box<dyn QueueProvider>,
256 #[allow(dead_code)] config: QueueConfig,
258}
259
260impl StandardQueueClient {
261 pub fn new(provider: Box<dyn QueueProvider>, config: QueueConfig) -> Self {
263 Self { provider, config }
264 }
265}
266
267#[async_trait]
268impl QueueClient for StandardQueueClient {
269 async fn send_message(
270 &self,
271 queue: &QueueName,
272 message: Message,
273 ) -> Result<MessageId, QueueError> {
274 self.provider.send_message(queue, &message).await
275 }
276
277 async fn send_messages(
278 &self,
279 queue: &QueueName,
280 messages: Vec<Message>,
281 ) -> Result<Vec<MessageId>, QueueError> {
282 self.provider.send_messages(queue, &messages).await
284 }
285
286 async fn receive_message(
287 &self,
288 queue: &QueueName,
289 timeout: Duration,
290 ) -> Result<Option<ReceivedMessage>, QueueError> {
291 self.provider.receive_message(queue, timeout).await
292 }
293
294 async fn receive_messages(
295 &self,
296 queue: &QueueName,
297 max_messages: u32,
298 timeout: Duration,
299 ) -> Result<Vec<ReceivedMessage>, QueueError> {
300 self.provider
301 .receive_messages(queue, max_messages, timeout)
302 .await
303 }
304
305 async fn complete_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError> {
306 self.provider.complete_message(&receipt).await
307 }
308
309 async fn abandon_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError> {
310 self.provider.abandon_message(&receipt).await
311 }
312
313 async fn dead_letter_message(
314 &self,
315 receipt: ReceiptHandle,
316 reason: String,
317 ) -> Result<(), QueueError> {
318 self.provider.dead_letter_message(&receipt, &reason).await
319 }
320
321 async fn accept_session(
322 &self,
323 queue: &QueueName,
324 session_id: Option<SessionId>,
325 ) -> Result<Box<dyn SessionClient>, QueueError> {
326 let session_provider = self
327 .provider
328 .create_session_client(queue, session_id)
329 .await?;
330 Ok(Box::new(StandardSessionClient::new(session_provider)))
331 }
332
333 fn provider_type(&self) -> ProviderType {
334 self.provider.provider_type()
335 }
336
337 fn supports_sessions(&self) -> bool {
338 matches!(
339 self.provider.supports_sessions(),
340 SessionSupport::Native | SessionSupport::Emulated
341 )
342 }
343
344 fn supports_batching(&self) -> bool {
345 self.provider.supports_batching()
346 }
347}
348
349struct StandardSessionClient {
351 provider: Box<dyn SessionProvider>,
352}
353
354impl StandardSessionClient {
355 fn new(provider: Box<dyn SessionProvider>) -> Self {
356 Self { provider }
357 }
358}
359
360#[async_trait]
361impl SessionClient for StandardSessionClient {
362 async fn receive_message(
363 &self,
364 timeout: Duration,
365 ) -> Result<Option<ReceivedMessage>, QueueError> {
366 self.provider.receive_message(timeout).await
367 }
368
369 async fn complete_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError> {
370 self.provider.complete_message(&receipt).await
371 }
372
373 async fn abandon_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError> {
374 self.provider.abandon_message(&receipt).await
375 }
376
377 async fn dead_letter_message(
378 &self,
379 receipt: ReceiptHandle,
380 reason: String,
381 ) -> Result<(), QueueError> {
382 self.provider.dead_letter_message(&receipt, &reason).await
383 }
384
385 async fn renew_session_lock(&self) -> Result<(), QueueError> {
386 self.provider.renew_session_lock().await
387 }
388
389 async fn close_session(&self) -> Result<(), QueueError> {
390 self.provider.close_session().await
391 }
392
393 fn session_id(&self) -> &SessionId {
394 self.provider.session_id()
395 }
396
397 fn session_expires_at(&self) -> Timestamp {
398 self.provider.session_expires_at()
399 }
400}