susydev_jsonrpc_server_utils/
suspendable_stream.rs1use std::io;
2use std::time::{Duration, Instant};
3use tokio::prelude::*;
4use tokio::timer::Delay;
5
6pub struct SuspendableStream<S> {
15 stream: S,
16 next_delay: Duration,
17 initial_delay: Duration,
18 max_delay: Duration,
19 timeout: Option<Delay>,
20}
21
22impl<S> SuspendableStream<S> {
23 pub fn new(stream: S) -> Self {
26 SuspendableStream {
27 stream,
28 next_delay: Duration::from_millis(20),
29 initial_delay: Duration::from_millis(10),
30 max_delay: Duration::from_secs(5),
31 timeout: None,
32 }
33 }
34}
35
36impl<S, I> Stream for SuspendableStream<S>
37where
38 S: Stream<Item = I, Error = io::Error>,
39{
40 type Item = I;
41 type Error = ();
42
43 fn poll(&mut self) -> Result<Async<Option<Self::Item>>, ()> {
44 loop {
45 if let Some(mut timeout) = self.timeout.take() {
46 match timeout.poll() {
47 Ok(Async::Ready(_)) => {}
48 Ok(Async::NotReady) => {
49 self.timeout = Some(timeout);
50 return Ok(Async::NotReady);
51 }
52 Err(err) => {
53 warn!("Timeout error {:?}", err);
54 task::current().notify();
55 return Ok(Async::NotReady);
56 }
57 }
58 }
59
60 match self.stream.poll() {
61 Ok(item) => {
62 if self.next_delay > self.initial_delay {
63 self.next_delay = self.initial_delay;
64 }
65 return Ok(item);
66 }
67 Err(ref err) => {
68 if connection_error(err) {
69 warn!("Connection Error: {:?}", err);
70 continue;
71 }
72 self.next_delay = if self.next_delay < self.max_delay {
73 self.next_delay * 2
74 } else {
75 self.next_delay
76 };
77 warn!("Error accepting connection: {}", err);
78 warn!("The server will stop accepting connections for {:?}", self.next_delay);
79 self.timeout = Some(Delay::new(Instant::now() + self.next_delay));
80 }
81 }
82 }
83 }
84}
85
86fn connection_error(e: &io::Error) -> bool {
88 e.kind() == io::ErrorKind::ConnectionRefused
89 || e.kind() == io::ErrorKind::ConnectionAborted
90 || e.kind() == io::ErrorKind::ConnectionReset
91}