susydev_jsonrpc_server_utils/
suspendable_stream.rs

1use std::io;
2use std::time::{Duration, Instant};
3use tokio::prelude::*;
4use tokio::timer::Delay;
5
6/// `Incoming` is a stream of incoming sockets
7/// Polling the stream may return a temporary io::Error (for instance if we can't open the connection because of "too many open files" limit)
8/// we use for_each combinator which:
9/// 1. Runs for every Ok(socket)
10/// 2. Stops on the FIRST Err()
11/// So any temporary io::Error will cause the entire server to terminate.
12/// This wrapper type for tokio::Incoming stops accepting new connections
13/// for a specified amount of time once an io::Error is encountered
14pub struct SuspendableStream<S> {
15	stream: S,
16	next_delay: Duration,
17	initial_delay: Duration,
18	max_delay: Duration,
19	timeout: Option<Delay>,
20}
21
22impl<S> SuspendableStream<S> {
23	/// construct a new Suspendable stream, given tokio::Incoming
24	/// and the amount of time to pause for.
25	pub fn new(stream: S) -> Self {
26		SuspendableStream {
27			stream,
28			next_delay: Duration::from_millis(20),
29			initial_delay: Duration::from_millis(10),
30			max_delay: Duration::from_secs(5),
31			timeout: None,
32		}
33	}
34}
35
36impl<S, I> Stream for SuspendableStream<S>
37where
38	S: Stream<Item = I, Error = io::Error>,
39{
40	type Item = I;
41	type Error = ();
42
43	fn poll(&mut self) -> Result<Async<Option<Self::Item>>, ()> {
44		loop {
45			if let Some(mut timeout) = self.timeout.take() {
46				match timeout.poll() {
47					Ok(Async::Ready(_)) => {}
48					Ok(Async::NotReady) => {
49						self.timeout = Some(timeout);
50						return Ok(Async::NotReady);
51					}
52					Err(err) => {
53						warn!("Timeout error {:?}", err);
54						task::current().notify();
55						return Ok(Async::NotReady);
56					}
57				}
58			}
59
60			match self.stream.poll() {
61				Ok(item) => {
62					if self.next_delay > self.initial_delay {
63						self.next_delay = self.initial_delay;
64					}
65					return Ok(item);
66				}
67				Err(ref err) => {
68					if connection_error(err) {
69						warn!("Connection Error: {:?}", err);
70						continue;
71					}
72					self.next_delay = if self.next_delay < self.max_delay {
73						self.next_delay * 2
74					} else {
75						self.next_delay
76					};
77					warn!("Error accepting connection: {}", err);
78					warn!("The server will stop accepting connections for {:?}", self.next_delay);
79					self.timeout = Some(Delay::new(Instant::now() + self.next_delay));
80				}
81			}
82		}
83	}
84}
85
86/// assert that the error was a connection error
87fn connection_error(e: &io::Error) -> bool {
88	e.kind() == io::ErrorKind::ConnectionRefused
89		|| e.kind() == io::ErrorKind::ConnectionAborted
90		|| e.kind() == io::ErrorKind::ConnectionReset
91}