1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
//! This module provides and uniform connection pool implementation,
//! which means we create a fixed number of connections for each IP/Port pair
//! and distribute requests by round-robin
use std::fmt;
use std::net::SocketAddr;
use std::sync::Arc;
use std::collections::VecDeque;

use rand::{thread_rng, Rng};
use abstract_ns::{Address};
use tokio_core::reactor::Handle;
use futures::{StartSend, AsyncSink, Async, Future, Poll};
use futures::sink::{Sink};
use futures::stream::Stream;

use {Connect};


/// A simple uniform connection pool
///
/// This pool connects fixed number of connections
/// to each IP/Port pair (if they are available) and distribute requests
/// by round-robin
///
/// Note the pool has neither a buffer of it's own nor any internal tasks, so
/// you are expected to use `Sink::buffer` and call `poll_complete` on every
/// wake-up.
pub struct UniformMx<S, E, A> {
    address: A,
    connect: Box<Connect<Sink=S, Error=E>>,
    cur_address: Option<Address>,
    active: VecDeque<(SocketAddr, S)>,
    pending: VecDeque<(SocketAddr, Box<Future<Item=S, Error=E>>)>,
    candidates: Vec<SocketAddr>,
    retired: VecDeque<S>,
    config: Arc<Config>,
    handle: Handle,
}


/// Configuration of the connection pool
///
/// Note default configuration doesn't make sense for most cases. Please
/// tweak at least `Config::connections_per_address`.
///
/// Also make sure to use `eager_connections()` if you expect performance.
#[derive(Clone, Debug)]
pub struct Config {
    connections_per_address: usize,
    lazy_connections: bool,
}

impl Config {
    /// Create a new config with default configuration
    ///
    /// Default configuration has `connections_per_address: 1` which is only
    /// useful if you have synchronous workers and only one of them is bound
    /// to every socket, or if there is a virtually infinite resources on the
    /// other side (comparing to the number of requests we are going to do)
    /// and good pipelining support.
    pub fn new() -> Config {
        Config {
            connections_per_address: 1,
            lazy_connections: true,
        }
    }

    /// Establish connections and keep them open even if there are no requests.
    ///
    /// Lazy connections are nicer when you have mostly idle connection pool
    /// and don't need sub-millisecond latency. The connection is established
    /// only when there are no other connections that can serve a request.
    ///
    /// Lazy connections are enabled by default because it what makes most
    /// sense if you have `HashMap<Hostname, UniformMx>` and this is how most
    /// connections pools work in other languages.
    ///
    /// Note that pool with lazy connections will return NotReady when there
    /// are free connections, but starts a new ones asynchronously. Also it
    /// will not establish new connections when there are no backpressure on
    /// existing connections even if not all peers are connected to yet. So you
    /// may get a skew in cluster load, especially if you support may
    /// pipelined requests on a single connection.
    pub fn eager_connections(&mut self) -> &mut Self {
        self.lazy_connections = false;
        self
    }
    /// Set the number of connections per address
    ///
    /// This kind of limit may look awkward for a connection pool. You used
    /// to opening fixed number of connections per connection pool rather
    /// than per backend address. But consider there are lots of workers
    /// (say 100 or 1000), and every worker has limited number of resources.
    /// So you want to have a number of connections proportional to actual
    /// workers there rather than arbitrarily chosen number of connections.
    ///
    /// Surely it doesn't work well for other cases. This is why we have
    /// multiple multiplexer implementations.
    pub fn connections_per_address(&mut self, n: usize) -> &mut Self {
        self.connections_per_address = n;
        self
    }
    /// Create a Arc'd config clone to pass to the constructor
    ///
    /// This is just a convenience method.
    pub fn done(&mut self) -> Arc<Config> {
        Arc::new(self.clone())
    }
}

