mockforge-amqp 0.3.129

AMQP protocol support for MockForge
docs.rs failed to build mockforge-amqp-0.3.129
Please check the build logs for more information.
See Builds for ideas on how to fix a failed build, or Metadata for how to configure docs.rs builds.
If you believe this is docs.rs' fault, open an issue.
Visit the last successful build: mockforge-amqp-0.3.126

MockForge AMQP

AMQP 0.9.1 protocol support for MockForge, enabling testing of message queue patterns, pub/sub, and enterprise messaging scenarios.

This crate provides a RabbitMQ-compatible AMQP broker implementation that allows you to mock message queue interactions, test pub/sub patterns, and simulate enterprise messaging scenarios in your applications.

Features

  • AMQP 0.9.1 Protocol: Full RabbitMQ-compatible protocol support
  • Mock Broker: Simulate message broker behavior for testing
  • Exchange Management: Direct, topic, headers, and fanout exchanges
  • Queue Management: Dynamic queue creation and management
  • Message Routing: Flexible message routing and binding rules
  • Consumer Simulation: Mock consumer behavior and acknowledgments
  • Fixture Support: Pre-configured broker states and message flows

Quick Start

Basic AMQP Broker

use mockforge_amqp::AmqpBroker;
use mockforge_core::config::AmqpConfig;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create AMQP configuration
    let config = AmqpConfig {
        host: "127.0.0.1".to_string(),
        port: 5672,
        ..Default::default()
    };

    // Create broker instance
    let spec_registry = Arc::new(AmqpSpecRegistry::new());
    let broker = AmqpBroker::new(config, spec_registry);

    // Start the broker
    broker.start().await?;

    Ok(())
}

Core Components

AmqpBroker

The main broker implementation that handles AMQP connections and protocol operations:

use mockforge_amqp::AmqpBroker;
use mockforge_core::config::AmqpConfig;

let config = AmqpConfig::default();
let spec_registry = Arc::new(AmqpSpecRegistry::new());
let broker = AmqpBroker::new(config, spec_registry);

// The broker handles:
// - Connection establishment and authentication
// - Channel management
// - Exchange and queue operations
// - Message publishing and consumption

Exchange Management

Support for different AMQP exchange types:

use mockforge_amqp::exchanges::{ExchangeManager, ExchangeType};

// Create exchange manager
let mut exchange_manager = ExchangeManager::new();

// Declare exchanges
exchange_manager.declare_exchange(
    "direct_exchange",
    ExchangeType::Direct,
    true, // durable
    false, // auto_delete
    HashMap::new(), // arguments
)?;

// Bind queues to exchanges
exchange_manager.bind_queue(
    "my_queue",
    "direct_exchange",
    "routing_key",
)?;

Queue Management

Dynamic queue creation and management:

use mockforge_amqp::queues::QueueManager;

// Create queue manager
let mut queue_manager = QueueManager::new();

// Declare a queue
queue_manager.declare_queue(
    "my_queue",
    true, // durable
    false, // exclusive
    false, // auto_delete
    HashMap::new(), // arguments
)?;

// Publish messages to queue
queue_manager.publish_message(
    "my_queue",
    "Hello, AMQP!".as_bytes(),
    HashMap::new(), // properties
)?;

Message Handling

Publishing Messages

use mockforge_amqp::messages::AmqpMessage;

// Create a message
let message = AmqpMessage {
    body: "Hello, World!".as_bytes().to_vec(),
    properties: HashMap::new(),
    exchange: "my_exchange".to_string(),
    routing_key: "my.routing.key".to_string(),
    mandatory: false,
    immediate: false,
};

// Publish through broker
broker.publish_message(message).await?;

Consumer Simulation

use mockforge_amqp::consumers::ConsumerManager;

// Create consumer manager
let consumer_manager = ConsumerManager::new();

// Register a consumer
consumer_manager.register_consumer(
    "consumer_tag",
    "queue_name",
    Box::new(|message| {
        println!("Received: {:?}", message);
        // Process message
        Ok(())
    }),
)?;

Fixture System

Define broker configurations and message flows using fixtures:

# amqp-fixture.yaml
exchanges:
  - name: "orders_exchange"
    type: "topic"
    durable: true
    bindings:
      - queue: "order_processing"
        routing_key: "orders.#"

