1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#![deny(missing_docs)]
#![forbid(unsafe_code)]

//! Lapin support for the r2d2 connection pool.

use futures_executor::block_on;
use lapin::protocol::{AMQPError, AMQPErrorKind, AMQPHardError};
use lapin::types::ShortString;
use lapin::{Connection, ConnectionProperties, ConnectionState, Error};

/// An `r2d2::ManageConnection` for `lapin::Connection`s.
///
/// # Example
/// ```no_run
/// use lapin::ConnectionProperties;
/// use r2d2_lapin::LapinConnectionManager;
/// use std::thread;
///
/// fn main() {
///     
/// let manager = LapinConnectionManager::new("amqp://guest:guest@127.0.0.1:5672//", &ConnectionProperties::default());
///     let pool = r2d2::Pool::builder()
///         .max_size(15)
///         .build(manager)
///         .unwrap();
///
///     for _ in 0..20 {
///         let pool = pool.clone();
///         thread::spawn(move || {
///             let conn = pool.get().unwrap();
///             // use the connection
///             // it will be returned to the pool when it falls out of scope.
///         });
///     }
/// }
/// ```
#[derive(Debug)]
pub struct LapinConnectionManager {
    amqp_address: String,
    conn_properties: ConnectionProperties,
}

impl LapinConnectionManager {
    /// Initialise the connection manager with the data needed to create new connections.
    /// Refer to the documentation of `lapin::ConnectionProperties` for further details on the parameters.
    ///
    /// # Example
    /// ```
    /// # use r2d2_oracle::OracleConnectionManager;
    /// let manager = OracleConnectionManager::new("user", "password", "localhost");
    /// ```
    pub fn new(amqp_address: &str, conn_properties: &ConnectionProperties) -> Self {
        Self {
            amqp_address: amqp_address.to_string(),
            conn_properties: conn_properties.clone(),
        }
    }

    async fn async_connect(amqp_address: &str, conn_properties: ConnectionProperties) -> Result<Connection, Error> {
        lapin::Connection::connect(amqp_address, conn_properties).await
    }
}

impl r2d2::ManageConnection for LapinConnectionManager {
    type Connection = Connection;
    type Error = Error;

    fn connect(&self) -> Result<Self::Connection, Self::Error> {
        block_on(Self::async_connect(&self.amqp_address, self.conn_properties.clone()))
    }

    fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
        let valid_states = vec![ConnectionState::Initial, ConnectionState::Connecting, ConnectionState::Connected];
        if valid_states.contains(&conn.status().state()) {
            Ok(())
        } else {
            Err(Self::Error::ProtocolError(AMQPError::new(
                AMQPErrorKind::Hard(AMQPHardError::CONNECTIONFORCED),
                ShortString::from("Invalid connection"),
            )))
        }
    }

    fn has_broken(&self, conn: &mut Self::Connection) -> bool {
        let broken_states = vec![ConnectionState::Closed, ConnectionState::Error];
        broken_states.contains(&conn.status().state())
    }
}

#[cfg(test)]
mod tests {
    use crate::LapinConnectionManager;
    use lapin::ConnectionProperties;
    use r2d2::Pool;
    use std::sync::Arc;
    use tokio_amqp::LapinTokioExt;

    lazy_static::lazy_static! {
        static ref AMQP_URL: String = {
            dotenv::dotenv().ok();
            std::env::var("TEST_AMQP_URL").unwrap_or_else(|_| "amqp://guest:guest@127.0.0.1:5672//".to_string())
        };
    }

    #[tokio::test]
    async fn can_connect() {
        let manager = LapinConnectionManager::new(&AMQP_URL, &ConnectionProperties::default().with_tokio());
        let pool = Arc::new(Pool::builder().max_size(2).test_on_check_out(true).build(manager).unwrap());
        let n_tasks = 100;
        for i in 0..n_tasks {
            let pool = pool.clone();
            tokio::spawn(async move {
                let delay_ms = i - n_tasks;
                tokio::time::delay_for(tokio::time::Duration::from_millis(delay_ms as u64)).await;
                let conn = pool.get().expect("Should get connection");
                conn.create_channel().await.expect("Should create lapin channel");
            });
        }
    }
}