use std::pin::Pin;
use std::sync::Arc;
use std::sync::Weak;
use std::task::Context;
use std::task::Poll;
use std::task::ready;
use futures::FutureExt;
use tracing::Instrument;
use vortex_error::vortex_panic;
use crate::runtime::AbortHandleRef;
use crate::runtime::Executor;
#[derive(Clone)]
pub struct Handle {
runtime: Weak<dyn Executor>,
}
impl Handle {
pub fn new(runtime: Weak<dyn Executor>) -> Self {
Self { runtime }
}
fn runtime(&self) -> Arc<dyn Executor> {
self.runtime.upgrade().unwrap_or_else(|| {
vortex_panic!("Attempted to use a Handle after its runtime was dropped")
})
}
pub fn find() -> Option<Self> {
#[cfg(feature = "tokio")]
{
use tokio::runtime::Handle as TokioHandle;
if TokioHandle::try_current().is_ok() {
return Some(crate::runtime::tokio::TokioRuntime::current());
}
}
None
}
pub fn spawn<Fut, R>(&self, f: Fut) -> Task<R>
where
Fut: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
let (send, recv) = oneshot::channel();
let span = tracing::Span::current();
let abort_handle = self.runtime().spawn(
async move {
drop(send.send(f.await));
}
.instrument(span)
.boxed(),
);
Task {
recv: recv.into_future(),
abort_handle: Some(abort_handle),
}
}
pub fn spawn_nested<F, Fut, R>(&self, f: F) -> Task<R>
where
F: FnOnce(Handle) -> Fut,
Fut: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
self.spawn(f(Handle::new(Weak::clone(&self.runtime))))
}
pub fn spawn_cpu<F, R>(&self, f: F) -> Task<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (send, recv) = oneshot::channel();
let span = tracing::Span::current();
let abort_handle = self.runtime().spawn_cpu(Box::new(move || {
let _guard = span.enter();
if !send.is_closed() {
drop(send.send(f()));
}
}));
Task {
recv: recv.into_future(),
abort_handle: Some(abort_handle),
}
}
pub fn spawn_blocking<F, R>(&self, f: F) -> Task<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (send, recv) = oneshot::channel();
let span = tracing::Span::current();
let abort_handle = self.runtime().spawn_blocking_io(Box::new(move || {
let _guard = span.enter();
if !send.is_closed() {
drop(send.send(f()));
}
}));
Task {
recv: recv.into_future(),
abort_handle: Some(abort_handle),
}
}
}
#[must_use = "When a Task is dropped without being awaited, it is cancelled"]
pub struct Task<T> {
recv: oneshot::AsyncReceiver<T>,
abort_handle: Option<AbortHandleRef>,
}
impl<T> Task<T> {
pub fn detach(mut self) {
drop(self.abort_handle.take());
}
}
impl<T> Future for Task<T> {
type Output = T;
#[expect(clippy::panic)]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match ready!(self.recv.poll_unpin(cx)) {
Ok(result) => Poll::Ready(result),
Err(_recv_err) => {
panic!("Runtime dropped task without completing it, likely it panicked")
}
}
}
}
impl<T> Drop for Task<T> {
fn drop(&mut self) {
if let Some(handle) = self.abort_handle.take() {
handle.abort();
}
}
}