1use std::time::Duration;
7
8use abstract_ns::Address;
9use futures::{Future, Stream, Sink};
10use tokio_core::reactor::Handle;
11use void::Void;
12
13use error_log::{ErrorLog, WarnLogger};
14use connect::Connect;
15use metrics::{self, Collect};
16use uniform::LazyUniform;
17
18pub trait NewMetrics {
20 type Collect: Collect;
21 fn construct(self) -> Self::Collect;
22}
23
24pub trait NewQueue<I, M>: private::NewQueue<I, M> {
28 type Pool;
30}
31
32impl<I, M, T: private::NewQueue<I, M>> NewQueue<I, M> for T {
33 type Pool = T::Pool;
34}
35
36pub trait NewMux<A, C, E, M>: private::NewMux<A, C, E, M>
40 where A: Stream<Item=Address, Error=Void>,
41 C: Connect + 'static,
42 <<C as Connect>::Future as Future>::Item: Sink,
43 E: ErrorLog<
44 ConnectionError=<C::Future as Future>::Error,
45 SinkError=<<C::Future as Future>::Item as Sink>::SinkError,
46 >,
47 E: 'static,
48 M: Collect + 'static,
49{}
50
51pub(crate) mod private {
52 use futures::{Stream, Future, Sink};
53 use void::Void;
54 use connect::Connect;
55 use metrics::Collect;
56 use error_log::ErrorLog;
57 use abstract_ns::Address;
58 use tokio_core::reactor::Handle;
59
60 pub struct Done;
61
62 pub trait NewMux<A, C, E, M>
63 where A: Stream<Item=Address, Error=Void>,
64 C: Connect + 'static,
65 <<C as Connect>::Future as Future>::Item: Sink,
66 E: ErrorLog<
67 ConnectionError=<C::Future as Future>::Error,
68 SinkError=<<C::Future as Future>::Item as Sink>::SinkError,
69 >,
70 E: 'static,
71 M: Collect + 'static,
72 {
73 type Sink: Sink<
74 SinkItem=<<C::Future as Future>::Item as Sink>::SinkItem,
75 SinkError=Done,
76 >;
77 fn construct(self,
78 h: &Handle, address: A, connector: C, errors: E, metrics: M)
79 -> Self::Sink;
80 }
81
82 pub trait NewQueue<I, M> {
83 type Pool;
84 fn spawn_on<S, E>(self, pool: S, e: E, metrics: M, handle: &Handle)
85 -> Self::Pool
86 where S: Sink<SinkItem=I, SinkError=Done> + 'static,
87 E: ErrorLog + 'static,
88 M: Collect + 'static;
89 }
90
91}
92
93pub trait NewErrorLog<C, S> {
95 type ErrorLog: ErrorLog<ConnectionError=C, SinkError=S>;
96 fn construct(self) -> Self::ErrorLog;
97}
98
99#[derive(Debug)]
101pub struct PartialConfig<C> {
102 pub(crate) connector: C,
103}
104
105pub struct PoolConfig<C, A, X, Q, E, M> {
107 pub(crate) connector: C,
108 pub(crate) address: A,
109 pub(crate) mux: X,
110 pub(crate) queue: Q,
111 pub(crate) errors: E,
112 pub(crate) metrics: M,
113}
114
115pub struct DefaultMux;
117
118pub struct DefaultQueue;
120
121pub struct Queue(pub(crate) usize);
123
124pub struct NoopMetrics;
126
127impl NewMetrics for NoopMetrics {
128 type Collect = metrics::Noop;
129 fn construct(self) -> metrics::Noop {
130 metrics::Noop
131 }
132}
133
134impl<C> PartialConfig<C> {
135 pub fn connect_to<A>(self, address_stream: A)
137 -> PoolConfig<C, A, DefaultMux, DefaultQueue, WarnLogger, NoopMetrics>
138 where A: Stream<Item=Address, Error=Void>,
139 {
140 PoolConfig {
141 address: address_stream,
142 connector: self.connector,
143 mux: DefaultMux,
144 errors: WarnLogger,
145 queue: DefaultQueue,
146 metrics: NoopMetrics,
147 }
148 }
149}
150
151impl<C, A, X, Q, E, M> PoolConfig<C, A, X, Q, E, M> {
152 pub fn spawn_on(self, h: &Handle)
154 -> <Q as NewQueue<
155 <<<C as Connect>::Future as Future>::Item as Sink>::SinkItem,
156 <M as NewMetrics>::Collect,
157 >>::Pool
158 where A: Stream<Item=Address, Error=Void>,
159 C: Connect + 'static,
160 <<C as Connect>::Future as Future>::Item: Sink,
161 M: NewMetrics,
162 M::Collect: 'static,
163 X: NewMux<A, C, E::ErrorLog, M::Collect>,
164 <X as private::NewMux<A, C, E::ErrorLog, M::Collect>>::Sink: 'static,
165 E: NewErrorLog<
166 <<C as Connect>::Future as Future>::Error,
167 <<<C as Connect>::Future as Future>::Item as Sink>::SinkError,
168 >,
169 E::ErrorLog: Clone + 'static,
170 Q: NewQueue<
171 <<<C as Connect>::Future as Future>::Item as Sink>::SinkItem,
172 <M as NewMetrics>::Collect,
173 Pool=<Q as private::NewQueue<
174 <<<C as Connect>::Future as Future>::Item as Sink>::SinkItem,
175 <M as NewMetrics>::Collect,
176 >>::Pool
177 >,
178
179 {
180 let m = self.metrics.construct();
181 let e = self.errors.construct();
182 let p = self.mux.construct(h,
183 self.address, self.connector, e.clone(), m.clone());
184 self.queue.spawn_on(p, e, m, h)
185 }
186
187 pub fn lazy_uniform_connections(self, num: u32)
190 -> PoolConfig<C, A, LazyUniform, Q, E, M>
191 {
192 PoolConfig {
193 mux: LazyUniform {
194 conn_limit: num,
195 reconnect_timeout: Duration::from_millis(100),
196 },
197 address: self.address,
198 connector: self.connector,
199 errors: self.errors,
200 queue: self.queue,
201 metrics: self.metrics,
202 }
203 }
204
205 pub fn with_queue_size(self, num: usize)
207 -> PoolConfig<C, A, X, Queue, E, M>
208 {
209 PoolConfig {
210 queue: Queue(num),
211 address: self.address,
212 connector: self.connector,
213 mux: self.mux,
214 errors: self.errors,
215 metrics: self.metrics,
216 }
217 }
218
219 pub fn metrics<NM>(self, metrics: NM)
221 -> PoolConfig<C, A, X, Q, E, NM>
222 where NM: NewMetrics,
223 {
224 PoolConfig {
225 queue: self.queue,
226 address: self.address,
227 connector: self.connector,
228 mux: self.mux,
229 errors: self.errors,
230 metrics: metrics,
231 }
232 }
233
234 pub fn errors<NE>(self, errors: NE)
236 -> PoolConfig<C, A, X, Q, NE, M>
237 where NE: ErrorLog,
238 {
239 PoolConfig {
240 queue: self.queue,
241 address: self.address,
242 connector: self.connector,
243 mux: self.mux,
244 errors: errors,
245 metrics: self.metrics,
246 }
247 }
248}
249
250