queue-runtime 0.2.0

Multi-provider queue runtime for Queue-Keeper
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
//! Tests for AWS SQS provider HTTP implementation.
//!
//! These tests verify the HTTP-based AWS SQS provider implementation without
//! requiring real AWS infrastructure. They follow the Azure provider test pattern:
//! - Provider construction with test credentials
//! - Unit tests for signature generation and XML parsing
//! - Operation tests expect authentication errors with test credentials
//!
//! For integration tests with LocalStack, see the integration test suite.

use super::*;
use crate::client::QueueProvider;
use crate::message::{Message, QueueName, SessionId};
use crate::provider::{AwsSqsConfig, ProviderType, SessionSupport};
use bytes::Bytes;
use chrono::Duration;

// ============================================================================
// Test Helper Functions
// ============================================================================

/// Helper to create a test provider with test AWS credentials
///
/// Uses well-known test credentials that will authenticate locally but fail
/// with real AWS API calls. This allows testing provider logic without infrastructure.
fn create_test_provider_config(use_fifo: bool) -> AwsSqsConfig {
    AwsSqsConfig {
        region: "us-east-1".to_string(),
        access_key_id: Some("AKIAIOSFODNN7EXAMPLE".to_string()),
        secret_access_key: Some("wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY".to_string()),
        use_fifo_queues: use_fifo,
    }
}

/// Helper to create a test message
fn create_test_message(body: &str) -> Message {
    Message::new(Bytes::from(body.to_string()))
}

/// Helper to create a test message with session ID
fn create_test_message_with_session(body: &str, session_id: SessionId) -> Message {
    Message::new(Bytes::from(body.to_string())).with_session_id(session_id)
}

// ============================================================================
// Configuration Tests
// ============================================================================

mod configuration_tests {
    use super::*;

    /// Verify provider creation succeeds with test credentials
    #[tokio::test]
    async fn test_provider_creation_with_credentials() {
        let config = create_test_provider_config(false);
        let result = AwsSqsProvider::new(config).await;

        assert!(
            result.is_ok(),
            "Provider creation should succeed with test credentials"
        );
        let provider = result.unwrap();
        assert_eq!(provider.provider_type(), ProviderType::AwsSqs);
    }

    /// Verify provider creation succeeds without credentials (IAM role)
    #[tokio::test]
    async fn test_provider_creation_without_credentials() {
        let config = AwsSqsConfig {
            region: "us-east-1".to_string(),
            access_key_id: None,
            secret_access_key: None,
            use_fifo_queues: false,
        };

        let result = AwsSqsProvider::new(config).await;
        assert!(
            result.is_ok(),
            "Provider creation should succeed without credentials (IAM role)"
        );
    }

    /// Verify FIFO queue configuration
    #[tokio::test]
    async fn test_fifo_queue_configuration() {
        let config = create_test_provider_config(true);
        let result = AwsSqsProvider::new(config).await;

        assert!(result.is_ok());
        let provider = result.unwrap();
        assert_eq!(provider.supports_sessions(), SessionSupport::Emulated);
    }

    /// Verify provider reports correct capabilities
    #[tokio::test]
    async fn test_provider_capabilities() {
        let config = create_test_provider_config(false);
        let provider = AwsSqsProvider::new(config).await.unwrap();

        assert_eq!(provider.provider_type(), ProviderType::AwsSqs);
        assert!(provider.supports_batching(), "AWS SQS supports batching");
        assert_eq!(provider.max_batch_size(), 10, "AWS SQS max batch is 10");
        assert_eq!(provider.supports_sessions(), SessionSupport::Emulated);
    }
}

// ============================================================================
// AWS Signature V4 Tests
// ============================================================================

mod signature_tests {
    use super::*;

    /// Verify signature generation with known test values
    #[tokio::test]
    async fn test_signature_generation() {
        let config = create_test_provider_config(false);
        let provider = AwsSqsProvider::new(config).await.unwrap();

        // Test that provider initialization succeeds
        // Credentials will be fetched when making requests
        assert_eq!(provider.provider_type(), ProviderType::AwsSqs);
    }
}

// ============================================================================
// XML Parsing Tests
// ============================================================================

mod xml_parsing_tests {
    use super::*;

    /// Verify QueueUrl XML parsing
    #[tokio::test]
    async fn test_parse_queue_url_response() {
        let xml = r#"
            <GetQueueUrlResponse>
                <GetQueueUrlResult>
                    <QueueUrl>https://sqs.us-east-1.amazonaws.com/123456789012/test-queue</QueueUrl>
                </GetQueueUrlResult>
            </GetQueueUrlResponse>
        "#;

        let config = create_test_provider_config(false);
        let provider = AwsSqsProvider::new(config).await.unwrap();

        let result = provider.parse_queue_url_response(xml);
        assert!(result.is_ok(), "QueueUrl parsing should succeed");
        assert!(result.unwrap().contains("test-queue"));
    }

