Skip to main content

qubit_executor/executor/
delay_executor.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    sync::Arc,
12    thread,
13    time::Duration,
14};
15
16use qubit_function::Callable;
17
18use crate::{
19    TrackedTask,
20    hook::{
21        TaskHook,
22        notify_rejected_optional,
23    },
24    service::SubmissionError,
25    task::{
26        spi::TaskEndpointPair,
27        task_admission_gate::TaskAdmissionGate,
28    },
29};
30
31use super::{
32    Executor,
33    thread_spawn_config::ThreadSpawnConfig,
34};
35
36type Worker = Box<dyn FnOnce() + Send + 'static>;
37
38/// Executor that starts each task after a fixed delay.
39///
40/// `DelayExecutor` is a small non-blocking submission strategy for cases where
41/// one task should run later while the caller must return immediately. It
42/// models delayed start, not minimum execution duration.
43///
44/// Each accepted task gets its own helper OS thread. The helper thread sleeps
45/// for the configured delay and then runs the task. This keeps the submitting
46/// thread unblocked without requiring a shared timer, queue, runtime, or
47/// scheduler worker. It is intentionally not a general-purpose delayed task
48/// scheduler and does not coalesce timers across tasks.
49///
50/// The returned [`TrackedTask`] is created immediately. Dropping the handle does
51/// not cancel the helper thread; use [`TrackedTask::cancel`] before the helper
52/// thread starts the task when pre-start cancellation is needed.
53///
54#[derive(Clone)]
55pub struct DelayExecutor {
56    /// Duration to sleep before each submitted task starts.
57    delay: Duration,
58    /// Hook notified about accepted task lifecycle events.
59    hook: Option<Arc<dyn TaskHook>>,
60    /// Optional stack size for each helper thread.
61    stack_size: Option<usize>,
62}
63
64impl DelayExecutor {
65    /// Creates an executor that delays task start by the supplied duration.
66    ///
67    /// # Parameters
68    ///
69    /// * `delay` - Duration to wait before running each task.
70    ///
71    /// # Returns
72    ///
73    /// A delay executor using the supplied delay.
74    #[inline]
75    pub fn new(delay: Duration) -> Self {
76        Self {
77            delay,
78            hook: None,
79            stack_size: None,
80        }
81    }
82
83    /// Returns a copy of this executor using the supplied task hook.
84    ///
85    /// # Parameters
86    ///
87    /// * `hook` - Hook notified about accepted task lifecycle events.
88    ///
89    /// # Returns
90    ///
91    /// This executor configured with `hook`.
92    #[inline]
93    pub fn with_hook(mut self, hook: Arc<dyn TaskHook>) -> Self {
94        self.hook = Some(hook);
95        self
96    }
97
98    /// Returns a copy of this executor using the supplied helper thread stack size.
99    ///
100    /// # Parameters
101    ///
102    /// * `stack_size` - Stack size in bytes for each helper thread.
103    ///
104    /// # Returns
105    ///
106    /// This executor configured with `stack_size`.
107    #[inline]
108    pub fn with_stack_size(mut self, stack_size: usize) -> Self {
109        self.stack_size = Some(stack_size);
110        self
111    }
112
113    /// Returns the configured delay.
114    ///
115    /// # Returns
116    ///
117    /// The duration waited before each task starts.
118    #[inline]
119    pub const fn delay(&self) -> Duration {
120        self.delay
121    }
122
123    /// Spawns one delayed worker thread.
124    ///
125    /// # Parameters
126    ///
127    /// * `worker` - Closure to run on the helper OS thread.
128    ///
129    /// # Returns
130    ///
131    /// `Ok(())` if the helper was spawned.
132    ///
133    /// # Errors
134    ///
135    /// Returns [`SubmissionError::WorkerSpawnFailed`] if the operating system
136    /// refuses to create the helper thread.
137    fn spawn_worker(&self, worker: Worker) -> Result<(), SubmissionError> {
138        ThreadSpawnConfig::new(self.stack_size).spawn(worker)
139    }
140}
141
142impl Executor for DelayExecutor {
143    /// Starts a helper thread that waits and then runs the callable.
144    ///
145    /// This method returns after the helper thread has been spawned; it does
146    /// not wait for the configured delay or for task completion.
147    ///
148    /// # Parameters
149    ///
150    /// * `task` - Callable to run after the configured delay.
151    ///
152    /// # Returns
153    ///
154    /// A [`TrackedTask`] for the delayed task.
155    ///
156    /// # Errors
157    ///
158    /// Returns [`SubmissionError::WorkerSpawnFailed`] if the helper thread
159    /// cannot be created.
160    fn call<C, R, E>(&self, task: C) -> Result<TrackedTask<R, E>, SubmissionError>
161    where
162        C: Callable<R, E> + Send + 'static,
163        R: Send + 'static,
164        E: Send + 'static,
165    {
166        let (handle, slot) =
167            TaskEndpointPair::with_optional_hook(self.hook.clone()).into_tracked_parts();
168        let delay = self.delay;
169        let gate = TaskAdmissionGate::new(self.hook.is_some());
170        let worker_gate = gate.clone();
171        let hook = self.hook.clone();
172        self.spawn_worker(Box::new(move || {
173            worker_gate.wait();
174            if !delay.is_zero() {
175                thread::sleep(delay);
176            }
177            slot.run(task);
178        }))
179        .inspect_err(|error| notify_rejected_optional(hook.as_ref(), error))?;
180        handle.accept();
181        gate.open();
182        Ok(handle)
183    }
184}