use futures::Stream;
use pin_project::pin_project;
use std::{
future::Future,
ops::{Deref, DerefMut},
pin::Pin,
task::{Context, Poll},
};
pub trait Sendable: 'static + Send {}
impl<T> Sendable for T where T: 'static + Send {}
#[derive(Debug)]
#[pin_project]
pub struct SendableWrapper<T>(#[pin] T);
impl<T> SendableWrapper<T>
where
T: Sendable,
{
pub fn new(inner: T) -> Self {
Self(inner)
}
pub fn take(self) -> T {
self.0
}
}
impl<T: Sendable> Deref for SendableWrapper<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T: Sendable> DerefMut for SendableWrapper<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<T: Sendable + Clone> Clone for SendableWrapper<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<T: SendableFuture> Future for SendableWrapper<T> {
type Output = <T as Future>::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().0.poll(cx)
}
}
impl<T: SendableStream> Stream for SendableWrapper<T> {
type Item = <T as Stream>::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().0.poll_next(cx)
}
}
#[cfg(all(feature = "tokio", feature = "async-std"))]
compile_error!("You can not enable both the 'tokio' and 'async-std' features. This leads to namespace collisions");
#[cfg(feature = "tokio")]
pub use tokio::*;
#[cfg(feature = "async-std")]
pub use async_std::*;
use super::{SendableFuture, SendableStream};
#[cfg(feature = "tokio")]
mod tokio {
use crate::compat::SendableFuture;
use instant::{Duration, Instant};
use pin_project::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use super::Sendable;
pub fn spawn_task<F, T>(fut: F)
where
F: SendableFuture<Output = T>,
T: Sendable,
{
drop(tokio::spawn(fut));
}
#[pin_project]
pub struct Sleep(#[pin] tokio::time::Sleep);
impl Future for Sleep {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().0.poll(cx)
}
}
pub fn sleep_for(dur: Duration) -> Sleep {
Sleep(tokio::time::sleep(dur))
}
pub fn sleep_until(deadline: Instant) -> Sleep {
Sleep(tokio::time::sleep_until(deadline.into()))
}
}
#[cfg(feature = "async-std")]
mod async_std {
use crate::compat::SendableFuture;
use futures::FutureExt;
use instant::{Duration, Instant};
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use super::Sendable;
pub fn spawn_task<F, T>(fut: F)
where
F: SendableFuture<Output = T>,
T: Sendable,
{
drop(async_std::task::spawn(fut));
}
pub struct Sleep(Pin<Box<dyn 'static + Send + Future<Output = ()>>>);
impl Future for Sleep {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.0.poll_unpin(cx)
}
}
pub fn sleep_for(dur: Duration) -> Sleep {
Sleep(Box::pin(async_std::task::sleep(dur)))
}
pub fn sleep_until(deadline: Instant) -> Sleep {
sleep_for(deadline - Instant::now())
}
}