Skip to main content

qubit_thread_pool/delayed/
delayed_task_scheduler_worker.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    sync::{
12        Arc,
13        atomic::Ordering,
14    },
15    time::Instant,
16};
17
18use qubit_executor::service::ExecutorServiceLifecycle;
19
20use super::delayed_task_scheduler_inner::DelayedTaskSchedulerInner;
21use super::delayed_task_scheduler_state::DelayedTaskSchedulerState;
22use super::delayed_task_state::is_task_cancelled;
23
24/// Worker loop entry point for delayed task schedulers.
25pub struct DelayedTaskSchedulerWorker;
26
27impl DelayedTaskSchedulerWorker {
28    /// Runs the delayed task scheduler loop.
29    ///
30    /// # Parameters
31    ///
32    /// * `inner` - Shared scheduler state.
33    pub fn run(inner: Arc<DelayedTaskSchedulerInner>) {
34        run_delayed_scheduler(inner);
35    }
36}
37
38/// Runs the delayed task scheduler loop.
39///
40/// # Parameters
41///
42/// * `inner` - Shared scheduler state.
43fn run_delayed_scheduler(inner: Arc<DelayedTaskSchedulerInner>) {
44    loop {
45        let task = {
46            let mut state = inner.state.lock();
47            loop {
48                prune_cancelled_front(&mut state);
49                if state.lifecycle == ExecutorServiceLifecycle::Stopping {
50                    inner.terminate(&mut state);
51                    return;
52                }
53                if state.tasks.is_empty() && state.lifecycle != ExecutorServiceLifecycle::Running {
54                    inner.terminate(&mut state);
55                    return;
56                }
57                let Some(next_deadline) = state.tasks.peek().map(|task| task.deadline) else {
58                    state = state.wait();
59                    continue;
60                };
61                let now = Instant::now();
62                if next_deadline > now {
63                    let timeout = next_deadline.saturating_duration_since(now);
64                    let (next_state, _) = state.wait_timeout(timeout);
65                    state = next_state;
66                    continue;
67                }
68                break state.tasks.pop();
69            }
70        };
71        if let Some(mut task) = task {
72            if !inner.start_task_state(&task.state) {
73                continue;
74            }
75            let Some(action) = task.task.take() else {
76                continue;
77            };
78            inner.running_task_count.fetch_add(1, Ordering::AcqRel);
79            let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(action));
80            inner.running_task_count.fetch_sub(1, Ordering::AcqRel);
81            inner.completed_task_count.fetch_add(1, Ordering::AcqRel);
82            inner.state.notify_all();
83        }
84    }
85}
86
87/// Removes already-cancelled tasks from the front of the deadline heap.
88///
89/// # Parameters
90///
91/// * `state` - Locked scheduler state.
92fn prune_cancelled_front(state: &mut DelayedTaskSchedulerState) {
93    while state
94        .tasks
95        .peek()
96        .is_some_and(|task| is_task_cancelled(&task.state))
97    {
98        state.tasks.pop();
99    }
100}