use std::sync::{
Arc,
atomic::{
AtomicBool,
Ordering,
},
};
use qubit_function::Callable;
use crate::task::spi::{
RunningTaskSlot,
TaskRunner,
TaskSlot,
};
use super::scheduled_task_entry::{
ScheduledTaskEntry,
StartedScheduledTask,
};
pub(crate) struct CompletableScheduledTask<R, E> {
task: Box<dyn FnOnce(RunningTaskSlot<R, E>) + Send + 'static>,
slot: TaskSlot<R, E>,
cancelled: Arc<AtomicBool>,
}
impl<R, E> CompletableScheduledTask<R, E> {
pub(crate) fn new<C>(task: C, slot: TaskSlot<R, E>, cancelled: Arc<AtomicBool>) -> Self
where
C: Callable<R, E> + Send + 'static,
R: Send + 'static,
E: Send + 'static,
{
Self {
task: Box::new(move |running_slot| {
TaskRunner::new(task).run_started(running_slot);
}),
slot,
cancelled,
}
}
}
impl<R, E> ScheduledTaskEntry for CompletableScheduledTask<R, E>
where
R: Send + 'static,
E: Send + 'static,
{
#[inline]
fn accept(&self) {
self.slot.accept();
}
#[inline]
fn is_cancelled(&self) -> bool {
self.cancelled.load(Ordering::Acquire)
}
fn start(self: Box<Self>) -> Option<StartedScheduledTask> {
let Self {
task,
slot,
cancelled,
} = *self;
match slot.try_start() {
Ok(running_slot) => Some(Box::new(move || {
task(running_slot);
})),
Err(_) => {
cancelled.store(true, Ordering::Release);
None
}
}
}
#[inline]
fn cancel(self: Box<Self>) -> bool {
let Self {
slot, cancelled, ..
} = *self;
if slot.cancel_unstarted() {
cancelled.store(true, Ordering::Release);
true
} else {
false
}
}
}