queue-runtime 0.1.0

Multi-provider queue runtime for Queue-Keeper
Documentation

queue-runtime

CI codecov Crates.io Documentation License Rust Version

A provider-agnostic queue abstraction for Rust, enabling reliable message processing across cloud platforms with session-based ordering, automatic retries, and dead letter queue handling.

Overview

queue-runtime provides a unified API for working with cloud message queues, currently supporting Azure Service Bus with AWS SQS support planned. Built with a hexagonal architecture, it allows applications to switch between providers without code changes while maintaining consistent behavior for session management, error handling, and message processing.

Designed for: GitHub bot applications and webhook processors that need reliable, ordered event processing with flexible deployment options.

Why queue-runtime?

  • Write Once, Deploy Anywhere: Single codebase works with Azure Service Bus or AWS SQS
  • Session-Based Ordering: Process related messages in order (e.g., all events for one pull request)
  • Resilient by Default: Automatic retries with exponential backoff, circuit breakers, and dead letter queues
  • Type-Safe: Compile-time guarantees for message handling with serde serialization
  • Production-Ready: Built-in observability with tracing and metrics, comprehensive error handling

Architecture

queue-runtime uses hexagonal architecture (ports and adapters):

  • Core Business Logic: Provider-agnostic queue operations, session management, retry logic
  • Port Interfaces: Abstract traits defining queue and session operations
  • Provider Adapters: Azure Service Bus implementation (AWS SQS coming soon)
  • In-Memory Provider: For testing without external dependencies

This design ensures your business logic never depends directly on cloud provider SDKs, making testing easier and provider migration seamless.

Supported Platforms

  • Rust: 1.90 or later
  • Cloud Providers:
    • ✅ Azure Service Bus (native session support)
    • 🚧 AWS SQS (planned - emulated sessions)
    • ✅ In-Memory (for testing)
  • Operating Systems: Linux, Windows, macOS (x86_64 and ARM64)
  • Async Runtime: Tokio 1.x

Features

  • Provider Agnostic - Unified QueueClient API works identically across Azure and AWS
  • Session Management - Ordered FIFO processing with native or emulated session support
  • Retry Logic - Configurable exponential backoff with jitter and circuit breakers
  • Dead Letter Queues - Automatic DLQ routing for poison messages and exceeded retries
  • Type Safe - Strongly-typed message handling with serde serialization/deserialization
  • Observable - Integrated structured logging with tracing and metrics collection
  • Secure - Credential management through Azure Identity SDK with managed identity support
  • Testable - In-memory provider for unit tests with contract tests ensuring consistency

Installation

Add to your Cargo.toml:

[dependencies]

queue-runtime = "0.1.0"

Quick Start

Basic Message Sending and Receiving

use queue_runtime::{QueueClientFactory, QueueConfig, ProviderConfig, InMemoryConfig};
use queue_runtime::{Message, QueueName};
use bytes::Bytes;
use chrono::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create client with in-memory provider
    let config = QueueConfig {
        provider: ProviderConfig::InMemory(InMemoryConfig::default()),
        ..Default::default()
    };
    let client = QueueClientFactory::create_client(config).await?;

    // Create queue name
    let queue = QueueName::new("my-queue".to_string())?;

    // Send a message
    let message = Message::new(Bytes::from("Hello, Queue!"));
    let message_id = client.send_message(&queue, message).await?;
    println!("Sent message: {}", message_id.as_str());

    // Receive a message
    let timeout = Duration::seconds(30);
    if let Some(received) = client.receive_message(&queue, timeout).await? {
        println!("Received: {:?}", String::from_utf8(received.body.to_vec()));

        // Mark as completed
        client.complete_message(received.receipt_handle).await?;
    }

    Ok(())
}

Session-Based Ordered Processing

