use std::pin::Pin;
use std::sync::{Arc, Weak};
use std::task::{Context, Poll, ready};
use futures::channel::mpsc;
use futures::{FutureExt, StreamExt};
use vortex_error::{VortexResult, vortex_panic};
use vortex_metrics::VortexMetrics;
use crate::file::{FileRead, IntoReadSource, IoRequestStream};
use crate::runtime::{AbortHandleRef, Executor, IoTask};
#[derive(Clone)]
pub struct Handle {
runtime: Weak<dyn Executor>,
}
impl Handle {
pub(crate) fn new(runtime: Weak<dyn Executor>) -> Self {
Self { runtime }
}
fn runtime(&self) -> Arc<dyn Executor> {
self.runtime.upgrade().unwrap_or_else(|| {
vortex_panic!("Attempted to use a Handle after its runtime was dropped")
})
}
pub fn find() -> Option<Self> {
#[cfg(feature = "tokio")]
{
use tokio::runtime::Handle as TokioHandle;
if TokioHandle::try_current().is_ok() {
return Some(crate::runtime::tokio::TokioRuntime::current());
}
}
None
}
pub fn spawn<Fut, R>(&self, f: Fut) -> Task<R>
where
Fut: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
let (send, recv) = oneshot::channel();
let abort_handle = self.runtime().spawn(
async move {
drop(send.send(f.await));
}
.boxed(),
);
Task {
recv,
abort_handle: Some(abort_handle),
}
}
pub fn spawn_nested<F, Fut, R>(&self, f: F) -> Task<R>
where
F: FnOnce(Handle) -> Fut,
Fut: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
self.spawn(f(Handle::new(self.runtime.clone())))
}
pub fn spawn_cpu<F, R>(&self, f: F) -> Task<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (send, recv) = oneshot::channel();
let abort_handle = self.runtime().spawn_cpu(Box::new(move || {
if !send.is_closed() {
drop(send.send(f()));
}
}));
Task {
recv,
abort_handle: Some(abort_handle),
}
}
pub fn spawn_blocking<F, R>(&self, f: F) -> Task<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (send, recv) = oneshot::channel();
let abort_handle = self.runtime().spawn_blocking(Box::new(move || {
if !send.is_closed() {
drop(send.send(f()));
}
}));
Task {
recv,
abort_handle: Some(abort_handle),
}
}
pub fn open_read<S: IntoReadSource>(
&self,
source: S,
metrics: VortexMetrics,
) -> VortexResult<FileRead> {
let source = source.into_read_source(self.clone())?;
let (send, recv) = mpsc::unbounded();
let read = FileRead::new(source.uri().clone(), source.size(), send);
let stream =
IoRequestStream::new(StreamExt::boxed(recv), source.coalesce_window(), metrics).boxed();
self.runtime().spawn_io(IoTask::new(source, stream));
Ok(read)
}
}
#[must_use = "When a Task is dropped without being awaited, it is cancelled"]
pub struct Task<T> {
recv: oneshot::Receiver<T>,
abort_handle: Option<AbortHandleRef>,
}
impl<T> Task<T> {
pub fn detach(mut self) {
drop(self.abort_handle.take());
}
}
impl<T> Future for Task<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match ready!(self.recv.poll_unpin(cx)) {
Ok(result) => Poll::Ready(result),
Err(recv_err) => {
vortex_panic!(
"Runtime dropped task without completing it, likely it panicked: {recv_err}"
)
}
}
}
}
impl<T> Drop for Task<T> {
fn drop(&mut self) {
if let Some(handle) = self.abort_handle.take() {
handle.abort();
}
}
}