qubit_thread_pool/delayed/
delayed_task_scheduler_inner.rs1use std::sync::atomic::{
11 AtomicU8,
12 AtomicUsize,
13 Ordering,
14};
15
16use qubit_executor::service::{
17 ExecutorServiceLifecycle,
18 StopReport,
19};
20use qubit_lock::Monitor;
21
22use super::delayed_task_scheduler_state::DelayedTaskSchedulerState;
23use super::delayed_task_state::{
24 cancel_task_state,
25 start_task_state,
26};
27
28pub struct DelayedTaskSchedulerInner {
30 pub state: Monitor<DelayedTaskSchedulerState>,
32 pub queued_task_count: AtomicUsize,
34 pub running_task_count: AtomicUsize,
36 pub completed_task_count: AtomicUsize,
38 pub cancelled_task_count: AtomicUsize,
40}
41
42impl DelayedTaskSchedulerInner {
43 pub fn new() -> Self {
49 Self {
50 state: Monitor::new(DelayedTaskSchedulerState::new()),
51 queued_task_count: AtomicUsize::new(0),
52 running_task_count: AtomicUsize::new(0),
53 completed_task_count: AtomicUsize::new(0),
54 cancelled_task_count: AtomicUsize::new(0),
55 }
56 }
57
58 #[inline]
64 pub fn queued_count(&self) -> usize {
65 self.queued_task_count.load(Ordering::Acquire)
66 }
67
68 #[inline]
74 pub fn running_count(&self) -> usize {
75 self.running_task_count.load(Ordering::Acquire)
76 }
77
78 pub fn finish_queued_cancellation(&self) {
80 let previous = self.queued_task_count.fetch_sub(1, Ordering::AcqRel);
81 debug_assert!(previous > 0, "delayed scheduler queued counter underflow");
82 self.cancelled_task_count.fetch_add(1, Ordering::AcqRel);
83 self.state.notify_all();
84 }
85
86 pub fn cancel_task_state(&self, task_state: &AtomicU8) -> bool {
96 if cancel_task_state(task_state) {
97 self.finish_queued_cancellation();
98 true
99 } else {
100 false
101 }
102 }
103
104 pub fn start_task_state(&self, task_state: &AtomicU8) -> bool {
114 if start_task_state(task_state) {
115 let previous = self.queued_task_count.fetch_sub(1, Ordering::AcqRel);
116 debug_assert!(previous > 0, "delayed scheduler queued counter underflow");
117 true
118 } else {
119 false
120 }
121 }
122
123 pub fn shutdown(&self) {
125 self.state.write(|state| {
126 if state.lifecycle == ExecutorServiceLifecycle::Running {
127 state.lifecycle = ExecutorServiceLifecycle::ShuttingDown;
128 }
129 });
130 self.state.notify_all();
131 }
132
133 pub fn stop(&self) -> StopReport {
139 let mut state = self.state.lock();
140 state.lifecycle = ExecutorServiceLifecycle::Stopping;
141 let mut cancelled = 0;
142 while let Some(task) = state.tasks.pop() {
143 if self.cancel_task_state(&task.state) {
144 cancelled += 1;
145 }
146 }
147 let running = self.running_count();
148 self.state.notify_all();
149 StopReport::new(cancelled, running, cancelled)
150 }
151
152 pub fn is_not_running(&self) -> bool {
158 self.state
159 .read(|state| state.lifecycle != ExecutorServiceLifecycle::Running)
160 }
161
162 pub fn lifecycle(&self) -> ExecutorServiceLifecycle {
169 self.state.read(|state| {
170 if state.terminated {
171 ExecutorServiceLifecycle::Terminated
172 } else {
173 state.lifecycle
174 }
175 })
176 }
177
178 pub fn is_terminated(&self) -> bool {
184 self.state.read(|state| state.terminated)
185 }
186
187 pub fn wait_for_termination(&self) {
189 self.state.wait_until(|state| state.terminated, |_| ());
190 }
191
192 pub fn terminate(&self, state: &mut DelayedTaskSchedulerState) {
194 state.terminated = true;
195 self.state.notify_all();
196 }
197}
198
199impl Default for DelayedTaskSchedulerInner {
200 fn default() -> Self {
201 Self::new()
202 }
203}