queue-runtime 0.2.0

Multi-provider queue runtime for Queue-Keeper
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
# Provider Implementation Differences


This document outlines the key differences between Azure Service Bus and AWS SQS implementations in the queue-runtime, helping developers understand provider-specific behavior and choose the appropriate provider for their use case.

## Overview


The queue-runtime provides a unified abstraction over different cloud message queue providers. While the API remains consistent, each provider has unique characteristics, capabilities, and limitations that affect implementation decisions.

## Provider Comparison Matrix


| Feature | Azure Service Bus | AWS SQS | In-Memory | RabbitMQ | NATS JetStream |
|---------|------------------|----------|-----------|----------|----------------|
| **Ordered Processing** | Native Sessions | FIFO Queues | Simple Ordering | Per-session queues | Per-session subjects |
| **Dead Letter Queues** | Built-in | Built-in | Simulated | Via DLX | Via Term + DLQ stream |
| **Message Deduplication** | Native Support | Content-based | Hash-based | None | None |
| **Batch Operations** | Send/Receive Batches | Send/Receive Batches | Full Batching | Sequential loop | Sequential loop |
| **Message Size Limit** | 1MB (Premium) | 256KB | Memory Limited | 128MB | 1MB (server default) |
| **Session Support** | First-class | Via Message Groups | Thread-based | Emulated (sub-queues) | Emulated (filter subjects) |
| **Transactional Operations** | Limited | No | Yes | No | No |
| **Peek Operations** | Supported | No | Yes | No | No |
| **Authentication** | Azure AD/Managed Identity | IAM Roles | N/A | AMQP credentials | NATS credentials/NKey |
| **Dead-letter reason** | Preserved | Preserved | N/A | Not forwarded (AMQP limitation) | Forwarded as header |

## Azure Service Bus Implementation


### Session Management


**Native Session Support**:

- Sessions provide strict FIFO ordering within session boundaries
- Session acceptance locks session for exclusive processing
- Session timeout and renewal mechanisms
- Session state storage for stateful processing

**Implementation Characteristics**:

- Session ID directly maps to Service Bus session ID
- Session acceptance provides exclusive lock until completion or timeout
- Messages without session ID go to default session queue
- Session completion releases lock for next consumer

### Message Features


**Advanced Message Properties**:

- Rich message properties and headers
- Custom message annotations
- Message scheduling and deferred delivery
- Message peeking without consumption

**Error Handling**:

- Native dead letter queue with detailed failure reason
- Message abandonment returns message to queue
- Delivery count tracking with automatic DLQ transfer
- Error description and exception details preserved

### Connection Management


**Connection Efficiency**:

- Connection pooling and multiplexing
- Long-lived connections with automatic renewal
- Exponential backoff for connection failures
- Circuit breaker patterns for fault tolerance

```rust
// Azure Service Bus specific configuration
pub struct AzureServiceBusConfig {
    pub connection_string: Option<String>,
    pub namespace: String,
    pub shared_access_key_name: Option<String>,
    pub shared_access_key: Option<String>,
    pub managed_identity_client_id: Option<String>,
    pub max_concurrent_calls: u32,
    pub prefetch_count: u32,
    pub max_auto_lock_renewal_duration: Duration,
    pub transport_type: TransportType,
}

#[derive(Debug, Clone)]

pub enum TransportType {
    Amqp,
    AmqpWebSockets,
}
```

## AWS SQS Implementation


**Status**: ✅ **Implemented** - Production-ready HTTP REST API implementation

### FIFO Queue Behavior


**Message Group Ordering**:

- FIFO queues ensure ordering within message groups
- Message group ID determines processing order
- Deduplication based on message content or deduplication ID
- Higher throughput with multiple message groups

**Implementation Approach**:

- Uses direct HTTP REST API calls (not AWS SDK)
- Enables proper unit testing with mocked HTTP responses
- Implements AWS Signature Version 4 request signing
- Automatic credential chain: explicit credentials → environment variables → ECS task metadata → EC2 instance metadata

**Implementation Mapping**:

- Session ID maps to SQS message group ID
- Content-based deduplication for message uniqueness
- Receive request deduplication for exactly-once delivery
- FIFO queue naming convention with `.fifo` suffix

### Performance Characteristics


**Throughput and Scaling**:

- Standard queues: Unlimited throughput, at-least-once delivery
- FIFO queues: 3000 messages/second with batching
- Visibility timeout for message processing windows
- Long polling for efficient message retrieval

**Batch Operations**:

- Send up to 10 messages per batch request
- Receive up to 10 messages per request
- Delete multiple messages in single request
- Batch operations reduce API calls and improve performance

### AWS-Specific Features


**Extended Message Support**:

- S3 payload storage for large messages
- Automatic message offloading and retrieval
- Cost optimization for infrequent access patterns

