1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#![forbid(unsafe_code)]
//! [Lapin](https://github.com/CleverCloud/lapin) wrapper that encapsulates the connections/channels
//! handling making it easier to use and less error prone.
//!
//! Here is an example using the tokio runtime:
//! ```no_run
//! use amqp_manager::prelude::*;
//! use deadpool_lapin::{Config, Runtime};
//! use futures::FutureExt;
//!
//! #[tokio::main]
//! async fn main() {
//!     let pool = Config {
//!        url: Some("amqp://guest:guest@127.0.0.1:5672//".to_string()),
//!         ..Default::default()
//!     }
//!     .create_pool(Some(Runtime::Tokio1))
//!     .expect("Should create DeadPool instance");
//!     let manager = AmqpManager::new(pool);
//!     let session = manager
//!         .create_session_with_confirm_select()
//!         .await
//!         .expect("Should create AmqpSession instance");
//!
//!     let queue_name = "queue-name";
//!     let create_queue_op = CreateQueue {
//!         queue_name: &queue_name,
//!         options: QueueDeclareOptions {
//!             auto_delete: false,
//!             ..Default::default()
//!         },
//!         ..Default::default()
//!     };
//!     session.create_queue(create_queue_op.clone()).await.expect("create_queue");
//!
//!     session
//!         .publish_to_routing_key(PublishToRoutingKey {
//!             routing_key: &queue_name,
//!             payload: "Hello World".as_bytes(),
//!             ..Default::default()
//!         })
//!         .await
//!         .expect("publish_to_queue");
//!
//!     session
//!         .create_consumer_with_delegate(
//!             CreateConsumer {
//!                 queue_name: &queue_name,
//!                 consumer_name: "consumer-name",
//!                 ..Default::default()
//!             },
//!             |delivery: DeliveryResult| async {
//!                 if let Ok(Some((channel, delivery))) = delivery {
//!                    let payload = std::str::from_utf8(&delivery.data).unwrap();
//!                     assert_eq!(payload, "Hello World");
//!                     channel
//!                         .basic_ack(delivery.delivery_tag, BasicAckOptions::default())
//!                         .map(|_| ())
//!                         .await;
//!                 }
//!             },
//!         )
//!         .await
//!         .expect("create_consumer");
//!
//!     let queue = session.create_queue(create_queue_op.clone()).await.expect("create_queue");
//!     assert_eq!(queue.message_count(), 0, "Messages has been consumed");
//! }
//! ```

use thiserror::Error;

use crate::prelude::lapin::options::ConfirmSelectOptions;
use crate::prelude::lapin::types::{LongString, ShortString};
use crate::prelude::{AMQPValue, FieldTable};
use crate::session::AmqpSession;

pub mod prelude;

mod ops;
mod session;

pub type AmqpResult<T> = Result<T, AmqpError>;

#[derive(Error, Debug)]
#[non_exhaustive]
pub enum AmqpError {
    #[error(transparent)]
    Lapin(#[from] lapin::Error),
    #[error(transparent)]
    Pool(#[from] deadpool_lapin::PoolError),
}

/// This struct contains a connection pool. Since a connection can be used to create multiple channels,
/// it's recommended to use a pool with a low number of connections.
/// Refer to the [RabbitMQ channels docs](https://www.rabbitmq.com/channels.html#basics) for more information.
#[derive(Debug, Clone)]
pub struct AmqpManager {
    pool: deadpool_lapin::Pool,
}

impl AmqpManager {
    pub fn new(pool: deadpool_lapin::Pool) -> Self {
        Self { pool }
    }

    /// Creates a new channel using a connection from the pool.
    /// The channel will be closed when dropping the `AmqpSession` instance.
    /// Creating a new connection is slower than creating a channel, so it's better
    /// to reuse the connection and create as many channels as needed out of that a connection.
    pub async fn create_session(&self) -> AmqpResult<AmqpSession> {
        let conn = self.pool.get().await?;
        let channel = conn.create_channel().await?;
        Ok(AmqpSession::new(channel))
    }

    /// Creates a new channel using a connection from the pool. This channel can be awaited to receive confirms.
    pub async fn create_session_with_confirm_select(&self) -> AmqpResult<AmqpSession> {
        let conn = self.pool.get().await?;
        let channel = conn.create_channel().await?;
        channel.confirm_select(ConfirmSelectOptions::default()).await?;
        Ok(AmqpSession::new(channel))
    }

    /// Helper method to create a `FieldTable` instance with the dead-letter argument.
    pub fn dead_letter_args(args: FieldTable, dead_letter_exchange_name: &str) -> FieldTable {
        let mut args = args;
        args.insert(
            ShortString::from("x-dead-letter-exchange"),
            AMQPValue::LongString(LongString::from(dead_letter_exchange_name)),
        );
        args
    }
}