use std::{
sync::{
Arc,
atomic::{
AtomicU8,
Ordering,
},
},
thread,
time::{
Duration,
Instant,
},
};
use qubit_executor::service::{
ExecutorServiceLifecycle,
StopReport,
SubmissionError,
};
use super::delayed_task_handle::DelayedTaskHandle;
use super::delayed_task_scheduler_inner::DelayedTaskSchedulerInner;
use super::delayed_task_scheduler_worker::DelayedTaskSchedulerWorker;
use super::delayed_task_state::TASK_PENDING;
use super::scheduled_task::ScheduledTask;
use crate::ExecutorServiceBuilderError;
pub struct DelayedTaskScheduler {
inner: Arc<DelayedTaskSchedulerInner>,
}
impl DelayedTaskScheduler {
pub fn new(thread_name: &str) -> Result<Self, ExecutorServiceBuilderError> {
Self::with_stack_size(thread_name, None)
}
pub fn with_stack_size(
thread_name: &str,
stack_size: Option<usize>,
) -> Result<Self, ExecutorServiceBuilderError> {
let inner = Arc::new(DelayedTaskSchedulerInner::new());
let worker_inner = Arc::clone(&inner);
let mut builder = thread::Builder::new().name(thread_name.to_string());
if let Some(stack_size) = stack_size {
builder = builder.stack_size(stack_size);
}
let worker = builder.spawn(move || DelayedTaskSchedulerWorker::run(worker_inner));
if let Err(source) = worker {
return Err(ExecutorServiceBuilderError::SpawnWorker {
index: Some(0),
source,
});
}
Ok(Self { inner })
}
pub fn schedule<F>(
&self,
delay: Duration,
task: F,
) -> Result<DelayedTaskHandle, SubmissionError>
where
F: FnOnce() + Send + 'static,
{
let task_state = Arc::new(AtomicU8::new(TASK_PENDING));
let inner_for_cancel = Arc::downgrade(&self.inner);
let handle = DelayedTaskHandle::new(
Arc::clone(&task_state),
Arc::new(move || {
if let Some(inner) = inner_for_cancel.upgrade() {
inner.finish_queued_cancellation();
}
}),
);
let deadline = Instant::now() + delay;
let mut state = self.inner.state.lock();
if state.lifecycle != ExecutorServiceLifecycle::Running {
return Err(SubmissionError::Shutdown);
}
let sequence = state.next_sequence;
state.next_sequence = state.next_sequence.wrapping_add(1);
state.tasks.push(ScheduledTask::new(
deadline,
sequence,
task_state,
Box::new(task),
));
self.inner.queued_task_count.fetch_add(1, Ordering::AcqRel);
self.inner.state.notify_all();
Ok(handle)
}
pub fn shutdown(&self) {
self.inner.shutdown();
}
pub fn stop(&self) -> StopReport {
self.inner.stop()
}
pub fn lifecycle(&self) -> ExecutorServiceLifecycle {
self.inner.lifecycle()
}
pub fn is_running(&self) -> bool {
self.lifecycle() == ExecutorServiceLifecycle::Running
}
pub fn is_shutting_down(&self) -> bool {
self.lifecycle() == ExecutorServiceLifecycle::ShuttingDown
}
pub fn is_stopping(&self) -> bool {
self.lifecycle() == ExecutorServiceLifecycle::Stopping
}
pub fn is_not_running(&self) -> bool {
self.inner.is_not_running()
}
pub fn is_terminated(&self) -> bool {
self.inner.is_terminated()
}
pub fn queued_count(&self) -> usize {
self.inner.queued_count()
}
pub fn wait_termination(&self) {
self.inner.wait_for_termination();
}
}
impl Drop for DelayedTaskScheduler {
fn drop(&mut self) {
self.inner.shutdown();
}
}