impl<S, E, A> UniformMx<S, E, A>
    where S: Sink<SinkError=E>,
          A: Stream<Item=Address>,
          E: From<A::Error> + fmt::Display,
{
    /// Create a connection pool
    ///
    /// This doesn't establish any connections even in eager mode. You need
    /// to call `poll_complete` to start.
    pub fn new<C>(handle: &Handle, config: &Arc<Config>,
           address: A, connect: C)
        -> UniformMx<S, E, A>
        where C: Connect<Sink=S, Error=E> + 'static
    {
        UniformMx {
            address: address,
            connect: Box::new(connect),
            active: VecDeque::new(),
            pending: VecDeque::new(),
            config: config.clone(),
            handle: handle.clone(),
            cur_address: None,
            candidates: Vec::new(),
            retired: VecDeque::new(),
        }
    }

    fn try_send(&mut self, mut item: S::SinkItem)
        -> Result<AsyncSink<S::SinkItem>, ()>
    {
        for _ in 0..self.active.len() {
            let (a, mut c) = self.active.pop_front().unwrap();
            item = match c.start_send(item) {
                Ok(AsyncSink::NotReady(item)) => {
                    self.active.push_back((a, c));
                    item
                }
                Ok(AsyncSink::Ready) => {
                    self.active.push_back((a, c));
                    return Ok(AsyncSink::Ready);
                }
                Err(e) => {
                    error!("Pool error: start_send to {} failed with {}. \
                        Dropping connection.", a, e);
                    // this is actually the end of the list
                    self.candidates.insert(0, a);
                    return Err(());
                }
            };
        }
        Ok(AsyncSink::NotReady(item))
    }
    // This only crashes if there is an error in address stream.
    // But this shouldn't happen for any sane stream.
    fn do_poll(&mut self) -> Result<(), E> {
        match self.address.poll() {
            Ok(Async::Ready(Some(new_addr))) => {
                if let Some(ref mut old_addr) = self.cur_address {
                    if old_addr != &new_addr {
                        // Retire connections immediately, but connect later
                        let (old, new) = old_addr.at(0)
                                       .compare_addresses(&new_addr.at(0));
                        debug!("New addresss, to be retired {:?}, \
                                to be connected {:?}", old, new);
                        for _ in 0..self.pending.len() {
                            let (addr, c) = self.pending.pop_front().unwrap();
                            // Drop pending connections to non-existing
                            // addresses
                            if !old.contains(&addr) {
                                self.pending.push_back((addr, c));
                            } else {
                                debug!("Dropped pending {}", addr);
                            }
                        }
                        for _ in 0..self.active.len() {
                            let (addr, c) = self.active.pop_front().unwrap();
                            // Active connections are waiting to become idle
                            if old.contains(&addr) {
                                debug!("Retiring {}", addr);
                                self.retired.push_back(c);
                            } else {
                                self.active.push_back((addr, c));
                            }
                        }
                        // New addresses go to the front of the list but
                        // we randomize their order
                        for _ in 0..self.config.connections_per_address {
                            let off = self.candidates.len();
                            for &a in &new {
                                self.candidates.push(a);
                            }
                            thread_rng().shuffle(&mut self.candidates[off..]);
                        }
                        *old_addr = new_addr;
                    }
                } else {
                    // We randomize order of the connections, but make sure
                    // that no connection connects twice unless all other are
                    // connected at least once
                    for _ in 0..self.config.connections_per_address {
                        let off = self.candidates.len();
                        for a in new_addr.at(0).addresses() {
                            self.candidates.push(a);
                        }
                        thread_rng().shuffle(&mut self.candidates[off..]);
                    }
                    self.cur_address = Some(new_addr);
                }
            },
            Ok(Async::NotReady) => {}
            Ok(Async::Ready(None)) => {
                panic!("Address stream must be infinite");
            }
            Err(e) => {
                // TODO(tailhook) poll crashes on address error?
                return Err(e.into());
            }
        }
        for _ in 0..self.retired.len() {
            let mut c = self.retired.pop_front().unwrap();
            match c.poll_complete() {
                Ok(Async::Ready(())) => {}
                Ok(Async::NotReady) => {
                    self.retired.push_back(c);
                }
                Err(_) => {}  // TODO(tailhook) may be log crashes?
            }
        }
        for _ in 0..self.active.len() {
            let (a, mut c) = self.active.pop_front().unwrap();
            match c.poll_complete() {
                Ok(_) => self.active.push_back((a, c)),
                Err(e) => {
                    info!("Connection to {:?} has been closed: {}", a, e);
                    // Add to the end of the list
                    self.candidates.insert(0, a);
                }
            }
        }
        self.poll_pending();
        Ok(())
    }

    fn poll_pending(&mut self) -> bool {
        let mut added = false;
        for _ in 0..self.pending.len() {
            let (a, mut c) = self.pending.pop_front().unwrap();
            match c.poll() {
                Ok(Async::Ready(c)) => {
                    // Can use it immediately
                    debug!("Connected {}", a);
                    self.active.push_front((a, c));
                    added = true;
                }
                Ok(Async::NotReady) => {
                    self.pending.push_back((a, c));
                }
                Err(e) => {
                    info!("Can't establish connection to {:?}: {}", a, e);
                    // Add to the end of the list
                    self.candidates.insert(0, a);
                }
            }
        }
        added
    }
    /// Initiate a connection(s)
    fn do_connect(&mut self) {
        // TODO(tailhook) implement some timeouts for failing connections
        if self.config.lazy_connections {
            if let Some(addr) = self.candidates.pop() {
                debug!("Connecting to {} (lazy mode)", addr);
                self.pending.push_back((addr, self.connect.connect(addr)));
                // we need to poll here, basicallly to schedule wakeup
                self.poll_pending();
            }
        } else {
            while let Some(addr) = self.candidates.pop() {
                debug!("Connecting to {}", addr);
                self.pending.push_back((addr, self.connect.connect(addr)));
            }
            // we need to poll here, basicallly to schedule wakeups
            self.poll_pending();
        }
        self.candidates.shrink_to_fit();
    }
}


impl<S, E, A> Sink for UniformMx<S, E, A>
    where S: Sink<SinkError=E>,
          A: Stream<Item=Address>,
          E: From<A::Error> + fmt::Display,
{
    type SinkItem = S::SinkItem;
    type SinkError = S::SinkError;
    fn start_send(&mut self, item: Self::SinkItem)
        -> StartSend<Self::SinkItem, Self::SinkError>
    {
        self.do_poll()?;
        let item = match self.try_send(item) {
            Ok(AsyncSink::NotReady(item)) => item,
            Ok(AsyncSink::Ready) => return Ok(AsyncSink::Ready),
            Err(()) => {
                // This means there was an active connection and we tried
                // to put an item there. In process of doing that we received
                // an error, which means we lost the item.
                //
                // We can't return error here, because that would mean sink
                // can't be used any more. So we behave, like we pushed the
                // request successfully, but it was lost in transit (i.e.
                // request was pushed, but connection closed afterwards).
                return Ok(AsyncSink::Ready);
            }
        };
        self.do_connect();
        Ok(AsyncSink::NotReady(item))
    }
    fn poll_complete(&mut self) -> Poll<(), Self::SinkError>
    {
        if !self.config.lazy_connections {
            self.do_connect();
        }
        self.do_poll()?;
        // Basically we're never ready
        Ok(Async::NotReady)
    }
}