#![allow(clippy::needless_doctest_main)]
#![doc(
html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
)]
#![doc(
html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
)]
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread;
use futures_core::ready;
use once_cell::sync::Lazy;
use pin_project_lite::pin_project;
pub trait CompatExt {
fn compat(self) -> Compat<Self>
where
Self: Sized;
fn compat_ref(&self) -> Compat<&Self>;
fn compat_mut(&mut self) -> Compat<&mut Self>;
}
impl<T> CompatExt for T {
fn compat(self) -> Compat<Self>
where
Self: Sized,
{
Compat::new(self)
}
fn compat_ref(&self) -> Compat<&Self> {
Compat::new(self)
}
fn compat_mut(&mut self) -> Compat<&mut Self> {
Compat::new(self)
}
}
pin_project! {
#[derive(Clone)]
pub struct Compat<T> {
#[pin]
inner: Option<T>,
seek_pos: Option<io::SeekFrom>,
}
impl<T> PinnedDrop for Compat<T> {
fn drop(this: Pin<&mut Self>) {
if this.inner.is_some() {
let _guard = get_runtime_handle().enter();
this.project().inner.set(None);
}
}
}
}
impl<T> Compat<T> {
pub fn new(t: T) -> Compat<T> {
Compat {
inner: Some(t),
seek_pos: None,
}
}
pub fn get_ref(&self) -> &T {
self.inner
.as_ref()
.expect("inner is only None when Compat is about to drop")
}
pub fn get_mut(&mut self) -> &mut T {
self.inner
.as_mut()
.expect("inner is only None when Compat is about to drop")
}
fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
self.project()
.inner
.as_pin_mut()
.expect("inner is only None when Compat is about to drop")
}
pub fn into_inner(mut self) -> T {
self.inner
.take()
.expect("inner is only None when Compat is about to drop")
}
}
impl<T: Future> Future for Compat<T> {
type Output = T::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let _guard = get_runtime_handle().enter();
self.get_pin_mut().poll(cx)
}
}
impl<T: tokio::io::AsyncRead> futures_io::AsyncRead for Compat<T> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let mut buf = tokio::io::ReadBuf::new(buf);
ready!(self.get_pin_mut().poll_read(cx, &mut buf))?;
Poll::Ready(Ok(buf.filled().len()))
}
}
impl<T: futures_io::AsyncRead> tokio::io::AsyncRead for Compat<T> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let unfilled = buf.initialize_unfilled();
let poll = self.get_pin_mut().poll_read(cx, unfilled);
if let Poll::Ready(Ok(num)) = &poll {
buf.advance(*num);
}
poll.map_ok(|_| ())
}
}
impl<T: tokio::io::AsyncBufRead> futures_io::AsyncBufRead for Compat<T> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
self.get_pin_mut().poll_fill_buf(cx)
}
fn consume(self: Pin<&mut Self>, amt: usize) {
self.get_pin_mut().consume(amt)
}
}
impl<T: futures_io::AsyncBufRead> tokio::io::AsyncBufRead for Compat<T> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
self.get_pin_mut().poll_fill_buf(cx)
}
fn consume(self: Pin<&mut Self>, amt: usize) {
self.get_pin_mut().consume(amt)
}
}
impl<T: tokio::io::AsyncWrite> futures_io::AsyncWrite for Compat<T> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.get_pin_mut().poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.get_pin_mut().poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.get_pin_mut().poll_shutdown(cx)
}
}
impl<T: futures_io::AsyncWrite> tokio::io::AsyncWrite for Compat<T> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.get_pin_mut().poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.get_pin_mut().poll_flush(cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.get_pin_mut().poll_close(cx)
}
}
impl<T: tokio::io::AsyncSeek> futures_io::AsyncSeek for Compat<T> {
fn poll_seek(
mut self: Pin<&mut Self>,
cx: &mut Context,
pos: io::SeekFrom,
) -> Poll<io::Result<u64>> {
if self.seek_pos != Some(pos) {
self.as_mut().get_pin_mut().start_seek(pos)?;
*self.as_mut().project().seek_pos = Some(pos);
}
let res = ready!(self.as_mut().get_pin_mut().poll_complete(cx));
*self.as_mut().project().seek_pos = None;
Poll::Ready(res)
}
}
impl<T: futures_io::AsyncSeek> tokio::io::AsyncSeek for Compat<T> {
fn start_seek(mut self: Pin<&mut Self>, pos: io::SeekFrom) -> io::Result<()> {
*self.as_mut().project().seek_pos = Some(pos);
Ok(())
}
fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>> {
let pos = match self.seek_pos {
None => {
return Poll::Ready(Ok(0));
}
Some(pos) => pos,
};
let res = ready!(self.as_mut().get_pin_mut().poll_seek(cx, pos));
*self.as_mut().project().seek_pos = None;
Poll::Ready(res)
}
}
fn get_runtime_handle() -> tokio::runtime::Handle {
tokio::runtime::Handle::try_current().unwrap_or_else(|_| TOKIO1.handle().clone())
}
static TOKIO1: Lazy<tokio::runtime::Runtime> = Lazy::new(|| {
thread::Builder::new()
.name("async-compat/tokio-1".into())
.spawn(|| TOKIO1.block_on(Pending))
.unwrap();
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("cannot start tokio-1 runtime")
});
struct Pending;
impl Future for Pending {
type Output = ();
fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Pending
}
}
#[cfg(test)]
mod tests {
use super::Lazy;
use crate::{CompatExt, TOKIO1};
#[test]
fn fallback_runtime_is_created_if_and_only_if_outside_tokio_context() {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(use_tokio().compat());
assert!(Lazy::get(&TOKIO1).is_none());
futures::executor::block_on(use_tokio().compat());
assert!(Lazy::get(&TOKIO1).is_some());
}
async fn use_tokio() {
tokio::time::sleep(std::time::Duration::from_micros(1)).await
}
}