tk_pool/
config.rs

1//! A number of traits to configure connection pool and their implementatons
2//!
3//! Usually you should start with ``pool_for`` and use methods to configure
4//! connection pool instead of poking at these types.
5//!
6use std::time::Duration;
7
8use abstract_ns::Address;
9use futures::{Future, Stream, Sink};
10use tokio_core::reactor::Handle;
11use void::Void;
12
13use error_log::{ErrorLog, WarnLogger};
14use connect::Connect;
15use metrics::{self, Collect};
16use uniform::LazyUniform;
17
18/// A constructor for metrics collector object used for connection pool
19pub trait NewMetrics {
20    type Collect: Collect;
21    fn construct(self) -> Self::Collect;
22}
23
24/// A constructor for queue
25///
26/// This trait is currently *sealed*, we will unseal it once it stabilized
27pub trait NewQueue<I, M>: private::NewQueue<I, M> {
28    /// Connection pool instance type
29    type Pool;
30}
31
32impl<I, M, T: private::NewQueue<I, M>> NewQueue<I, M> for T {
33    type Pool = T::Pool;
34}
35
36/// A constructor for multiplexer
37///
38/// This trait is currently *sealed*, we will unseal it once it stabilized
39pub trait NewMux<A, C, E, M>: private::NewMux<A, C, E, M>
40    where A: Stream<Item=Address, Error=Void>,
41          C: Connect + 'static,
42          <<C as Connect>::Future as Future>::Item: Sink,
43          E: ErrorLog<
44            ConnectionError=<C::Future as Future>::Error,
45            SinkError=<<C::Future as Future>::Item as Sink>::SinkError,
46            >,
47          E: 'static,
48          M: Collect + 'static,
49{}
50
51pub(crate) mod private {
52    use futures::{Stream, Future, Sink};
53    use void::Void;
54    use connect::Connect;
55    use metrics::Collect;
56    use error_log::ErrorLog;
57    use abstract_ns::Address;
58    use tokio_core::reactor::Handle;
59
60    pub struct Done;
61
62    pub trait NewMux<A, C, E, M>
63        where A: Stream<Item=Address, Error=Void>,
64              C: Connect + 'static,
65              <<C as Connect>::Future as Future>::Item: Sink,
66              E: ErrorLog<
67                ConnectionError=<C::Future as Future>::Error,
68                SinkError=<<C::Future as Future>::Item as Sink>::SinkError,
69                >,
70              E: 'static,
71              M: Collect + 'static,
72    {
73        type Sink: Sink<
74            SinkItem=<<C::Future as Future>::Item as Sink>::SinkItem,
75            SinkError=Done,
76        >;
77        fn construct(self,
78            h: &Handle, address: A, connector: C, errors: E, metrics: M)
79            -> Self::Sink;
80    }
81
82    pub trait NewQueue<I, M> {
83        type Pool;
84        fn spawn_on<S, E>(self, pool: S, e: E, metrics: M, handle: &Handle)
85            -> Self::Pool
86            where S: Sink<SinkItem=I, SinkError=Done> + 'static,
87                  E: ErrorLog + 'static,
88                  M: Collect + 'static;
89    }
90
91}
92
93/// A constructor for error log
94pub trait NewErrorLog<C, S> {
95    type ErrorLog: ErrorLog<ConnectionError=C, SinkError=S>;
96    fn construct(self) -> Self::ErrorLog;
97}
98
99/// A configuration builder that holds onto `Connect` object
100#[derive(Debug)]
101pub struct PartialConfig<C> {
102    pub(crate) connector: C,
103}
104
105/// A fully configured pool but you might override some defaults
106pub struct PoolConfig<C, A, X, Q, E, M> {
107    pub(crate) connector: C,
108    pub(crate) address: A,
109    pub(crate) mux: X,
110    pub(crate) queue: Q,
111    pub(crate) errors: E,
112    pub(crate) metrics: M,
113}
114
115/// A constructor for a default multiplexer
116pub struct DefaultMux;
117
118/// A constructor for a default queue
119pub struct DefaultQueue;
120
121/// A constructor for a fixed-size dumb queue
122pub struct Queue(pub(crate) usize);
123
124/// A constructor for a default (no-op) metrics collector
125pub struct NoopMetrics;
126
127impl NewMetrics for NoopMetrics {
128    type Collect = metrics::Noop;
129    fn construct(self) -> metrics::Noop {
130        metrics::Noop
131    }
132}
133
134impl<C> PartialConfig<C> {
135    /// Create a configuration by adding an address stream
136    pub fn connect_to<A>(self, address_stream: A)
137        -> PoolConfig<C, A, DefaultMux, DefaultQueue, WarnLogger, NoopMetrics>
138        where A: Stream<Item=Address, Error=Void>,
139    {
140        PoolConfig {
141            address: address_stream,
142            connector: self.connector,
143            mux: DefaultMux,
144            errors: WarnLogger,
145            queue: DefaultQueue,
146            metrics: NoopMetrics,
147        }
148    }
149}
150
151impl<C, A, X, Q, E, M> PoolConfig<C, A, X, Q, E, M> {
152    /// Spawn a connection pool on the main loop specified by handle
153    pub fn spawn_on(self, h: &Handle)
154        -> <Q as NewQueue<
155                <<<C as Connect>::Future as Future>::Item as Sink>::SinkItem,
156                <M as NewMetrics>::Collect,
157           >>::Pool
158        where A: Stream<Item=Address, Error=Void>,
159              C: Connect + 'static,
160              <<C as Connect>::Future as Future>::Item: Sink,
161              M: NewMetrics,
162              M::Collect: 'static,
163              X: NewMux<A, C, E::ErrorLog, M::Collect>,
164              <X as private::NewMux<A, C, E::ErrorLog, M::Collect>>::Sink: 'static,
165              E: NewErrorLog<
166                <<C as Connect>::Future as Future>::Error,
167                <<<C as Connect>::Future as Future>::Item as Sink>::SinkError,
168              >,
169              E::ErrorLog: Clone + 'static,
170              Q: NewQueue<
171                <<<C as Connect>::Future as Future>::Item as Sink>::SinkItem,
172                <M as NewMetrics>::Collect,
173                Pool=<Q as private::NewQueue<
174                    <<<C as Connect>::Future as Future>::Item as Sink>::SinkItem,
175                    <M as NewMetrics>::Collect,
176                >>::Pool
177              >,
178
179    {
180        let m = self.metrics.construct();
181        let e = self.errors.construct();
182        let p = self.mux.construct(h,
183            self.address, self.connector, e.clone(), m.clone());
184        self.queue.spawn_on(p, e, m, h)
185    }
186
187    /// Configure a uniform connection pool with specified number of
188    /// per-host connections crated lazily (i.e. when there are requests)
189    pub fn lazy_uniform_connections(self, num: u32)
190        -> PoolConfig<C, A, LazyUniform, Q, E, M>
191    {
192        PoolConfig {
193            mux: LazyUniform {
194                conn_limit: num,
195                reconnect_timeout: Duration::from_millis(100),
196            },
197            address: self.address,
198            connector: self.connector,
199            errors: self.errors,
200            queue: self.queue,
201            metrics: self.metrics,
202        }
203    }
204
205    /// Add a queue of size num used when no connection can accept a message
206    pub fn with_queue_size(self, num: usize)
207        -> PoolConfig<C, A, X, Queue, E, M>
208    {
209        PoolConfig {
210            queue: Queue(num),
211            address: self.address,
212            connector: self.connector,
213            mux: self.mux,
214            errors: self.errors,
215            metrics: self.metrics,
216        }
217    }
218
219    /// Override metrics reporter
220    pub fn metrics<NM>(self, metrics: NM)
221        -> PoolConfig<C, A, X, Q, E, NM>
222        where NM: NewMetrics,
223    {
224        PoolConfig {
225            queue: self.queue,
226            address: self.address,
227            connector: self.connector,
228            mux: self.mux,
229            errors: self.errors,
230            metrics: metrics,
231        }
232    }
233
234    /// Override error reporter
235    pub fn errors<NE>(self, errors: NE)
236        -> PoolConfig<C, A, X, Q, NE, M>
237        where NE: ErrorLog,
238    {
239        PoolConfig {
240            queue: self.queue,
241            address: self.address,
242            connector: self.connector,
243            mux: self.mux,
244            errors: errors,
245            metrics: self.metrics,
246        }
247    }
248}
249
250