qubit_executor/executor/delay_executor.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11 sync::Arc,
12 thread,
13 time::Duration,
14};
15
16use qubit_function::Callable;
17
18use crate::{
19 TrackedTask,
20 hook::{
21 TaskHook,
22 notify_rejected_optional,
23 },
24 service::SubmissionError,
25 task::{
26 spi::TaskEndpointPair,
27 task_admission_gate::TaskAdmissionGate,
28 },
29};
30
31use super::{
32 Executor,
33 thread_spawn_config::ThreadSpawnConfig,
34};
35
36type Worker = Box<dyn FnOnce() + Send + 'static>;
37
38/// Executor that starts each task after a fixed delay.
39///
40/// `DelayExecutor` is a small non-blocking submission strategy for cases where
41/// one task should run later while the caller must return immediately. It
42/// models delayed start, not minimum execution duration.
43///
44/// Each accepted task gets its own helper OS thread. The helper thread sleeps
45/// for the configured delay and then runs the task. This keeps the submitting
46/// thread unblocked without requiring a shared timer, queue, runtime, or
47/// scheduler worker. It is intentionally not a general-purpose delayed task
48/// scheduler and does not coalesce timers across tasks.
49///
50/// The returned [`TrackedTask`] is created immediately. Dropping the handle does
51/// not cancel the helper thread; use [`TrackedTask::cancel`] before the helper
52/// thread starts the task when pre-start cancellation is needed.
53///
54#[derive(Clone)]
55pub struct DelayExecutor {
56 /// Duration to sleep before each submitted task starts.
57 delay: Duration,
58 /// Hook notified about accepted task lifecycle events.
59 hook: Option<Arc<dyn TaskHook>>,
60 /// Optional stack size for each helper thread.
61 stack_size: Option<usize>,
62}
63
64impl DelayExecutor {
65 /// Creates an executor that delays task start by the supplied duration.
66 ///
67 /// # Parameters
68 ///
69 /// * `delay` - Duration to wait before running each task.
70 ///
71 /// # Returns
72 ///
73 /// A delay executor using the supplied delay.
74 #[inline]
75 pub fn new(delay: Duration) -> Self {
76 Self {
77 delay,
78 hook: None,
79 stack_size: None,
80 }
81 }
82
83 /// Returns a copy of this executor using the supplied task hook.
84 ///
85 /// # Parameters
86 ///
87 /// * `hook` - Hook notified about accepted task lifecycle events.
88 ///
89 /// # Returns
90 ///
91 /// This executor configured with `hook`.
92 #[inline]
93 pub fn with_hook(mut self, hook: Arc<dyn TaskHook>) -> Self {
94 self.hook = Some(hook);
95 self
96 }
97
98 /// Returns a copy of this executor using the supplied helper thread stack size.
99 ///
100 /// # Parameters
101 ///
102 /// * `stack_size` - Stack size in bytes for each helper thread.
103 ///
104 /// # Returns
105 ///
106 /// This executor configured with `stack_size`.
107 #[inline]
108 pub fn with_stack_size(mut self, stack_size: usize) -> Self {
109 self.stack_size = Some(stack_size);
110 self
111 }
112
113 /// Returns the configured delay.
114 ///
115 /// # Returns
116 ///
117 /// The duration waited before each task starts.
118 #[inline]
119 pub const fn delay(&self) -> Duration {
120 self.delay
121 }
122
123 /// Spawns one delayed worker thread.
124 ///
125 /// # Parameters
126 ///
127 /// * `worker` - Closure to run on the helper OS thread.
128 ///
129 /// # Returns
130 ///
131 /// `Ok(())` if the helper was spawned.
132 ///
133 /// # Errors
134 ///
135 /// Returns [`SubmissionError::WorkerSpawnFailed`] if the operating system
136 /// refuses to create the helper thread.
137 fn spawn_worker(&self, worker: Worker) -> Result<(), SubmissionError> {
138 ThreadSpawnConfig::new(self.stack_size).spawn(worker)
139 }
140}
141
142impl Executor for DelayExecutor {
143 /// Starts a helper thread that waits and then runs the callable.
144 ///
145 /// This method returns after the helper thread has been spawned; it does
146 /// not wait for the configured delay or for task completion.
147 ///
148 /// # Parameters
149 ///
150 /// * `task` - Callable to run after the configured delay.
151 ///
152 /// # Returns
153 ///
154 /// A [`TrackedTask`] for the delayed task.
155 ///
156 /// # Errors
157 ///
158 /// Returns [`SubmissionError::WorkerSpawnFailed`] if the helper thread
159 /// cannot be created.
160 fn call<C, R, E>(&self, task: C) -> Result<TrackedTask<R, E>, SubmissionError>
161 where
162 C: Callable<R, E> + Send + 'static,
163 R: Send + 'static,
164 E: Send + 'static,
165 {
166 let (handle, slot) =
167 TaskEndpointPair::with_optional_hook(self.hook.clone()).into_tracked_parts();
168 let delay = self.delay;
169 let gate = TaskAdmissionGate::new(self.hook.is_some());
170 let worker_gate = gate.clone();
171 let hook = self.hook.clone();
172 self.spawn_worker(Box::new(move || {
173 worker_gate.wait();
174 if !delay.is_zero() {
175 thread::sleep(delay);
176 }
177 slot.run(task);
178 }))
179 .inspect_err(|error| notify_rejected_optional(hook.as_ref(), error))?;
180 handle.accept();
181 gate.open();
182 Ok(handle)
183 }
184}