use std::{
sync::Arc,
thread,
time::Duration,
};
use qubit_function::Callable;
use crate::{
TrackedTask,
hook::{
TaskHook,
notify_rejected_optional,
},
service::SubmissionError,
task::{
spi::TaskEndpointPair,
task_admission_gate::TaskAdmissionGate,
},
};
use super::{
Executor,
thread_spawn_config::ThreadSpawnConfig,
};
type Worker = Box<dyn FnOnce() + Send + 'static>;
#[derive(Clone)]
pub struct DelayExecutor {
delay: Duration,
hook: Option<Arc<dyn TaskHook>>,
stack_size: Option<usize>,
}
impl DelayExecutor {
#[inline]
pub fn new(delay: Duration) -> Self {
Self {
delay,
hook: None,
stack_size: None,
}
}
#[inline]
pub fn with_hook(mut self, hook: Arc<dyn TaskHook>) -> Self {
self.hook = Some(hook);
self
}
#[inline]
pub fn with_stack_size(mut self, stack_size: usize) -> Self {
self.stack_size = Some(stack_size);
self
}
#[inline]
pub const fn delay(&self) -> Duration {
self.delay
}
fn spawn_worker(&self, worker: Worker) -> Result<(), SubmissionError> {
ThreadSpawnConfig::new(self.stack_size).spawn(worker)
}
}
impl Executor for DelayExecutor {
fn call<C, R, E>(&self, task: C) -> Result<TrackedTask<R, E>, SubmissionError>
where
C: Callable<R, E> + Send + 'static,
R: Send + 'static,
E: Send + 'static,
{
let (handle, slot) =
TaskEndpointPair::with_optional_hook(self.hook.clone()).into_tracked_parts();
let delay = self.delay;
let gate = TaskAdmissionGate::new(self.hook.is_some());
let worker_gate = gate.clone();
let hook = self.hook.clone();
self.spawn_worker(Box::new(move || {
worker_gate.wait();
if !delay.is_zero() {
thread::sleep(delay);
}
slot.run(task);
}))
.inspect_err(|error| notify_rejected_optional(hook.as_ref(), error))?;
handle.accept();
gate.open();
Ok(handle)
}
}