mod net {
use crate::traits;
use async_std_crate::net::{TcpListener, TcpStream, UdpSocket as StdUdpSocket};
use async_trait::async_trait;
use futures::future::Future;
use futures::stream::Stream;
use std::io::Result as IoResult;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};
pub struct IncomingStreams {
state: Option<IncomingStreamsState>,
}
type FResult = (IoResult<(TcpStream, SocketAddr)>, TcpListener);
async fn take_and_poll(lis: TcpListener) -> FResult {
let result = lis.accept().await;
(result, lis)
}
enum IncomingStreamsState {
Ready(TcpListener),
Accepting(Pin<Box<dyn Future<Output = FResult> + Send>>),
}
impl IncomingStreams {
pub fn from_listener(lis: TcpListener) -> IncomingStreams {
IncomingStreams {
state: Some(IncomingStreamsState::Ready(lis)),
}
}
}
impl Stream for IncomingStreams {
type Item = IoResult<(TcpStream, SocketAddr)>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use IncomingStreamsState as St;
let state = self.state.take().expect("No valid state!");
let mut future = match state {
St::Ready(lis) => Box::pin(take_and_poll(lis)),
St::Accepting(fut) => fut,
};
match future.as_mut().poll(cx) {
Poll::Ready((val, lis)) => {
self.state = Some(St::Ready(lis));
Poll::Ready(Some(val))
}
Poll::Pending => {
self.state = Some(St::Accepting(future));
Poll::Pending
}
}
}
}
#[async_trait]
impl traits::TcpListener for TcpListener {
type TcpStream = TcpStream;
type Incoming = IncomingStreams;
async fn accept(&self) -> IoResult<(Self::TcpStream, SocketAddr)> {
TcpListener::accept(self).await
}
fn incoming(self) -> IncomingStreams {
IncomingStreams::from_listener(self)
}
fn local_addr(&self) -> IoResult<SocketAddr> {
TcpListener::local_addr(self)
}
}
#[async_trait]
impl traits::TcpProvider for async_executors::AsyncStd {
type TcpStream = TcpStream;
type TcpListener = TcpListener;
async fn connect(&self, addr: &SocketAddr) -> IoResult<Self::TcpStream> {
TcpStream::connect(addr).await
}
async fn listen(&self, addr: &SocketAddr) -> IoResult<Self::TcpListener> {
TcpListener::bind(*addr).await
}
}
#[async_trait]
impl traits::UdpProvider for async_executors::AsyncStd {
type UdpSocket = UdpSocket;
async fn bind(&self, addr: &std::net::SocketAddr) -> IoResult<Self::UdpSocket> {
StdUdpSocket::bind(*addr)
.await
.map(|socket| UdpSocket { socket })
}
}
pub struct UdpSocket {
socket: StdUdpSocket,
}
#[async_trait]
impl traits::UdpSocket for UdpSocket {
async fn recv(&self, buf: &mut [u8]) -> IoResult<(usize, SocketAddr)> {
self.socket.recv_from(buf).await
}
async fn send(&self, buf: &[u8], target: &SocketAddr) -> IoResult<usize> {
self.socket.send_to(buf, target).await
}
fn local_addr(&self) -> IoResult<SocketAddr> {
self.socket.local_addr()
}
}
}
use futures::{Future, FutureExt};
use std::pin::Pin;
use std::time::Duration;
use crate::traits::*;
pub fn create_runtime() -> async_executors::AsyncStd {
async_executors::AsyncStd::new()
}
impl SleepProvider for async_executors::AsyncStd {
type SleepFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
fn sleep(&self, duration: Duration) -> Self::SleepFuture {
Box::pin(async_io::Timer::after(duration).map(|_| ()))
}
}
impl BlockOn for async_executors::AsyncStd {
fn block_on<F: Future>(&self, f: F) -> F::Output {
async_executors::AsyncStd::block_on(f)
}
}