deadpool_lapin/
lib.rs

1#![doc = include_str!("../README.md")]
2#![cfg_attr(docsrs, feature(doc_cfg))]
3#![deny(
4    nonstandard_style,
5    rust_2018_idioms,
6    rustdoc::broken_intra_doc_links,
7    rustdoc::private_intra_doc_links
8)]
9#![forbid(non_ascii_idents, unsafe_code)]
10#![warn(
11    deprecated_in_future,
12    missing_copy_implementations,
13    missing_debug_implementations,
14    missing_docs,
15    unreachable_pub,
16    unused_import_braces,
17    unused_labels,
18    unused_lifetimes,
19    unused_qualifications,
20    unused_results
21)]
22#![allow(clippy::uninlined_format_args)]
23
24mod config;
25
26use deadpool::managed;
27use lapin::{ConnectionProperties, Error};
28
29pub use lapin;
30
31pub use self::config::{Config, ConfigError};
32
33pub use deadpool::managed::reexports::*;
34deadpool::managed_reexports!(
35    "lapin",
36    Manager,
37    managed::Object<Manager>,
38    Error,
39    ConfigError
40);
41
42/// Type alias for ['Object']
43pub type Connection = managed::Object<Manager>;
44
45type RecycleResult = managed::RecycleResult<Error>;
46type RecycleError = managed::RecycleError<Error>;
47
48/// [`Manager`] for creating and recycling [`lapin::Connection`].
49///
50/// [`Manager`]: managed::Manager
51pub struct Manager {
52    addr: String,
53    connection_properties: ConnectionProperties,
54}
55
56impl std::fmt::Debug for Manager {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        f.debug_struct("Manager")
59            .field("addr", &self.addr)
60            .field(
61                "connection_properties",
62                &config::ConnProps(&self.connection_properties),
63            )
64            .finish()
65    }
66}
67
68impl Manager {
69    /// Creates a new [`Manager`] using the given AMQP address and
70    /// [`lapin::ConnectionProperties`].
71    #[must_use]
72    pub fn new<S: Into<String>>(addr: S, connection_properties: ConnectionProperties) -> Self {
73        Self {
74            addr: addr.into(),
75            connection_properties,
76        }
77    }
78}
79
80impl managed::Manager for Manager {
81    type Type = lapin::Connection;
82    type Error = Error;
83
84    async fn create(&self) -> Result<lapin::Connection, Error> {
85        let conn =
86            lapin::Connection::connect(self.addr.as_str(), self.connection_properties.clone())
87                .await?;
88        Ok(conn)
89    }
90
91    async fn recycle(&self, conn: &mut lapin::Connection, _: &Metrics) -> RecycleResult {
92        match conn.status().state() {
93            lapin::ConnectionState::Connected => Ok(()),
94            other_state => Err(RecycleError::message(format!(
95                "lapin connection is in state: {:?}",
96                other_state
97            ))),
98        }
99    }
100}