pub(crate) mod net {
use crate::traits;
use async_trait::async_trait;
pub(crate) use tokio_crate::net::{
TcpListener as TokioTcpListener, TcpStream as TokioTcpStream, UdpSocket as TokioUdpSocket,
};
use futures::io::{AsyncRead, AsyncWrite};
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt as _};
use std::io::Result as IoResult;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};
pub struct TcpStream {
s: Compat<TokioTcpStream>,
}
impl From<TokioTcpStream> for TcpStream {
fn from(s: TokioTcpStream) -> TcpStream {
let s = s.compat();
TcpStream { s }
}
}
impl AsyncRead for TcpStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<IoResult<usize>> {
Pin::new(&mut self.s).poll_read(cx, buf)
}
}
impl AsyncWrite for TcpStream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<IoResult<usize>> {
Pin::new(&mut self.s).poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
Pin::new(&mut self.s).poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
Pin::new(&mut self.s).poll_close(cx)
}
}
pub struct TcpListener {
pub(super) lis: TokioTcpListener,
}
pub struct IncomingTcpStreams {
pub(super) lis: TokioTcpListener,
}
impl futures::stream::Stream for IncomingTcpStreams {
type Item = IoResult<(TcpStream, SocketAddr)>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.lis.poll_accept(cx) {
Poll::Ready(Ok((s, a))) => Poll::Ready(Some(Ok((s.into(), a)))),
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
Poll::Pending => Poll::Pending,
}
}
}
#[async_trait]
impl traits::TcpListener for TcpListener {
type TcpStream = TcpStream;
type Incoming = IncomingTcpStreams;
async fn accept(&self) -> IoResult<(Self::TcpStream, SocketAddr)> {
let (stream, addr) = self.lis.accept().await?;
Ok((stream.into(), addr))
}
fn incoming(self) -> Self::Incoming {
IncomingTcpStreams { lis: self.lis }
}
fn local_addr(&self) -> IoResult<SocketAddr> {
self.lis.local_addr()
}
}
pub struct UdpSocket {
socket: TokioUdpSocket,
}
impl UdpSocket {
pub async fn bind(addr: SocketAddr) -> IoResult<Self> {
TokioUdpSocket::bind(addr)
.await
.map(|socket| UdpSocket { socket })
}
}
#[async_trait]
impl traits::UdpSocket for UdpSocket {
async fn recv(&self, buf: &mut [u8]) -> IoResult<(usize, SocketAddr)> {
self.socket.recv_from(buf).await
}
async fn send(&self, buf: &[u8], target: &SocketAddr) -> IoResult<usize> {
self.socket.send_to(buf, target).await
}
fn local_addr(&self) -> IoResult<SocketAddr> {
self.socket.local_addr()
}
}
}
use crate::traits::*;
use async_trait::async_trait;
use futures::Future;
use std::io::Result as IoResult;
use std::time::Duration;
impl SleepProvider for TokioRuntimeHandle {
type SleepFuture = tokio_crate::time::Sleep;
fn sleep(&self, duration: Duration) -> Self::SleepFuture {
tokio_crate::time::sleep(duration)
}
}
#[async_trait]
impl crate::traits::TcpProvider for TokioRuntimeHandle {
type TcpStream = net::TcpStream;
type TcpListener = net::TcpListener;
async fn connect(&self, addr: &std::net::SocketAddr) -> IoResult<Self::TcpStream> {
let s = net::TokioTcpStream::connect(addr).await?;
Ok(s.into())
}
async fn listen(&self, addr: &std::net::SocketAddr) -> IoResult<Self::TcpListener> {
let lis = net::TokioTcpListener::bind(*addr).await?;
Ok(net::TcpListener { lis })
}
}
#[async_trait]
impl crate::traits::UdpProvider for TokioRuntimeHandle {
type UdpSocket = net::UdpSocket;
async fn bind(&self, addr: &std::net::SocketAddr) -> IoResult<Self::UdpSocket> {
net::UdpSocket::bind(*addr).await
}
}
pub(crate) fn create_runtime() -> IoResult<TokioRuntimeHandle> {
let mut builder = async_executors::TokioTpBuilder::new();
builder.tokio_builder().enable_all();
let owned = builder.build()?;
Ok(owned.into())
}
#[derive(Clone, Debug)]
pub struct TokioRuntimeHandle {
owned: Option<async_executors::TokioTp>,
handle: tokio_crate::runtime::Handle,
}
impl TokioRuntimeHandle {
pub(crate) fn new(handle: tokio_crate::runtime::Handle) -> Self {
handle.into()
}
pub fn is_owned(&self) -> bool {
self.owned.is_some()
}
}
impl From<tokio_crate::runtime::Handle> for TokioRuntimeHandle {
fn from(handle: tokio_crate::runtime::Handle) -> Self {
Self {
owned: None,
handle,
}
}
}
impl From<async_executors::TokioTp> for TokioRuntimeHandle {
fn from(owner: async_executors::TokioTp) -> TokioRuntimeHandle {
let handle = owner.block_on(async { tokio_crate::runtime::Handle::current() });
Self {
owned: Some(owner),
handle,
}
}
}
impl BlockOn for TokioRuntimeHandle {
fn block_on<F: Future>(&self, f: F) -> F::Output {
self.handle.block_on(f)
}
}
impl futures::task::Spawn for TokioRuntimeHandle {
fn spawn_obj(
&self,
future: futures::task::FutureObj<'static, ()>,
) -> Result<(), futures::task::SpawnError> {
let join_handle = self.handle.spawn(future);
drop(join_handle); Ok(())
}
}