use std::sync::atomic::{
AtomicU8,
AtomicUsize,
Ordering,
};
use qubit_executor::service::{
ExecutorServiceLifecycle,
StopReport,
};
use qubit_lock::Monitor;
use super::delayed_task_scheduler_state::DelayedTaskSchedulerState;
use super::delayed_task_state::{
cancel_task_state,
start_task_state,
};
pub struct DelayedTaskSchedulerInner {
pub state: Monitor<DelayedTaskSchedulerState>,
pub queued_task_count: AtomicUsize,
pub running_task_count: AtomicUsize,
pub completed_task_count: AtomicUsize,
pub cancelled_task_count: AtomicUsize,
}
impl DelayedTaskSchedulerInner {
pub fn new() -> Self {
Self {
state: Monitor::new(DelayedTaskSchedulerState::new()),
queued_task_count: AtomicUsize::new(0),
running_task_count: AtomicUsize::new(0),
completed_task_count: AtomicUsize::new(0),
cancelled_task_count: AtomicUsize::new(0),
}
}
#[inline]
pub fn queued_count(&self) -> usize {
self.queued_task_count.load(Ordering::Acquire)
}
#[inline]
pub fn running_count(&self) -> usize {
self.running_task_count.load(Ordering::Acquire)
}
pub fn finish_queued_cancellation(&self) {
let previous = self.queued_task_count.fetch_sub(1, Ordering::AcqRel);
debug_assert!(previous > 0, "delayed scheduler queued counter underflow");
self.cancelled_task_count.fetch_add(1, Ordering::AcqRel);
self.state.notify_all();
}
pub fn cancel_task_state(&self, task_state: &AtomicU8) -> bool {
if cancel_task_state(task_state) {
self.finish_queued_cancellation();
true
} else {
false
}
}
pub fn start_task_state(&self, task_state: &AtomicU8) -> bool {
if start_task_state(task_state) {
let previous = self.queued_task_count.fetch_sub(1, Ordering::AcqRel);
debug_assert!(previous > 0, "delayed scheduler queued counter underflow");
true
} else {
false
}
}
pub fn shutdown(&self) {
self.state.write(|state| {
if state.lifecycle == ExecutorServiceLifecycle::Running {
state.lifecycle = ExecutorServiceLifecycle::ShuttingDown;
}
});
self.state.notify_all();
}
pub fn stop(&self) -> StopReport {
let mut state = self.state.lock();
state.lifecycle = ExecutorServiceLifecycle::Stopping;
let mut cancelled = 0;
while let Some(task) = state.tasks.pop() {
if self.cancel_task_state(&task.state) {
cancelled += 1;
}
}
let running = self.running_count();
self.state.notify_all();
StopReport::new(cancelled, running, cancelled)
}
pub fn is_not_running(&self) -> bool {
self.state
.read(|state| state.lifecycle != ExecutorServiceLifecycle::Running)
}
pub fn lifecycle(&self) -> ExecutorServiceLifecycle {
self.state.read(|state| {
if state.terminated {
ExecutorServiceLifecycle::Terminated
} else {
state.lifecycle
}
})
}
pub fn is_terminated(&self) -> bool {
self.state.read(|state| state.terminated)
}
pub fn wait_for_termination(&self) {
self.state.wait_until(|state| state.terminated, |_| ());
}
pub fn terminate(&self, state: &mut DelayedTaskSchedulerState) {
state.terminated = true;
self.state.notify_all();
}
}
impl Default for DelayedTaskSchedulerInner {
fn default() -> Self {
Self::new()
}
}