    /// Verify SendMessage response parsing
    #[tokio::test]
    async fn test_parse_send_message_response() {
        let xml = r#"
            <SendMessageResponse>
                <SendMessageResult>
                    <MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
                </SendMessageResult>
            </SendMessageResponse>
        "#;

        let config = create_test_provider_config(false);
        let provider = AwsSqsProvider::new(config).await.unwrap();

        let result = provider.parse_send_message_response(xml);
        assert!(
            result.is_ok(),
            "SendMessage response parsing should succeed"
        );
    }

    /// Verify error response parsing
    #[tokio::test]
    async fn test_parse_error_response() {
        let xml = r#"
            <ErrorResponse>
                <Error>
                    <Type>Sender</Type>
                    <Code>InvalidParameterValue</Code>
                    <Message>Invalid queue name</Message>
                </Error>
            </ErrorResponse>
        "#;

        let config = create_test_provider_config(false);
        let provider = AwsSqsProvider::new(config).await.unwrap();

        let result = provider.parse_error_response(xml, 400);
        assert!(
            matches!(result, AwsError::ServiceError(_)),
            "Error response should be parsed as AwsError"
        );
    }
}

// ============================================================================
// Error Handling Tests
// ============================================================================

mod error_handling_tests {
    use super::*;

    /// Verify error classification for retry logic
    #[test]
    fn test_error_transient_classification() {
        let network_error = AwsError::NetworkError("Connection timeout".to_string());
        assert!(
            network_error.is_transient(),
            "Network errors should be transient"
        );

        let service_error = AwsError::ServiceError("Internal error".to_string());
        assert!(
            service_error.is_transient(),
            "Service errors should be transient"
        );

        let auth_error = AwsError::Authentication("Invalid credentials".to_string());
        assert!(
            !auth_error.is_transient(),
            "Auth errors should not be transient"
        );
    }

    /// Verify error mapping to QueueError
    #[test]
    fn test_error_to_queue_error_mapping() {
        let aws_error = AwsError::InvalidReceipt("bad-handle".to_string());
        let queue_error = aws_error.to_queue_error();

        assert!(matches!(
            queue_error,
            crate::error::QueueError::InvalidReceipt { .. }
        ));
    }
}

// ============================================================================
// Operation Tests (expect errors with test credentials)
// ============================================================================

mod operation_tests {
    use super::*;

    /// Test send_message with test credentials (expects auth error)
    #[tokio::test]
    async fn test_send_message_with_test_credentials() {
        let config = create_test_provider_config(false);
        let provider = AwsSqsProvider::new(config).await.unwrap();

        let queue = QueueName::new("test-queue".to_string()).unwrap();
        let message = create_test_message("test body");

        // Should fail with auth error using test credentials
        let result = provider.send_message(&queue, &message).await;
        assert!(result.is_err(), "Should fail with test credentials");
    }

    /// Test receive_message with test credentials (expects auth error)
    #[tokio::test]
    async fn test_receive_message_with_test_credentials() {
        let config = create_test_provider_config(false);
        let provider = AwsSqsProvider::new(config).await.unwrap();

        let queue = QueueName::new("test-queue".to_string()).unwrap();

        // Should fail with auth error using test credentials
        let result = provider.receive_message(&queue, Duration::seconds(1)).await;
        assert!(result.is_err(), "Should fail with test credentials");
    }

    /// Test complete_message with invalid receipt
    #[tokio::test]
    async fn test_complete_message_invalid_receipt() {
        let config = create_test_provider_config(false);
        let provider = AwsSqsProvider::new(config).await.unwrap();

        use crate::message::ReceiptHandle;
        let receipt = ReceiptHandle::new(
            "invalid".to_string(),
            crate::message::Timestamp::now(),
            ProviderType::AwsSqs,
        );

        let result = provider.complete_message(&receipt).await;
        assert!(result.is_err(), "Should fail with invalid receipt format");
    }

    /// Test abandon_message with invalid receipt
    #[tokio::test]
    async fn test_abandon_message_invalid_receipt() {
        let config = create_test_provider_config(false);
        let provider = AwsSqsProvider::new(config).await.unwrap();

        use crate::message::ReceiptHandle;
        let receipt = ReceiptHandle::new(
            "invalid".to_string(),
            crate::message::Timestamp::now(),
            ProviderType::AwsSqs,
        );

        let result = provider.abandon_message(&receipt).await;
        assert!(result.is_err(), "Should fail with invalid receipt format");
    }

    /// Test FIFO queue session support
    #[tokio::test]
    async fn test_fifo_queue_session_support() {
        let config = create_test_provider_config(true);
        let provider = AwsSqsProvider::new(config).await.unwrap();

        let queue = QueueName::new("test-queue-fifo".to_string()).unwrap();
        let session_id = SessionId::new("session-1".to_string()).unwrap();

        // Should fail because queue name doesn't end with .fifo
        let result = provider
            .create_session_client(&queue, Some(session_id))
            .await;
        assert!(
            result.is_err(),
            "Standard queue should reject session requests"
        );
    }

