use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread;
use futures_core::ready;
use once_cell::sync::Lazy;
use pin_project_lite::pin_project;
pub trait CompatExt {
fn compat(self) -> Compat<Self>
where
Self: Sized;
fn compat_ref(&self) -> Compat<&Self>;
fn compat_mut(&mut self) -> Compat<&mut Self>;
}
impl<T> CompatExt for T {
fn compat(self) -> Compat<Self>
where
Self: Sized,
{
Compat::new(self)
}
fn compat_ref(&self) -> Compat<&Self> {
Compat::new(self)
}
fn compat_mut(&mut self) -> Compat<&mut Self> {
Compat::new(self)
}
}
pin_project! {
pub struct Compat<T> {
#[pin]
inner: T,
seek_pos: Option<io::SeekFrom>,
seek_res: Option<io::Result<u64>>,
}
}
impl<T> Compat<T> {
pub fn new(t: T) -> Compat<T> {
Compat {
inner: t,
seek_pos: None,
seek_res: None,
}
}
pub fn get_ref(&self) -> &T {
&self.inner
}
pub fn get_mut(&mut self) -> &mut T {
&mut self.inner
}
pub fn into_inner(self) -> T {
self.inner
}
}
impl<T: Future> Future for Compat<T> {
type Output = T::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
TOKIO.enter(|| self.project().inner.poll(cx))
}
}
impl<T: tokio::io::AsyncRead> futures_io::AsyncRead for Compat<T> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
self.project().inner.poll_read(cx, buf)
}
}
impl<T: futures_io::AsyncRead> tokio::io::AsyncRead for Compat<T> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
self.project().inner.poll_read(cx, buf)
}
}
impl<T: tokio::io::AsyncBufRead> futures_io::AsyncBufRead for Compat<T> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
self.project().inner.poll_fill_buf(cx)
}
fn consume(self: Pin<&mut Self>, amt: usize) {
self.project().inner.consume(amt)
}
}
impl<T: futures_io::AsyncBufRead> tokio::io::AsyncBufRead for Compat<T> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
self.project().inner.poll_fill_buf(cx)
}
fn consume(self: Pin<&mut Self>, amt: usize) {
self.project().inner.consume(amt)
}
}
impl<T: tokio::io::AsyncWrite> futures_io::AsyncWrite for Compat<T> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.project().inner.poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().inner.poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().inner.poll_shutdown(cx)
}
}
impl<T: futures_io::AsyncWrite> tokio::io::AsyncWrite for Compat<T> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.project().inner.poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().inner.poll_flush(cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().inner.poll_close(cx)
}
}
impl<T: tokio::io::AsyncSeek> futures_io::AsyncSeek for Compat<T> {
fn poll_seek(
mut self: Pin<&mut Self>,
cx: &mut Context,
pos: io::SeekFrom,
) -> Poll<io::Result<u64>> {
if self.seek_pos != Some(pos) {
ready!(self.as_mut().project().inner.start_seek(cx, pos))?;
*self.as_mut().project().seek_pos = Some(pos);
}
let res = ready!(self.as_mut().project().inner.poll_complete(cx));
*self.as_mut().project().seek_pos = None;
Poll::Ready(res.map(|p| p as u64))
}
}
impl<T: futures_io::AsyncSeek> tokio::io::AsyncSeek for Compat<T> {
fn start_seek(
mut self: Pin<&mut Self>,
cx: &mut Context,
pos: io::SeekFrom,
) -> Poll<io::Result<()>> {
let p = ready!(self.as_mut().project().inner.poll_seek(cx, pos))?;
*self.project().seek_res = Some(Ok(p));
Poll::Ready(Ok(()))
}
fn poll_complete(self: Pin<&mut Self>, _: &mut Context) -> Poll<io::Result<u64>> {
Poll::Ready(
self.project()
.seek_res
.take()
.unwrap_or(Err(io::ErrorKind::Other.into())),
)
}
}
static TOKIO: Lazy<tokio::runtime::Handle> = Lazy::new(|| {
let mut rt = tokio::runtime::Builder::new()
.enable_all()
.basic_scheduler()
.build()
.expect("cannot start tokio runtime");
let handle = rt.handle().clone();
thread::Builder::new()
.name("async-compat/tokio".to_string())
.spawn(move || rt.block_on(Pending))
.unwrap();
handle
});
struct Pending;
impl Future for Pending {
type Output = ();
fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Pending
}
}