use queue_runtime::{QueueClientFactory, QueueConfig, Message, QueueName, SessionId};
use bytes::Bytes;
use chrono::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = QueueClientFactory::create_test_client();
    let queue = QueueName::new("orders-queue".to_string())?;

    // Send messages with session ID for ordering
    let session_id = SessionId::new("order-12345".to_string())?;
    for i in 1..=5 {
        let mut message = Message::new(Bytes::from(format!("Order step {}", i)));
        message.session_id = Some(session_id.clone());
        client.send_message(&queue, message).await?;
    }

    // Accept session for ordered processing
    let session = client.accept_session(&queue, Some(session_id.clone())).await?;

    // Process messages in order
    while let Some(msg) = session.receive_message(Duration::seconds(5)).await? {
        println!("Processing: {:?}", String::from_utf8(msg.body.to_vec()));
        session.complete_message(msg.receipt_handle).await?;
    }

    Ok(())
}

Azure Service Bus Example

use queue_runtime::{QueueClientFactory, QueueConfig, ProviderConfig, AzureServiceBusConfig};
use queue_runtime::{Message, QueueName};
use bytes::Bytes;
use chrono::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Configure Azure Service Bus
    let azure_config = AzureServiceBusConfig {
        connection_string: Some(std::env::var("AZURE_SERVICEBUS_CONNECTION_STRING")?),
        namespace: None,
        auth_method: queue_runtime::providers::AzureAuthMethod::ConnectionString,
        use_sessions: true,
        session_timeout: Duration::minutes(5),
    };

    let config = QueueConfig {
        provider: ProviderConfig::AzureServiceBus(azure_config),
        default_timeout: Duration::seconds(30),
        max_retry_attempts: 3,
        retry_base_delay: Duration::seconds(2),
        enable_dead_letter: true,
    };

    let client = QueueClientFactory::create_client(config).await?;
    let queue = QueueName::new("production-queue".to_string())?;

    // Send message with custom attributes
    let mut message = Message::new(Bytes::from(r#"{"event": "webhook"}"#));
    message.attributes.insert("source".to_string(), "github".to_string());
    message.attributes.insert("event_type".to_string(), "pull_request".to_string());

    client.send_message(&queue, message).await?;

    Ok(())
}

Error Handling and Dead Letter Queues

use queue_runtime::{QueueClientFactory, QueueError, Message, QueueName};
use bytes::Bytes;
use chrono::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = QueueClientFactory::create_test_client();
    let queue = QueueName::new("processing-queue".to_string())?;

    // Receive and process with error handling
    let timeout = Duration::seconds(30);
    if let Some(received) = client.receive_message(&queue, timeout).await? {
        match process_message(&received.body).await {
            Ok(_) => {
                // Success - complete the message
                client.complete_message(received.receipt_handle).await?;
            }
            Err(e) if is_retryable(&e) => {
                // Transient error - abandon for retry
                println!("Transient error, will retry: {}", e);
                client.abandon_message(received.receipt_handle).await?;
            }
            Err(e) => {
                // Permanent error - send to dead letter queue
                println!("Permanent error, moving to DLQ: {}", e);
                client.dead_letter_message(
                    received.receipt_handle,
                    format!("Processing failed: {}", e)
                ).await?;
            }
        }
    }

    Ok(())
}

async fn process_message(body: &[u8]) -> Result<(), Box<dyn std::error::Error>> {
    // Your processing logic here
    Ok(())
}

fn is_retryable(error: &Box<dyn std::error::Error>) -> bool {
    // Determine if error is transient
    error.to_string().contains("timeout") || error.to_string().contains("unavailable")
}

Documentation

API Reference

Complete API documentation is available at docs.rs/queue-runtime.

Core Types

Configuration Types

Error Types

Specifications

Module Documentation

Configuration

Azure Service Bus Configuration

Connection String Authentication

The simplest way to connect to Azure Service Bus is using a connection string:

use queue_runtime::{QueueConfig, ProviderConfig, AzureServiceBusConfig};
use queue_runtime::providers::AzureAuthMethod;
use chrono::Duration;

let config = QueueConfig {
    provider: ProviderConfig::AzureServiceBus(AzureServiceBusConfig {
        connection_string: Some("Endpoint=sb://your-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=YOUR_KEY".to_string()),
        namespace: None,
        auth_method: AzureAuthMethod::ConnectionString,
        use_sessions: true,
        session_timeout: Duration::minutes(5),
    }),
    default_timeout: Duration::seconds(30),
    max_retry_attempts: 3,
    retry_base_delay: Duration::seconds(2),
    enable_dead_letter: true,
};

