deadpool_lapin/
config.rs

1use std::convert::Infallible;
2
3use crate::{CreatePoolError, Manager, Pool, PoolBuilder, PoolConfig, Runtime};
4
5/// Configuration object.
6///
7/// # Example (from environment)
8///
9/// By enabling the `serde` feature you can read the configuration using the
10/// [`config`](https://crates.io/crates/config) crate as following:
11/// ```env
12/// AMQP__URL=amqp://127.0.0.1:5672/%2f
13/// AMQP__POOL__MAX_SIZE=16
14/// AMQP__POOL__TIMEOUTS__WAIT__SECS=2
15/// AMQP__POOL__TIMEOUTS__WAIT__NANOS=0
16/// ```
17/// ```rust
18/// #[derive(serde::Deserialize)]
19/// struct Config {
20///     amqp: deadpool_lapin::Config,
21/// }
22///
23/// impl Config {
24///     pub fn from_env() -> Result<Self, config::ConfigError> {
25///         let mut cfg = config::Config::builder()
26///            .add_source(config::Environment::default().separator("__"))
27///            .build()?;
28///            cfg.try_deserialize()
29///     }
30/// }
31/// ```
32#[derive(Clone, Default)]
33#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
34pub struct Config {
35    /// AMQP server URL.
36    pub url: Option<String>,
37
38    /// [`Pool`] configuration.
39    pub pool: Option<PoolConfig>,
40
41    /// Connection properties.
42    #[cfg_attr(feature = "serde", serde(skip))]
43    pub connection_properties: lapin::ConnectionProperties,
44}
45
46pub(crate) struct ConnProps<'a>(pub(crate) &'a lapin::ConnectionProperties);
47impl std::fmt::Debug for ConnProps<'_> {
48    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49        f.debug_struct("ConnectionProperties")
50            .field("locale", &self.0.locale)
51            .field("client_properties", &self.0.client_properties)
52            .finish_non_exhaustive()
53    }
54}
55
56impl std::fmt::Debug for Config {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        f.debug_struct("Config")
59            .field("url", &self.url)
60            .field("pool", &self.pool)
61            .field(
62                "connection_properties",
63                &ConnProps(&self.connection_properties),
64            )
65            .finish()
66    }
67}
68
69impl Config {
70    /// Creates a new [`Pool`] using this [`Config`].
71    ///
72    /// # Errors
73    ///
74    /// See [`CreatePoolError`] for details.
75    pub fn create_pool(&self, runtime: Option<Runtime>) -> Result<Pool, CreatePoolError> {
76        self.builder(runtime)
77            .build()
78            .map_err(CreatePoolError::Build)
79    }
80
81    /// Creates a new [`PoolBuilder`] using this [`Config`].
82    pub fn builder(&self, runtime: Option<Runtime>) -> PoolBuilder {
83        let url = self.get_url().to_string();
84        let pool_config = self.get_pool_config();
85
86        let conn_props = self.connection_properties.clone();
87        let conn_props = match runtime {
88            None => conn_props,
89            #[cfg(feature = "rt_tokio_1")]
90            Some(Runtime::Tokio1) => {
91                #[cfg(not(windows))]
92                let conn_props = conn_props.with_reactor(tokio_reactor_trait::Tokio::current());
93                conn_props.with_executor(tokio_executor_trait::Tokio::current())
94            }
95            #[cfg(feature = "rt_async-std_1")]
96            Some(Runtime::AsyncStd1) => conn_props
97                .with_executor(async_executor_trait::AsyncStd)
98                .with_reactor(async_reactor_trait::AsyncIo),
99            #[allow(unreachable_patterns)]
100            _ => unreachable!(),
101        };
102
103        let mut builder = Pool::builder(Manager::new(url, conn_props)).config(pool_config);
104
105        if let Some(runtime) = runtime {
106            builder = builder.runtime(runtime)
107        }
108
109        builder
110    }
111
112    /// Returns URL which can be used to connect to the database.
113    pub fn get_url(&self) -> &str {
114        self.url.as_deref().unwrap_or("amqp://127.0.0.1:5672/%2f")
115    }
116
117    /// Returns [`deadpool::managed::PoolConfig`] which can be used to construct
118    /// a [`deadpool::managed::Pool`] instance.
119    #[must_use]
120    pub fn get_pool_config(&self) -> PoolConfig {
121        self.pool.unwrap_or_default()
122    }
123}
124
125/// This error is returned if there is something wrong with the lapin configuration.
126///
127/// This is just a type alias to [`Infallible`] at the moment as there
128/// is no validation happening at the configuration phase.
129pub type ConfigError = Infallible;