use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread;
use futures_core::ready;
use once_cell::sync::Lazy;
use pin_project_lite::pin_project;
pub trait CompatExt {
fn compat(self) -> Compat<Self>
where
Self: Sized;
fn compat_ref(&self) -> Compat<&Self>;
fn compat_mut(&mut self) -> Compat<&mut Self>;
}
impl<T> CompatExt for T {
fn compat(self) -> Compat<Self>
where
Self: Sized,
{
Compat::new(self)
}
fn compat_ref(&self) -> Compat<&Self> {
Compat::new(self)
}
fn compat_mut(&mut self) -> Compat<&mut Self> {
Compat::new(self)
}
}
pin_project! {
pub struct Compat<T> {
#[pin]
inner: T,
seek_pos: Option<io::SeekFrom>,
}
}
impl<T> Compat<T> {
pub fn new(t: T) -> Compat<T> {
Compat {
inner: t,
seek_pos: None,
}
}
pub fn get_ref(&self) -> &T {
&self.inner
}
pub fn get_mut(&mut self) -> &mut T {
&mut self.inner
}
pub fn into_inner(self) -> T {
self.inner
}
}
impl<T: Future> Future for Compat<T> {
type Output = T::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let _guard = TOKIO1.enter();
self.project().inner.poll(cx)
}
}
impl<T: tokio::io::AsyncRead> futures_io::AsyncRead for Compat<T> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let mut buf = tokio::io::ReadBuf::new(buf);
ready!(self.project().inner.poll_read(cx, &mut buf))?;
Poll::Ready(Ok(buf.filled().len()))
}
}
impl<T: futures_io::AsyncRead> tokio::io::AsyncRead for Compat<T> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let unfilled = buf.initialize_unfilled();
let poll = self.project().inner.poll_read(cx, unfilled);
if let Poll::Ready(Ok(num)) = &poll {
buf.advance(*num);
}
poll.map_ok(|_| ())
}
}
impl<T: tokio::io::AsyncBufRead> futures_io::AsyncBufRead for Compat<T> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
self.project().inner.poll_fill_buf(cx)
}
fn consume(self: Pin<&mut Self>, amt: usize) {
self.project().inner.consume(amt)
}
}
impl<T: futures_io::AsyncBufRead> tokio::io::AsyncBufRead for Compat<T> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
self.project().inner.poll_fill_buf(cx)
}
fn consume(self: Pin<&mut Self>, amt: usize) {
self.project().inner.consume(amt)
}
}
impl<T: tokio::io::AsyncWrite> futures_io::AsyncWrite for Compat<T> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.project().inner.poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().inner.poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().inner.poll_shutdown(cx)
}
}
impl<T: futures_io::AsyncWrite> tokio::io::AsyncWrite for Compat<T> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.project().inner.poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().inner.poll_flush(cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().inner.poll_close(cx)
}
}
impl<T: tokio::io::AsyncSeek> futures_io::AsyncSeek for Compat<T> {
fn poll_seek(
mut self: Pin<&mut Self>,
cx: &mut Context,
pos: io::SeekFrom,
) -> Poll<io::Result<u64>> {
if self.seek_pos != Some(pos) {
self.as_mut().project().inner.start_seek(pos)?;
*self.as_mut().project().seek_pos = Some(pos);
}
let res = ready!(self.as_mut().project().inner.poll_complete(cx));
*self.as_mut().project().seek_pos = None;
Poll::Ready(res.map(|p| p as u64))
}
}
impl<T: futures_io::AsyncSeek> tokio::io::AsyncSeek for Compat<T> {
fn start_seek(mut self: Pin<&mut Self>, pos: io::SeekFrom) -> io::Result<()> {
*self.as_mut().project().seek_pos = Some(pos);
Ok(())
}
fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>> {
let pos = match self.seek_pos {
None => {
return Poll::Ready(Ok(0));
}
Some(pos) => pos,
};
let res = ready!(self.as_mut().project().inner.poll_seek(cx, pos));
*self.as_mut().project().seek_pos = None;
Poll::Ready(res.map(|p| p as u64))
}
}
static TOKIO1: Lazy<tokio::runtime::Runtime> = Lazy::new(|| {
thread::Builder::new()
.name("async-compat/tokio-1".to_string())
.spawn(move || TOKIO1.block_on(Pending))
.unwrap();
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("cannot start tokio-1 runtime")
});
struct Pending;
impl Future for Pending {
type Output = ();
fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Pending
}
}