reool/pools/single_pool/
mod.rs

1//! A connection pool for connecting to a single node
2use std::sync::Arc;
3use std::time::Instant;
4
5use future::BoxFuture;
6use futures::prelude::*;
7use log::info;
8
9use crate::config::Config;
10use crate::connection_factory::ConnectionFactory;
11use crate::error::{Error, InitializationResult};
12use crate::executor_flavour::ExecutorFlavour;
13use crate::instrumentation::{InstrumentationFlavour, PoolId};
14
15use crate::{CheckoutError, Ping, PoolState, Poolable};
16
17use super::{CanCheckout, CheckoutConstraint};
18
19use super::pool_internal::{
20    instrumentation::PoolInstrumentation, Config as PoolConfig, Managed, PoolInternal,
21};
22
23/// A connection pool that maintains multiple connections
24/// to one node only.
25///
26/// The pool is cloneable and all clones share their connections.
27/// Once the last instance drops the shared connections will be dropped.
28pub(crate) struct SinglePool<T: Poolable> {
29    pool: Arc<PoolInternal<T>>,
30}
31
32impl<T: Poolable> SinglePool<T> {
33    pub fn new<F, CF>(
34        mut config: Config,
35        create_connection_factory: F,
36        executor_flavour: ExecutorFlavour,
37        instrumentation: InstrumentationFlavour,
38    ) -> InitializationResult<SinglePool<T>>
39    where
40        CF: ConnectionFactory<Connection = T> + Send + Sync + 'static,
41        F: Fn(String) -> InitializationResult<CF>,
42    {
43        if config.desired_pool_size == 0 {
44            return Err(Error::message("'desired_pool_size' must be at least one"));
45        }
46
47        let pool_conf = PoolConfig {
48            desired_pool_size: config.desired_pool_size,
49            backoff_strategy: config.backoff_strategy,
50            reservation_limit: config.reservation_limit,
51            activation_order: config.activation_order,
52            checkout_queue_size: config.checkout_queue_size,
53        };
54
55        let connection_factory = if config.connect_to_nodes.len() == 1 {
56            let connect_to = config.connect_to_nodes.pop().unwrap();
57            info!(
58                "Creating pool for '{}' with {} connections",
59                connect_to, config.desired_pool_size,
60            );
61
62            create_connection_factory(connect_to)?
63        } else {
64            return Err(Error::message(format!(
65                "there must be exactly 1 connection string given - found {}",
66                config.connect_to_nodes.len()
67            )));
68        };
69
70        let pool = PoolInternal::new(
71            pool_conf,
72            connection_factory,
73            executor_flavour,
74            PoolInstrumentation::new(instrumentation, PoolId::new(0)),
75        );
76
77        Ok(SinglePool {
78            pool: Arc::new(pool),
79        })
80    }
81
82    pub fn connected_to(&self) -> &str {
83        self.pool.connected_to()
84    }
85
86    pub fn state(&self) -> PoolState {
87        self.pool.state()
88    }
89
90    pub fn ping(&self, timeout: Instant) -> BoxFuture<Ping> {
91        self.pool.ping(timeout)
92    }
93}
94
95impl<T: Poolable> CanCheckout<T> for SinglePool<T> {
96    fn check_out<'a, M: Into<CheckoutConstraint> + Send + 'static>(
97        &'a self,
98        constraint: M,
99    ) -> BoxFuture<'a, Result<Managed<T>, CheckoutError>> {
100        self.pool
101            .check_out(constraint)
102            .map_err(|error_package| error_package.error_kind.into())
103            .boxed()
104    }
105}
106
107impl<T: Poolable> Clone for SinglePool<T> {
108    fn clone(&self) -> Self {
109        Self {
110            pool: self.pool.clone(),
111        }
112    }
113}