Skip to main content

queue_runtime/
client.rs

1//! Client traits and implementations for queue operations.
2
3use crate::error::QueueError;
4use crate::message::{
5    Message, MessageId, QueueName, ReceiptHandle, ReceivedMessage, SessionId, Timestamp,
6};
7use crate::provider::{InMemoryConfig, ProviderConfig, ProviderType, QueueConfig, SessionSupport};
8use crate::providers::InMemoryProvider;
9use async_trait::async_trait;
10use chrono::Duration;
11
12#[cfg(test)]
13#[path = "client_tests.rs"]
14mod tests;
15
16/// Main interface for queue operations across all providers
17#[async_trait]
18pub trait QueueClient: Send + Sync {
19    /// Send single message to queue
20    async fn send_message(
21        &self,
22        queue: &QueueName,
23        message: Message,
24    ) -> Result<MessageId, QueueError>;
25
26    /// Send multiple messages in batch (if supported)
27    async fn send_messages(
28        &self,
29        queue: &QueueName,
30        messages: Vec<Message>,
31    ) -> Result<Vec<MessageId>, QueueError>;
32
33    /// Receive single message from queue
34    ///
35    /// Returns the next available message without regard to session ordering.
36    ///
37    /// # Note
38    ///
39    /// For session-ordered processing (where messages within a session must be
40    /// handled strictly in order), use [`accept_session`](QueueClient::accept_session)
41    /// instead and call `receive_message` on the returned [`SessionClient`].
42    async fn receive_message(
43        &self,
44        queue: &QueueName,
45        timeout: Duration,
46    ) -> Result<Option<ReceivedMessage>, QueueError>;
47
48    /// Receive multiple messages from queue
49    async fn receive_messages(
50        &self,
51        queue: &QueueName,
52        max_messages: u32,
53        timeout: Duration,
54    ) -> Result<Vec<ReceivedMessage>, QueueError>;
55
56    /// Mark message as successfully processed
57    async fn complete_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError>;
58
59    /// Return message to queue for retry
60    async fn abandon_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError>;
61
62    /// Send message to dead letter queue
63    async fn dead_letter_message(
64        &self,
65        receipt: ReceiptHandle,
66        reason: String,
67    ) -> Result<(), QueueError>;
68
69    /// Accept session for ordered processing
70    ///
71    /// Acquires an exclusive lock on the specified session and returns a
72    /// [`SessionClient`] that delivers messages from that session in FIFO order.
73    ///
74    /// # Note
75    ///
76    /// For unordered message consumption (no session guarantee required), use
77    /// [`receive_message`](QueueClient::receive_message) directly, which is
78    /// faster and does not require a session lock.
79    async fn accept_session(
80        &self,
81        queue: &QueueName,
82        session_id: Option<SessionId>,
83    ) -> Result<Box<dyn SessionClient>, QueueError>;
84
85    /// Get provider type
86    fn provider_type(&self) -> ProviderType;
87
88    /// Check if provider supports sessions
89    fn supports_sessions(&self) -> bool;
90
91    /// Check if provider supports batch operations
92    fn supports_batching(&self) -> bool;
93}
94
95/// Interface for session-based ordered message processing
96#[async_trait]
97pub trait SessionClient: Send + Sync {
98    /// Receive message from session (maintains order)
99    async fn receive_message(
100        &self,
101        timeout: Duration,
102    ) -> Result<Option<ReceivedMessage>, QueueError>;
103
104    /// Complete message in session
105    async fn complete_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError>;
106
107    /// Abandon message in session
108    async fn abandon_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError>;
109
110    /// Send message to dead letter queue
111    async fn dead_letter_message(
112        &self,
113        receipt: ReceiptHandle,
114        reason: String,
115    ) -> Result<(), QueueError>;
116
117    /// Renew session lock to prevent timeout
118    async fn renew_session_lock(&self) -> Result<(), QueueError>;
119
120    /// Close session and release lock
121    async fn close_session(&self) -> Result<(), QueueError>;
122
123    /// Get session ID
124    fn session_id(&self) -> &SessionId;
125
126    /// Get session expiry time
127    fn session_expires_at(&self) -> Timestamp;
128}
129
130/// Interface implemented by specific queue providers (Azure, AWS, etc.)
131#[async_trait]
132pub trait QueueProvider: Send + Sync {
133    /// Send single message
134    async fn send_message(
135        &self,
136        queue: &QueueName,
137        message: &Message,
138    ) -> Result<MessageId, QueueError>;
139
140    /// Send multiple messages
141    async fn send_messages(
142        &self,
143        queue: &QueueName,
144        messages: &[Message],
145    ) -> Result<Vec<MessageId>, QueueError>;
146
147    /// Receive single message
148    async fn receive_message(
149        &self,
150        queue: &QueueName,
151        timeout: Duration,
152    ) -> Result<Option<ReceivedMessage>, QueueError>;
153
154    /// Receive multiple messages
155    async fn receive_messages(
156        &self,
157        queue: &QueueName,
158        max_messages: u32,
159        timeout: Duration,
160    ) -> Result<Vec<ReceivedMessage>, QueueError>;
161
162    /// Complete message processing
163    async fn complete_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError>;
164
165    /// Abandon message for retry
166    async fn abandon_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError>;
167
168    /// Send to dead letter queue
169    async fn dead_letter_message(
170        &self,
171        receipt: &ReceiptHandle,
172        reason: &str,
173    ) -> Result<(), QueueError>;
174
175    /// Create session client
176    async fn create_session_client(
177        &self,
178        queue: &QueueName,
179        session_id: Option<SessionId>,
180    ) -> Result<Box<dyn SessionProvider>, QueueError>;
181
182    /// Get provider type
183    fn provider_type(&self) -> ProviderType;
184
185    /// Get session support level
186    fn supports_sessions(&self) -> SessionSupport;
187
188    /// Check batch operation support
189    fn supports_batching(&self) -> bool;
190
191    /// Get maximum batch size
192    fn max_batch_size(&self) -> u32;
193}
194
195/// Interface implemented by provider-specific session implementations
196#[async_trait]
197pub trait SessionProvider: Send + Sync {
198    /// Receive message from session
199    async fn receive_message(
200        &self,
201        timeout: Duration,
202    ) -> Result<Option<ReceivedMessage>, QueueError>;
203
204    /// Complete message
205    async fn complete_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError>;
206
207    /// Abandon message
208    async fn abandon_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError>;
209
210    /// Send to dead letter queue
211    async fn dead_letter_message(
212        &self,
213        receipt: &ReceiptHandle,
214        reason: &str,
215    ) -> Result<(), QueueError>;
216
217    /// Renew session lock
218    async fn renew_session_lock(&self) -> Result<(), QueueError>;
219
220    /// Close session
221    async fn close_session(&self) -> Result<(), QueueError>;
222
223    /// Get session ID
224    fn session_id(&self) -> &SessionId;
225
226    /// Get session expiry time
227    fn session_expires_at(&self) -> Timestamp;
228}
229
230/// Factory for creating queue clients with appropriate providers
231pub struct QueueClientFactory;
232
233impl QueueClientFactory {
234    /// Create queue client from configuration
235    pub async fn create_client(config: QueueConfig) -> Result<Box<dyn QueueClient>, QueueError> {
236        // Clone config for client since we need to move parts for provider
237        let client_config = config.clone();
238
239        // Create provider based on configuration
240        let provider: Box<dyn QueueProvider> = match config.provider {
241            ProviderConfig::InMemory(in_memory_config) => {
242                Box::new(InMemoryProvider::new(in_memory_config))
243            }
244            ProviderConfig::AzureServiceBus(azure_config) => {
245                let azure_provider = crate::providers::AzureServiceBusProvider::new(azure_config)
246                    .await
247                    .map_err(|e| e.to_queue_error())?;
248                Box::new(azure_provider)
249            }
250            ProviderConfig::AwsSqs(aws_config) => {
251                let aws_provider = crate::providers::AwsSqsProvider::new(aws_config)
252                    .await
253                    .map_err(|e| e.to_queue_error())?;
254                Box::new(aws_provider)
255            }
256            ProviderConfig::RabbitMq(rmq_config) => {
257                let rmq_provider = crate::providers::RabbitMqProvider::new(rmq_config)
258                    .await
259                    .map_err(|e| e.to_queue_error())?;
260                Box::new(rmq_provider)
261            }
262            ProviderConfig::Nats(nats_config) => {
263                let nats_provider = crate::providers::NatsProvider::new(nats_config)
264                    .await
265                    .map_err(|e| e.to_queue_error())?;
266                Box::new(nats_provider)
267            }
268        };
269
270        // Wrap provider in StandardQueueClient
271        Ok(Box::new(StandardQueueClient::new(provider, client_config)))
272    }
273
274    /// Create test client with in-memory provider
275    pub fn create_test_client() -> Box<dyn QueueClient> {
276        let provider = InMemoryProvider::new(InMemoryConfig::default());
277        let config = QueueConfig::default();
278        Box::new(StandardQueueClient::new(Box::new(provider), config))
279    }
280}
281
282/// Standard queue client implementation
283pub struct StandardQueueClient {
284    provider: Box<dyn QueueProvider>,
285    #[allow(dead_code)] // Will be used for retry logic and timeouts in future
286    config: QueueConfig,
287}
288
289impl StandardQueueClient {
290    /// Create new standard queue client with provider
291    pub fn new(provider: Box<dyn QueueProvider>, config: QueueConfig) -> Self {
292        Self { provider, config }
293    }
294}
295
296#[async_trait]
297impl QueueClient for StandardQueueClient {
298    async fn send_message(
299        &self,
300        queue: &QueueName,
301        message: Message,
302    ) -> Result<MessageId, QueueError> {
303        self.provider.send_message(queue, &message).await
304    }
305
306    async fn send_messages(
307        &self,
308        queue: &QueueName,
309        messages: Vec<Message>,
310    ) -> Result<Vec<MessageId>, QueueError> {
311        // Pass slice of messages to provider
312        self.provider.send_messages(queue, &messages).await
313    }
314
315    async fn receive_message(
316        &self,
317        queue: &QueueName,
318        timeout: Duration,
319    ) -> Result<Option<ReceivedMessage>, QueueError> {
320        self.provider.receive_message(queue, timeout).await
321    }
322
323    async fn receive_messages(
324        &self,
325        queue: &QueueName,
326        max_messages: u32,
327        timeout: Duration,
328    ) -> Result<Vec<ReceivedMessage>, QueueError> {
329        self.provider
330            .receive_messages(queue, max_messages, timeout)
331            .await
332    }
333
334    async fn complete_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError> {
335        self.provider.complete_message(&receipt).await
336    }
337
338    async fn abandon_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError> {
339        self.provider.abandon_message(&receipt).await
340    }
341
342    async fn dead_letter_message(
343        &self,
344        receipt: ReceiptHandle,
345        reason: String,
346    ) -> Result<(), QueueError> {
347        self.provider.dead_letter_message(&receipt, &reason).await
348    }
349
350    async fn accept_session(
351        &self,
352        queue: &QueueName,
353        session_id: Option<SessionId>,
354    ) -> Result<Box<dyn SessionClient>, QueueError> {
355        let session_provider = self
356            .provider
357            .create_session_client(queue, session_id)
358            .await?;
359        Ok(Box::new(StandardSessionClient::new(session_provider)))
360    }
361
362    fn provider_type(&self) -> ProviderType {
363        self.provider.provider_type()
364    }
365
366    fn supports_sessions(&self) -> bool {
367        matches!(
368            self.provider.supports_sessions(),
369            SessionSupport::Native | SessionSupport::Emulated
370        )
371    }
372
373    fn supports_batching(&self) -> bool {
374        self.provider.supports_batching()
375    }
376}
377
378/// Standard session client implementation
379struct StandardSessionClient {
380    provider: Box<dyn SessionProvider>,
381}
382
383impl StandardSessionClient {
384    fn new(provider: Box<dyn SessionProvider>) -> Self {
385        Self { provider }
386    }
387}
388
389#[async_trait]
390impl SessionClient for StandardSessionClient {
391    async fn receive_message(
392        &self,
393        timeout: Duration,
394    ) -> Result<Option<ReceivedMessage>, QueueError> {
395        self.provider.receive_message(timeout).await
396    }
397
398    async fn complete_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError> {
399        self.provider.complete_message(&receipt).await
400    }
401
402    async fn abandon_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError> {
403        self.provider.abandon_message(&receipt).await
404    }
405
406    async fn dead_letter_message(
407        &self,
408        receipt: ReceiptHandle,
409        reason: String,
410    ) -> Result<(), QueueError> {
411        self.provider.dead_letter_message(&receipt, &reason).await
412    }
413
414    async fn renew_session_lock(&self) -> Result<(), QueueError> {
415        self.provider.renew_session_lock().await
416    }
417
418    async fn close_session(&self) -> Result<(), QueueError> {
419        self.provider.close_session().await
420    }
421
422    fn session_id(&self) -> &SessionId {
423        self.provider.session_id()
424    }
425
426    fn session_expires_at(&self) -> Timestamp {
427        self.provider.session_expires_at()
428    }
429}