Skip to main content

qubit_thread_pool/delayed/
delayed_task_scheduler_inner.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::sync::atomic::{
11    AtomicU8,
12    AtomicUsize,
13    Ordering,
14};
15
16use qubit_executor::service::{
17    ExecutorServiceLifecycle,
18    StopReport,
19};
20use qubit_lock::Monitor;
21
22use super::delayed_task_scheduler_state::DelayedTaskSchedulerState;
23use super::delayed_task_state::{
24    cancel_task_state,
25    start_task_state,
26};
27
28/// Shared delayed scheduler state.
29pub struct DelayedTaskSchedulerInner {
30    /// Mutable lifecycle and heap state.
31    pub state: Monitor<DelayedTaskSchedulerState>,
32    /// Number of tasks still pending in the delay heap.
33    pub queued_task_count: AtomicUsize,
34    /// Number of tasks currently executing on the scheduler thread.
35    pub running_task_count: AtomicUsize,
36    /// Number of tasks that ran to completion.
37    pub completed_task_count: AtomicUsize,
38    /// Number of delayed tasks cancelled before execution.
39    pub cancelled_task_count: AtomicUsize,
40}
41
42impl DelayedTaskSchedulerInner {
43    /// Creates an empty delayed scheduler.
44    ///
45    /// # Returns
46    ///
47    /// Shared scheduler state before its worker thread starts.
48    pub fn new() -> Self {
49        Self {
50            state: Monitor::new(DelayedTaskSchedulerState::new()),
51            queued_task_count: AtomicUsize::new(0),
52            running_task_count: AtomicUsize::new(0),
53            completed_task_count: AtomicUsize::new(0),
54            cancelled_task_count: AtomicUsize::new(0),
55        }
56    }
57
58    /// Returns the queued delayed task count.
59    ///
60    /// # Returns
61    ///
62    /// Number of tasks that have not started or been cancelled.
63    #[inline]
64    pub fn queued_count(&self) -> usize {
65        self.queued_task_count.load(Ordering::Acquire)
66    }
67
68    /// Returns the currently running task count.
69    ///
70    /// # Returns
71    ///
72    /// `1` when the scheduler thread is running a task, otherwise `0`.
73    #[inline]
74    pub fn running_count(&self) -> usize {
75        self.running_task_count.load(Ordering::Acquire)
76    }
77
78    /// Records a pending task cancellation.
79    pub fn finish_queued_cancellation(&self) {
80        let previous = self.queued_task_count.fetch_sub(1, Ordering::AcqRel);
81        debug_assert!(previous > 0, "delayed scheduler queued counter underflow");
82        self.cancelled_task_count.fetch_add(1, Ordering::AcqRel);
83        self.state.notify_all();
84    }
85
86    /// Attempts to cancel a task state before it starts.
87    ///
88    /// # Parameters
89    ///
90    /// * `task_state` - Shared task lifecycle state.
91    ///
92    /// # Returns
93    ///
94    /// `true` if this call cancelled the task.
95    pub fn cancel_task_state(&self, task_state: &AtomicU8) -> bool {
96        if cancel_task_state(task_state) {
97            self.finish_queued_cancellation();
98            true
99        } else {
100            false
101        }
102    }
103
104    /// Marks a task as started if it has not been cancelled.
105    ///
106    /// # Parameters
107    ///
108    /// * `task_state` - Shared task lifecycle state.
109    ///
110    /// # Returns
111    ///
112    /// `true` if the task may execute.
113    pub fn start_task_state(&self, task_state: &AtomicU8) -> bool {
114        if start_task_state(task_state) {
115            let previous = self.queued_task_count.fetch_sub(1, Ordering::AcqRel);
116            debug_assert!(previous > 0, "delayed scheduler queued counter underflow");
117            true
118        } else {
119            false
120        }
121    }
122
123    /// Requests graceful shutdown.
124    pub fn shutdown(&self) {
125        self.state.write(|state| {
126            if state.lifecycle == ExecutorServiceLifecycle::Running {
127                state.lifecycle = ExecutorServiceLifecycle::ShuttingDown;
128            }
129        });
130        self.state.notify_all();
131    }
132
133    /// Requests immediate shutdown and cancels all queued delayed tasks.
134    ///
135    /// # Returns
136    ///
137    /// Count-based shutdown report.
138    pub fn stop(&self) -> StopReport {
139        let mut state = self.state.lock();
140        state.lifecycle = ExecutorServiceLifecycle::Stopping;
141        let mut cancelled = 0;
142        while let Some(task) = state.tasks.pop() {
143            if self.cancel_task_state(&task.state) {
144                cancelled += 1;
145            }
146        }
147        let running = self.running_count();
148        self.state.notify_all();
149        StopReport::new(cancelled, running, cancelled)
150    }
151
152    /// Returns whether shutdown has started.
153    ///
154    /// # Returns
155    ///
156    /// `true` if new delayed tasks are rejected.
157    pub fn is_not_running(&self) -> bool {
158        self.state
159            .read(|state| state.lifecycle != ExecutorServiceLifecycle::Running)
160    }
161
162    /// Returns the current lifecycle state.
163    ///
164    /// # Returns
165    ///
166    /// [`ExecutorServiceLifecycle::Terminated`] after the worker has exited,
167    /// otherwise the stored lifecycle state.
168    pub fn lifecycle(&self) -> ExecutorServiceLifecycle {
169        self.state.read(|state| {
170            if state.terminated {
171                ExecutorServiceLifecycle::Terminated
172            } else {
173                state.lifecycle
174            }
175        })
176    }
177
178    /// Returns whether the scheduler thread has exited.
179    ///
180    /// # Returns
181    ///
182    /// `true` after shutdown and scheduler termination.
183    pub fn is_terminated(&self) -> bool {
184        self.state.read(|state| state.terminated)
185    }
186
187    /// Waits until the scheduler thread exits.
188    pub fn wait_for_termination(&self) {
189        self.state.wait_until(|state| state.terminated, |_| ());
190    }
191
192    /// Marks the scheduler thread as terminated.
193    pub fn terminate(&self, state: &mut DelayedTaskSchedulerState) {
194        state.terminated = true;
195        self.state.notify_all();
196    }
197}
198
199impl Default for DelayedTaskSchedulerInner {
200    fn default() -> Self {
201        Self::new()
202    }
203}