use std::sync::atomic::{
AtomicUsize,
Ordering,
};
use qubit_lock::Monitor;
use crate::service::{
ExecutorServiceLifecycle,
StopReport,
};
use super::single_thread_scheduled_executor_service_state::SingleThreadScheduledExecutorServiceState;
pub(crate) struct SingleThreadScheduledExecutorServiceInner {
pub(crate) state: Monitor<SingleThreadScheduledExecutorServiceState>,
queued_task_count: AtomicUsize,
running_task_count: AtomicUsize,
completed_task_count: AtomicUsize,
cancelled_task_count: AtomicUsize,
}
impl SingleThreadScheduledExecutorServiceInner {
pub(crate) fn new() -> Self {
Self {
state: Monitor::new(SingleThreadScheduledExecutorServiceState::new()),
queued_task_count: AtomicUsize::new(0),
running_task_count: AtomicUsize::new(0),
completed_task_count: AtomicUsize::new(0),
cancelled_task_count: AtomicUsize::new(0),
}
}
#[inline]
pub(crate) fn queued_count(&self) -> usize {
self.queued_task_count.load(Ordering::Acquire)
}
#[inline]
pub(crate) fn running_count(&self) -> usize {
self.running_task_count.load(Ordering::Acquire)
}
#[inline]
pub(crate) fn add_queued_task(&self) {
self.queued_task_count.fetch_add(1, Ordering::AcqRel);
}
pub(crate) fn finish_queued_cancellation(&self) {
let previous = self.queued_task_count.fetch_sub(1, Ordering::AcqRel);
debug_assert!(previous > 0, "scheduled service queued counter underflow");
self.cancelled_task_count.fetch_add(1, Ordering::AcqRel);
self.state.notify_all();
}
pub(crate) fn start_task(&self) {
let previous = self.queued_task_count.fetch_sub(1, Ordering::AcqRel);
debug_assert!(previous > 0, "scheduled service queued counter underflow");
self.running_task_count.fetch_add(1, Ordering::AcqRel);
}
pub(crate) fn finish_running_task(&self) {
let previous = self.running_task_count.fetch_sub(1, Ordering::AcqRel);
debug_assert!(previous > 0, "scheduled service running counter underflow");
self.completed_task_count.fetch_add(1, Ordering::AcqRel);
self.state.notify_all();
}
pub(crate) fn shutdown(&self) {
self.state.write(|state| {
if state.lifecycle == ExecutorServiceLifecycle::Running {
state.lifecycle = ExecutorServiceLifecycle::ShuttingDown;
}
});
self.state.notify_all();
}
pub(crate) fn stop(&self) -> StopReport {
let mut state = self.state.lock();
state.lifecycle = ExecutorServiceLifecycle::Stopping;
let queued = self.queued_count();
let mut cancelled = 0;
while let Some(task) = state.tasks.pop() {
if task.entry.cancel() {
self.finish_queued_cancellation();
cancelled += 1;
}
}
let running = self.running_count();
self.state.notify_all();
StopReport::new(queued, running, cancelled)
}
pub(crate) fn is_not_running(&self) -> bool {
self.state
.read(|state| state.lifecycle != ExecutorServiceLifecycle::Running)
}
pub(crate) fn lifecycle(&self) -> ExecutorServiceLifecycle {
self.state.read(|state| {
if state.terminated {
ExecutorServiceLifecycle::Terminated
} else {
state.lifecycle
}
})
}
pub(crate) fn is_terminated(&self) -> bool {
self.state.read(|state| state.terminated)
}
pub(crate) fn wait_for_termination(&self) {
self.state.wait_until(|state| state.terminated, |_| ());
}
pub(crate) fn terminate(&self, state: &mut SingleThreadScheduledExecutorServiceState) {
state.terminated = true;
self.state.notify_all();
}
}
impl Default for SingleThreadScheduledExecutorServiceInner {
#[inline]
fn default() -> Self {
Self::new()
}
}