qubit_thread_pool/delayed/
delayed_task_scheduler_worker.rs1use std::{
11 sync::{
12 Arc,
13 atomic::Ordering,
14 },
15 time::Instant,
16};
17
18use qubit_executor::service::ExecutorServiceLifecycle;
19
20use super::delayed_task_scheduler_inner::DelayedTaskSchedulerInner;
21use super::delayed_task_scheduler_state::DelayedTaskSchedulerState;
22use super::delayed_task_state::is_task_cancelled;
23
24pub struct DelayedTaskSchedulerWorker;
26
27impl DelayedTaskSchedulerWorker {
28 pub fn run(inner: Arc<DelayedTaskSchedulerInner>) {
34 run_delayed_scheduler(inner);
35 }
36}
37
38fn run_delayed_scheduler(inner: Arc<DelayedTaskSchedulerInner>) {
44 loop {
45 let task = {
46 let mut state = inner.state.lock();
47 loop {
48 prune_cancelled_front(&mut state);
49 if state.lifecycle == ExecutorServiceLifecycle::Stopping {
50 inner.terminate(&mut state);
51 return;
52 }
53 if state.tasks.is_empty() && state.lifecycle != ExecutorServiceLifecycle::Running {
54 inner.terminate(&mut state);
55 return;
56 }
57 let Some(next_deadline) = state.tasks.peek().map(|task| task.deadline) else {
58 state = state.wait();
59 continue;
60 };
61 let now = Instant::now();
62 if next_deadline > now {
63 let timeout = next_deadline.saturating_duration_since(now);
64 let (next_state, _) = state.wait_timeout(timeout);
65 state = next_state;
66 continue;
67 }
68 break state.tasks.pop();
69 }
70 };
71 if let Some(mut task) = task {
72 if !inner.start_task_state(&task.state) {
73 continue;
74 }
75 let Some(action) = task.task.take() else {
76 continue;
77 };
78 inner.running_task_count.fetch_add(1, Ordering::AcqRel);
79 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(action));
80 inner.running_task_count.fetch_sub(1, Ordering::AcqRel);
81 inner.completed_task_count.fetch_add(1, Ordering::AcqRel);
82 inner.state.notify_all();
83 }
84 }
85}
86
87fn prune_cancelled_front(state: &mut DelayedTaskSchedulerState) {
93 while state
94 .tasks
95 .peek()
96 .is_some_and(|task| is_task_cancelled(&task.state))
97 {
98 state.tasks.pop();
99 }
100}