Skip to main content

qubit_executor/schedule/
single_thread_scheduled_executor_service.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    sync::{
12        Arc,
13        Weak,
14    },
15    thread,
16    time::Instant,
17};
18
19use qubit_atomic::Atomic;
20use qubit_function::{
21    Callable,
22    Runnable,
23};
24
25use crate::{
26    TaskHandle,
27    service::{
28        ExecutorService,
29        ExecutorServiceBuilderError,
30        ExecutorServiceLifecycle,
31        StopReport,
32        SubmissionError,
33    },
34    task::spi::TaskEndpointPair,
35};
36
37use super::{
38    completable_scheduled_task::CompletableScheduledTask,
39    scheduled_executor_service::ScheduledExecutorService,
40    scheduled_task::ScheduledTask,
41    scheduled_task_entry::ScheduledTaskEntry,
42    scheduled_task_handle::ScheduledTaskHandle,
43    scheduled_worker::ScheduledWorker,
44    single_thread_scheduled_executor_service_inner::SingleThreadScheduledExecutorServiceInner,
45};
46
47/// Single-threaded scheduled executor service.
48///
49/// The service owns one scheduler OS thread. It accepts tasks into a deadline
50/// heap, waits until the earliest task is due, and then runs that task directly
51/// on the scheduler thread. Scheduled tasks should therefore stay short; submit
52/// heavier work to another executor service from the scheduled task body.
53pub struct SingleThreadScheduledExecutorService {
54    /// Shared scheduler state.
55    inner: Arc<SingleThreadScheduledExecutorServiceInner>,
56}
57
58impl SingleThreadScheduledExecutorService {
59    /// Starts a new single-thread scheduled executor service.
60    ///
61    /// # Parameters
62    ///
63    /// * `thread_name` - Name for the scheduler thread.
64    ///
65    /// # Returns
66    ///
67    /// A started scheduled executor service.
68    ///
69    /// # Errors
70    ///
71    /// Returns [`ExecutorServiceBuilderError::SpawnWorker`] if the scheduler
72    /// thread cannot be created.
73    #[inline]
74    pub fn new(thread_name: &str) -> Result<Self, ExecutorServiceBuilderError> {
75        Self::with_stack_size(thread_name, None)
76    }
77
78    /// Starts a new scheduled service with an optional scheduler thread stack size.
79    ///
80    /// # Parameters
81    ///
82    /// * `thread_name` - Name for the scheduler thread.
83    /// * `stack_size` - Optional stack size for the scheduler thread.
84    ///
85    /// # Returns
86    ///
87    /// A started scheduled executor service.
88    ///
89    /// # Errors
90    ///
91    /// Returns [`ExecutorServiceBuilderError::SpawnWorker`] if the scheduler
92    /// thread cannot be created.
93    pub fn with_stack_size(thread_name: &str, stack_size: Option<usize>) -> Result<Self, ExecutorServiceBuilderError> {
94        let inner = Arc::new(SingleThreadScheduledExecutorServiceInner::new());
95        let worker_inner = Arc::clone(&inner);
96        let mut builder = thread::Builder::new().name(thread_name.to_string());
97        if let Some(stack_size) = stack_size {
98            builder = builder.stack_size(stack_size);
99        }
100        if let Err(source) = builder.spawn(move || ScheduledWorker::run(worker_inner)) {
101            return Err(ExecutorServiceBuilderError::SpawnWorker { index: Some(0), source });
102        }
103        Ok(Self { inner })
104    }
105
106    /// Returns the number of queued scheduled tasks.
107    ///
108    /// # Returns
109    ///
110    /// Number of accepted scheduled tasks that have not started or been cancelled.
111    #[inline]
112    pub fn queued_count(&self) -> usize {
113        self.inner.queued_count()
114    }
115
116    /// Returns the number of currently running tasks.
117    ///
118    /// # Returns
119    ///
120    /// `1` when the scheduler thread is running a task, otherwise `0`.
121    #[inline]
122    pub fn running_count(&self) -> usize {
123        self.inner.running_count()
124    }
125
126    /// Creates a cancellation callback for handles returned by this service.
127    ///
128    /// # Returns
129    ///
130    /// Callback that decrements queued accounting and wakes the scheduler when a
131    /// scheduled handle cancels a pending task.
132    fn cancellation_callback(&self) -> Arc<dyn Fn() + Send + Sync + 'static> {
133        let inner = Arc::downgrade(&self.inner);
134        Arc::new(move || finish_queued_cancellation(&inner))
135    }
136
137    /// Accepts a type-erased task into the deadline heap.
138    ///
139    /// # Parameters
140    ///
141    /// * `deadline` - Monotonic instant when the task becomes runnable.
142    /// * `entry` - Type-erased scheduled task entry.
143    ///
144    /// # Errors
145    ///
146    /// Returns [`SubmissionError::Shutdown`] after shutdown or stop starts.
147    fn schedule_entry(&self, deadline: Instant, entry: Box<dyn ScheduledTaskEntry>) -> Result<(), SubmissionError> {
148        let mut state = self.inner.state.lock();
149        if state.lifecycle != ExecutorServiceLifecycle::Running {
150            return Err(SubmissionError::Shutdown);
151        }
152        entry.accept();
153        let sequence = state.next_sequence;
154        state.next_sequence = state.next_sequence.wrapping_add(1);
155        state.tasks.push(ScheduledTask::new(deadline, sequence, entry));
156        self.inner.add_queued_task();
157        self.inner.state.notify_all();
158        Ok(())
159    }
160
161    /// Schedules a callable and returns a standard result handle.
162    ///
163    /// # Parameters
164    ///
165    /// * `deadline` - Monotonic instant when the task becomes runnable.
166    /// * `task` - Callable task to execute.
167    ///
168    /// # Returns
169    ///
170    /// A task result handle for the accepted task.
171    ///
172    /// # Errors
173    ///
174    /// Returns [`SubmissionError::Shutdown`] after shutdown or stop starts.
175    fn schedule_result_handle<C, R, E>(&self, deadline: Instant, task: C) -> Result<TaskHandle<R, E>, SubmissionError>
176    where
177        C: Callable<R, E> + Send + 'static,
178        R: Send + 'static,
179        E: Send + 'static,
180    {
181        let (handle, slot) = TaskEndpointPair::new().into_parts();
182        let cancelled = Arc::new(Atomic::new(false));
183        let entry = CompletableScheduledTask::new(task, slot, cancelled);
184        self.schedule_entry(deadline, Box::new(entry))?;
185        Ok(handle)
186    }
187}
188
189impl Drop for SingleThreadScheduledExecutorService {
190    /// Requests graceful shutdown when the service handle is dropped.
191    fn drop(&mut self) {
192        self.inner.shutdown();
193    }
194}
195
196impl ExecutorService for SingleThreadScheduledExecutorService {
197    type ResultHandle<R, E>
198        = TaskHandle<R, E>
199    where
200        R: Send + 'static,
201        E: Send + 'static;
202
203    type TrackedHandle<R, E>
204        = ScheduledTaskHandle<R, E>
205    where
206        R: Send + 'static,
207        E: Send + 'static;
208
209    /// Accepts a runnable for immediate execution on the scheduler thread.
210    fn submit<T, E>(&self, task: T) -> Result<(), SubmissionError>
211    where
212        T: Runnable<E> + Send + 'static,
213        E: Send + 'static,
214    {
215        let mut task = task;
216        let handle = self.submit_callable(move || task.run())?;
217        drop(handle);
218        Ok(())
219    }
220
221    /// Accepts a callable for immediate execution on the scheduler thread.
222    fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::ResultHandle<R, E>, SubmissionError>
223    where
224        C: Callable<R, E> + Send + 'static,
225        R: Send + 'static,
226        E: Send + 'static,
227    {
228        self.schedule_result_handle(Instant::now(), task)
229    }
230
231    /// Accepts a callable for immediate execution with a scheduled task handle.
232    fn submit_tracked_callable<C, R, E>(&self, task: C) -> Result<Self::TrackedHandle<R, E>, SubmissionError>
233    where
234        C: Callable<R, E> + Send + 'static,
235        R: Send + 'static,
236        E: Send + 'static,
237    {
238        self.schedule_callable_at(Instant::now(), task)
239    }
240
241    /// Requests graceful shutdown and drains accepted scheduled work.
242    #[inline]
243    fn shutdown(&self) {
244        self.inner.shutdown();
245    }
246
247    /// Requests immediate shutdown and cancels tasks that have not started.
248    #[inline]
249    fn stop(&self) -> StopReport {
250        self.inner.stop()
251    }
252
253    /// Returns the current lifecycle state.
254    #[inline]
255    fn lifecycle(&self) -> ExecutorServiceLifecycle {
256        self.inner.lifecycle()
257    }
258
259    /// Returns whether shutdown has started.
260    #[inline]
261    fn is_not_running(&self) -> bool {
262        self.inner.is_not_running()
263    }
264
265    /// Returns whether the scheduler thread has exited.
266    #[inline]
267    fn is_terminated(&self) -> bool {
268        self.inner.is_terminated()
269    }
270
271    /// Blocks until this scheduled service has terminated.
272    #[inline]
273    fn wait_termination(&self) {
274        self.inner.wait_for_termination();
275    }
276}
277
278impl ScheduledExecutorService for SingleThreadScheduledExecutorService {
279    /// Schedules a callable task to start at a monotonic instant.
280    fn schedule_callable_at<C, R, E>(
281        &self,
282        instant: Instant,
283        task: C,
284    ) -> Result<Self::TrackedHandle<R, E>, SubmissionError>
285    where
286        C: Callable<R, E> + Send + 'static,
287        R: Send + 'static,
288        E: Send + 'static,
289    {
290        let (tracked, slot) = TaskEndpointPair::new().into_tracked_parts();
291        let cancellation_marker = Arc::new(Atomic::new(false));
292        let entry = CompletableScheduledTask::new(task, slot, Arc::clone(&cancellation_marker));
293        self.schedule_entry(instant, Box::new(entry))?;
294        Ok(ScheduledTaskHandle::new(
295            tracked,
296            cancellation_marker,
297            self.cancellation_callback(),
298        ))
299    }
300}
301
302/// Records queued cancellation if the scheduled service still exists.
303///
304/// # Parameters
305///
306/// * `inner` - Weak reference to shared scheduled service state.
307fn finish_queued_cancellation(inner: &Weak<SingleThreadScheduledExecutorServiceInner>) {
308    if let Some(inner) = inner.upgrade() {
309        inner.finish_queued_cancellation();
310    }
311}