Skip to main content

qubit_thread_pool/delayed/
delayed_task_scheduler.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    sync::{
12        Arc,
13        atomic::{
14            AtomicU8,
15            Ordering,
16        },
17    },
18    thread,
19    time::{
20        Duration,
21        Instant,
22    },
23};
24
25use qubit_executor::service::{
26    ExecutorServiceLifecycle,
27    StopReport,
28    SubmissionError,
29};
30
31use super::delayed_task_handle::DelayedTaskHandle;
32use super::delayed_task_scheduler_inner::DelayedTaskSchedulerInner;
33use super::delayed_task_scheduler_worker::DelayedTaskSchedulerWorker;
34use super::delayed_task_state::TASK_PENDING;
35use super::scheduled_task::ScheduledTask;
36use crate::ExecutorServiceBuilderError;
37
38/// Single-threaded scheduler for cancellable delayed tasks.
39///
40/// The scheduler only owns delay timing. Scheduled closures should stay small;
41/// submit longer work to an executor service from the closure.
42pub struct DelayedTaskScheduler {
43    /// Shared scheduler state.
44    inner: Arc<DelayedTaskSchedulerInner>,
45}
46
47impl DelayedTaskScheduler {
48    /// Starts a new delayed task scheduler.
49    ///
50    /// # Parameters
51    ///
52    /// * `thread_name` - Name for the scheduler thread.
53    ///
54    /// # Returns
55    ///
56    /// A started delayed task scheduler.
57    ///
58    /// # Errors
59    ///
60    /// Returns [`ExecutorServiceBuilderError::SpawnWorker`] if the scheduler thread
61    /// cannot be created.
62    pub fn new(thread_name: &str) -> Result<Self, ExecutorServiceBuilderError> {
63        Self::with_stack_size(thread_name, None)
64    }
65
66    /// Starts a new delayed task scheduler with an optional thread stack size.
67    ///
68    /// # Parameters
69    ///
70    /// * `thread_name` - Name for the scheduler thread.
71    /// * `stack_size` - Optional stack size for the scheduler thread.
72    ///
73    /// # Returns
74    ///
75    /// A started delayed task scheduler.
76    ///
77    /// # Errors
78    ///
79    /// Returns [`ExecutorServiceBuilderError::SpawnWorker`] if the scheduler thread
80    /// cannot be created.
81    pub fn with_stack_size(
82        thread_name: &str,
83        stack_size: Option<usize>,
84    ) -> Result<Self, ExecutorServiceBuilderError> {
85        let inner = Arc::new(DelayedTaskSchedulerInner::new());
86        let worker_inner = Arc::clone(&inner);
87        let mut builder = thread::Builder::new().name(thread_name.to_string());
88        if let Some(stack_size) = stack_size {
89            builder = builder.stack_size(stack_size);
90        }
91        let worker = builder.spawn(move || DelayedTaskSchedulerWorker::run(worker_inner));
92        if let Err(source) = worker {
93            return Err(ExecutorServiceBuilderError::SpawnWorker {
94                index: Some(0),
95                source,
96            });
97        }
98        Ok(Self { inner })
99    }
100
101    /// Schedules a task to run after the given delay.
102    ///
103    /// # Parameters
104    ///
105    /// * `delay` - Minimum delay before the task becomes runnable.
106    /// * `task` - Action to run on the scheduler thread after the delay.
107    ///
108    /// # Returns
109    ///
110    /// A handle that can cancel the task before it starts.
111    ///
112    /// # Errors
113    ///
114    /// Returns [`SubmissionError::Shutdown`] after shutdown starts.
115    pub fn schedule<F>(
116        &self,
117        delay: Duration,
118        task: F,
119    ) -> Result<DelayedTaskHandle, SubmissionError>
120    where
121        F: FnOnce() + Send + 'static,
122    {
123        let task_state = Arc::new(AtomicU8::new(TASK_PENDING));
124        let inner_for_cancel = Arc::downgrade(&self.inner);
125        let handle = DelayedTaskHandle::new(
126            Arc::clone(&task_state),
127            Arc::new(move || {
128                if let Some(inner) = inner_for_cancel.upgrade() {
129                    inner.finish_queued_cancellation();
130                }
131            }),
132        );
133        let deadline = Instant::now() + delay;
134        let mut state = self.inner.state.lock();
135        if state.lifecycle != ExecutorServiceLifecycle::Running {
136            return Err(SubmissionError::Shutdown);
137        }
138        let sequence = state.next_sequence;
139        state.next_sequence = state.next_sequence.wrapping_add(1);
140        state.tasks.push(ScheduledTask::new(
141            deadline,
142            sequence,
143            task_state,
144            Box::new(task),
145        ));
146        self.inner.queued_task_count.fetch_add(1, Ordering::AcqRel);
147        self.inner.state.notify_all();
148        Ok(handle)
149    }
150
151    /// Requests graceful shutdown.
152    pub fn shutdown(&self) {
153        self.inner.shutdown();
154    }
155
156    /// Requests immediate shutdown and cancels pending delayed tasks.
157    ///
158    /// # Returns
159    ///
160    /// Count-based shutdown report.
161    pub fn stop(&self) -> StopReport {
162        self.inner.stop()
163    }
164
165    /// Returns the current lifecycle state.
166    ///
167    /// # Returns
168    ///
169    /// [`ExecutorServiceLifecycle::Terminated`] after the scheduler thread has
170    /// exited, otherwise the stored lifecycle state.
171    pub fn lifecycle(&self) -> ExecutorServiceLifecycle {
172        self.inner.lifecycle()
173    }
174
175    /// Returns whether this scheduler still accepts delayed tasks.
176    ///
177    /// # Returns
178    ///
179    /// `true` only while the lifecycle is [`ExecutorServiceLifecycle::Running`].
180    pub fn is_running(&self) -> bool {
181        self.lifecycle() == ExecutorServiceLifecycle::Running
182    }
183
184    /// Returns whether graceful shutdown is in progress.
185    ///
186    /// # Returns
187    ///
188    /// `true` only while the lifecycle is
189    /// [`ExecutorServiceLifecycle::ShuttingDown`].
190    pub fn is_shutting_down(&self) -> bool {
191        self.lifecycle() == ExecutorServiceLifecycle::ShuttingDown
192    }
193
194    /// Returns whether abrupt stop is in progress.
195    ///
196    /// # Returns
197    ///
198    /// `true` only while the lifecycle is [`ExecutorServiceLifecycle::Stopping`].
199    pub fn is_stopping(&self) -> bool {
200        self.lifecycle() == ExecutorServiceLifecycle::Stopping
201    }
202
203    /// Returns whether shutdown has started.
204    ///
205    /// # Returns
206    ///
207    /// `true` if this scheduler rejects new tasks.
208    pub fn is_not_running(&self) -> bool {
209        self.inner.is_not_running()
210    }
211
212    /// Returns whether the scheduler thread has exited.
213    ///
214    /// # Returns
215    ///
216    /// `true` after shutdown and termination.
217    pub fn is_terminated(&self) -> bool {
218        self.inner.is_terminated()
219    }
220
221    /// Returns the number of pending delayed tasks.
222    ///
223    /// # Returns
224    ///
225    /// Number of accepted delayed tasks that have not started or been
226    /// cancelled.
227    pub fn queued_count(&self) -> usize {
228        self.inner.queued_count()
229    }
230
231    /// Blocks until the scheduler thread has terminated.
232    pub fn wait_termination(&self) {
233        self.inner.wait_for_termination();
234    }
235}
236
237impl Drop for DelayedTaskScheduler {
238    fn drop(&mut self) {
239        self.inner.shutdown();
240    }
241}