1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
use super::{
    futures, std, AsyncRead, AsyncWrite, BoxedNewPeerFuture, BoxedNewPeerStream, L2rUser, Peer,
    PeerConstructor, Rc,
};
use super::{Future, Stream};

pub fn wouldblock<T>() -> std::io::Result<T> {
    Err(std::io::Error::new(std::io::ErrorKind::WouldBlock, ""))
}
pub fn brokenpipe<T>() -> std::io::Result<T> {
    Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, ""))
}
pub fn io_other_error<E: std::error::Error + Send + Sync + 'static>(e: E) -> std::io::Error {
    std::io::Error::new(std::io::ErrorKind::Other, e)
}

#[cfg_attr(feature = "cargo-clippy", allow(redundant_closure))]
impl PeerConstructor {
    pub fn map<F: 'static>(self, func: F) -> Self
    where
        F: Fn(Peer, L2rUser) -> BoxedNewPeerFuture,
    {
        let f = Rc::new(func);
        use PeerConstructor::*;
        match self {
            ServeOnce(x) => Overlay1(x, f),
            ServeMultipleTimes(s) => OverlayM(s, f),
            Overlay1(x, mapper) => Overlay1(
                x,
                Rc::new(move |p, l2r| {
                    let ff = f.clone();
                    let l2rc = l2r.clone();
                    Box::new(mapper(p, l2r).and_then(move |x| ff(x, l2rc)))
                }),
            ),
            OverlayM(x, mapper) => OverlayM(
                x,
                Rc::new(move |p, l2r| {
                    let ff = f.clone();
                    let l2rc = l2r.clone();
                    Box::new(mapper(p, l2r).and_then(move |x| ff(x, l2rc)))
                }),
            ), // This implementation (without Overlay{1,M} cases)
               // causes task to be spawned too late (before establishing ws upgrade)
               // when serving clients:

               //ServeOnce(x) => ServeOnce(Box::new(x.and_then(f)) as BoxedNewPeerFuture),
               //ServeMultipleTimes(s) => {
               //    ServeMultipleTimes(Box::new(s.and_then(f)) as BoxedNewPeerStream)
               //}
        }
    }

    pub fn get_only_first_conn(self, l2r: L2rUser) -> BoxedNewPeerFuture {
        use PeerConstructor::*;
        match self {
            ServeMultipleTimes(stre) => Box::new(
                stre.into_future()
                    .map(move |(std_peer, _)| std_peer.expect("Nowhere to connect it"))
                    .map_err(|(e, _)| e),
            ) as BoxedNewPeerFuture,
            ServeOnce(futur) => futur,
            Overlay1(futur, mapper) => {
                Box::new(futur.and_then(move |p| mapper(p, l2r))) as BoxedNewPeerFuture
            }
            OverlayM(stre, mapper) => Box::new(
                stre.into_future()
                    .map(move |(std_peer, _)| std_peer.expect("Nowhere to connect it"))
                    .map_err(|(e, _)| e)
                    .and_then(move |p| mapper(p, l2r)),
            ) as BoxedNewPeerFuture,
        }
    }
}

pub fn once(x: BoxedNewPeerFuture) -> PeerConstructor {
    PeerConstructor::ServeOnce(x)
}
pub fn multi(x: BoxedNewPeerStream) -> PeerConstructor {
    PeerConstructor::ServeMultipleTimes(x)
}

pub fn peer_err<E: std::error::Error + 'static>(e: E) -> BoxedNewPeerFuture {
    Box::new(futures::future::err(Box::new(e) as Box<std::error::Error>)) as BoxedNewPeerFuture
}
pub fn peer_err_s<E: std::error::Error + 'static>(e: E) -> BoxedNewPeerStream {
    Box::new(futures::stream::iter_result(vec![Err(
        Box::new(e) as Box<std::error::Error>
    )])) as BoxedNewPeerStream
}
pub fn peer_strerr(e: &str) -> BoxedNewPeerFuture {
    let q: Box<std::error::Error> = From::from(e);
    Box::new(futures::future::err(q)) as BoxedNewPeerFuture
}
pub fn simple_err(e: String) -> std::io::Error {
    let e1: Box<std::error::Error + Send + Sync> = e.into();
    ::std::io::Error::new(::std::io::ErrorKind::Other, e1)
}
pub fn simple_err2(e: &'static str) -> Box<std::error::Error> {
    let e1: Box<std::error::Error + Send + Sync> = e.to_string().into();
    e1 as Box<std::error::Error>
}
pub fn box_up_err<E: std::error::Error + 'static>(e: E) -> Box<std::error::Error> {
    Box::new(e) as Box<std::error::Error>
}

impl Peer {
    pub fn new<R: AsyncRead + 'static, W: AsyncWrite + 'static>(r: R, w: W) -> Self {
        Peer(
            Box::new(r) as Box<AsyncRead>,
            Box::new(w) as Box<AsyncWrite>,
        )
    }
}