queue-runtime 0.2.0

Multi-provider queue runtime for Queue-Keeper
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
//! Client traits and implementations for queue operations.

use crate::error::QueueError;
use crate::message::{
    Message, MessageId, QueueName, ReceiptHandle, ReceivedMessage, SessionId, Timestamp,
};
use crate::provider::{InMemoryConfig, ProviderConfig, ProviderType, QueueConfig, SessionSupport};
use crate::providers::InMemoryProvider;
use async_trait::async_trait;
use chrono::Duration;

#[cfg(test)]
#[path = "client_tests.rs"]
mod tests;

/// Main interface for queue operations across all providers
#[async_trait]
pub trait QueueClient: Send + Sync {
    /// Send single message to queue
    async fn send_message(
        &self,
        queue: &QueueName,
        message: Message,
    ) -> Result<MessageId, QueueError>;

    /// Send multiple messages in batch (if supported)
    async fn send_messages(
        &self,
        queue: &QueueName,
        messages: Vec<Message>,
    ) -> Result<Vec<MessageId>, QueueError>;

    /// Receive single message from queue
    ///
    /// Returns the next available message without regard to session ordering.
    ///
    /// # Note
    ///
    /// For session-ordered processing (where messages within a session must be
    /// handled strictly in order), use [`accept_session`](QueueClient::accept_session)
    /// instead and call `receive_message` on the returned [`SessionClient`].
    async fn receive_message(
        &self,
        queue: &QueueName,
        timeout: Duration,
    ) -> Result<Option<ReceivedMessage>, QueueError>;

    /// Receive multiple messages from queue
    async fn receive_messages(
        &self,
        queue: &QueueName,
        max_messages: u32,
        timeout: Duration,
    ) -> Result<Vec<ReceivedMessage>, QueueError>;

    /// Mark message as successfully processed
    async fn complete_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError>;

    /// Return message to queue for retry
    async fn abandon_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError>;

    /// Send message to dead letter queue
    async fn dead_letter_message(
        &self,
        receipt: ReceiptHandle,
        reason: String,
    ) -> Result<(), QueueError>;

    /// Accept session for ordered processing
    ///
    /// Acquires an exclusive lock on the specified session and returns a
    /// [`SessionClient`] that delivers messages from that session in FIFO order.
    ///
    /// # Note
    ///
    /// For unordered message consumption (no session guarantee required), use
    /// [`receive_message`](QueueClient::receive_message) directly, which is
    /// faster and does not require a session lock.
    async fn accept_session(
        &self,
        queue: &QueueName,
        session_id: Option<SessionId>,
    ) -> Result<Box<dyn SessionClient>, QueueError>;

    /// Get provider type
    fn provider_type(&self) -> ProviderType;

    /// Check if provider supports sessions
    fn supports_sessions(&self) -> bool;

    /// Check if provider supports batch operations
    fn supports_batching(&self) -> bool;
}

/// Interface for session-based ordered message processing
#[async_trait]
pub trait SessionClient: Send + Sync {
    /// Receive message from session (maintains order)
    async fn receive_message(
        &self,
        timeout: Duration,
    ) -> Result<Option<ReceivedMessage>, QueueError>;

    /// Complete message in session
    async fn complete_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError>;

    /// Abandon message in session
    async fn abandon_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError>;

    /// Send message to dead letter queue
    async fn dead_letter_message(
        &self,
        receipt: ReceiptHandle,
        reason: String,
    ) -> Result<(), QueueError>;

    /// Renew session lock to prevent timeout
    async fn renew_session_lock(&self) -> Result<(), QueueError>;

    /// Close session and release lock
    async fn close_session(&self) -> Result<(), QueueError>;

    /// Get session ID
    fn session_id(&self) -> &SessionId;

    /// Get session expiry time
    fn session_expires_at(&self) -> Timestamp;
}

/// Interface implemented by specific queue providers (Azure, AWS, etc.)
#[async_trait]
pub trait QueueProvider: Send + Sync {
    /// Send single message
    async fn send_message(
        &self,
        queue: &QueueName,
        message: &Message,
    ) -> Result<MessageId, QueueError>;

