tk_carbon/
pool.rs

1use std::collections::VecDeque;
2use std::io;
3use std::net::SocketAddr;
4use std::sync::Arc;
5use std::time::{Instant, Duration};
6
7use abstract_ns::Address;
8use futures::{Future, Async, Stream};
9use rand::{thread_rng, Rng};
10use tk_bufstream::IoBuf;
11use tokio_io::{AsyncWrite};
12use tokio_core::net::TcpStream;
13use tokio_core::reactor::{Handle, Timeout};
14use void::{Void, unreachable};
15
16use channel::Receiver;
17use {Init, Config};
18
19
20struct Pool<A> {
21    address_stream: A,
22    channel: Receiver,
23    config: Arc<Config>,
24    handle: Handle,
25    deadline: Instant,
26    timeo: Timeout,
27
28    cur_address: Option<Address>,
29    normal: VecDeque<(SocketAddr, Conn<TcpStream>)>,
30    crowded: VecDeque<(SocketAddr, Conn<TcpStream>)>,
31    pending: VecDeque<(SocketAddr,
32                       Box<Future<Item=Conn<TcpStream>, Error=io::Error>>)>,
33    retired: VecDeque<Conn<TcpStream>>,
34    failed: VecDeque<(SocketAddr, Instant)>,
35}
36
37struct Conn<T> {
38    io: IoBuf<T>,
39    deadline: Instant,
40}
41
42
43impl Init {
44    /// Establishes connections to all the hosts
45    ///
46    /// This method spawns a future (or futures) in the loop represented
47    /// by handle. The future exits when all references to API (`Carbon`
48    /// structure) are dropped and all buffers are flushed.
49    pub fn connect_to<S>(self, address_stream: S, handle: &Handle)
50        where S: Stream<Item=Address, Error=Void> + 'static,
51    {
52        handle.spawn(Pool {
53            address_stream: address_stream,
54            channel: self.chan,
55            handle: handle.clone(),
56            deadline: Instant::now() + self.config.write_timeout,
57            timeo: Timeout::new(self.config.write_timeout, &handle)
58                .expect("can always set a timeout"),
59            config: self.config,
60
61            cur_address: None,
62            normal: VecDeque::new(),
63            crowded: VecDeque::new(),
64            pending: VecDeque::new(),
65            retired: VecDeque::new(),
66            failed: VecDeque::new(),
67        });
68    }
69}
70
71impl<S: Stream<Item=Address, Error=Void>> Future for Pool<S> {
72    type Item = ();
73    type Error = ();
74    fn poll(&mut self) -> Result<Async<()>, ()> {
75        loop {
76            match self.update_addresses() {
77                Async::Ready(()) => {
78                    info!("Eof on address stream, shutting down");
79                    return Ok(Async::Ready(()));
80                }
81                Async::NotReady => {}
82            }
83            self.reconnect_failed();
84            self.check_pending();
85            self.read_check();
86            self.push_crowded();
87            self.new_metrics();
88            self.flush_metrics();
89            let ndeadline = self.calc_deadline();
90            if ndeadline != self.deadline {
91                self.deadline = ndeadline;
92                self.timeo = Timeout::new_at(self.deadline, &self.handle)
93                    .expect("can always set a timeout");
94                let res = self.timeo.poll()
95                    .expect("timeout future never fails");
96                match res {
97                    Async::Ready(()) => continue,
98                    Async::NotReady => break,
99                }
100            } else {
101                break;
102            }
103        }
104        Ok(Async::NotReady)
105    }
106}
107
108impl<S: Stream<Item=Address, Error=Void>> Pool<S> {
109    fn update_addresses(&mut self) -> Async<()> {
110        loop {
111            let new_addr = match self.address_stream.poll() {
112                Ok(Async::Ready(Some(new_addr))) => new_addr,
113                Ok(Async::NotReady) => return Async::NotReady,
114                Ok(Async::Ready(None)) => return Async::Ready(()),
115                Err(void) => unreachable(void),
116            };
117            if let Some(ref mut old_addr) = self.cur_address {
118                if old_addr != &new_addr {
119                    let (old, new) = old_addr.at(0)
120                                   .compare_addresses(&new_addr.at(0));
121                    debug!("New addresss, to be retired {:?}, \
122                            to be connected {:?}", old, new);
123                    for _ in 0..self.pending.len() {
124                        let (addr, c) = self.pending.pop_front().unwrap();
125                        // Drop pending connections to non-existing
126                        // addresses
127                        if !old.contains(&addr) {
128                            self.pending.push_back((addr, c));
129                        } else {
130                            debug!("Dropped pending {}", addr);
131                        }
132                    }
133                    for _ in 0..self.normal.len() {
134                        let (addr, c) = self.normal.pop_front().unwrap();
135                        // Active connections are waiting to become idle
136                        if old.contains(&addr) {
137                            debug!("Retiring {}", addr);
138                            self.retired.push_back(c);
139                        } else {
140                            self.normal.push_back((addr, c));
141                        }
142                    }
143                    for _ in 0..self.crowded.len() {
144                        let (addr, c) = self.crowded.pop_front().unwrap();
145                        // Active connections are waiting to become idle
146                        if old.contains(&addr) {
147                            debug!("Retiring {}", addr);
148                            self.retired.push_back(c);
149                        } else {
150                            self.crowded.push_back((addr, c));
151                        }
152                    }
153                    for addr in new {
154                        self.pending.push_back((addr, Box::new(
155                            // TODO(tailhook) timeout on connect
156                            TcpStream::connect(&addr, &self.handle)
157                            .map(|sock | Conn {
158                                io: IoBuf::new(sock),
159                                deadline: Instant::now()
160                                    // no data yet
161                                    + Duration::new(86400, 0),
162                            })
163                        )));
164                    }
165                }
166            } else {
167                for addr in new_addr.at(0).addresses() {
168                    self.pending.push_back((addr, Box::new(
169                        // TODO(tailhook) timeout on connect
170                        TcpStream::connect(&addr, &self.handle)
171                        .map(|sock | Conn {
172                            io: IoBuf::new(sock),
173                            deadline: Instant::now()
174                                // no data yet
175                                + Duration::new(86400, 0),
176                        })
177                    )));
178                }
179            }
180        }
181    }
182    fn check_pending(&mut self) {
183        for _ in 0..self.pending.len() {
184            let (a, mut c) = self.pending.pop_front().unwrap();
185            match c.poll() {
186                Ok(Async::Ready(c)) => {
187                    // Can use it immediately
188                    debug!("Connected {}", a);
189                    self.normal.push_front((a, c));
190                }
191                Ok(Async::NotReady) => {
192                    self.pending.push_back((a, c));
193                }
194                Err(e) => {
195                    warn!("Can't establish connection to {}: {}", a, e);
196                    // TODO(tailhook) set timer to reconnect
197                    // Add to the end of the list
198                    self.reconnect(a);
199                }
200            }
201        }
202    }
203    fn read_check(&mut self) {
204        for _ in 0..self.normal.len() {
205            let (a, mut c) = self.normal.pop_front().unwrap();
206            if let Err(e) = c.io.read() {
207                warn!("Read error from {}: {}", a, e);
208                self.reconnect(a);
209            } else if c.io.in_buf.len() > 0 {
210                warn!("Input data in carbon socket from {} (protocol error)",
211                    a);
212                self.reconnect(a);
213            } else if c.io.done() {
214                warn!("Connection from {} closed by peer", a);
215                self.reconnect(a);
216            } else {
217                self.normal.push_back((a, c));
218            }
219        }
220        for _ in 0..self.crowded.len() {
221            let (a, mut c) = self.crowded.pop_front().unwrap();
222            if let Err(e) = c.io.read() {
223                warn!("Read error from {}: {}", a, e);
224                self.reconnect(a);
225            } else if c.io.in_buf.len() > 0 {
226                warn!("Input data in carbon socket from {} (protocol error)",
227                    a);
228                self.reconnect(a);
229            } else if c.io.done() {
230                warn!("Connection from {} closed by peer", a);
231                self.reconnect(a);
232            } else {
233                self.crowded.push_back((a, c));
234            }
235        }
236    }
237    fn reconnect(&mut self, addr: SocketAddr) {
238        let (min, max) = self.config.reconnect_delay;
239        let ms = thread_rng().gen_range(min, max);
240        self.failed.push_back((
241            addr,
242            Instant::now() + Duration::from_millis(ms),
243        ));
244    }
245    fn push_crowded(&mut self) {
246        for _ in 0..self.crowded.len() {
247            let (a, mut c) = self.crowded.pop_front().unwrap();
248            if let Err(e) = c.flush(&*self.config) {
249                warn!("Write error for {}: {}", a, e);
250                self.reconnect(a);
251            } else if c.io.out_buf.len() < self.config.watermarks.0 {
252                self.normal.push_back((a, c));
253            } else {
254                self.crowded.push_back((a, c));
255            }
256        }
257    }
258    fn new_metrics(&mut self) {
259        if self.normal.len() == 0 {
260            // do not accept new metrics
261            return;
262        }
263        while let Ok(Async::Ready(Some(metric))) = self.channel.poll() {
264            for &mut (_, ref mut c) in self.normal.iter_mut()
265                .chain(&mut self.crowded)
266            {
267                c.io.out_buf.extend(&metric.0);
268            }
269        }
270    }
271    fn flush_metrics(&mut self) {
272        // we're flushing only normal metrics, because crowded have already
273        // been flushed at the start of poll
274        for _ in 0..self.normal.len() {
275            let (a, mut c) = self.normal.pop_front().unwrap();
276            if let Err(e) = c.flush(&*self.config) {
277                warn!("Write error for {}: {}", a, e);
278                self.reconnect(a);
279            } else if c.io.out_buf.len() > self.config.watermarks.1 {
280                warn!("Buffer overflow for {}: {}/{}. \
281                    Dropping buffer and reconnecting... ", a,
282                    c.io.out_buf.len(), self.config.watermarks.1);
283                self.reconnect(a);
284            } else if c.io.out_buf.len() < self.config.watermarks.0 {
285                self.normal.push_back((a, c));
286            } else {
287                self.crowded.push_back((a, c));
288            }
289        }
290    }
291    fn reconnect_failed(&mut self) {
292        let now = Instant::now();
293        for _ in 0..self.failed.len() {
294            let (addr, time) = self.failed.pop_front().unwrap();
295            if time <= now {
296                self.pending.push_back((addr, Box::new(
297                    // TODO(tailhook) timeout on connect
298                    TcpStream::connect(&addr, &self.handle)
299                    .map(move |sock| Conn {
300                        io: IoBuf::new(sock),
301                        deadline: now
302                            // no data yet
303                            + Duration::new(86400, 0),
304                    })
305                )));
306            } else {
307                self.failed.push_back((addr, time));
308            }
309        }
310    }
311    fn calc_deadline(&mut self) -> Instant {
312        // We assume that there are only few connections at any point in
313        // time, so iterating is faster than keeping a heap of timers
314        self.failed.iter().map(|&(_, dline)| dline)
315        .chain(self.normal.iter().map(|&(_, ref c)| c.deadline))
316        .chain(self.crowded.iter().map(|&(_, ref c)| c.deadline))
317        // TODO(tailhook) make timeouts for pending connections
318        .min()
319        // We can have all the queues empty, when we're waiting for address
320        // to be resolved
321        .unwrap_or_else(|| Instant::now() +  Duration::new(86400, 0))
322    }
323}
324
325impl<S: AsyncWrite> Conn<S> {
326    fn flush(&mut self, cfg: &Config) -> Result<(), io::Error> {
327        let old_out = self.io.out_buf.len();
328        if old_out > 0 {
329            self.io.flush()?;
330            let new_out = self.io.out_buf.len();
331            if new_out != old_out {
332                self.deadline = Instant::now() + cfg.write_timeout;
333            } else {
334                if self.deadline < Instant::now() {
335                    return Err(io::ErrorKind::TimedOut.into());
336                }
337            }
338        }
339        Ok(())
340    }
341}