mwc_libp2p_tcp/provider/
async_io.rs

1// Copyright 2020 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use super::{Provider, IfEvent, Incoming};
22
23use async_io_crate::Async;
24use futures::{
25    future::{BoxFuture, FutureExt},
26    prelude::*,
27};
28use std::io;
29use std::task::{Poll, Context};
30use std::net;
31
32#[derive(Copy, Clone)]
33pub enum Tcp {}
34
35impl Provider for Tcp {
36    type Stream = Async<net::TcpStream>;
37    type Listener = Async<net::TcpListener>;
38    type IfWatcher = if_watch::IfWatcher;
39
40    fn if_watcher() -> BoxFuture<'static, io::Result<Self::IfWatcher>> {
41        if_watch::IfWatcher::new().boxed()
42    }
43
44    fn new_listener(l: net::TcpListener) -> io::Result<Self::Listener> {
45        Async::new(l)
46    }
47
48    fn new_stream(s: net::TcpStream) -> BoxFuture<'static, io::Result<Self::Stream>> {
49        async move {
50            let stream = Async::new(s)?;
51            stream.writable().await?;
52            Ok(stream)
53        }.boxed()
54    }
55
56    fn poll_accept(l: &mut Self::Listener, cx: &mut Context<'_>) -> Poll<io::Result<Incoming<Self::Stream>>> {
57        let (stream, remote_addr) = loop {
58            match l.poll_readable(cx) {
59                Poll::Pending => return Poll::Pending,
60                Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
61                Poll::Ready(Ok(())) => match l.accept().now_or_never() {
62                    Some(Err(e)) => return Poll::Ready(Err(e)),
63                    Some(Ok(res)) => break res,
64                    None => {
65                        // Since it doesn't do any harm, account for false positives of
66                        // `poll_readable` just in case, i.e. try again.
67                    }
68                }
69            }
70        };
71
72        let local_addr = stream.get_ref().local_addr()?;
73
74        Poll::Ready(Ok(Incoming { stream, local_addr, remote_addr }))
75    }
76
77    fn poll_interfaces(w: &mut Self::IfWatcher, cx: &mut Context<'_>) -> Poll<io::Result<IfEvent>> {
78        w.next().map_ok(|e| match e {
79            if_watch::IfEvent::Up(a) => IfEvent::Up(a),
80            if_watch::IfEvent::Down(a) => IfEvent::Down(a),
81        }).boxed().poll_unpin(cx)
82    }
83}