    /// Test standard queue rejects session requests
    #[tokio::test]
    async fn test_standard_queue_rejects_sessions() {
        let config = create_test_provider_config(false);
        let provider = AwsSqsProvider::new(config).await.unwrap();

        let queue = QueueName::new("test-queue".to_string()).unwrap();
        let session_id = SessionId::new("session-1".to_string()).unwrap();

        let result = provider
            .create_session_client(&queue, Some(session_id))
            .await;
        assert!(
            result.is_err(),
            "Standard queue should reject session requests"
        );
    }
}

// ============================================================================
// Receipt Handle Format Tests
// ============================================================================

mod receipt_handle_tests {

    /// Verify receipt handle encoding includes queue name
    #[test]
    fn test_receipt_handle_format() {
        // Receipt handles should be encoded as "{queue_name}|{receipt_token}"
        let handle = "test-queue|AQEBwJxS8...token...";
        let parts: Vec<&str> = handle.split('|').collect();

        assert_eq!(parts.len(), 2, "Receipt handle should have queue and token");
        assert_eq!(parts[0], "test-queue");
    }
}

// ============================================================================
// FIFO Queue Tests
// ============================================================================

mod fifo_tests {
    use super::*;

    /// Verify FIFO queue detection
    #[test]
    fn test_fifo_queue_detection() {
        // Note: QueueName validation doesn't allow dots, so we test with hyphens
        // In real AWS, FIFO queues end with .fifo suffix
        // This tests the logic even though validation prevents actual .fifo names
        let fifo_name = "test-fifo";
        let standard_name = "test-queue";

        // Test the is_fifo_queue logic with valid names
        assert!(
            !AwsSqsProvider::is_fifo_queue(&QueueName::new(fifo_name.to_string()).unwrap()),
            "Queue name without .fifo suffix should not be detected as FIFO"
        );

        assert!(
            !AwsSqsProvider::is_fifo_queue(&QueueName::new(standard_name.to_string()).unwrap()),
            "Standard queue should not be detected as FIFO"
        );
    }
}

// ============================================================================
// Session Message Tests
// ============================================================================

mod session_message_tests {
    use super::*;

    /// Verify a message with a session ID has that ID set on the envelope.
    #[test]
    fn test_session_message_carries_session_id() {
        let session_id = SessionId::new("order-42".to_string()).unwrap();
        let msg = create_test_message_with_session("payload", session_id.clone());

        assert_eq!(
            msg.session_id.as_ref(),
            Some(&session_id),
            "Message should carry the provided session ID"
        );
    }

    /// Messages without a session ID have `None` as the session field.
    #[test]
    fn test_regular_message_has_no_session_id() {
        let msg = create_test_message("payload");

        assert!(
            msg.session_id.is_none(),
            "Regular message should have no session ID"
        );
    }

    /// Sending a session-tagged message to a standard (non-FIFO) queue returns an error.
    #[tokio::test]
    async fn test_send_session_message_to_standard_queue_fails() {
        let config = create_test_provider_config(false); // standard (non-FIFO) queue
        let provider = AwsSqsProvider::new(config).await.unwrap();
        let queue = QueueName::new("test-queue".to_string()).unwrap();

        let session_id = SessionId::new("order-42".to_string()).unwrap();
        let msg = create_test_message_with_session("payload", session_id);

        let result = provider.send_message(&queue, &msg).await;

        assert!(
            result.is_err(),
            "Sending a session message to a standard queue should fail"
        );
    }
}

// ============================================================================
// Security Tests — credential redaction in Debug output
// ============================================================================

mod security_tests {
    use super::*;

    /// `AwsSqsConfig` must not expose `secret_access_key` through its Debug impl.
    ///
    /// `secret_access_key` is marked `#[serde(skip)]` to prevent serialisation
    /// exposure, but the derived `Debug` impl will still print the value unless
    /// the type overrides `fmt::Debug` manually.  This test enforces that
    /// requirement.
    #[test]
    fn aws_config_debug_does_not_expose_secret_key() {
        let config = AwsSqsConfig {
            region: "us-east-1".to_string(),
            access_key_id: Some("AKIAIOSFODNN7EXAMPLE".to_string()),
            secret_access_key: Some("wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY".to_string()),
            use_fifo_queues: false,
        };

        let debug_output = format!("{:?}", config);

        assert!(
            !debug_output.contains("wJalrXUtnFEMI"),
            "Debug output must not contain the raw secret key value. \
             Add a custom Debug impl that emits \"<REDACTED>\" for secret_access_key. \
             Got: {}",
            debug_output
        );
    }
}