Crate rust_rabbit

Crate rust_rabbit 

Source
Expand description

§RustRabbit 🐰

A high-performance, production-ready RabbitMQ client library for Rust with zero-configuration simplicity and enterprise-grade features. Built for reliability, observability, and developer happiness.

§Features

  • 🚀 Smart Automation: One-line setup with RetryPolicy::fast() configures everything
  • 🔄 Advanced Retry System: Multiple presets, exponential backoff, dead letter integration
  • 🏗️ Enterprise Patterns: Request-Response, Saga, Event Sourcing, Priority Queues
  • 🔍 Production Observability: Prometheus metrics, health monitoring, circuit breaker
  • 🛡️ Reliability: Connection pooling, graceful shutdown, error recovery

§Quick Start

use rust_rabbit::{
    config::RabbitConfig,
    connection::ConnectionManager,
    consumer::{Consumer, ConsumerOptions},
    retry::RetryPolicy,
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 1. Connection
    let config = RabbitConfig::builder()
        .connection_string("amqp://user:pass@localhost:5672/vhost")
        .build();
    let connection = ConnectionManager::new(config).await?;

    // 2. Consumer with retry (1 line!)
    let options = ConsumerOptions {
        auto_ack: false,
        retry_policy: Some(RetryPolicy::fast()),
        ..Default::default()
    };

    // 3. Create consumer (ready to use)
    let _consumer = Consumer::new(connection, options).await?;
     
    // Consumer is ready! See examples/ for usage patterns
    Ok(())
}

What RetryPolicy::fast() creates automatically:

  • 5 retries: 200ms → 300ms → 450ms → 675ms → 1s (capped at 10s)
  • Dead Letter Queue: Automatic DLX/DLQ setup for failed messages
  • Backoff + Jitter: Intelligent delay with randomization
  • Production Ready: Optimal settings for most use cases

§Retry Patterns

use rust_rabbit::retry::RetryPolicy;
use std::time::Duration;

// Quick presets for common scenarios
let fast = RetryPolicy::fast();               // 5 retries, 200ms→10s, 1.5x backoff
let slow = RetryPolicy::slow();               // 3 retries, 1s→1min, 2.0x backoff
let linear = RetryPolicy::linear(Duration::from_millis(500), 3); // Fixed 500ms intervals

// Custom with builder
let custom = RetryPolicy::builder()
    .max_retries(5)
    .initial_delay(Duration::from_millis(100))
    .backoff_multiplier(2.0)
    .jitter(0.1)
    .dead_letter_exchange("my.dlx")
    .build();

§Advanced Patterns

§Request-Response (RPC)

use rust_rabbit::patterns::request_response::*;
use std::time::Duration;
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Simple example - actual usage requires proper message types
    let client = RequestResponseClient::new(Duration::from_secs(30));
     
    // In real usage, you would send actual request messages
    // let response = client.send_request("queue", request_data, None).await?;
    Ok(())
}

§Event Sourcing (CQRS)

use rust_rabbit::patterns::event_sourcing::*;
use std::sync::Arc;

async fn example() -> Result<(), Box<dyn std::error::Error>> {
    let event_store = Arc::new(InMemoryEventStore::new());
     
    // Example - actual usage requires implementing AggregateRoot trait
    // let repository = EventSourcingRepository::<MyAggregate>::new(event_store);
    Ok(())
}

§Production Features

§Health Monitoring

use rust_rabbit::{health::HealthChecker, connection::ConnectionManager, config::RabbitConfig};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = RabbitConfig::builder()
        .connection_string("amqp://localhost:5672")
        .build();
    let connection_manager = ConnectionManager::new(config).await?;
    let health_checker = HealthChecker::new(connection_manager.clone());
     
    match health_checker.check_health().await {
        Ok(status) => println!("Connection healthy: {:?}", status),
        Err(e) => println!("Connection issues: {}", e),
    }
    Ok(())
}

§Prometheus Metrics

use rust_rabbit::metrics::RustRabbitMetrics;

let metrics = RustRabbitMetrics::new();
// Metrics automatically collected:
// - rust_rabbit_messages_published_total
// - rust_rabbit_messages_consumed_total  
// - rust_rabbit_message_processing_duration_seconds
// - rust_rabbit_connection_health