    /// Send multiple messages
    async fn send_messages(
        &self,
        queue: &QueueName,
        messages: &[Message],
    ) -> Result<Vec<MessageId>, QueueError>;

    /// Receive single message
    async fn receive_message(
        &self,
        queue: &QueueName,
        timeout: Duration,
    ) -> Result<Option<ReceivedMessage>, QueueError>;

    /// Receive multiple messages
    async fn receive_messages(
        &self,
        queue: &QueueName,
        max_messages: u32,
        timeout: Duration,
    ) -> Result<Vec<ReceivedMessage>, QueueError>;

    /// Complete message processing
    async fn complete_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError>;

    /// Abandon message for retry
    async fn abandon_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError>;

    /// Send to dead letter queue
    async fn dead_letter_message(
        &self,
        receipt: &ReceiptHandle,
        reason: &str,
    ) -> Result<(), QueueError>;

    /// Create session client
    async fn create_session_client(
        &self,
        queue: &QueueName,
        session_id: Option<SessionId>,
    ) -> Result<Box<dyn SessionProvider>, QueueError>;

    /// Get provider type
    fn provider_type(&self) -> ProviderType;

    /// Get session support level
    fn supports_sessions(&self) -> SessionSupport;

    /// Check batch operation support
    fn supports_batching(&self) -> bool;

    /// Get maximum batch size
    fn max_batch_size(&self) -> u32;
}

/// Interface implemented by provider-specific session implementations
#[async_trait]
pub trait SessionProvider: Send + Sync {
    /// Receive message from session
    async fn receive_message(
        &self,
        timeout: Duration,
    ) -> Result<Option<ReceivedMessage>, QueueError>;

    /// Complete message
    async fn complete_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError>;

    /// Abandon message
    async fn abandon_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError>;

    /// Send to dead letter queue
    async fn dead_letter_message(
        &self,
        receipt: &ReceiptHandle,
        reason: &str,
    ) -> Result<(), QueueError>;

    /// Renew session lock
    async fn renew_session_lock(&self) -> Result<(), QueueError>;

    /// Close session
    async fn close_session(&self) -> Result<(), QueueError>;

    /// Get session ID
    fn session_id(&self) -> &SessionId;

    /// Get session expiry time
    fn session_expires_at(&self) -> Timestamp;
}

/// Factory for creating queue clients with appropriate providers
pub struct QueueClientFactory;

impl QueueClientFactory {
    /// Create queue client from configuration
    pub async fn create_client(config: QueueConfig) -> Result<Box<dyn QueueClient>, QueueError> {
        // Clone config for client since we need to move parts for provider
        let client_config = config.clone();

        // Create provider based on configuration
        let provider: Box<dyn QueueProvider> = match config.provider {
            ProviderConfig::InMemory(in_memory_config) => {
                Box::new(InMemoryProvider::new(in_memory_config))
            }
            ProviderConfig::AzureServiceBus(azure_config) => {
                let azure_provider = crate::providers::AzureServiceBusProvider::new(azure_config)
                    .await
                    .map_err(|e| e.to_queue_error())?;
                Box::new(azure_provider)
            }
            ProviderConfig::AwsSqs(aws_config) => {
                let aws_provider = crate::providers::AwsSqsProvider::new(aws_config)
                    .await
                    .map_err(|e| e.to_queue_error())?;
                Box::new(aws_provider)
            }
            ProviderConfig::RabbitMq(rmq_config) => {
                let rmq_provider = crate::providers::RabbitMqProvider::new(rmq_config)
                    .await
                    .map_err(|e| e.to_queue_error())?;
                Box::new(rmq_provider)
            }
            ProviderConfig::Nats(nats_config) => {
                let nats_provider = crate::providers::NatsProvider::new(nats_config)
                    .await
                    .map_err(|e| e.to_queue_error())?;
                Box::new(nats_provider)
            }
        };

        // Wrap provider in StandardQueueClient
        Ok(Box::new(StandardQueueClient::new(provider, client_config)))
    }

