qubit_thread_pool/delayed/delayed_task_scheduler.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11 sync::{
12 Arc,
13 atomic::{
14 AtomicU8,
15 Ordering,
16 },
17 },
18 thread,
19 time::{
20 Duration,
21 Instant,
22 },
23};
24
25use qubit_executor::service::{
26 ExecutorServiceLifecycle,
27 StopReport,
28 SubmissionError,
29};
30
31use super::delayed_task_handle::DelayedTaskHandle;
32use super::delayed_task_scheduler_inner::DelayedTaskSchedulerInner;
33use super::delayed_task_scheduler_worker::DelayedTaskSchedulerWorker;
34use super::delayed_task_state::TASK_PENDING;
35use super::scheduled_task::ScheduledTask;
36use crate::ExecutorServiceBuilderError;
37
38/// Single-threaded scheduler for cancellable delayed tasks.
39///
40/// The scheduler only owns delay timing. Scheduled closures should stay small;
41/// submit longer work to an executor service from the closure.
42pub struct DelayedTaskScheduler {
43 /// Shared scheduler state.
44 inner: Arc<DelayedTaskSchedulerInner>,
45}
46
47impl DelayedTaskScheduler {
48 /// Starts a new delayed task scheduler.
49 ///
50 /// # Parameters
51 ///
52 /// * `thread_name` - Name for the scheduler thread.
53 ///
54 /// # Returns
55 ///
56 /// A started delayed task scheduler.
57 ///
58 /// # Errors
59 ///
60 /// Returns [`ExecutorServiceBuilderError::SpawnWorker`] if the scheduler thread
61 /// cannot be created.
62 pub fn new(thread_name: &str) -> Result<Self, ExecutorServiceBuilderError> {
63 Self::with_stack_size(thread_name, None)
64 }
65
66 /// Starts a new delayed task scheduler with an optional thread stack size.
67 ///
68 /// # Parameters
69 ///
70 /// * `thread_name` - Name for the scheduler thread.
71 /// * `stack_size` - Optional stack size for the scheduler thread.
72 ///
73 /// # Returns
74 ///
75 /// A started delayed task scheduler.
76 ///
77 /// # Errors
78 ///
79 /// Returns [`ExecutorServiceBuilderError::SpawnWorker`] if the scheduler thread
80 /// cannot be created.
81 pub fn with_stack_size(
82 thread_name: &str,
83 stack_size: Option<usize>,
84 ) -> Result<Self, ExecutorServiceBuilderError> {
85 let inner = Arc::new(DelayedTaskSchedulerInner::new());
86 let worker_inner = Arc::clone(&inner);
87 let mut builder = thread::Builder::new().name(thread_name.to_string());
88 if let Some(stack_size) = stack_size {
89 builder = builder.stack_size(stack_size);
90 }
91 let worker = builder.spawn(move || DelayedTaskSchedulerWorker::run(worker_inner));
92 if let Err(source) = worker {
93 return Err(ExecutorServiceBuilderError::SpawnWorker {
94 index: Some(0),
95 source,
96 });
97 }
98 Ok(Self { inner })
99 }
100
101 /// Schedules a task to run after the given delay.
102 ///
103 /// # Parameters
104 ///
105 /// * `delay` - Minimum delay before the task becomes runnable.
106 /// * `task` - Action to run on the scheduler thread after the delay.
107 ///
108 /// # Returns
109 ///
110 /// A handle that can cancel the task before it starts.
111 ///
112 /// # Errors
113 ///
114 /// Returns [`SubmissionError::Shutdown`] after shutdown starts.
115 pub fn schedule<F>(
116 &self,
117 delay: Duration,
118 task: F,
119 ) -> Result<DelayedTaskHandle, SubmissionError>
120 where
121 F: FnOnce() + Send + 'static,
122 {
123 let task_state = Arc::new(AtomicU8::new(TASK_PENDING));
124 let inner_for_cancel = Arc::downgrade(&self.inner);
125 let handle = DelayedTaskHandle::new(
126 Arc::clone(&task_state),
127 Arc::new(move || {
128 if let Some(inner) = inner_for_cancel.upgrade() {
129 inner.finish_queued_cancellation();
130 }
131 }),
132 );
133 let deadline = Instant::now() + delay;
134 let mut state = self.inner.state.lock();
135 if state.lifecycle != ExecutorServiceLifecycle::Running {
136 return Err(SubmissionError::Shutdown);
137 }
138 let sequence = state.next_sequence;
139 state.next_sequence = state.next_sequence.wrapping_add(1);
140 state.tasks.push(ScheduledTask::new(
141 deadline,
142 sequence,
143 task_state,
144 Box::new(task),
145 ));
146 self.inner.queued_task_count.fetch_add(1, Ordering::AcqRel);
147 self.inner.state.notify_all();
148 Ok(handle)
149 }
150
151 /// Requests graceful shutdown.
152 pub fn shutdown(&self) {
153 self.inner.shutdown();
154 }
155
156 /// Requests immediate shutdown and cancels pending delayed tasks.
157 ///
158 /// # Returns
159 ///
160 /// Count-based shutdown report.
161 pub fn stop(&self) -> StopReport {
162 self.inner.stop()
163 }
164
165 /// Returns the current lifecycle state.
166 ///
167 /// # Returns
168 ///
169 /// [`ExecutorServiceLifecycle::Terminated`] after the scheduler thread has
170 /// exited, otherwise the stored lifecycle state.
171 pub fn lifecycle(&self) -> ExecutorServiceLifecycle {
172 self.inner.lifecycle()
173 }
174
175 /// Returns whether this scheduler still accepts delayed tasks.
176 ///
177 /// # Returns
178 ///
179 /// `true` only while the lifecycle is [`ExecutorServiceLifecycle::Running`].
180 pub fn is_running(&self) -> bool {
181 self.lifecycle() == ExecutorServiceLifecycle::Running
182 }
183
184 /// Returns whether graceful shutdown is in progress.
185 ///
186 /// # Returns
187 ///
188 /// `true` only while the lifecycle is
189 /// [`ExecutorServiceLifecycle::ShuttingDown`].
190 pub fn is_shutting_down(&self) -> bool {
191 self.lifecycle() == ExecutorServiceLifecycle::ShuttingDown
192 }
193
194 /// Returns whether abrupt stop is in progress.
195 ///
196 /// # Returns
197 ///
198 /// `true` only while the lifecycle is [`ExecutorServiceLifecycle::Stopping`].
199 pub fn is_stopping(&self) -> bool {
200 self.lifecycle() == ExecutorServiceLifecycle::Stopping
201 }
202
203 /// Returns whether shutdown has started.
204 ///
205 /// # Returns
206 ///
207 /// `true` if this scheduler rejects new tasks.
208 pub fn is_not_running(&self) -> bool {
209 self.inner.is_not_running()
210 }
211
212 /// Returns whether the scheduler thread has exited.
213 ///
214 /// # Returns
215 ///
216 /// `true` after shutdown and termination.
217 pub fn is_terminated(&self) -> bool {
218 self.inner.is_terminated()
219 }
220
221 /// Returns the number of pending delayed tasks.
222 ///
223 /// # Returns
224 ///
225 /// Number of accepted delayed tasks that have not started or been
226 /// cancelled.
227 pub fn queued_count(&self) -> usize {
228 self.inner.queued_count()
229 }
230
231 /// Blocks until the scheduler thread has terminated.
232 pub fn wait_termination(&self) {
233 self.inner.wait_for_termination();
234 }
235}
236
237impl Drop for DelayedTaskScheduler {
238 fn drop(&mut self) {
239 self.inner.shutdown();
240 }
241}