Re-exports§

pub use batching::BatchConfig;
pub use batching::BatchConfigBuilder;
pub use batching::MessageBatcher;
pub use circuit_breaker::CircuitBreaker;
pub use circuit_breaker::CircuitBreakerConfig;
pub use circuit_breaker::CircuitBreakerStats;
pub use circuit_breaker::CircuitState;
pub use config::HealthCheckConfig;
pub use config::HealthCheckConfigBuilder;
pub use config::PoolConfig;
pub use config::PoolConfigBuilder;
pub use config::RabbitConfig;
pub use config::RabbitConfigBuilder;
pub use config::RetryConfig;
pub use config::RetryConfigBuilder;
pub use connection::Connection;
pub use connection::ConnectionManager;
pub use connection::ConnectionStats;
pub use consumer::Consumer;
pub use consumer::ConsumerOptions;
pub use consumer::ConsumerOptionsBuilder;
pub use consumer::MessageHandler;
pub use error::RabbitError;
pub use error::Result;
pub use error::RustRabbitError;
pub use health::ConnectionStatus;
pub use health::HealthChecker;
pub use metrics::MetricsTimer;
pub use metrics::RustRabbitMetrics;
pub use patterns::deduplication::ContentHash;
pub use patterns::deduplication::DeduplicatedMessage;
pub use patterns::deduplication::DeduplicationConfig;
pub use patterns::deduplication::DeduplicationManager;
pub use patterns::deduplication::DeduplicationResult;
pub use patterns::deduplication::DeduplicationStrategy;
pub use patterns::deduplication::DuplicateInfo;
pub use patterns::deduplication::MessageId;
pub use patterns::event_sourcing::AggregateId;
pub use patterns::event_sourcing::AggregateRoot;
pub use patterns::event_sourcing::AggregateSnapshot;
pub use patterns::event_sourcing::DomainEvent;
pub use patterns::event_sourcing::EventReplayService;
pub use patterns::event_sourcing::EventSequence;
pub use patterns::event_sourcing::EventSourcingRepository;
pub use patterns::event_sourcing::EventStore;
pub use patterns::event_sourcing::InMemoryEventStore;
pub use patterns::priority::Priority;
pub use patterns::priority::PriorityConsumer;
pub use patterns::priority::PriorityMessage;
pub use patterns::priority::PriorityQueue;
pub use patterns::priority::PriorityQueueConfig;
pub use patterns::priority::PriorityRouter;
pub use patterns::request_response::CorrelationId;
pub use patterns::request_response::RequestHandler;
pub use patterns::request_response::RequestMessage;
pub use patterns::request_response::RequestResponseClient;
pub use patterns::request_response::RequestResponseServer;
pub use patterns::request_response::ResponseMessage;
pub use patterns::saga::SagaAction;
pub use patterns::saga::SagaCoordinator;
pub use patterns::saga::SagaId;
pub use patterns::saga::SagaInstance;
pub use patterns::saga::SagaStatus;
pub use patterns::saga::SagaStep;
pub use patterns::saga::SagaStepExecutor;
pub use patterns::saga::StepResult;
pub use patterns::saga::StepStatus;
pub use publisher::CustomExchangeDeclareOptions;
pub use publisher::CustomQueueDeclareOptions;
pub use publisher::PublishOptions;
pub use publisher::PublishOptionsBuilder;
pub use publisher::Publisher;
pub use retry::DelayedMessageExchange;
pub use retry::RetryPolicy;
pub use shutdown::setup_signal_handling;
pub use shutdown::ShutdownConfig;
pub use shutdown::ShutdownHandler;
pub use shutdown::ShutdownManager;
pub use shutdown::ShutdownSignal;

Modules§

batching
Message batching implementation for high-throughput scenarios
circuit_breaker
Circuit breaker implementation for connection resilience
config
connection
consumer
error
health
metrics
Metrics and observability module for RustRabbit
patterns
publisher
retry
shutdown
Graceful shutdown handling for RustRabbit

Structs§

RustRabbit
Main facade for the rust-rabbit library