queues:
  - name: "order_processing"
    durable: true

messages:
  - exchange: "orders_exchange"
    routing_key: "orders.created"
    body: '{"order_id": "12345", "amount": 99.99}'
    properties:
      content_type: "application/json"

Loading Fixtures

use mockforge_amqp::AmqpSpecRegistry;

// Create spec registry
let registry = AmqpSpecRegistry::new();

// Load fixture from file
registry.load_fixture_from_file("amqp-fixture.yaml").await?;

// Or create fixture programmatically
use mockforge_amqp::fixtures::AmqpFixture;

let fixture = AmqpFixture {
    exchanges: vec![/* ... */],
    queues: vec![/* ... */],
    messages: vec![/* ... */],
};

registry.add_fixture(fixture)?;

Protocol Support

MockForge AMQP supports the full AMQP 0.9.1 protocol including:

  • Connection Operations: Open, close, authentication
  • Channel Management: Multiple channels per connection
  • Exchange Operations: Declare, delete, bind, unbind
  • Queue Operations: Declare, delete, bind, unbind, purge
  • Message Operations: Publish, consume, acknowledge, reject
  • Transaction Support: Basic transaction semantics
  • QoS Settings: Prefetch and flow control

Testing AMQP Clients

Use MockForge AMQP to test AMQP client applications:

use lapin::{Connection, ConnectionProperties};
use tokio::task;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Start AMQP broker in background
    let config = AmqpConfig {
        host: "127.0.0.1".to_string(),
        port: 5672,
        ..Default::default()
    };

    let spec_registry = Arc::new(AmqpSpecRegistry::new());
    let broker = AmqpBroker::new(config.clone(), spec_registry);
    task::spawn(async move {
        broker.start().await.unwrap();
    });

    // Give broker time to start
    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

    // Test AMQP client
    let addr = format!("amqp://{}:{}", config.host, config.port);
    let conn = Connection::connect(&addr, ConnectionProperties::default()).await?;

    let channel = conn.create_channel().await?;

    // Declare exchange and queue
    channel.exchange_declare(
        "test_exchange",
        lapin::ExchangeKind::Direct,
        lapin::options::ExchangeDeclareOptions::default(),
        lapin::types::FieldTable::default(),
    ).await?;

    channel.queue_declare(
        "test_queue",
        lapin::options::QueueDeclareOptions::default(),
        lapin::types::FieldTable::default(),
    ).await?;

    // Bind queue to exchange
    channel.queue_bind(
        "test_queue",
        "test_exchange",
        "test_key",
        lapin::options::QueueBindOptions::default(),
        lapin::types::FieldTable::default(),
    ).await?;

    // Publish a message
    channel.basic_publish(
        "test_exchange",
        "test_key",
        lapin::options::BasicPublishOptions::default(),
        b"Hello from MockForge!",
        lapin::types::BasicProperties::default(),
    ).await?;

    conn.close(0, "").await?;

    Ok(())
}

Configuration

AmqpConfig

use mockforge_core::config::AmqpConfig;

let config = AmqpConfig {
    host: "0.0.0.0".to_string(),        // Bind address
    port: 5672,                         // AMQP port
    max_connections: 1000,              // Connection limit
    heartbeat_interval: 60,             // Heartbeat interval in seconds
    ..Default::default()
};

Performance

MockForge AMQP is optimized for testing scenarios:

  • In-Memory Operations: Fast message routing without persistence
  • Concurrent Connections: Handle multiple simultaneous AMQP clients
  • Low Latency: Minimal overhead for message operations
  • Scalable: Support for high-throughput testing scenarios

Integration with MockForge

MockForge AMQP integrates seamlessly with the MockForge ecosystem:

  • MockForge Core: Shared configuration and logging
  • MockForge CLI: Command-line interface for AMQP broker management
  • MockForge Plugins: Extend AMQP functionality with custom plugins

Examples

See the examples directory for complete working examples including:

  • Basic AMQP broker setup
  • Message routing scenarios
  • Consumer testing patterns
  • Integration testing with real AMQP clients

Related Crates

License

Licensed under MIT OR Apache-2.0