tk_listen/
bind.rs

1use std::collections::HashMap;
2use std::io;
3use std::mem;
4use std::net::SocketAddr;
5use std::time::Duration;
6
7use futures::{Future, Stream, Async};
8use tokio::net::{TcpListener, Incoming, TcpStream};
9use tokio::clock;
10use tokio::timer::Delay;
11
12
13
14/// This stream replaces ``tokio_core::net::Incoming`` and listens many sockets
15///
16/// It receives a stream of lists of addresses as an input.
17/// When a new value received on a stream it adapts:
18///
19/// 1. Removes sockets not in set we're already received (already established
20///    connections aren't interfered in any way)
21/// 2. Adds sockets to set which wasn't listened before
22///
23/// Instead of failing on bind error it logs the error and retries in a
24/// second (you can change the delay using `BindMany::retry_interval`)
25///
26/// It's good idea to pass a stream with a **Void** error, because on receiving
27/// error `BindMany` will log a message (that doesn't contain an error) and
28/// will shutdown. It's better to log specific error and send end-of-stream
29/// instead, but that is user's responsibility.
30///
31/// Note: we track identity of the sockets by `SocketAddr` used to bind it,
32/// this means `0.0.0.0` and `127.0.0.1` for example can be bound/unbound
33/// independently despite the fact that `0.0.0.0` can accept connections for
34/// `127.0.0.1`.
35///
36///  # Example
37///
38///  Simple example:
39///
40///  ```rust,ignore
41///    lp.run(
42///        BindMany::new(address_stream)
43///        .sleep_on_error(TIME_TO_WAIT_ON_ERROR, &h2)
44///        .map(move |(mut socket, _addr)| {
45///             // Your future is here:
46///             Proto::new(socket)
47///             // Errors should not pass silently
48///             // common idea is to log them
49///             .map_err(|e| error!("Protocol error: {}", e))
50///        })
51///        .listen(MAX_SIMULTANEOUS_CONNECTIONS)
52///    ).unwrap(); // stream doesn't end in this case
53///  ```
54///
55///  Example using name resolution via abstract-ns + ns-env-config:
56///
57///  ```rust,ignore
58///      extern crate ns_env_config;
59///
60///      let mut lp = Core::new().unwrap();
61///      let ns = ns_env_config::init(&lp.handle()).unwrap();
62///      lp.run(
63///          BindMany::new(ns.resolve_auto("localhost", 8080)
64///             .map(|addr| addr.addresses_at(0)))
65///          .sleep_on_error(TIME_TO_WAIT_ON_ERROR, &h2)
66///          .map(move |(mut socket, _addr)| {
67///               // Your future is here:
68///               Proto::new(socket)
69///               // Errors should not pass silently
70///               // common idea is to log them
71///               .map_err(|e| eprintln!("Protocol error: {}", e))
72///          })
73///          .listen(MAX_SIMULTANEOUS_CONNECTIONS)
74///      ).unwrap(); // stream doesn't end in this case
75///  ```
76///
77///
78pub struct BindMany<S> {
79    addresses: S,
80    retry_interval: Duration,
81    retry_timer: Option<(Delay, Vec<SocketAddr>)>,
82    inputs: HashMap<SocketAddr, Incoming>,
83}
84
85impl<S> BindMany<S> {
86    /// Create a new instance
87    pub fn new(s: S) -> BindMany<S>
88    {
89        BindMany {
90            addresses: s,
91            retry_interval: Duration::new(1, 0),
92            retry_timer: None,
93            inputs: HashMap::new(),
94        }
95    }
96
97    /// Sets the retry interval
98    ///
99    /// Each time binding socket fails (including he first one on start) istead
100    /// of immediately failing we log the error and sleep this interval to
101    /// retry (by default 1 second).
102    ///
103    /// This behavior is important because if your configuration changes
104    /// number of listening sockets, and one of them is either invalid or
105    /// just DNS is temporarily down, you still need to serve other addresses.
106    ///
107    /// This also helps if you have failover IP which can only be listened
108    /// at when IP attached to the host, but server must be ready to listen
109    /// it anyway (this one might be better achieved by non-local bind though).
110    pub fn retry_interval(&mut self, interval: Duration) -> &mut Self {
111        self.retry_interval = interval;
112        self
113    }
114}
115
116impl<S> Stream for BindMany<S>
117    where S: Stream,
118        S::Item: IntoIterator<Item=SocketAddr>,
119{
120    type Item = TcpStream;
121    type Error = io::Error;
122    fn poll(&mut self) -> Result<Async<Option<Self::Item>>, io::Error> {
123        loop {
124            match self.addresses.poll() {
125                Ok(Async::Ready(None)) => {
126                    info!("Listening stream reached end-of-stream condition");
127                    return Ok(Async::Ready(None));
128                }
129                Ok(Async::Ready(Some(new))) => {
130                    let mut old = mem::replace(&mut self.inputs,
131                                               HashMap::new());
132                    let mut backlog = Vec::new();
133                    for addr in new {
134                        if let Some(listener) = old.remove(&addr) {
135                            self.inputs.insert(addr, listener);
136                        } else {
137                            match TcpListener::bind(&addr) {
138                                Ok(l) =>  {
139                                    self.inputs.insert(addr, l.incoming());
140                                }
141                                Err(e) => {
142                                    backlog.push(addr);
143                                    error!("Error binding {:?}: {}, \
144                                        will retry in {:?}",
145                                        addr, e, self.retry_interval);
146                                }
147                            }
148                        }
149                    }
150                    if backlog.len() > 0 {
151                        self.retry_timer = Some((
152                            Delay::new(clock::now() + self.retry_interval),
153                            backlog));
154                    } else {
155                        self.retry_timer = None;
156                    }
157                }
158                Ok(Async::NotReady) => break,
159                Err(_) => {
160                    error!("Error in address stream");
161                    return Ok(Async::Ready(None));
162                }
163            }
164        }
165        loop {
166            if let Some((ref mut timer, ref mut backlog)) = self.retry_timer {
167                match timer.poll().expect("deadline never fails") {
168                    Async::Ready(()) => {
169                        for addr in mem::replace(backlog, Vec::new()) {
170                            match TcpListener::bind(&addr) {
171                                Ok(l) =>  {
172                                    self.inputs.insert(addr, l.incoming());
173                                }
174                                Err(e) => {
175                                    backlog.push(addr);
176                                    // Lower level on retry
177                                    debug!("Error binding {:?}: {}, \
178                                        will retry in {:?}",
179                                        addr, e, self.retry_interval);
180                                }
181                            }
182                        }
183                        if backlog.len() > 0 {
184                            *timer = Delay::new(
185                                clock::now() + self.retry_interval
186                            );
187                            continue;  // need to poll timer
188                        }
189                        // fallthrough to cleaning timer
190                    }
191                    Async::NotReady => break,
192                }
193            }
194            self.retry_timer = None;
195            break;
196        }
197        for inp in self.inputs.values_mut() {
198            loop {
199                match inp.poll() {
200                    Ok(Async::Ready(pair)) => {
201                        return Ok(Async::Ready(pair));
202                    }
203                    Ok(Async::NotReady) => break,
204                    Err(e) => return Err(e),
205                }
206            }
207        }
208        return Ok(Async::NotReady);
209    }
210}