reool/pools/single_pool/
mod.rs1use std::sync::Arc;
3use std::time::Instant;
4
5use future::BoxFuture;
6use futures::prelude::*;
7use log::info;
8
9use crate::config::Config;
10use crate::connection_factory::ConnectionFactory;
11use crate::error::{Error, InitializationResult};
12use crate::executor_flavour::ExecutorFlavour;
13use crate::instrumentation::{InstrumentationFlavour, PoolId};
14
15use crate::{CheckoutError, Ping, PoolState, Poolable};
16
17use super::{CanCheckout, CheckoutConstraint};
18
19use super::pool_internal::{
20 instrumentation::PoolInstrumentation, Config as PoolConfig, Managed, PoolInternal,
21};
22
23pub(crate) struct SinglePool<T: Poolable> {
29 pool: Arc<PoolInternal<T>>,
30}
31
32impl<T: Poolable> SinglePool<T> {
33 pub fn new<F, CF>(
34 mut config: Config,
35 create_connection_factory: F,
36 executor_flavour: ExecutorFlavour,
37 instrumentation: InstrumentationFlavour,
38 ) -> InitializationResult<SinglePool<T>>
39 where
40 CF: ConnectionFactory<Connection = T> + Send + Sync + 'static,
41 F: Fn(String) -> InitializationResult<CF>,
42 {
43 if config.desired_pool_size == 0 {
44 return Err(Error::message("'desired_pool_size' must be at least one"));
45 }
46
47 let pool_conf = PoolConfig {
48 desired_pool_size: config.desired_pool_size,
49 backoff_strategy: config.backoff_strategy,
50 reservation_limit: config.reservation_limit,
51 activation_order: config.activation_order,
52 checkout_queue_size: config.checkout_queue_size,
53 };
54
55 let connection_factory = if config.connect_to_nodes.len() == 1 {
56 let connect_to = config.connect_to_nodes.pop().unwrap();
57 info!(
58 "Creating pool for '{}' with {} connections",
59 connect_to, config.desired_pool_size,
60 );
61
62 create_connection_factory(connect_to)?
63 } else {
64 return Err(Error::message(format!(
65 "there must be exactly 1 connection string given - found {}",
66 config.connect_to_nodes.len()
67 )));
68 };
69
70 let pool = PoolInternal::new(
71 pool_conf,
72 connection_factory,
73 executor_flavour,
74 PoolInstrumentation::new(instrumentation, PoolId::new(0)),
75 );
76
77 Ok(SinglePool {
78 pool: Arc::new(pool),
79 })
80 }
81
82 pub fn connected_to(&self) -> &str {
83 self.pool.connected_to()
84 }
85
86 pub fn state(&self) -> PoolState {
87 self.pool.state()
88 }
89
90 pub fn ping(&self, timeout: Instant) -> BoxFuture<Ping> {
91 self.pool.ping(timeout)
92 }
93}
94
95impl<T: Poolable> CanCheckout<T> for SinglePool<T> {
96 fn check_out<'a, M: Into<CheckoutConstraint> + Send + 'static>(
97 &'a self,
98 constraint: M,
99 ) -> BoxFuture<'a, Result<Managed<T>, CheckoutError>> {
100 self.pool
101 .check_out(constraint)
102 .map_err(|error_package| error_package.error_kind.into())
103 .boxed()
104 }
105}
106
107impl<T: Poolable> Clone for SinglePool<T> {
108 fn clone(&self) -> Self {
109 Self {
110 pool: self.pool.clone(),
111 }
112 }
113}