    /// Create test client with in-memory provider
    pub fn create_test_client() -> Box<dyn QueueClient> {
        let provider = InMemoryProvider::new(InMemoryConfig::default());
        let config = QueueConfig::default();
        Box::new(StandardQueueClient::new(Box::new(provider), config))
    }
}

/// Standard queue client implementation
pub struct StandardQueueClient {
    provider: Box<dyn QueueProvider>,
    #[allow(dead_code)] // Will be used for retry logic and timeouts in future
    config: QueueConfig,
}

impl StandardQueueClient {
    /// Create new standard queue client with provider
    pub fn new(provider: Box<dyn QueueProvider>, config: QueueConfig) -> Self {
        Self { provider, config }
    }
}

#[async_trait]
impl QueueClient for StandardQueueClient {
    async fn send_message(
        &self,
        queue: &QueueName,
        message: Message,
    ) -> Result<MessageId, QueueError> {
        self.provider.send_message(queue, &message).await
    }

    async fn send_messages(
        &self,
        queue: &QueueName,
        messages: Vec<Message>,
    ) -> Result<Vec<MessageId>, QueueError> {
        // Pass slice of messages to provider
        self.provider.send_messages(queue, &messages).await
    }

    async fn receive_message(
        &self,
        queue: &QueueName,
        timeout: Duration,
    ) -> Result<Option<ReceivedMessage>, QueueError> {
        self.provider.receive_message(queue, timeout).await
    }

    async fn receive_messages(
        &self,
        queue: &QueueName,
        max_messages: u32,
        timeout: Duration,
    ) -> Result<Vec<ReceivedMessage>, QueueError> {
        self.provider
            .receive_messages(queue, max_messages, timeout)
            .await
    }

    async fn complete_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError> {
        self.provider.complete_message(&receipt).await
    }

    async fn abandon_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError> {
        self.provider.abandon_message(&receipt).await
    }

    async fn dead_letter_message(
        &self,
        receipt: ReceiptHandle,
        reason: String,
    ) -> Result<(), QueueError> {
        self.provider.dead_letter_message(&receipt, &reason).await
    }

    async fn accept_session(
        &self,
        queue: &QueueName,
        session_id: Option<SessionId>,
    ) -> Result<Box<dyn SessionClient>, QueueError> {
        let session_provider = self
            .provider
            .create_session_client(queue, session_id)
            .await?;
        Ok(Box::new(StandardSessionClient::new(session_provider)))
    }

    fn provider_type(&self) -> ProviderType {
        self.provider.provider_type()
    }

    fn supports_sessions(&self) -> bool {
        matches!(
            self.provider.supports_sessions(),
            SessionSupport::Native | SessionSupport::Emulated
        )
    }

    fn supports_batching(&self) -> bool {
        self.provider.supports_batching()
    }
}

/// Standard session client implementation
struct StandardSessionClient {
    provider: Box<dyn SessionProvider>,
}

impl StandardSessionClient {
    fn new(provider: Box<dyn SessionProvider>) -> Self {
        Self { provider }
    }
}

#[async_trait]
impl SessionClient for StandardSessionClient {
    async fn receive_message(
        &self,
        timeout: Duration,
    ) -> Result<Option<ReceivedMessage>, QueueError> {
        self.provider.receive_message(timeout).await
    }

    async fn complete_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError> {
        self.provider.complete_message(&receipt).await
    }

    async fn abandon_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError> {
        self.provider.abandon_message(&receipt).await
    }

    async fn dead_letter_message(
        &self,
        receipt: ReceiptHandle,
        reason: String,
    ) -> Result<(), QueueError> {
        self.provider.dead_letter_message(&receipt, &reason).await
    }

    async fn renew_session_lock(&self) -> Result<(), QueueError> {
        self.provider.renew_session_lock().await
    }

    async fn close_session(&self) -> Result<(), QueueError> {
        self.provider.close_session().await
    }

    fn session_id(&self) -> &SessionId {
        self.provider.session_id()
    }

    fn session_expires_at(&self) -> Timestamp {
        self.provider.session_expires_at()
    }
}