Skip to main content

Module aws

Module aws 

Source
Expand description

AWS SQS provider implementation using HTTP REST API.

This module provides production-ready AWS SQS integration using direct HTTP calls instead of the AWS SDK. This approach enables proper unit testing with mocked HTTP responses, similar to the Azure provider implementation.

§Key Features

  • HTTP REST API: Direct calls to AWS SQS REST API endpoints
  • AWS Signature V4: Manual request signing for authentication
  • Standard queues: High-throughput scenarios (near-unlimited throughput)
  • FIFO queues: Strict message ordering (3000 msgs/sec with batching)
  • Session support: Via FIFO message groups
  • Dead letter queues: Native AWS SQS DLQ integration
  • Batch operations: Up to 10 messages per batch
  • Queue URL caching: Performance optimization
  • Test-friendly: Mock HTTP responses in unit tests

§Authentication

Implements AWS Signature Version 4 signing with automatic credential chain:

  1. Explicit Credentials: Access key and secret key from configuration
  2. Environment Variables: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN
  3. ECS Task Metadata: Via AWS_CONTAINER_CREDENTIALS_RELATIVE_URI (for ECS/Fargate)
  4. EC2 Instance Metadata: Via IMDSv2 at 169.254.169.254 (for EC2 instances)

Temporary credentials are automatically cached and refreshed before expiration.

§Queue Types

§Standard Queues

  • Near-unlimited throughput
  • At-least-once delivery
  • Best-effort ordering
  • Use for high-throughput scenarios

§FIFO Queues

  • Strict message ordering within message groups
  • Exactly-once processing with deduplication
  • Up to 3000 messages/second with batching
  • Requires .fifo suffix in queue name
  • Use for ordered processing requirements

§Session Support

AWS SQS emulates sessions via FIFO queue message groups:

  • SessionId maps to MessageGroupId
  • Messages in same group processed in order
  • Different groups can process concurrently
  • Standard queues do not support sessions

§Benefits of HTTP Approach

  1. Testable: Mock HTTP responses in unit tests
  2. Transparent: Full control over request/response handling
  3. Lightweight: No heavy SDK dependencies
  4. Consistent: Matches Azure provider pattern

§AWS Signature V4 Process

All requests are signed using AWS Signature Version 4:

  1. Canonical Request: Standardized request format

    • HTTP method (POST for all SQS operations)
    • Canonical URI (/)
    • Canonical query string (sorted parameters)
    • Canonical headers (host, x-amz-date)
    • Hashed payload (SHA-256)
  2. String to Sign: Combines algorithm, timestamp, scope, and hashed canonical request

  3. Signing Key Derivation: 4-level HMAC chain

    • kSecret = AWS secret access key
    • kDate = HMAC-SHA256(kSecret, date)
    • kRegion = HMAC-SHA256(kDate, region)
    • kService = HMAC-SHA256(kRegion, “sqs”)
    • kSigning = HMAC-SHA256(kService, “aws4_request”)
  4. Authorization Header: Includes access key, credential scope, signed headers, and signature

§Testing

The HTTP-based approach enables comprehensive unit testing:

use queue_runtime::providers::AwsSqsProvider;
use queue_runtime::AwsSqsConfig;

// Create provider with test credentials
let config = AwsSqsConfig {
    region: "us-east-1".to_string(),
    access_key_id: Some("AKIAIOSFODNN7EXAMPLE".to_string()),
    secret_access_key: Some("wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY".to_string()),
    use_fifo_queues: false,
};

let provider = AwsSqsProvider::new(config).await.unwrap();

// Operations will fail with test credentials (expected)
// This tests the logic without requiring real AWS infrastructure

For integration tests with LocalStack:

# Start LocalStack with SQS
docker run -d -p 4566:4566 -e SERVICES=sqs localstack/localstack

# Run integration tests
cargo test --package queue-runtime-integration-tests

§Example

§Using Explicit Credentials

use queue_runtime::{QueueClientFactory, QueueConfig, ProviderConfig, AwsSqsConfig};
use queue_runtime::{Message, QueueName};
use bytes::Bytes;

// Configure AWS SQS provider with explicit credentials
let config = QueueConfig {
    provider: ProviderConfig::AwsSqs(AwsSqsConfig {
        region: "us-east-1".to_string(),
        access_key_id: Some("your-access-key".to_string()),
        secret_access_key: Some("your-secret-key".to_string()),
        use_fifo_queues: true,
    }),
    ..Default::default()
};

// Create client
let client = QueueClientFactory::create_client(config).await?;

// Send a message
let queue = QueueName::new("my-queue".to_string())?;
let message = Message::new(Bytes::from("Hello, SQS!"));
let message_id = client.send_message(&queue, message).await?;

§Using IAM Roles (EC2/ECS)

use queue_runtime::{QueueClientFactory, QueueConfig, ProviderConfig, AwsSqsConfig};

// No explicit credentials needed - will use instance/task role
let config = QueueConfig {
    provider: ProviderConfig::AwsSqs(AwsSqsConfig {
        region: "us-east-1".to_string(),
        access_key_id: None,
        secret_access_key: None,
        use_fifo_queues: false,
    }),
    ..Default::default()
};

let client = QueueClientFactory::create_client(config).await?;
// Credentials will be fetched automatically from instance/task metadata

§Additional Examples

// Receive messages
use chrono::Duration;
if let Some(received) = client.receive_message(&queue, Duration::seconds(10)).await? {
    println!("Received: {:?}", received.body);

    // Complete the message
    client.complete_message(received.receipt_handle).await?;
}

§FIFO Queue Example

use queue_runtime::{Message, QueueName, SessionId};
use bytes::Bytes;

// FIFO queues require session IDs for ordering
let queue = QueueName::new("my-queue-fifo".to_string())?;  // Note: .fifo suffix
let session_id = SessionId::new("order-12345".to_string())?;

// Messages with same session ID are processed in order
let msg1 = Message::new(Bytes::from("First")).with_session_id(session_id.clone());
let msg2 = Message::new(Bytes::from("Second")).with_session_id(session_id.clone());

client.send_message(&queue, msg1).await?;
client.send_message(&queue, msg2).await?;

Structs§

AwsSessionProvider
AWS SQS session provider for ordered message processing via FIFO queues
AwsSqsProvider
AWS SQS queue provider implementation

Enums§

AwsError
AWS SQS specific errors