Lapin wrapper that encapsulates the use of connections/channels and provides some
helpful methods making it easier to use and less error prone.
Usage
use amqp_manager::prelude::*;
use futures::FutureExt;
use tokio_amqp::LapinTokioExt;
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize)]
struct SimpleDelivery(String);
#[tokio::main]
async fn main() {
let pool_manager = RMQConnectionManager::new("amqp://guest:guest@127.0.0.1:5672//".to_string(), ConnectionProperties::default().with_tokio());
let pool = mobc::Pool::builder().build(pool_manager);
let amqp_manager = AmqpManager::new(pool).expect("Should create AmqpManager instance");
let amqp_session = amqp_manager.get_session().await.expect("Should create AmqpSession instance");
let queue_name = "queue-name";
let create_queue_op = CreateQueue {
queue_name: queue_name.to_string(),
options: QueueDeclareOptions {
auto_delete: false,
..Default::default()
},
..Default::default()
};
amqp_session.create_queue(create_queue_op.clone()).await.expect("create_queue");
amqp_session
.publish_to_routing_key(PublishToRoutingKey {
routing_key: queue_name.to_string(),
payload: Payload::new(&SimpleDelivery("Hello World".to_string())).unwrap(),
..Default::default()
})
.await
.expect("publish_to_queue");
amqp_session
.create_consumer_with_delegate(
CreateConsumer {
queue_name: queue_name.to_string(),
consumer_name: "consumer-name".to_string(),
..Default::default()
},
|delivery: DeliveryResult| async {
if let Ok(Some((channel, delivery))) = delivery {
channel
.basic_ack(delivery.delivery_tag, BasicAckOptions::default())
.map(|_| ())
.await;
}
},
)
.await
.expect("create_consumer");
let queue = amqp_session.create_queue(create_queue_op.clone()).await.expect("create_queue");
assert_eq!(queue.message_count(), 0, "Messages has been consumed");
}
Build-time Requirements
The crate is tested on ubuntu-latest
against the following rust versions: nightly, beta, stable and 1.45.0.
It is possible that it works with older versions as well but this is not tested.
Please see the details of the lapin crate about its requirements.