Crate amqp_manager[−][src]
Lapin wrapper that encapsulates the connections/channels handling making it easier to use and less error prone.
Here is an example using the tokio runtime:
use amqp_manager::prelude::*; use futures::FutureExt; use tokio_amqp::LapinTokioExt; #[tokio::main] async fn main() { let pool_manager = AmqpConnectionManager::new("amqp://guest:guest@127.0.0.1:5672//".to_string(), ConnectionProperties::default().with_tokio()); let pool = mobc::Pool::builder().build(pool_manager); let amqp_manager = AmqpManager::new(pool).expect("Should create AmqpManager instance"); let amqp_session = amqp_manager.get_session().await.expect("Should create AmqpSession instance"); let queue_name = "queue-name"; let create_queue_op = CreateQueue { queue_name: &queue_name, options: QueueDeclareOptions { auto_delete: false, ..Default::default() }, ..Default::default() }; amqp_session.create_queue(create_queue_op.clone()).await.expect("create_queue"); amqp_session .publish_to_routing_key(PublishToRoutingKey { routing_key: &queue_name, payload: "Hello World".as_bytes(), ..Default::default() }) .await .expect("publish_to_queue"); amqp_session .create_consumer_with_delegate( CreateConsumer { queue_name: &queue_name, consumer_name: "consumer-name", ..Default::default() }, |delivery: DeliveryResult| async { if let Ok(Some((channel, delivery))) = delivery { let payload = std::str::from_utf8(&delivery.data).unwrap(); assert_eq!(payload, "Hello World"); channel .basic_ack(delivery.delivery_tag, BasicAckOptions::default()) .map(|_| ()) .await; } }, ) .await .expect("create_consumer"); let queue = amqp_session.create_queue(create_queue_op.clone()).await.expect("create_queue"); assert_eq!(queue.message_count(), 0, "Messages has been consumed"); }
Modules
prelude |
Structs
AmqpConnectionManager | |
AmqpManager | The struct that handles the connection pool. |
Type Definitions
AmqpResult | A type alias of the lapin’s |