Environment Variable Setup:

export AZURE_SERVICEBUS_CONNECTION_STRING="Endpoint=sb://...;SharedAccessKey=..."

Then in your code:

let connection_string = std::env::var("AZURE_SERVICEBUS_CONNECTION_STRING")?;
let azure_config = AzureServiceBusConfig {
    connection_string: Some(connection_string),
    namespace: None,
    auth_method: AzureAuthMethod::ConnectionString,
    use_sessions: true,
    session_timeout: Duration::minutes(5),
};

Managed Identity Authentication (Recommended for Production)

For production deployments in Azure, use Managed Identity for passwordless authentication:

use queue_runtime::{AzureServiceBusConfig, AzureAuthMethod};
use chrono::Duration;

let azure_config = AzureServiceBusConfig {
    connection_string: None,
    namespace: Some("your-namespace".to_string()),
    auth_method: AzureAuthMethod::ManagedIdentity,
    use_sessions: true,
    session_timeout: Duration::minutes(5),
};

Environment Variable Setup:

# Only namespace required for managed identity

export AZURE_SERVICEBUS_NAMESPACE="your-namespace"

Service Principal Authentication

For development or CI/CD pipelines, use a service principal:

use queue_runtime::{AzureServiceBusConfig, AzureAuthMethod};
use chrono::Duration;

let azure_config = AzureServiceBusConfig {
    connection_string: None,
    namespace: Some(std::env::var("AZURE_SERVICEBUS_NAMESPACE")?),
    auth_method: AzureAuthMethod::ClientSecret {
        tenant_id: std::env::var("AZURE_TENANT_ID")?,
        client_id: std::env::var("AZURE_CLIENT_ID")?,
        client_secret: std::env::var("AZURE_CLIENT_SECRET")?,
    },
    use_sessions: true,
    session_timeout: Duration::minutes(5),
};

Environment Variable Setup:

export AZURE_TENANT_ID="your-tenant-id"

export AZURE_CLIENT_ID="your-client-id"

export AZURE_CLIENT_SECRET="your-client-secret"

export AZURE_SERVICEBUS_NAMESPACE="your-namespace"

AWS SQS Configuration (Planned)

AWS SQS support is planned for a future release. The configuration will support:

use queue_runtime::{QueueConfig, ProviderConfig, AwsSqsConfig};

let config = QueueConfig {
    provider: ProviderConfig::AwsSqs(AwsSqsConfig {
        region: "us-east-1".to_string(),
        access_key_id: Some(std::env::var("AWS_ACCESS_KEY_ID").ok()),
        secret_access_key: Some(std::env::var("AWS_SECRET_ACCESS_KEY").ok()),
        use_fifo_queues: true,
    }),
    default_timeout: Duration::seconds(30),
    max_retry_attempts: 3,
    retry_base_delay: Duration::seconds(2),
    enable_dead_letter: true,
};

In-Memory Provider Configuration

For testing and development, use the in-memory provider:

use queue_runtime::{QueueConfig, ProviderConfig, InMemoryConfig};
use chrono::Duration;

let config = QueueConfig {
    provider: ProviderConfig::InMemory(InMemoryConfig {
        max_queue_size: 10000,
        enable_persistence: false,
        max_delivery_count: 3,
        default_message_ttl: Some(Duration::hours(24)),
        enable_dead_letter_queue: true,
        session_lock_duration: Duration::minutes(5),
    }),
    default_timeout: Duration::seconds(30),
    max_retry_attempts: 3,
    retry_base_delay: Duration::seconds(1),
    enable_dead_letter: true,
};

Or use the test factory for quick setup:

let client = QueueClientFactory::create_test_client();

Retry Configuration

Configure automatic retry behavior for transient failures:

let config = QueueConfig {
    provider: /* your provider config */,
    default_timeout: Duration::seconds(30),
    max_retry_attempts: 5,              // Retry up to 5 times
    retry_base_delay: Duration::seconds(2),  // Start with 2s delay
    enable_dead_letter: true,
};

The retry mechanism uses exponential backoff with jitter:

  • First retry: ~2 seconds
  • Second retry: ~4 seconds
  • Third retry: ~8 seconds
  • And so on...

