mwc_libp2p_tcp/provider/
async_io.rs1use super::{Provider, IfEvent, Incoming};
22
23use async_io_crate::Async;
24use futures::{
25 future::{BoxFuture, FutureExt},
26 prelude::*,
27};
28use std::io;
29use std::task::{Poll, Context};
30use std::net;
31
32#[derive(Copy, Clone)]
33pub enum Tcp {}
34
35impl Provider for Tcp {
36 type Stream = Async<net::TcpStream>;
37 type Listener = Async<net::TcpListener>;
38 type IfWatcher = if_watch::IfWatcher;
39
40 fn if_watcher() -> BoxFuture<'static, io::Result<Self::IfWatcher>> {
41 if_watch::IfWatcher::new().boxed()
42 }
43
44 fn new_listener(l: net::TcpListener) -> io::Result<Self::Listener> {
45 Async::new(l)
46 }
47
48 fn new_stream(s: net::TcpStream) -> BoxFuture<'static, io::Result<Self::Stream>> {
49 async move {
50 let stream = Async::new(s)?;
51 stream.writable().await?;
52 Ok(stream)
53 }.boxed()
54 }
55
56 fn poll_accept(l: &mut Self::Listener, cx: &mut Context<'_>) -> Poll<io::Result<Incoming<Self::Stream>>> {
57 let (stream, remote_addr) = loop {
58 match l.poll_readable(cx) {
59 Poll::Pending => return Poll::Pending,
60 Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
61 Poll::Ready(Ok(())) => match l.accept().now_or_never() {
62 Some(Err(e)) => return Poll::Ready(Err(e)),
63 Some(Ok(res)) => break res,
64 None => {
65 }
68 }
69 }
70 };
71
72 let local_addr = stream.get_ref().local_addr()?;
73
74 Poll::Ready(Ok(Incoming { stream, local_addr, remote_addr }))
75 }
76
77 fn poll_interfaces(w: &mut Self::IfWatcher, cx: &mut Context<'_>) -> Poll<io::Result<IfEvent>> {
78 w.next().map_ok(|e| match e {
79 if_watch::IfEvent::Up(a) => IfEvent::Up(a),
80 if_watch::IfEvent::Down(a) => IfEvent::Down(a),
81 }).boxed().poll_unpin(cx)
82 }
83}