use std::io;
use std::time::Duration;
use tokio_timer::{Timer, TimeoutError, Sleep};
use futures::{Poll, Async, Future};
use futures::stream::{Stream, Fuse};
pub enum PersistentError {
Disconnect,
Timeout,
IoError(io::Error)
}
impl<T> From<TimeoutError<T>> for PersistentError {
fn from(error: TimeoutError<T>) -> PersistentError {
match error {
TimeoutError::Timer(_, err) => panic!("bip_peer: Timer Error In Peer Stream, Timer Capacity Is Probably Too Small: {}", err),
TimeoutError::TimedOut(_) => PersistentError::Timeout
}
}
}
pub struct PersistentStream<S> {
stream: Fuse<S>
}
impl<S> PersistentStream<S> where S: Stream {
pub fn new(stream: S) -> PersistentStream<S> {
PersistentStream{ stream: stream.fuse() }
}
}
impl<S> Stream for PersistentStream<S>
where S: Stream<Error=io::Error> {
type Item = S::Item;
type Error = PersistentError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.stream.poll()
.map_err(|error| PersistentError::IoError(error))
.and_then(|item| {
match item {
Async::Ready(None) => Err(PersistentError::Disconnect),
other @ _ => Ok(other)
}
})
}
}
pub enum RecurringTimeoutError {
Disconnect,
Timeout
}
pub struct RecurringTimeoutStream<S> {
dur: Duration,
timer: Timer,
sleep: Sleep,
stream: S
}
impl<S> RecurringTimeoutStream<S> {
pub fn new(stream: S, timer: Timer, dur: Duration) -> RecurringTimeoutStream<S> {
let sleep = timer.sleep(dur);
RecurringTimeoutStream{ dur: dur, timer: timer, sleep: sleep, stream: stream }
}
}
impl<S> Stream for RecurringTimeoutStream<S>
where S: Stream
{
type Item = S::Item;
type Error = RecurringTimeoutError;
fn poll(&mut self) -> Poll<Option<S::Item>, RecurringTimeoutError> {
match self.stream.poll() {
Ok(Async::NotReady) => {},
Ok(Async::Ready(Some(v))) => {
self.sleep = self.timer.sleep(self.dur);
return Ok(Async::Ready(Some(v)));
},
Ok(Async::Ready(None)) => { return Ok(Async::Ready(None)) },
Err(_) => { return Err(RecurringTimeoutError::Disconnect) }
}
match self.sleep.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(_)) => {
self.sleep = self.timer.sleep(self.dur);
Err(RecurringTimeoutError::Timeout)
}
Err(_) => panic!("bip_peer: Timer Error In Manager Stream, Timer Capacity Is Probably Too Small...")
}
}
}