use std::future::IntoFuture;
use qubit_executor::task::TaskHandleFuture;
use qubit_executor::{
CancelResult,
TaskResult,
TaskStatus,
TrackedTask,
TryGet,
task::spi::{
TaskResultHandle,
TrackedTaskHandle,
},
};
use tokio::task::AbortHandle;
type CancelQueuedTask = Box<dyn Fn() + Send + Sync + 'static>;
pub struct TokioBlockingTaskHandle<R, E> {
handle: TrackedTask<R, E>,
abort_handle: AbortHandle,
cancel_queued_task: CancelQueuedTask,
}
impl<R, E> TokioBlockingTaskHandle<R, E> {
#[inline]
pub(crate) fn new<F>(
handle: TrackedTask<R, E>,
abort_handle: AbortHandle,
cancel_queued_task: F,
) -> Self
where
F: Fn() + Send + Sync + 'static,
{
Self {
handle,
abort_handle,
cancel_queued_task: Box::new(cancel_queued_task),
}
}
#[inline]
pub fn get(self) -> TaskResult<R, E>
where
R: Send,
E: Send,
{
<Self as TaskResultHandle<R, E>>::get(self)
}
#[inline]
pub fn try_get(self) -> TryGet<Self, R, E>
where
R: Send,
E: Send,
{
<Self as TaskResultHandle<R, E>>::try_get(self)
}
#[inline]
pub fn is_done(&self) -> bool
where
R: Send,
E: Send,
{
<Self as TaskResultHandle<R, E>>::is_done(self)
}
#[inline]
pub fn status(&self) -> TaskStatus {
self.handle.status()
}
#[must_use]
#[inline]
pub fn cancel(&self) -> CancelResult {
let result = self.handle.cancel();
if result == CancelResult::Cancelled {
(self.cancel_queued_task)();
self.abort_handle.abort();
}
result
}
}
impl<R, E> TaskResultHandle<R, E> for TokioBlockingTaskHandle<R, E>
where
R: Send,
E: Send,
{
#[inline]
fn is_done(&self) -> bool {
self.handle.is_done()
}
#[inline]
fn get(self) -> TaskResult<R, E> {
self.handle.get()
}
#[inline]
fn try_get(self) -> TryGet<Self, R, E> {
let Self {
handle,
abort_handle,
cancel_queued_task,
} = self;
match handle.try_get() {
TryGet::Ready(result) => TryGet::Ready(result),
TryGet::Pending(handle) => TryGet::Pending(Self {
handle,
abort_handle,
cancel_queued_task,
}),
}
}
}
impl<R, E> TrackedTaskHandle<R, E> for TokioBlockingTaskHandle<R, E>
where
R: Send,
E: Send,
{
#[inline]
fn status(&self) -> TaskStatus {
self.handle.status()
}
#[inline]
fn cancel(&self) -> CancelResult {
Self::cancel(self)
}
}
impl<R, E> IntoFuture for TokioBlockingTaskHandle<R, E> {
type Output = TaskResult<R, E>;
type IntoFuture = TaskHandleFuture<R, E>;
#[inline]
fn into_future(self) -> Self::IntoFuture {
self.handle.into_future()
}
}