1use std::collections::VecDeque;
2use std::io;
3use std::net::SocketAddr;
4use std::sync::Arc;
5use std::time::{Instant, Duration};
6
7use abstract_ns::Address;
8use futures::{Future, Async, Stream};
9use rand::{thread_rng, Rng};
10use tk_bufstream::IoBuf;
11use tokio_io::{AsyncWrite};
12use tokio_core::net::TcpStream;
13use tokio_core::reactor::{Handle, Timeout};
14use void::{Void, unreachable};
15
16use channel::Receiver;
17use {Init, Config};
18
19
20struct Pool<A> {
21 address_stream: A,
22 channel: Receiver,
23 config: Arc<Config>,
24 handle: Handle,
25 deadline: Instant,
26 timeo: Timeout,
27
28 cur_address: Option<Address>,
29 normal: VecDeque<(SocketAddr, Conn<TcpStream>)>,
30 crowded: VecDeque<(SocketAddr, Conn<TcpStream>)>,
31 pending: VecDeque<(SocketAddr,
32 Box<Future<Item=Conn<TcpStream>, Error=io::Error>>)>,
33 retired: VecDeque<Conn<TcpStream>>,
34 failed: VecDeque<(SocketAddr, Instant)>,
35}
36
37struct Conn<T> {
38 io: IoBuf<T>,
39 deadline: Instant,
40}
41
42
43impl Init {
44 pub fn connect_to<S>(self, address_stream: S, handle: &Handle)
50 where S: Stream<Item=Address, Error=Void> + 'static,
51 {
52 handle.spawn(Pool {
53 address_stream: address_stream,
54 channel: self.chan,
55 handle: handle.clone(),
56 deadline: Instant::now() + self.config.write_timeout,
57 timeo: Timeout::new(self.config.write_timeout, &handle)
58 .expect("can always set a timeout"),
59 config: self.config,
60
61 cur_address: None,
62 normal: VecDeque::new(),
63 crowded: VecDeque::new(),
64 pending: VecDeque::new(),
65 retired: VecDeque::new(),
66 failed: VecDeque::new(),
67 });
68 }
69}
70
71impl<S: Stream<Item=Address, Error=Void>> Future for Pool<S> {
72 type Item = ();
73 type Error = ();
74 fn poll(&mut self) -> Result<Async<()>, ()> {
75 loop {
76 match self.update_addresses() {
77 Async::Ready(()) => {
78 info!("Eof on address stream, shutting down");
79 return Ok(Async::Ready(()));
80 }
81 Async::NotReady => {}
82 }
83 self.reconnect_failed();
84 self.check_pending();
85 self.read_check();
86 self.push_crowded();
87 self.new_metrics();
88 self.flush_metrics();
89 let ndeadline = self.calc_deadline();
90 if ndeadline != self.deadline {
91 self.deadline = ndeadline;
92 self.timeo = Timeout::new_at(self.deadline, &self.handle)
93 .expect("can always set a timeout");
94 let res = self.timeo.poll()
95 .expect("timeout future never fails");
96 match res {
97 Async::Ready(()) => continue,
98 Async::NotReady => break,
99 }
100 } else {
101 break;
102 }
103 }
104 Ok(Async::NotReady)
105 }
106}
107
108impl<S: Stream<Item=Address, Error=Void>> Pool<S> {
109 fn update_addresses(&mut self) -> Async<()> {
110 loop {
111 let new_addr = match self.address_stream.poll() {
112 Ok(Async::Ready(Some(new_addr))) => new_addr,
113 Ok(Async::NotReady) => return Async::NotReady,
114 Ok(Async::Ready(None)) => return Async::Ready(()),
115 Err(void) => unreachable(void),
116 };
117 if let Some(ref mut old_addr) = self.cur_address {
118 if old_addr != &new_addr {
119 let (old, new) = old_addr.at(0)
120 .compare_addresses(&new_addr.at(0));
121 debug!("New addresss, to be retired {:?}, \
122 to be connected {:?}", old, new);
123 for _ in 0..self.pending.len() {
124 let (addr, c) = self.pending.pop_front().unwrap();
125 if !old.contains(&addr) {
128 self.pending.push_back((addr, c));
129 } else {
130 debug!("Dropped pending {}", addr);
131 }
132 }
133 for _ in 0..self.normal.len() {
134 let (addr, c) = self.normal.pop_front().unwrap();
135 if old.contains(&addr) {
137 debug!("Retiring {}", addr);
138 self.retired.push_back(c);
139 } else {
140 self.normal.push_back((addr, c));
141 }
142 }
143 for _ in 0..self.crowded.len() {
144 let (addr, c) = self.crowded.pop_front().unwrap();
145 if old.contains(&addr) {
147 debug!("Retiring {}", addr);
148 self.retired.push_back(c);
149 } else {
150 self.crowded.push_back((addr, c));
151 }
152 }
153 for addr in new {
154 self.pending.push_back((addr, Box::new(
155 TcpStream::connect(&addr, &self.handle)
157 .map(|sock | Conn {
158 io: IoBuf::new(sock),
159 deadline: Instant::now()
160 + Duration::new(86400, 0),
162 })
163 )));
164 }
165 }
166 } else {
167 for addr in new_addr.at(0).addresses() {
168 self.pending.push_back((addr, Box::new(
169 TcpStream::connect(&addr, &self.handle)
171 .map(|sock | Conn {
172 io: IoBuf::new(sock),
173 deadline: Instant::now()
174 + Duration::new(86400, 0),
176 })
177 )));
178 }
179 }
180 }
181 }
182 fn check_pending(&mut self) {
183 for _ in 0..self.pending.len() {
184 let (a, mut c) = self.pending.pop_front().unwrap();
185 match c.poll() {
186 Ok(Async::Ready(c)) => {
187 debug!("Connected {}", a);
189 self.normal.push_front((a, c));
190 }
191 Ok(Async::NotReady) => {
192 self.pending.push_back((a, c));
193 }
194 Err(e) => {
195 warn!("Can't establish connection to {}: {}", a, e);
196 self.reconnect(a);
199 }
200 }
201 }
202 }
203 fn read_check(&mut self) {
204 for _ in 0..self.normal.len() {
205 let (a, mut c) = self.normal.pop_front().unwrap();
206 if let Err(e) = c.io.read() {
207 warn!("Read error from {}: {}", a, e);
208 self.reconnect(a);
209 } else if c.io.in_buf.len() > 0 {
210 warn!("Input data in carbon socket from {} (protocol error)",
211 a);
212 self.reconnect(a);
213 } else if c.io.done() {
214 warn!("Connection from {} closed by peer", a);
215 self.reconnect(a);
216 } else {
217 self.normal.push_back((a, c));
218 }
219 }
220 for _ in 0..self.crowded.len() {
221 let (a, mut c) = self.crowded.pop_front().unwrap();
222 if let Err(e) = c.io.read() {
223 warn!("Read error from {}: {}", a, e);
224 self.reconnect(a);
225 } else if c.io.in_buf.len() > 0 {
226 warn!("Input data in carbon socket from {} (protocol error)",
227 a);
228 self.reconnect(a);
229 } else if c.io.done() {
230 warn!("Connection from {} closed by peer", a);
231 self.reconnect(a);
232 } else {
233 self.crowded.push_back((a, c));
234 }
235 }
236 }
237 fn reconnect(&mut self, addr: SocketAddr) {
238 let (min, max) = self.config.reconnect_delay;
239 let ms = thread_rng().gen_range(min, max);
240 self.failed.push_back((
241 addr,
242 Instant::now() + Duration::from_millis(ms),
243 ));
244 }
245 fn push_crowded(&mut self) {
246 for _ in 0..self.crowded.len() {
247 let (a, mut c) = self.crowded.pop_front().unwrap();
248 if let Err(e) = c.flush(&*self.config) {
249 warn!("Write error for {}: {}", a, e);
250 self.reconnect(a);
251 } else if c.io.out_buf.len() < self.config.watermarks.0 {
252 self.normal.push_back((a, c));
253 } else {
254 self.crowded.push_back((a, c));
255 }
256 }
257 }
258 fn new_metrics(&mut self) {
259 if self.normal.len() == 0 {
260 return;
262 }
263 while let Ok(Async::Ready(Some(metric))) = self.channel.poll() {
264 for &mut (_, ref mut c) in self.normal.iter_mut()
265 .chain(&mut self.crowded)
266 {
267 c.io.out_buf.extend(&metric.0);
268 }
269 }
270 }
271 fn flush_metrics(&mut self) {
272 for _ in 0..self.normal.len() {
275 let (a, mut c) = self.normal.pop_front().unwrap();
276 if let Err(e) = c.flush(&*self.config) {
277 warn!("Write error for {}: {}", a, e);
278 self.reconnect(a);
279 } else if c.io.out_buf.len() > self.config.watermarks.1 {
280 warn!("Buffer overflow for {}: {}/{}. \
281 Dropping buffer and reconnecting... ", a,
282 c.io.out_buf.len(), self.config.watermarks.1);
283 self.reconnect(a);
284 } else if c.io.out_buf.len() < self.config.watermarks.0 {
285 self.normal.push_back((a, c));
286 } else {
287 self.crowded.push_back((a, c));
288 }
289 }
290 }
291 fn reconnect_failed(&mut self) {
292 let now = Instant::now();
293 for _ in 0..self.failed.len() {
294 let (addr, time) = self.failed.pop_front().unwrap();
295 if time <= now {
296 self.pending.push_back((addr, Box::new(
297 TcpStream::connect(&addr, &self.handle)
299 .map(move |sock| Conn {
300 io: IoBuf::new(sock),
301 deadline: now
302 + Duration::new(86400, 0),
304 })
305 )));
306 } else {
307 self.failed.push_back((addr, time));
308 }
309 }
310 }
311 fn calc_deadline(&mut self) -> Instant {
312 self.failed.iter().map(|&(_, dline)| dline)
315 .chain(self.normal.iter().map(|&(_, ref c)| c.deadline))
316 .chain(self.crowded.iter().map(|&(_, ref c)| c.deadline))
317 .min()
319 .unwrap_or_else(|| Instant::now() + Duration::new(86400, 0))
322 }
323}
324
325impl<S: AsyncWrite> Conn<S> {
326 fn flush(&mut self, cfg: &Config) -> Result<(), io::Error> {
327 let old_out = self.io.out_buf.len();
328 if old_out > 0 {
329 self.io.flush()?;
330 let new_out = self.io.out_buf.len();
331 if new_out != old_out {
332 self.deadline = Instant::now() + cfg.write_timeout;
333 } else {
334 if self.deadline < Instant::now() {
335 return Err(io::ErrorKind::TimedOut.into());
336 }
337 }
338 }
339 Ok(())
340 }
341}