Skip to main content

queue_runtime/
client.rs

1//! Client traits and implementations for queue operations.
2
3use crate::error::QueueError;
4use crate::message::{
5    Message, MessageId, QueueName, ReceiptHandle, ReceivedMessage, SessionId, Timestamp,
6};
7use crate::provider::{InMemoryConfig, ProviderConfig, ProviderType, QueueConfig, SessionSupport};
8use crate::providers::InMemoryProvider;
9use async_trait::async_trait;
10use chrono::Duration;
11
12#[cfg(test)]
13#[path = "client_tests.rs"]
14mod tests;
15
16/// Main interface for queue operations across all providers
17#[async_trait]
18pub trait QueueClient: Send + Sync {
19    /// Send single message to queue
20    async fn send_message(
21        &self,
22        queue: &QueueName,
23        message: Message,
24    ) -> Result<MessageId, QueueError>;
25
26    /// Send multiple messages in batch (if supported)
27    async fn send_messages(
28        &self,
29        queue: &QueueName,
30        messages: Vec<Message>,
31    ) -> Result<Vec<MessageId>, QueueError>;
32
33    /// Receive single message from queue
34    async fn receive_message(
35        &self,
36        queue: &QueueName,
37        timeout: Duration,
38    ) -> Result<Option<ReceivedMessage>, QueueError>;
39
40    /// Receive multiple messages from queue
41    async fn receive_messages(
42        &self,
43        queue: &QueueName,
44        max_messages: u32,
45        timeout: Duration,
46    ) -> Result<Vec<ReceivedMessage>, QueueError>;
47
48    /// Mark message as successfully processed
49    async fn complete_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError>;
50
51    /// Return message to queue for retry
52    async fn abandon_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError>;
53
54    /// Send message to dead letter queue
55    async fn dead_letter_message(
56        &self,
57        receipt: ReceiptHandle,
58        reason: String,
59    ) -> Result<(), QueueError>;
60
61    /// Accept session for ordered processing
62    async fn accept_session(
63        &self,
64        queue: &QueueName,
65        session_id: Option<SessionId>,
66    ) -> Result<Box<dyn SessionClient>, QueueError>;
67
68    /// Get provider type
69    fn provider_type(&self) -> ProviderType;
70
71    /// Check if provider supports sessions
72    fn supports_sessions(&self) -> bool;
73
74    /// Check if provider supports batch operations
75    fn supports_batching(&self) -> bool;
76}
77
78/// Interface for session-based ordered message processing
79#[async_trait]
80pub trait SessionClient: Send + Sync {
81    /// Receive message from session (maintains order)
82    async fn receive_message(
83        &self,
84        timeout: Duration,
85    ) -> Result<Option<ReceivedMessage>, QueueError>;
86
87    /// Complete message in session
88    async fn complete_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError>;
89
90    /// Abandon message in session
91    async fn abandon_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError>;
92
93    /// Send message to dead letter queue
94    async fn dead_letter_message(
95        &self,
96        receipt: ReceiptHandle,
97        reason: String,
98    ) -> Result<(), QueueError>;
99
100    /// Renew session lock to prevent timeout
101    async fn renew_session_lock(&self) -> Result<(), QueueError>;
102
103    /// Close session and release lock
104    async fn close_session(&self) -> Result<(), QueueError>;
105
106    /// Get session ID
107    fn session_id(&self) -> &SessionId;
108
109    /// Get session expiry time
110    fn session_expires_at(&self) -> Timestamp;
111}
112
113/// Interface implemented by specific queue providers (Azure, AWS, etc.)
114#[async_trait]
115pub trait QueueProvider: Send + Sync {
116    /// Send single message
117    async fn send_message(
118        &self,
119        queue: &QueueName,
120        message: &Message,
121    ) -> Result<MessageId, QueueError>;
122
123    /// Send multiple messages
124    async fn send_messages(
125        &self,
126        queue: &QueueName,
127        messages: &[Message],
128    ) -> Result<Vec<MessageId>, QueueError>;
129
130    /// Receive single message
131    async fn receive_message(
132        &self,
133        queue: &QueueName,
134        timeout: Duration,
135    ) -> Result<Option<ReceivedMessage>, QueueError>;
136
137    /// Receive multiple messages
138    async fn receive_messages(
139        &self,
140        queue: &QueueName,
141        max_messages: u32,
142        timeout: Duration,
143    ) -> Result<Vec<ReceivedMessage>, QueueError>;
144
145    /// Complete message processing
146    async fn complete_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError>;
147
148    /// Abandon message for retry
149    async fn abandon_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError>;
150
151    /// Send to dead letter queue
152    async fn dead_letter_message(
153        &self,
154        receipt: &ReceiptHandle,
155        reason: &str,
156    ) -> Result<(), QueueError>;
157
158    /// Create session client
159    async fn create_session_client(
160        &self,
161        queue: &QueueName,
162        session_id: Option<SessionId>,
163    ) -> Result<Box<dyn SessionProvider>, QueueError>;
164
165    /// Get provider type
166    fn provider_type(&self) -> ProviderType;
167
168    /// Get session support level
169    fn supports_sessions(&self) -> SessionSupport;
170
171    /// Check batch operation support
172    fn supports_batching(&self) -> bool;
173
174    /// Get maximum batch size
175    fn max_batch_size(&self) -> u32;
176}
177
178/// Interface implemented by provider-specific session implementations
179#[async_trait]
180pub trait SessionProvider: Send + Sync {
181    /// Receive message from session
182    async fn receive_message(
183        &self,
184        timeout: Duration,
185    ) -> Result<Option<ReceivedMessage>, QueueError>;
186
187    /// Complete message
188    async fn complete_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError>;
189
190    /// Abandon message
191    async fn abandon_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError>;
192
193    /// Send to dead letter queue
194    async fn dead_letter_message(
195        &self,
196        receipt: &ReceiptHandle,
197        reason: &str,
198    ) -> Result<(), QueueError>;
199
200    /// Renew session lock
201    async fn renew_session_lock(&self) -> Result<(), QueueError>;
202
203    /// Close session
204    async fn close_session(&self) -> Result<(), QueueError>;
205
206    /// Get session ID
207    fn session_id(&self) -> &SessionId;
208
209    /// Get session expiry time
210    fn session_expires_at(&self) -> Timestamp;
211}
212
213/// Factory for creating queue clients with appropriate providers
214pub struct QueueClientFactory;
215
216impl QueueClientFactory {
217    /// Create queue client from configuration
218    pub async fn create_client(config: QueueConfig) -> Result<Box<dyn QueueClient>, QueueError> {
219        // Clone config for client since we need to move parts for provider
220        let client_config = config.clone();
221
222        // Create provider based on configuration
223        let provider: Box<dyn QueueProvider> = match config.provider {
224            ProviderConfig::InMemory(in_memory_config) => {
225                Box::new(InMemoryProvider::new(in_memory_config))
226            }
227            ProviderConfig::AzureServiceBus(azure_config) => {
228                let azure_provider = crate::providers::AzureServiceBusProvider::new(azure_config)
229                    .await
230                    .map_err(|e| e.to_queue_error())?;
231                Box::new(azure_provider)
232            }
233            ProviderConfig::AwsSqs(aws_config) => {
234                let aws_provider = crate::providers::AwsSqsProvider::new(aws_config)
235                    .await
236                    .map_err(|e| e.to_queue_error())?;
237                Box::new(aws_provider)
238            }
239        };
240
241        // Wrap provider in StandardQueueClient
242        Ok(Box::new(StandardQueueClient::new(provider, client_config)))
243    }
244
245    /// Create test client with in-memory provider
246    pub fn create_test_client() -> Box<dyn QueueClient> {
247        let provider = InMemoryProvider::new(InMemoryConfig::default());
248        let config = QueueConfig::default();
249        Box::new(StandardQueueClient::new(Box::new(provider), config))
250    }
251}
252
253/// Standard queue client implementation
254pub struct StandardQueueClient {
255    provider: Box<dyn QueueProvider>,
256    #[allow(dead_code)] // Will be used for retry logic and timeouts in future
257    config: QueueConfig,
258}
259
260impl StandardQueueClient {
261    /// Create new standard queue client with provider
262    pub fn new(provider: Box<dyn QueueProvider>, config: QueueConfig) -> Self {
263        Self { provider, config }
264    }
265}
266
267#[async_trait]
268impl QueueClient for StandardQueueClient {
269    async fn send_message(
270        &self,
271        queue: &QueueName,
272        message: Message,
273    ) -> Result<MessageId, QueueError> {
274        self.provider.send_message(queue, &message).await
275    }
276
277    async fn send_messages(
278        &self,
279        queue: &QueueName,
280        messages: Vec<Message>,
281    ) -> Result<Vec<MessageId>, QueueError> {
282        // Pass slice of messages to provider
283        self.provider.send_messages(queue, &messages).await
284    }
285
286    async fn receive_message(
287        &self,
288        queue: &QueueName,
289        timeout: Duration,
290    ) -> Result<Option<ReceivedMessage>, QueueError> {
291        self.provider.receive_message(queue, timeout).await
292    }
293
294    async fn receive_messages(
295        &self,
296        queue: &QueueName,
297        max_messages: u32,
298        timeout: Duration,
299    ) -> Result<Vec<ReceivedMessage>, QueueError> {
300        self.provider
301            .receive_messages(queue, max_messages, timeout)
302            .await
303    }
304
305    async fn complete_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError> {
306        self.provider.complete_message(&receipt).await
307    }
308
309    async fn abandon_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError> {
310        self.provider.abandon_message(&receipt).await
311    }
312
313    async fn dead_letter_message(
314        &self,
315        receipt: ReceiptHandle,
316        reason: String,
317    ) -> Result<(), QueueError> {
318        self.provider.dead_letter_message(&receipt, &reason).await
319    }
320
321    async fn accept_session(
322        &self,
323        queue: &QueueName,
324        session_id: Option<SessionId>,
325    ) -> Result<Box<dyn SessionClient>, QueueError> {
326        let session_provider = self
327            .provider
328            .create_session_client(queue, session_id)
329            .await?;
330        Ok(Box::new(StandardSessionClient::new(session_provider)))
331    }
332
333    fn provider_type(&self) -> ProviderType {
334        self.provider.provider_type()
335    }
336
337    fn supports_sessions(&self) -> bool {
338        matches!(
339            self.provider.supports_sessions(),
340            SessionSupport::Native | SessionSupport::Emulated
341        )
342    }
343
344    fn supports_batching(&self) -> bool {
345        self.provider.supports_batching()
346    }
347}
348
349/// Standard session client implementation
350struct StandardSessionClient {
351    provider: Box<dyn SessionProvider>,
352}
353
354impl StandardSessionClient {
355    fn new(provider: Box<dyn SessionProvider>) -> Self {
356        Self { provider }
357    }
358}
359
360#[async_trait]
361impl SessionClient for StandardSessionClient {
362    async fn receive_message(
363        &self,
364        timeout: Duration,
365    ) -> Result<Option<ReceivedMessage>, QueueError> {
366        self.provider.receive_message(timeout).await
367    }
368
369    async fn complete_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError> {
370        self.provider.complete_message(&receipt).await
371    }
372
373    async fn abandon_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError> {
374        self.provider.abandon_message(&receipt).await
375    }
376
377    async fn dead_letter_message(
378        &self,
379        receipt: ReceiptHandle,
380        reason: String,
381    ) -> Result<(), QueueError> {
382        self.provider.dead_letter_message(&receipt, &reason).await
383    }
384
385    async fn renew_session_lock(&self) -> Result<(), QueueError> {
386        self.provider.renew_session_lock().await
387    }
388
389    async fn close_session(&self) -> Result<(), QueueError> {
390        self.provider.close_session().await
391    }
392
393    fn session_id(&self) -> &SessionId {
394        self.provider.session_id()
395    }
396
397    fn session_expires_at(&self) -> Timestamp {
398        self.provider.session_expires_at()
399    }
400}