**Dead Letter Queue Configuration**:

- Redrive policy with source queue configuration
- Message retention and replay capabilities
- Cross-account DLQ access patterns

```rust
// AWS SQS specific configuration (actual implementation)
pub struct AwsSqsConfig {
    pub region: String,
    pub access_key_id: Option<String>,
    pub secret_access_key: Option<String>,
    pub use_fifo_queues: bool,
}

// Authentication Methods Supported:
// 1. Explicit credentials (access_key_id + secret_access_key)
// 2. Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN)
// 3. ECS task metadata (AWS_CONTAINER_CREDENTIALS_RELATIVE_URI)
// 4. EC2 instance metadata (IMDSv2)
```

## In-Memory Implementation


### Development and Testing


**Simplified Behavior**:

- Thread-safe in-memory queue storage
- Simulated session behavior with thread affinity
- Configurable delivery delays and failures
- Full message lifecycle simulation

**Testing Capabilities**:

- Message inspection and verification
- Controlled failure injection
- Deterministic ordering and timing
- Reset and cleanup operations

```rust
// In-Memory specific configuration
pub struct InMemoryConfig {
    pub max_queue_size: usize,
    pub enable_persistence: bool,
    pub persistence_path: Option<PathBuf>,
    pub simulate_delays: bool,
    pub default_visibility_timeout: Duration,
    pub failure_rate: f64, // For chaos testing
}
```

## RabbitMQ Implementation


**Status**: ✅ **Implemented** — AMQP 0-9-1 via the `lapin` crate.

### Session Management


**Emulated via Per-Session Sub-Queues**:

- Sessions are emulated by routing messages with a `session_id` to a dedicated AMQP queue named `{queue}.session.{session_id}`
- An exclusive consumer on the session queue provides mutual exclusion
- Session lock is tracked in-memory and renewable via `renew_session_lock`

### Dead Letter Support


- Set `enable_dead_letter = true` and specify a `dead_letter_exchange` to enable automatic DLX binding
- Messages dead-lettered via `dead_letter_message` are nacked without requeue; RabbitMQ routes them through the DLX
- **Note**: The dead-letter reason string is logged but **not forwarded** to the DLX because the AMQP `basic_nack` command carries no metadata

### Key Characteristics


