use std::{
panic::{
AssertUnwindSafe,
catch_unwind,
},
sync::Mutex,
};
use qubit_executor::task::spi::{
TaskRunner,
TaskSlot,
};
use qubit_function::{
Callable,
Runnable,
};
trait PoolTask: Send + 'static {
fn accept(&self);
fn run(self: Box<Self>);
fn cancel(self: Box<Self>);
}
struct CompletablePoolTask<C, R, E> {
task: C,
completion: TaskSlot<R, E>,
}
impl<C, R, E> PoolTask for CompletablePoolTask<C, R, E>
where
C: Callable<R, E> + Send + 'static,
R: Send + 'static,
E: Send + 'static,
{
fn accept(&self) {
self.completion.accept();
}
fn run(self: Box<Self>) {
let Self { task, completion } = *self;
TaskRunner::new(task).run(completion);
}
fn cancel(self: Box<Self>) {
let Self { completion, .. } = *self;
let _cancelled = completion.cancel_unstarted();
}
}
struct CustomPoolTask {
accept: Mutex<Option<Box<dyn FnOnce() + Send + 'static>>>,
run: Box<dyn FnOnce() + Send + 'static>,
cancel: Box<dyn FnOnce() + Send + 'static>,
}
impl PoolTask for CustomPoolTask {
fn accept(&self) {
if let Some(accept) = self
.accept
.lock()
.expect("custom pool job accept lock should not be poisoned")
.take()
{
accept();
}
}
fn run(self: Box<Self>) {
(self.run)();
}
fn cancel(self: Box<Self>) {
(self.cancel)();
}
}
pub struct PoolJob {
inner: PoolJobInner,
}
enum PoolJobInner {
Detached {
run: Box<dyn FnOnce() + Send + 'static>,
},
Completable(Box<dyn PoolTask>),
}
impl PoolJob {
pub fn new(
run: Box<dyn FnOnce() + Send + 'static>,
cancel: Box<dyn FnOnce() + Send + 'static>,
) -> Self {
Self::with_accept(Box::new(|| {}), run, cancel)
}
pub fn with_accept(
accept: Box<dyn FnOnce() + Send + 'static>,
run: Box<dyn FnOnce() + Send + 'static>,
cancel: Box<dyn FnOnce() + Send + 'static>,
) -> Self {
Self {
inner: PoolJobInner::Completable(Box::new(CustomPoolTask {
accept: Mutex::new(Some(accept)),
run,
cancel,
})),
}
}
pub(crate) fn from_task<C, R, E>(task: C, completion: TaskSlot<R, E>) -> Self
where
C: Callable<R, E> + Send + 'static,
R: Send + 'static,
E: Send + 'static,
{
Self {
inner: PoolJobInner::Completable(Box::new(CompletablePoolTask { task, completion })),
}
}
pub(crate) fn detached<T, E>(task: T) -> Self
where
T: Runnable<E> + Send + 'static,
E: Send + 'static,
{
Self {
inner: PoolJobInner::Detached {
run: Box::new(move || {
let mut task = task;
let _ignored = catch_unwind(AssertUnwindSafe(|| task.run()));
}),
},
}
}
pub(crate) fn accept(&self) {
if let PoolJobInner::Completable(task) = &self.inner {
task.accept();
}
}
pub(crate) fn run(self) {
match self.inner {
PoolJobInner::Detached { run } => run(),
PoolJobInner::Completable(task) => task.run(),
}
}
pub(crate) fn cancel(self) {
if let PoolJobInner::Completable(task) = self.inner {
task.cancel();
}
}
}