1use super::{
2 futures, AsyncRead, AsyncWrite, BoxedNewPeerFuture, BoxedNewPeerStream, L2rUser, Peer,
3 PeerConstructor, Rc, HupToken,
4};
5use super::{Future, Stream};
6
7pub fn wouldblock<T>() -> std::io::Result<T> {
8 Err(std::io::Error::new(std::io::ErrorKind::WouldBlock, ""))
9}
10pub fn brokenpipe<T>() -> std::io::Result<T> {
11 Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, ""))
12}
13pub fn io_other_error<E: std::error::Error + Send + Sync + 'static>(e: E) -> std::io::Error {
14 std::io::Error::new(std::io::ErrorKind::Other, e)
15}
16
17#[cfg_attr(feature = "cargo-clippy", allow(redundant_closure))]
18impl PeerConstructor {
19 pub fn map<F: 'static>(self, func: F) -> Self
20 where
21 F: Fn(Peer, L2rUser) -> BoxedNewPeerFuture,
22 {
23 let f = Rc::new(func);
24 use crate::PeerConstructor::*;
25 match self {
26 Error(e) => Error(e),
27 ServeOnce(x) => Overlay1(x, f),
28 ServeMultipleTimes(s) => OverlayM(s, f),
29 Overlay1(x, mapper) => Overlay1(
30 x,
31 Rc::new(move |p, l2r| {
32 let ff = f.clone();
33 let l2rc = l2r.clone();
34 Box::new(mapper(p, l2r).and_then(move |x| ff(x, l2rc)))
35 }),
36 ),
37 OverlayM(x, mapper) => OverlayM(
38 x,
39 Rc::new(move |p, l2r| {
40 let ff = f.clone();
41 let l2rc = l2r.clone();
42 Box::new(mapper(p, l2r).and_then(move |x| ff(x, l2rc)))
43 }),
44 ), }
53 }
54
55 pub fn get_only_first_conn(self, l2r: L2rUser) -> BoxedNewPeerFuture {
56 use crate::PeerConstructor::*;
57 match self {
58 Error(e) => Box::new(futures::future::err(e)) as BoxedNewPeerFuture,
59 ServeMultipleTimes(stre) => Box::new(
60 stre.into_future()
61 .map(move |(std_peer, _)| std_peer.expect("Nowhere to connect it"))
62 .map_err(|(e, _)| e),
63 ) as BoxedNewPeerFuture,
64 ServeOnce(futur) => futur,
65 Overlay1(futur, mapper) => {
66 Box::new(futur.and_then(move |p| mapper(p, l2r))) as BoxedNewPeerFuture
67 }
68 OverlayM(stre, mapper) => Box::new(
69 stre.into_future()
70 .map(move |(std_peer, _)| std_peer.expect("Nowhere to connect it"))
71 .map_err(|(e, _)| e)
72 .and_then(move |p| mapper(p, l2r)),
73 ) as BoxedNewPeerFuture,
74 }
75 }
76}
77
78pub fn once(x: BoxedNewPeerFuture) -> PeerConstructor {
79 PeerConstructor::ServeOnce(x)
80}
81pub fn multi(x: BoxedNewPeerStream) -> PeerConstructor {
82 PeerConstructor::ServeMultipleTimes(x)
83}
84
85pub fn peer_err<E: std::error::Error + 'static>(e: E) -> BoxedNewPeerFuture {
86 Box::new(futures::future::err(
87 Box::new(e) as Box<dyn std::error::Error>
88 )) as BoxedNewPeerFuture
89}
90pub fn peer_err2(e: Box<dyn std::error::Error>) -> BoxedNewPeerFuture {
91 Box::new(futures::future::err(
92 e
93 )) as BoxedNewPeerFuture
94}
95pub fn peer_err_s<E: std::error::Error + 'static>(e: E) -> BoxedNewPeerStream {
96 Box::new(futures::stream::iter_result(vec![Err(
97 Box::new(e) as Box<dyn std::error::Error>
98 )])) as BoxedNewPeerStream
99}
100pub fn peer_err_sb(e: Box<dyn std::error::Error + 'static>) -> BoxedNewPeerStream {
101 Box::new(futures::stream::iter_result(vec![Err(
102 e
103 )])) as BoxedNewPeerStream
104}
105pub fn peer_strerr(e: &str) -> BoxedNewPeerFuture {
106 let q: Box<dyn std::error::Error> = From::from(e);
107 Box::new(futures::future::err(q)) as BoxedNewPeerFuture
108}
109pub fn simple_err(e: String) -> std::io::Error {
110 let e1: Box<dyn std::error::Error + Send + Sync> = e.into();
111 ::std::io::Error::new(::std::io::ErrorKind::Other, e1)
112}
113pub fn simple_err2(e: &'static str) -> Box<dyn std::error::Error> {
114 let e1: Box<dyn std::error::Error + Send + Sync> = e.to_string().into();
115 e1 as Box<dyn std::error::Error>
116}
117pub fn box_up_err<E: std::error::Error + 'static>(e: E) -> Box<dyn std::error::Error> {
118 Box::new(e) as Box<dyn std::error::Error>
119}
120
121impl Peer {
122 pub fn new<R: AsyncRead + 'static, W: AsyncWrite + 'static>(r: R, w: W, hup: Option<HupToken>) -> Self {
123 Peer(
124 Box::new(r) as Box<dyn AsyncRead>,
125 Box::new(w) as Box<dyn AsyncWrite>,
126 hup,
127 )
128 }
129}