Skip to main content

qubit_executor/schedule/
single_thread_scheduled_executor_service.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    sync::{
12        Arc,
13        Weak,
14    },
15    thread,
16    time::Instant,
17};
18
19use qubit_atomic::Atomic;
20use qubit_function::{
21    Callable,
22    Runnable,
23};
24
25use crate::{
26    TaskHandle,
27    service::{
28        ExecutorService,
29        ExecutorServiceBuilderError,
30        ExecutorServiceLifecycle,
31        StopReport,
32        SubmissionError,
33    },
34    task::spi::TaskEndpointPair,
35};
36
37use super::{
38    completable_scheduled_task::CompletableScheduledTask,
39    scheduled_executor_service::ScheduledExecutorService,
40    scheduled_task::ScheduledTask,
41    scheduled_task_entry::ScheduledTaskEntry,
42    scheduled_task_handle::ScheduledTaskHandle,
43    scheduled_worker::ScheduledWorker,
44    single_thread_scheduled_executor_service_inner::SingleThreadScheduledExecutorServiceInner,
45};
46
47/// Single-threaded scheduled executor service.
48///
49/// The service owns one scheduler OS thread. It accepts tasks into a deadline
50/// heap, waits until the earliest task is due, and then runs that task directly
51/// on the scheduler thread. Scheduled tasks should therefore stay short; submit
52/// heavier work to another executor service from the scheduled task body.
53pub struct SingleThreadScheduledExecutorService {
54    /// Shared scheduler state.
55    inner: Arc<SingleThreadScheduledExecutorServiceInner>,
56}
57
58impl SingleThreadScheduledExecutorService {
59    /// Starts a new single-thread scheduled executor service.
60    ///
61    /// # Parameters
62    ///
63    /// * `thread_name` - Name for the scheduler thread.
64    ///
65    /// # Returns
66    ///
67    /// A started scheduled executor service.
68    ///
69    /// # Errors
70    ///
71    /// Returns [`ExecutorServiceBuilderError::SpawnWorker`] if the scheduler
72    /// thread cannot be created.
73    #[inline]
74    pub fn new(thread_name: &str) -> Result<Self, ExecutorServiceBuilderError> {
75        Self::with_stack_size(thread_name, None)
76    }
77
78    /// Starts a new scheduled service with an optional scheduler thread stack size.
79    ///
80    /// # Parameters
81    ///
82    /// * `thread_name` - Name for the scheduler thread.
83    /// * `stack_size` - Optional stack size for the scheduler thread.
84    ///
85    /// # Returns
86    ///
87    /// A started scheduled executor service.
88    ///
89    /// # Errors
90    ///
91    /// Returns [`ExecutorServiceBuilderError::SpawnWorker`] if the scheduler
92    /// thread cannot be created.
93    pub fn with_stack_size(
94        thread_name: &str,
95        stack_size: Option<usize>,
96    ) -> Result<Self, ExecutorServiceBuilderError> {
97        let inner = Arc::new(SingleThreadScheduledExecutorServiceInner::new());
98        let worker_inner = Arc::clone(&inner);
99        let mut builder = thread::Builder::new().name(thread_name.to_string());
100        if let Some(stack_size) = stack_size {
101            builder = builder.stack_size(stack_size);
102        }
103        if let Err(source) = builder.spawn(move || ScheduledWorker::run(worker_inner)) {
104            return Err(ExecutorServiceBuilderError::SpawnWorker {
105                index: Some(0),
106                source,
107            });
108        }
109        Ok(Self { inner })
110    }
111
112    /// Returns the number of queued scheduled tasks.
113    ///
114    /// # Returns
115    ///
116    /// Number of accepted scheduled tasks that have not started or been cancelled.
117    #[inline]
118    pub fn queued_count(&self) -> usize {
119        self.inner.queued_count()
120    }
121
122    /// Returns the number of currently running tasks.
123    ///
124    /// # Returns
125    ///
126    /// `1` when the scheduler thread is running a task, otherwise `0`.
127    #[inline]
128    pub fn running_count(&self) -> usize {
129        self.inner.running_count()
130    }
131
132    /// Creates a cancellation callback for handles returned by this service.
133    ///
134    /// # Returns
135    ///
136    /// Callback that decrements queued accounting and wakes the scheduler when a
137    /// scheduled handle cancels a pending task.
138    fn cancellation_callback(&self) -> Arc<dyn Fn() + Send + Sync + 'static> {
139        let inner = Arc::downgrade(&self.inner);
140        Arc::new(move || finish_queued_cancellation(&inner))
141    }
142
143    /// Accepts a type-erased task into the deadline heap.
144    ///
145    /// # Parameters
146    ///
147    /// * `deadline` - Monotonic instant when the task becomes runnable.
148    /// * `entry` - Type-erased scheduled task entry.
149    ///
150    /// # Errors
151    ///
152    /// Returns [`SubmissionError::Shutdown`] after shutdown or stop starts.
153    fn schedule_entry(
154        &self,
155        deadline: Instant,
156        entry: Box<dyn ScheduledTaskEntry>,
157    ) -> Result<(), SubmissionError> {
158        let mut state = self.inner.state.lock();
159        if state.lifecycle != ExecutorServiceLifecycle::Running {
160            return Err(SubmissionError::Shutdown);
161        }
162        entry.accept();
163        let sequence = state.next_sequence;
164        state.next_sequence = state.next_sequence.wrapping_add(1);
165        state
166            .tasks
167            .push(ScheduledTask::new(deadline, sequence, entry));
168        self.inner.add_queued_task();
169        self.inner.state.notify_all();
170        Ok(())
171    }
172
173    /// Schedules a callable and returns a standard result handle.
174    ///
175    /// # Parameters
176    ///
177    /// * `deadline` - Monotonic instant when the task becomes runnable.
178    /// * `task` - Callable task to execute.
179    ///
180    /// # Returns
181    ///
182    /// A task result handle for the accepted task.
183    ///
184    /// # Errors
185    ///
186    /// Returns [`SubmissionError::Shutdown`] after shutdown or stop starts.
187    fn schedule_result_handle<C, R, E>(
188        &self,
189        deadline: Instant,
190        task: C,
191    ) -> Result<TaskHandle<R, E>, SubmissionError>
192    where
193        C: Callable<R, E> + Send + 'static,
194        R: Send + 'static,
195        E: Send + 'static,
196    {
197        let (handle, slot) = TaskEndpointPair::new().into_parts();
198        let cancelled = Arc::new(Atomic::new(false));
199        let entry = CompletableScheduledTask::new(task, slot, cancelled);
200        self.schedule_entry(deadline, Box::new(entry))?;
201        Ok(handle)
202    }
203}
204
205impl Drop for SingleThreadScheduledExecutorService {
206    /// Requests graceful shutdown when the service handle is dropped.
207    fn drop(&mut self) {
208        self.inner.shutdown();
209    }
210}
211
212impl ExecutorService for SingleThreadScheduledExecutorService {
213    type ResultHandle<R, E>
214        = TaskHandle<R, E>
215    where
216        R: Send + 'static,
217        E: Send + 'static;
218
219    type TrackedHandle<R, E>
220        = ScheduledTaskHandle<R, E>
221    where
222        R: Send + 'static,
223        E: Send + 'static;
224
225    /// Accepts a runnable for immediate execution on the scheduler thread.
226    fn submit<T, E>(&self, task: T) -> Result<(), SubmissionError>
227    where
228        T: Runnable<E> + Send + 'static,
229        E: Send + 'static,
230    {
231        let mut task = task;
232        let handle = self.submit_callable(move || task.run())?;
233        drop(handle);
234        Ok(())
235    }
236
237    /// Accepts a callable for immediate execution on the scheduler thread.
238    fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::ResultHandle<R, E>, SubmissionError>
239    where
240        C: Callable<R, E> + Send + 'static,
241        R: Send + 'static,
242        E: Send + 'static,
243    {
244        self.schedule_result_handle(Instant::now(), task)
245    }
246
247    /// Accepts a callable for immediate execution with a scheduled task handle.
248    fn submit_tracked_callable<C, R, E>(
249        &self,
250        task: C,
251    ) -> Result<Self::TrackedHandle<R, E>, SubmissionError>
252    where
253        C: Callable<R, E> + Send + 'static,
254        R: Send + 'static,
255        E: Send + 'static,
256    {
257        self.schedule_callable_at(Instant::now(), task)
258    }
259
260    /// Requests graceful shutdown and drains accepted scheduled work.
261    #[inline]
262    fn shutdown(&self) {
263        self.inner.shutdown();
264    }
265
266    /// Requests immediate shutdown and cancels tasks that have not started.
267    #[inline]
268    fn stop(&self) -> StopReport {
269        self.inner.stop()
270    }
271
272    /// Returns the current lifecycle state.
273    #[inline]
274    fn lifecycle(&self) -> ExecutorServiceLifecycle {
275        self.inner.lifecycle()
276    }
277
278    /// Returns whether shutdown has started.
279    #[inline]
280    fn is_not_running(&self) -> bool {
281        self.inner.is_not_running()
282    }
283
284    /// Returns whether the scheduler thread has exited.
285    #[inline]
286    fn is_terminated(&self) -> bool {
287        self.inner.is_terminated()
288    }
289
290    /// Blocks until this scheduled service has terminated.
291    #[inline]
292    fn wait_termination(&self) {
293        self.inner.wait_for_termination();
294    }
295}
296
297impl ScheduledExecutorService for SingleThreadScheduledExecutorService {
298    /// Schedules a callable task to start at a monotonic instant.
299    fn schedule_callable_at<C, R, E>(
300        &self,
301        instant: Instant,
302        task: C,
303    ) -> Result<Self::TrackedHandle<R, E>, SubmissionError>
304    where
305        C: Callable<R, E> + Send + 'static,
306        R: Send + 'static,
307        E: Send + 'static,
308    {
309        let (tracked, slot) = TaskEndpointPair::new().into_tracked_parts();
310        let cancellation_marker = Arc::new(Atomic::new(false));
311        let entry = CompletableScheduledTask::new(task, slot, Arc::clone(&cancellation_marker));
312        self.schedule_entry(instant, Box::new(entry))?;
313        Ok(ScheduledTaskHandle::new(
314            tracked,
315            cancellation_marker,
316            self.cancellation_callback(),
317        ))
318    }
319}
320
321/// Records queued cancellation if the scheduled service still exists.
322///
323/// # Parameters
324///
325/// * `inner` - Weak reference to shared scheduled service state.
326fn finish_queued_cancellation(inner: &Weak<SingleThreadScheduledExecutorServiceInner>) {
327    if let Some(inner) = inner.upgrade() {
328        inner.finish_queued_cancellation();
329    }
330}