1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
#![deny(missing_docs)]
#![forbid(unsafe_code)]

//! Lapin support for the r2d2 connection pool.

#[allow(missing_docs)]
pub mod prelude;

use futures_executor::block_on;
use lapin::protocol::{AMQPError, AMQPErrorKind, AMQPHardError};
use lapin::types::ShortString;
use lapin::{Connection, ConnectionProperties, ConnectionState, Error};

/// An `r2d2::ManageConnection` for `lapin::Connection`s.
///
/// # Example
/// ```no_run
/// use lapin::ConnectionProperties;
/// use r2d2_lapin::prelude::*;
/// use std::thread;
///
/// let manager = LapinConnectionManager::new("amqp://guest:guest@127.0.0.1:5672//", &ConnectionProperties::default());
/// let pool = r2d2::Pool::builder()
///     .max_size(15)
///     .build(manager)
///     .unwrap();
///
/// for _ in 0..20 {
///     let pool = pool.clone();
///     thread::spawn(move || {
///         let conn = pool.get().unwrap();
///         // use the connection
///         // it will be returned to the pool when it falls out of scope.
///     });
/// }
/// ```
#[derive(Debug)]
pub struct LapinConnectionManager {
    amqp_address: String,
    conn_properties: ConnectionProperties,
}

impl LapinConnectionManager {
    /// Initialise the connection manager with the data needed to create new connections.
    /// Refer to the documentation of [`lapin::ConnectionProperties`](https://docs.rs/lapin/1.2.8/lapin/struct.ConnectionProperties.html) for further details on the parameters.
    ///
    /// # Example
    /// ```
    /// let manager = r2d2_lapin::LapinConnectionManager::new("amqp://guest:guest@127.0.0.1:5672//", &lapin::ConnectionProperties::default());
    /// ```
    pub fn new(amqp_address: &str, conn_properties: &ConnectionProperties) -> Self {
        Self {
            amqp_address: amqp_address.to_string(),
            conn_properties: conn_properties.clone(),
        }
    }

    async fn async_connect(amqp_address: &str, conn_properties: ConnectionProperties) -> Result<Connection, Error> {
        lapin::Connection::connect(amqp_address, conn_properties).await
    }
}

impl r2d2::ManageConnection for LapinConnectionManager {
    type Connection = Connection;
    type Error = Error;

    fn connect(&self) -> Result<Self::Connection, Self::Error> {
        block_on(Self::async_connect(&self.amqp_address, self.conn_properties.clone()))
    }

    fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
        let valid_states = vec![ConnectionState::Initial, ConnectionState::Connecting, ConnectionState::Connected];
        if valid_states.contains(&conn.status().state()) {
            Ok(())
        } else {
            Err(Self::Error::ProtocolError(AMQPError::new(
                AMQPErrorKind::Hard(AMQPHardError::CONNECTIONFORCED),
                ShortString::from("Invalid connection"),
            )))
        }
    }

    fn has_broken(&self, conn: &mut Self::Connection) -> bool {
        let broken_states = vec![ConnectionState::Closed, ConnectionState::Error];
        broken_states.contains(&conn.status().state())
    }
}