Crate rust_rabbit

Crate rust_rabbit 

Source
Expand description

§rust-rabbit

A simple, reliable RabbitMQ client library for Rust. Focus on core functionality with minimal configuration.

§Features

  • Simple API: Just Publisher and Consumer with essential methods
  • Flexible Retry: Exponential, linear, or custom retry mechanisms
  • Auto-Setup: Automatic queue/exchange declaration and binding
  • Built-in Reliability: Default ACK behavior with error handling

§Quick Start

§Publisher

use rust_rabbit::{Connection, Publisher, PublishOptions};
use serde::Serialize;

#[derive(Serialize)]
struct Order {
    id: u32,
    amount: f64,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let connection = Connection::new("amqp://localhost:5672").await?;
    let publisher = Publisher::new(connection);
     
    let order = Order { id: 123, amount: 99.99 };
     
    // Publish to exchange
    publisher.publish_to_exchange("orders", "new.order", &order, None).await?;
     
    // Publish directly to queue
    publisher.publish_to_queue("order_queue", &order, None).await?;
     
    Ok(())
}

§Consumer with Retry

use rust_rabbit::{Connection, Consumer, RetryConfig};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Clone)]
struct Order {
    id: u32,
    amount: f64,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let connection = Connection::new("amqp://localhost:5672").await?;
     
    let consumer = Consumer::builder(connection, "order_queue")
        .with_retry(RetryConfig::exponential_default()) // 1s->2s->4s->8s->16s
        .bind_to_exchange("orders", "order.*")
        .with_prefetch(5)
        .build();
     
    consumer.consume(|msg: rust_rabbit::MessageEnvelope<Order>| async move {
        println!("Processing order {}: ${}", msg.payload.id, msg.payload.amount);
        // Your business logic here
        Ok(()) // ACK message
    }).await?;
     
    Ok(())
}

§Retry Configurations

use rust_rabbit::RetryConfig;
use std::time::Duration;

// Exponential: 1s -> 2s -> 4s -> 8s -> 16s (5 retries)
let exponential = RetryConfig::exponential_default();

// Custom exponential: 2s -> 4s -> 8s -> 16s -> 32s (with cap at 60s)
let custom_exp = RetryConfig::exponential(5, Duration::from_secs(2), Duration::from_secs(60));

// Linear: 10s -> 10s -> 10s (3 retries)  
let linear = RetryConfig::linear(3, Duration::from_secs(10));

// Custom delays: 1s -> 5s -> 30s
let custom = RetryConfig::custom(vec![
    Duration::from_secs(1),
    Duration::from_secs(5),
    Duration::from_secs(30),
]);

// No retries
let no_retry = RetryConfig::no_retry();

§MessageEnvelope System

For advanced retry tracking and error handling, use the MessageEnvelope system:

use rust_rabbit::{Connection, Publisher, Consumer, MessageEnvelope, RetryConfig};
use serde::{Serialize, Deserialize};

#[derive(Serialize, Deserialize, Clone)]
struct Order {
    id: u32,
    amount: f64,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let connection = Connection::new("amqp://localhost:5672").await?;
     
    // Publisher with envelope
    let publisher = Publisher::new(connection.clone());
    let order = Order { id: 123, amount: 99.99 };
    let envelope = MessageEnvelope::new(order, "order_queue")
        .with_max_retries(3);
     
    publisher.publish_envelope_to_queue("order_queue", &envelope, None).await?;
     
    // Consumer with envelope processing
    let consumer = Consumer::builder(connection, "order_queue")
        .with_retry(RetryConfig::exponential_default())
        .build();
     
    consumer.consume_envelopes(|envelope: MessageEnvelope<Order>| async move {
        println!("Processing order {} (attempt {})",
                 envelope.payload.id,
                 envelope.metadata.retry_attempt + 1);
         
        // Access retry metadata
        if !envelope.is_first_attempt() {
            println!("This is a retry. Last error: {:?}", envelope.last_error());
        }
         
        // Your business logic here
        Ok(())
    }).await?;
     
    Ok(())
}

Modules§

prelude
Prelude module for convenient imports

Structs§

Connection
Simple RabbitMQ connection wrapper
Consumer
Simplified Consumer for message consumption
ConsumerBuilder
Consumer configuration builder
ErrorRecord
Record of an error that occurred during message processing
MassTransitEnvelope
MassTransit message envelope format (C# camelCase JSON)
MassTransitOptions
MassTransit-specific options for message publishing
MessageEnvelope
Message envelope that wraps the actual payload with metadata
MessageMetadata
Metadata associated with a message
MessageSource
Information about where the message came from
PublishOptions
Publish options builder
Publisher
Simplified Publisher for message publishing
RetryConfig
Simple retry configuration
WireMessage
Simple wire message format for basic publish/consume

Enums§

DelayStrategy
Strategy for delaying retry messages
ErrorType
Classification of error types for better handling
RetryMechanism
Retry mechanism configuration
RustRabbitError
Main error type for rust-rabbit library

Functions§

init_tracing
Initialize tracing with recommended defaults for rust-rabbit.

Type Aliases§

Result
Result type alias for convenience