use std::future::Future;
use std::pin::Pin;
use std::sync::LazyLock;
use std::task::{Context, Poll, Waker};
pub(crate) static RUNTIME: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
tokio::runtime::Builder::new_multi_thread()
.enable_time()
.enable_io()
.build()
.unwrap()
});
#[derive(Debug)]
pub struct AbortOnDropJoinHandle<T>(tokio::task::JoinHandle<T>);
impl<T> AbortOnDropJoinHandle<T> {
pub async fn cancel(mut self) -> Option<T> {
self.0.abort();
match (&mut self.0).await {
Ok(value) => Some(value),
Err(err) if err.is_cancelled() => None,
Err(err) => std::panic::resume_unwind(err.into_panic()),
}
}
}
impl<T> Drop for AbortOnDropJoinHandle<T> {
fn drop(&mut self) {
self.0.abort()
}
}
impl<T> std::ops::Deref for AbortOnDropJoinHandle<T> {
type Target = tokio::task::JoinHandle<T>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T> std::ops::DerefMut for AbortOnDropJoinHandle<T> {
fn deref_mut(&mut self) -> &mut tokio::task::JoinHandle<T> {
&mut self.0
}
}
impl<T> From<tokio::task::JoinHandle<T>> for AbortOnDropJoinHandle<T> {
fn from(jh: tokio::task::JoinHandle<T>) -> Self {
AbortOnDropJoinHandle(jh)
}
}
impl<T> Future for AbortOnDropJoinHandle<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.as_mut().0).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(r) => Poll::Ready(r.expect("child task panicked")),
}
}
}
pub fn spawn<F>(f: F) -> AbortOnDropJoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let j = with_ambient_tokio_runtime(|| tokio::task::spawn(f));
AbortOnDropJoinHandle(j)
}
pub fn spawn_blocking<F, R>(f: F) -> AbortOnDropJoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let j = with_ambient_tokio_runtime(|| tokio::task::spawn_blocking(f));
AbortOnDropJoinHandle(j)
}
pub fn in_tokio<F: Future>(f: F) -> F::Output {
match tokio::runtime::Handle::try_current() {
Ok(h) => {
let _enter = h.enter();
h.block_on(f)
}
Err(_) => {
let _enter = RUNTIME.enter();
RUNTIME.block_on(async move {
tokio::task::yield_now().await;
f.await
})
}
}
}
pub fn with_ambient_tokio_runtime<R>(f: impl FnOnce() -> R) -> R {
match tokio::runtime::Handle::try_current() {
Ok(_) => f(),
Err(_) => {
let _enter = RUNTIME.enter();
f()
}
}
}
pub fn poll_noop<F>(future: Pin<&mut F>) -> Option<F::Output>
where
F: Future,
{
let mut task = Context::from_waker(Waker::noop());
match future.poll(&mut task) {
Poll::Ready(result) => Some(result),
Poll::Pending => None,
}
}