Messages that exceed max_retry_attempts are automatically moved to the dead letter queue if enable_dead_letter is true.

Dead Letter Queue Configuration

Dead letter queues (DLQ) capture messages that cannot be processed successfully:

let config = QueueConfig {
    provider: /* your provider config */,
    enable_dead_letter: true,  // Enable automatic DLQ routing
    max_retry_attempts: 3,     // Move to DLQ after 3 failed attempts
    /* other settings */
};

Manually sending to DLQ:

if let Some(msg) = client.receive_message(&queue, timeout).await? {
    match process_message(&msg).await {
        Ok(_) => client.complete_message(msg.receipt_handle).await?,
        Err(e) => {
            // Send to DLQ with error reason
            client.dead_letter_message(
                msg.receipt_handle,
                format!("Processing failed: {}", e)
            ).await?;
        }
    }
}

Session Configuration

Configure session-based ordered processing:

// Azure Service Bus (native session support)
let azure_config = AzureServiceBusConfig {
    connection_string: Some(connection_string),
    namespace: None,
    auth_method: AzureAuthMethod::ConnectionString,
    use_sessions: true,               // Enable session support
    session_timeout: Duration::minutes(5),  // Session lock timeout
};

// In-memory provider (for testing)
let memory_config = InMemoryConfig {
    session_lock_duration: Duration::minutes(5),  // Session lock timeout
    ..Default::default()
};

Environment-Based Configuration

Create environment-specific configurations:

use queue_runtime::*;

pub fn create_queue_config() -> Result<QueueConfig, Box<dyn std::error::Error>> {
    let env = std::env::var("ENVIRONMENT").unwrap_or_else(|_| "development".to_string());

    match env.as_str() {
        "production" => Ok(QueueConfig {
            provider: ProviderConfig::AzureServiceBus(AzureServiceBusConfig {
                connection_string: None,
                namespace: Some(std::env::var("AZURE_SERVICEBUS_NAMESPACE")?),
                auth_method: AzureAuthMethod::ManagedIdentity,
                use_sessions: true,
                session_timeout: Duration::minutes(5),
            }),
            default_timeout: Duration::seconds(60),
            max_retry_attempts: 5,
            retry_base_delay: Duration::seconds(2),
            enable_dead_letter: true,
        }),

        "staging" => Ok(QueueConfig {
            provider: ProviderConfig::AzureServiceBus(AzureServiceBusConfig {
                connection_string: Some(std::env::var("AZURE_SERVICEBUS_CONNECTION_STRING")?),
                namespace: None,
                auth_method: AzureAuthMethod::ConnectionString,
                use_sessions: true,
                session_timeout: Duration::minutes(5),
            }),
            default_timeout: Duration::seconds(30),
            max_retry_attempts: 3,
            retry_base_delay: Duration::seconds(2),
            enable_dead_letter: true,
        }),

        _ => Ok(QueueConfig {
            provider: ProviderConfig::InMemory(InMemoryConfig::default()),
            default_timeout: Duration::seconds(10),
            max_retry_attempts: 2,
            retry_base_delay: Duration::seconds(1),
            enable_dead_letter: false,
        }),
    }
}

Configuration Best Practices

  1. Use Managed Identity in Production: Avoid storing connection strings in code or environment variables for production deployments on Azure.

  2. Set Appropriate Timeouts: Match default_timeout to your expected message processing time. Too short causes unnecessary retries; too long delays error detection.

  3. Configure Retry Limits: Set max_retry_attempts based on your failure rate tolerance. More retries = higher success rate but longer delays.

  4. Enable Dead Letter Queues: Always enable DLQ in production to prevent message loss and allow debugging failed messages.

  5. Session Timeout Balance: Set session_timeout long enough for message processing but short enough to recover from failures quickly (typically 5-10 minutes).

  6. Environment-Specific Settings: Use different retry and timeout values for development, staging, and production environments.

Examples

See the examples/ directory for complete working examples.

Contributing

Contributions are welcome! Please read CONTRIBUTING.md for guidelines.

License

Licensed under the Apache License, Version 2.0. See LICENSE for details.