Expand description
Lapin wrapper that encapsulates the connections/channels handling making it easier to use and less error prone.
Here is an example using the tokio runtime:
use amqp_manager::prelude::*;
use deadpool_lapin::{Config, Runtime};
use futures::FutureExt;
#[tokio::main]
async fn main() {
let pool = Config {
url: Some("amqp://guest:guest@127.0.0.1:5672//".to_string()),
..Default::default()
}
.create_pool(Some(Runtime::Tokio1))
.expect("Should create DeadPool instance");
let manager = AmqpManager::new(pool);
let session = manager
.create_session_with_confirm_select()
.await
.expect("Should create AmqpSession instance");
let queue_name = "queue-name";
let create_queue_op = CreateQueue {
queue_name: &queue_name,
options: QueueDeclareOptions {
auto_delete: false,
..Default::default()
},
..Default::default()
};
session.create_queue(create_queue_op.clone()).await.expect("create_queue");
session
.publish_to_routing_key(PublishToRoutingKey {
routing_key: &queue_name,
payload: "Hello World".as_bytes(),
..Default::default()
})
.await
.expect("publish_to_queue");
session
.create_consumer_with_delegate(
CreateConsumer {
queue_name: &queue_name,
consumer_name: "consumer-name",
..Default::default()
},
|delivery: DeliveryResult| async {
if let Ok(Some((channel, delivery))) = delivery {
let payload = std::str::from_utf8(&delivery.data).unwrap();
assert_eq!(payload, "Hello World");
channel
.basic_ack(delivery.delivery_tag, BasicAckOptions::default())
.map(|_| ())
.await;
}
},
)
.await
.expect("create_consumer");
let queue = session.create_queue(create_queue_op.clone()).await.expect("create_queue");
assert_eq!(queue.message_count(), 0, "Messages has been consumed");
}
Modules§
Structs§
- Amqp
Manager - This struct contains a connection pool. Since a connection can be used to create multiple channels, it’s recommended to use a pool with a low number of connections. Refer to the RabbitMQ channels docs for more information.