bb8_lapin/
lib.rs

1//! Lapin support for the `bb8` connection pool.
2#![deny(missing_docs, missing_debug_implementations)]
3
4pub use bb8;
5pub use lapin;
6
7/// Basic types to create a `LapinConnectionManager` instance.
8pub mod prelude;
9
10use lapin::protocol::{AMQPError, AMQPErrorKind, AMQPHardError};
11use lapin::types::ShortString;
12use lapin::{ConnectionProperties, ConnectionState};
13use std::fmt;
14
15/// A `bb8::ManageConnection` implementation for `lapin::Connection`s.
16///
17/// # Example
18/// ```no_run
19/// use bb8_lapin::prelude::*;
20///
21/// async fn example() {
22///     let manager = LapinConnectionManager::new("amqp://guest:guest@127.0.0.1:5672//", ConnectionProperties::default());
23///     let pool = bb8::Pool::builder()
24///         .max_size(15)
25///         .build(manager)
26///         .await
27///         .unwrap();
28///
29///     for _ in 0..20 {
30///         let pool = pool.clone();
31///         tokio::spawn(async move {
32///             let conn = pool.get().await.unwrap();
33///             // use the connection
34///             // it will be returned to the pool when it falls out of scope.
35///         });
36///     }
37/// }
38/// ```
39pub struct LapinConnectionManager {
40    amqp_address: String,
41    conn_properties: ConnectionProperties,
42}
43
44impl LapinConnectionManager {
45    /// Initialize the connection manager with the data needed to create new connections.
46    /// Refer to the documentation of [`lapin::ConnectionProperties`](https://docs.rs/lapin/1.2.8/lapin/struct.ConnectionProperties.html)
47    /// for further details on the available connection parameters.
48    ///
49    /// # Example
50    /// ```
51    /// let manager = bb8_lapin::LapinConnectionManager::new("amqp://guest:guest@127.0.0.1:5672//", lapin::ConnectionProperties::default());
52    /// ```
53    pub fn new(amqp_address: &str, conn_properties: ConnectionProperties) -> Self {
54        Self {
55            amqp_address: amqp_address.to_string(),
56            conn_properties,
57        }
58    }
59}
60
61impl bb8::ManageConnection for LapinConnectionManager {
62    type Connection = lapin::Connection;
63    type Error = lapin::ErrorKind;
64
65    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
66        lapin::Connection::connect(&self.amqp_address, self.conn_properties.clone())
67            .await
68            .map_err(|e| e.kind().to_owned())
69    }
70
71    async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
72        let valid_states = [ConnectionState::Initial, ConnectionState::Connecting, ConnectionState::Connected];
73        if valid_states.contains(&conn.status().state()) {
74            Ok(())
75        } else {
76            Err(lapin::ErrorKind::ProtocolError(AMQPError::new(
77                AMQPErrorKind::Hard(AMQPHardError::CONNECTIONFORCED),
78                ShortString::from("Invalid connection"),
79            )))
80        }
81    }
82
83    fn has_broken(&self, conn: &mut Self::Connection) -> bool {
84        let broken_states = [ConnectionState::Closed, ConnectionState::Error];
85        broken_states.contains(&conn.status().state())
86    }
87}
88
89impl fmt::Debug for LapinConnectionManager {
90    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
91        f.debug_struct("LapinConnectionManager")
92            .field("amqp_address", &self.amqp_address)
93            .finish()
94    }
95}