- Maximum message size: **128 MB** (governed by the broker's `max-frame-size` setting)
- Delivery mode 2 (persistent) for all messages
- QoS prefetch configurable via `prefetch_count`
- `send_messages` sends sequentially (no native batch-publish in AMQP 0-9-1)

### Known Limitation: `delivery_count` on Classic Queues


RabbitMQ **Classic Queues** do not increment a per-message delivery counter on AMQP nack + requeue.  The broker only sets the `redelivered` boolean flag in the delivery frame.  After the first abandon the library returns `delivery_count = 2` (using the `redelivered` flag as a fallback), but subsequent abandons cannot be distinguished — `delivery_count` stays at 2 indefinitely.

**Impact**: Callers that implement custom back-off or poison-message detection based on `delivery_count ≥ 3` will not get reliable results with Classic Queues beyond the first redelivery.

**Workaround**: Use **Quorum Queues** instead of Classic Queues.  Quorum Queues populate the `x-delivery-count` AMQP header on every redelivery, which the provider reads and adds 1 to, giving an accurate monotonically-increasing count across all nack/requeue cycles.

```rust
// RabbitMQ configuration
pub struct RabbitMqConfig {
    pub url: String,                         // amqp://user:pass@host:5672/vhost
    pub virtual_host: String,                // defaults to "/"
    pub prefetch_count: u16,                 // 0 = unlimited
    pub session_lock_duration: Duration,
    pub message_ttl: Option<Duration>,
    pub enable_dead_letter: bool,
    pub dead_letter_exchange: Option<String>,
}
```

## NATS JetStream Implementation


**Status**: ✅ **Implemented** — JetStream via the `async-nats` crate.

### Session Management


**Emulated via Per-Session Filter Subjects**:

- One JetStream stream per queue with `WorkQueue` retention policy
- Sessions use a dedicated filter subject: `{prefix}.{queue}.session.{session_id}`
- A per-session pull consumer filtered to the session subject provides ordered, exclusive delivery
- Session lock is tracked in-memory and renewable via `renew_session_lock`

### Dead Letter Support


- Requires `enable_dead_letter = true` and `dead_letter_subject_prefix` to be set
- Dead-lettered messages receive `ack_with(Term)` (stops JetStream redelivery) and are republished to `{prefix}.{queue}` on a separate DLQ stream
- The dead-letter reason is forwarded as an `x-dead-letter-reason` NATS header

### Key Characteristics


- Maximum message size: **1 MB** (NATS server default; configurable on the server)
- `WorkQueue` stream retention ensures at-most-once delivery per consumer
- `send_messages` sends sequentially (no multi-message JetStream publish API)

```rust
// NATS JetStream configuration
pub struct NatsConfig {
    pub url: String,                              // nats://host:4222
    pub stream_prefix: String,                    // defaults to "queue-runtime"
    pub max_deliver: Option<i64>,                 // None = unlimited
    pub ack_wait: Duration,                       // visibility-timeout analog
    pub session_lock_duration: Duration,
    pub enable_dead_letter: bool,
    pub dead_letter_subject_prefix: Option<String>,
    pub credentials_path: Option<String>,         // path to .creds file
}
```

## Provider Selection Guidance


### Use Azure Service Bus When


1. **Rich Session Management**: Need complex session state and lifecycle management
2. **Enterprise Integration**: Existing Azure ecosystem and managed identity integration
3. **Advanced Messaging**: Require message scheduling, peeking, and rich metadata
4. **Strict Ordering**: Need guaranteed FIFO with session affinity
5. **Large Messages**: Messages approaching 1MB size limit

### Use AWS SQS When


1. **High Throughput**: Standard queues with unlimited throughput requirements
2. **Cost Optimization**: Pay-per-use pricing model preferred
3. **AWS Ecosystem**: Existing AWS infrastructure and IAM integration
4. **Simple Ordering**: FIFO requirements with multiple processing groups
5. **Global Scale**: Multi-region deployment with consistent behavior

### Use RabbitMQ When


1. **On-Premises / Self-Hosted**: Full control over broker without cloud lock-in
2. **AMQP Ecosystem**: Existing AMQP infrastructure or AMQP-native clients
3. **Large Messages**: Messages exceeding SQS/Service Bus size limits (up to 128 MB)
4. **Complex Routing**: Need topic exchanges, fanout, or DLX routing topologies
5. **Mixed Workloads**: Integrating with non-Rust AMQP consumers on the same broker

### Use NATS JetStream When


1. **High Throughput / Low Latency**: Sub-millisecond delivery for real-time workloads
2. **Edge / IoT Deployments**: Lightweight broker suitable for constrained environments
3. **Multi-Cloud / Hybrid**: NATS clusters span cloud and on-premises seamlessly
4. **Dead-Letter Reason Propagation**: Only NATS forwards the reason string to the DLQ
5. **Modern Messaging Patterns**: Streams, key-value, and object store in one system

### Use In-Memory When


1. **Local Development**: Fast local development and testing cycles
2. **Unit Testing**: Predictable behavior and message inspection
3. **Integration Testing**: Controlled message flow and failure scenarios
4. **Prototyping**: Rapid prototyping without cloud dependencies

## Configuration Patterns


### Environment-Based Selection


```rust
use queue_runtime::*;

pub fn create_queue_client() -> Result<Box<dyn QueueClient>, Box<dyn std::error::Error>> {
    let provider = std::env::var("QUEUE_PROVIDER").unwrap_or_default();

    match provider.as_str() {
        "azure" => {
            let config = AzureServiceBusConfig {
                namespace: std::env::var("SERVICE_BUS_NAMESPACE")?,
                managed_identity_client_id: std::env::var("AZURE_CLIENT_ID").ok(),
                max_concurrent_calls: 16,
                prefetch_count: 10,
                max_auto_lock_renewal_duration: Duration::from_minutes(5),
                transport_type: TransportType::Amqp,
                ..Default::default()
            };
            Ok(Box::new(AzureServiceBusClient::new(config).await?))
        },

        "aws" => {
            let config = AwsSqsConfig {
                region: std::env::var("AWS_REGION").unwrap_or_else(|_| "us-east-1".to_string()),
                queue_url: std::env::var("SQS_QUEUE_URL")?,
                max_number_of_messages: 10,
                wait_time_seconds: Some(Duration::from_secs(20)),
                visibility_timeout: Some(Duration::from_secs(30)),
                message_retention_period: Duration::from_days(14),
                ..Default::default()
            };
            Ok(Box::new(AwsSqsClient::new(config).await?))
        },

        "memory" | _ => {
            let config = InMemoryConfig {
                max_queue_size: 1000,
                enable_persistence: false,
                simulate_delays: false,
                default_visibility_timeout: Duration::from_secs(30),
                failure_rate: 0.0,
                ..Default::default()
            };
            Ok(Box::new(InMemoryClient::new(config).await?))
        }
    }
}
```

### Feature-Based Selection


```rust
pub struct ProviderRequirements {
    pub strict_ordering: bool,
    pub high_throughput: bool,
    pub large_messages: bool,
    pub session_state: bool,
    pub cost_optimization: bool,
    pub existing_ecosystem: CloudProvider,
}

pub enum CloudProvider {
    Azure,
    Aws,
    MultiCloud,
    OnPremises,
}

pub fn recommend_provider(requirements: &ProviderRequirements) -> QueueProvider {
    match requirements {
        // Azure Service Bus preferred
        ProviderRequirements {
            session_state: true,
            large_messages: true,
            existing_ecosystem: CloudProvider::Azure,
            ..
        } => QueueProvider::AzureServiceBus,

        // AWS SQS preferred
        ProviderRequirements {
            high_throughput: true,
            cost_optimization: true,
            existing_ecosystem: CloudProvider::Aws,
            ..
        } => QueueProvider::AwsSqs,

        // Default to in-memory for development
        _ => QueueProvider::InMemory,
    }
}
```

## Migration Considerations


### Azure to AWS Migration


**Configuration Mapping**:

- Service Bus sessions → SQS message groups
- Message properties → Message attributes
- Connection strings → Queue URLs and IAM roles
- Session state → External state store (DynamoDB, Redis)

**Behavioral Changes**:

- Session exclusive locking → Message group ordering
- Message peeking → Additional SQS operations
- Rich message metadata → Simplified attributes
- Transaction support → Application-level compensation

### AWS to Azure Migration


**Configuration Mapping**:

- Message groups → Service Bus sessions
- Queue URLs → Connection strings and queue names
- IAM roles → Managed identity and RBAC
- Visibility timeout → Lock duration

**Feature Enhancements**:

- Message group ordering → Session exclusive processing
- Simple attributes → Rich message properties
- Redrive policies → Native dead letter handling
- Long polling → Prefetch and concurrent processing

## Performance Optimization


### Provider-Specific Tuning


**Azure Service Bus**:

- Increase prefetch count for high-throughput scenarios
- Use Premium tier for consistent performance
- Configure appropriate max concurrent calls
- Enable connection pooling and multiplexing

**AWS SQS**:

- Use batch operations to reduce API calls
- Tune visibility timeout for processing patterns
- Enable long polling to reduce empty receives
- Consider FIFO vs Standard queues based on requirements

**In-Memory**:

- Configure appropriate queue sizes for memory usage
- Enable persistence for durable local testing
- Use controlled delays for realistic testing scenarios

### Cross-Provider Patterns


```rust
pub trait QueueOptimization {
    fn configure_for_throughput(&mut self) -> Result<(), ConfigError>;
    fn configure_for_latency(&mut self) -> Result<(), ConfigError>;
    fn configure_for_cost(&mut self) -> Result<(), ConfigError>;
}

impl QueueOptimization for AzureServiceBusConfig {
    fn configure_for_throughput(&mut self) -> Result<(), ConfigError> {
        self.prefetch_count = 50;
        self.max_concurrent_calls = 32;
        self.transport_type = TransportType::Amqp;
        Ok(())
    }

    fn configure_for_latency(&mut self) -> Result<(), ConfigError> {
        self.prefetch_count = 1;
        self.max_concurrent_calls = 1;
        self.transport_type = TransportType::AmqpWebSockets;
        Ok(())
    }
}

impl QueueOptimization for AwsSqsConfig {
    fn configure_for_throughput(&mut self) -> Result<(), ConfigError> {
        self.max_number_of_messages = 10;
        self.wait_time_seconds = Some(Duration::from_secs(20));
        self.visibility_timeout = Some(Duration::from_secs(60));
        Ok(())
    }

    fn configure_for_cost(&mut self) -> Result<(), ConfigError> {
        self.wait_time_seconds = Some(Duration::from_secs(20)); // Long polling
        self.message_retention_period = Duration::from_days(1); // Shorter retention
        Ok(())
    }
}
```

## Behavioral Assertions


The following assertions define expected provider-specific behaviors:

### Azure Service Bus Assertions


1. **Session Exclusivity**: Only one consumer can process messages from a session at a time
2. **Session Ordering**: Messages within a session MUST be processed in FIFO order
3. **Lock Renewal**: Session locks MUST be renewable before expiration
4. **Message Scheduling**: Scheduled messages MUST not be visible until scheduled time
5. **Peek Operations**: Peek MUST not affect message visibility or lock state

### AWS SQS Assertions


6. **Message Group Ordering**: Messages in the same group MUST be processed in order
7. **FIFO Deduplication**: Duplicate messages MUST be rejected within deduplication window
8. **Visibility Timeout**: Messages MUST be invisible during processing window
9. **Long Polling**: Empty receives MUST wait for configured duration
10. **Batch Efficiency**: Batch operations MUST be more efficient than individual operations

### Cross-Provider Assertions


11. **API Consistency**: All providers MUST implement the same QueueClient trait
12. **Error Mapping**: Provider-specific errors MUST map to consistent error types
13. **Configuration Validation**: Invalid configurations MUST be rejected at startup
14. **Graceful Degradation**: Provider failures MUST not cause application crashes
15. **Resource Cleanup**: All providers MUST properly cleanup connections and resources