tk_listen/bind.rs
1use std::collections::HashMap;
2use std::io;
3use std::mem;
4use std::net::SocketAddr;
5use std::time::Duration;
6
7use futures::{Future, Stream, Async};
8use tokio::net::{TcpListener, Incoming, TcpStream};
9use tokio::clock;
10use tokio::timer::Delay;
11
12
13
14/// This stream replaces ``tokio_core::net::Incoming`` and listens many sockets
15///
16/// It receives a stream of lists of addresses as an input.
17/// When a new value received on a stream it adapts:
18///
19/// 1. Removes sockets not in set we're already received (already established
20/// connections aren't interfered in any way)
21/// 2. Adds sockets to set which wasn't listened before
22///
23/// Instead of failing on bind error it logs the error and retries in a
24/// second (you can change the delay using `BindMany::retry_interval`)
25///
26/// It's good idea to pass a stream with a **Void** error, because on receiving
27/// error `BindMany` will log a message (that doesn't contain an error) and
28/// will shutdown. It's better to log specific error and send end-of-stream
29/// instead, but that is user's responsibility.
30///
31/// Note: we track identity of the sockets by `SocketAddr` used to bind it,
32/// this means `0.0.0.0` and `127.0.0.1` for example can be bound/unbound
33/// independently despite the fact that `0.0.0.0` can accept connections for
34/// `127.0.0.1`.
35///
36/// # Example
37///
38/// Simple example:
39///
40/// ```rust,ignore
41/// lp.run(
42/// BindMany::new(address_stream)
43/// .sleep_on_error(TIME_TO_WAIT_ON_ERROR, &h2)
44/// .map(move |(mut socket, _addr)| {
45/// // Your future is here:
46/// Proto::new(socket)
47/// // Errors should not pass silently
48/// // common idea is to log them
49/// .map_err(|e| error!("Protocol error: {}", e))
50/// })
51/// .listen(MAX_SIMULTANEOUS_CONNECTIONS)
52/// ).unwrap(); // stream doesn't end in this case
53/// ```
54///
55/// Example using name resolution via abstract-ns + ns-env-config:
56///
57/// ```rust,ignore
58/// extern crate ns_env_config;
59///
60/// let mut lp = Core::new().unwrap();
61/// let ns = ns_env_config::init(&lp.handle()).unwrap();
62/// lp.run(
63/// BindMany::new(ns.resolve_auto("localhost", 8080)
64/// .map(|addr| addr.addresses_at(0)))
65/// .sleep_on_error(TIME_TO_WAIT_ON_ERROR, &h2)
66/// .map(move |(mut socket, _addr)| {
67/// // Your future is here:
68/// Proto::new(socket)
69/// // Errors should not pass silently
70/// // common idea is to log them
71/// .map_err(|e| eprintln!("Protocol error: {}", e))
72/// })
73/// .listen(MAX_SIMULTANEOUS_CONNECTIONS)
74/// ).unwrap(); // stream doesn't end in this case
75/// ```
76///
77///
78pub struct BindMany<S> {
79 addresses: S,
80 retry_interval: Duration,
81 retry_timer: Option<(Delay, Vec<SocketAddr>)>,
82 inputs: HashMap<SocketAddr, Incoming>,
83}
84
85impl<S> BindMany<S> {
86 /// Create a new instance
87 pub fn new(s: S) -> BindMany<S>
88 {
89 BindMany {
90 addresses: s,
91 retry_interval: Duration::new(1, 0),
92 retry_timer: None,
93 inputs: HashMap::new(),
94 }
95 }
96
97 /// Sets the retry interval
98 ///
99 /// Each time binding socket fails (including he first one on start) istead
100 /// of immediately failing we log the error and sleep this interval to
101 /// retry (by default 1 second).
102 ///
103 /// This behavior is important because if your configuration changes
104 /// number of listening sockets, and one of them is either invalid or
105 /// just DNS is temporarily down, you still need to serve other addresses.
106 ///
107 /// This also helps if you have failover IP which can only be listened
108 /// at when IP attached to the host, but server must be ready to listen
109 /// it anyway (this one might be better achieved by non-local bind though).
110 pub fn retry_interval(&mut self, interval: Duration) -> &mut Self {
111 self.retry_interval = interval;
112 self
113 }
114}
115
116impl<S> Stream for BindMany<S>
117 where S: Stream,
118 S::Item: IntoIterator<Item=SocketAddr>,
119{
120 type Item = TcpStream;
121 type Error = io::Error;
122 fn poll(&mut self) -> Result<Async<Option<Self::Item>>, io::Error> {
123 loop {
124 match self.addresses.poll() {
125 Ok(Async::Ready(None)) => {
126 info!("Listening stream reached end-of-stream condition");
127 return Ok(Async::Ready(None));
128 }
129 Ok(Async::Ready(Some(new))) => {
130 let mut old = mem::replace(&mut self.inputs,
131 HashMap::new());
132 let mut backlog = Vec::new();
133 for addr in new {
134 if let Some(listener) = old.remove(&addr) {
135 self.inputs.insert(addr, listener);
136 } else {
137 match TcpListener::bind(&addr) {
138 Ok(l) => {
139 self.inputs.insert(addr, l.incoming());
140 }
141 Err(e) => {
142 backlog.push(addr);
143 error!("Error binding {:?}: {}, \
144 will retry in {:?}",
145 addr, e, self.retry_interval);
146 }
147 }
148 }
149 }
150 if backlog.len() > 0 {
151 self.retry_timer = Some((
152 Delay::new(clock::now() + self.retry_interval),
153 backlog));
154 } else {
155 self.retry_timer = None;
156 }
157 }
158 Ok(Async::NotReady) => break,
159 Err(_) => {
160 error!("Error in address stream");
161 return Ok(Async::Ready(None));
162 }
163 }
164 }
165 loop {
166 if let Some((ref mut timer, ref mut backlog)) = self.retry_timer {
167 match timer.poll().expect("deadline never fails") {
168 Async::Ready(()) => {
169 for addr in mem::replace(backlog, Vec::new()) {
170 match TcpListener::bind(&addr) {
171 Ok(l) => {
172 self.inputs.insert(addr, l.incoming());
173 }
174 Err(e) => {
175 backlog.push(addr);
176 // Lower level on retry
177 debug!("Error binding {:?}: {}, \
178 will retry in {:?}",
179 addr, e, self.retry_interval);
180 }
181 }
182 }
183 if backlog.len() > 0 {
184 *timer = Delay::new(
185 clock::now() + self.retry_interval
186 );
187 continue; // need to poll timer
188 }
189 // fallthrough to cleaning timer
190 }
191 Async::NotReady => break,
192 }
193 }
194 self.retry_timer = None;
195 break;
196 }
197 for inp in self.inputs.values_mut() {
198 loop {
199 match inp.poll() {
200 Ok(Async::Ready(pair)) => {
201 return Ok(Async::Ready(pair));
202 }
203 Ok(Async::NotReady) => break,
204 Err(e) => return Err(e),
205 }
206 }
207 }
208 return Ok(Async::NotReady);
209 }
210}