Crate amqp_manager[][src]

Expand description

Lapin wrapper that encapsulates the connections/channels handling making it easier to use and less error prone.

Here is an example using the tokio runtime:

use amqp_manager::prelude::*;
use deadpool_lapin::{Config, Runtime};
use futures::FutureExt;

#[tokio::main]
async fn main() {
    let pool = Config {
       url: Some("amqp://guest:guest@127.0.0.1:5672//".to_string()),
        ..Default::default()
    }
    .create_pool(Some(Runtime::Tokio1))
    .expect("Should create DeadPool instance");
    let manager = AmqpManager::new(pool);
    let session = manager
        .create_session_with_confirm_select()
        .await
        .expect("Should create AmqpSession instance");

    let queue_name = "queue-name";
    let create_queue_op = CreateQueue {
        queue_name: &queue_name,
        options: QueueDeclareOptions {
            auto_delete: false,
            ..Default::default()
        },
        ..Default::default()
    };
    session.create_queue(create_queue_op.clone()).await.expect("create_queue");

    session
        .publish_to_routing_key(PublishToRoutingKey {
            routing_key: &queue_name,
            payload: "Hello World".as_bytes(),
            ..Default::default()
        })
        .await
        .expect("publish_to_queue");

    session
        .create_consumer_with_delegate(
            CreateConsumer {
                queue_name: &queue_name,
                consumer_name: "consumer-name",
                ..Default::default()
            },
            |delivery: DeliveryResult| async {
                if let Ok(Some((channel, delivery))) = delivery {
                   let payload = std::str::from_utf8(&delivery.data).unwrap();
                    assert_eq!(payload, "Hello World");
                    channel
                        .basic_ack(delivery.delivery_tag, BasicAckOptions::default())
                        .map(|_| ())
                        .await;
                }
            },
        )
        .await
        .expect("create_consumer");

    let queue = session.create_queue(create_queue_op.clone()).await.expect("create_queue");
    assert_eq!(queue.message_count(), 0, "Messages has been consumed");
}

Modules

Structs

This struct contains a connection pool. Since a connection can be used to create multiple channels, it’s recommended to use a pool with a low number of connections. Refer to the RabbitMQ channels docs for more information.

Enums

Type Definitions