use std::{
sync::{
Arc,
Weak,
atomic::AtomicBool,
},
thread,
time::Instant,
};
use qubit_function::{
Callable,
Runnable,
};
use crate::{
TaskHandle,
service::{
ExecutorService,
ExecutorServiceBuilderError,
ExecutorServiceLifecycle,
StopReport,
SubmissionError,
},
task::spi::TaskEndpointPair,
};
use super::{
completable_scheduled_task::CompletableScheduledTask,
scheduled_executor_service::ScheduledExecutorService,
scheduled_task::ScheduledTask,
scheduled_task_entry::ScheduledTaskEntry,
scheduled_task_handle::ScheduledTaskHandle,
scheduled_worker::ScheduledWorker,
single_thread_scheduled_executor_service_inner::SingleThreadScheduledExecutorServiceInner,
};
pub struct SingleThreadScheduledExecutorService {
inner: Arc<SingleThreadScheduledExecutorServiceInner>,
}
impl SingleThreadScheduledExecutorService {
#[inline]
pub fn new(thread_name: &str) -> Result<Self, ExecutorServiceBuilderError> {
Self::with_stack_size(thread_name, None)
}
pub fn with_stack_size(
thread_name: &str,
stack_size: Option<usize>,
) -> Result<Self, ExecutorServiceBuilderError> {
let inner = Arc::new(SingleThreadScheduledExecutorServiceInner::new());
let worker_inner = Arc::clone(&inner);
let mut builder = thread::Builder::new().name(thread_name.to_string());
if let Some(stack_size) = stack_size {
builder = builder.stack_size(stack_size);
}
if let Err(source) = builder.spawn(move || ScheduledWorker::run(worker_inner)) {
return Err(ExecutorServiceBuilderError::SpawnWorker {
index: Some(0),
source,
});
}
Ok(Self { inner })
}
#[inline]
pub fn queued_count(&self) -> usize {
self.inner.queued_count()
}
#[inline]
pub fn running_count(&self) -> usize {
self.inner.running_count()
}
fn cancellation_callback(&self) -> Arc<dyn Fn() + Send + Sync + 'static> {
let inner = Arc::downgrade(&self.inner);
Arc::new(move || finish_queued_cancellation(&inner))
}
fn schedule_entry(
&self,
deadline: Instant,
entry: Box<dyn ScheduledTaskEntry>,
) -> Result<(), SubmissionError> {
let mut state = self.inner.state.lock();
if state.lifecycle != ExecutorServiceLifecycle::Running {
return Err(SubmissionError::Shutdown);
}
entry.accept();
let sequence = state.next_sequence;
state.next_sequence = state.next_sequence.wrapping_add(1);
state
.tasks
.push(ScheduledTask::new(deadline, sequence, entry));
self.inner.add_queued_task();
self.inner.state.notify_all();
Ok(())
}
fn schedule_result_handle<C, R, E>(
&self,
deadline: Instant,
task: C,
) -> Result<TaskHandle<R, E>, SubmissionError>
where
C: Callable<R, E> + Send + 'static,
R: Send + 'static,
E: Send + 'static,
{
let (handle, slot) = TaskEndpointPair::new().into_parts();
let cancelled = Arc::new(AtomicBool::new(false));
let entry = CompletableScheduledTask::new(task, slot, cancelled);
self.schedule_entry(deadline, Box::new(entry))?;
Ok(handle)
}
}
impl Drop for SingleThreadScheduledExecutorService {
fn drop(&mut self) {
self.inner.shutdown();
}
}
impl ExecutorService for SingleThreadScheduledExecutorService {
type ResultHandle<R, E>
= TaskHandle<R, E>
where
R: Send + 'static,
E: Send + 'static;
type TrackedHandle<R, E>
= ScheduledTaskHandle<R, E>
where
R: Send + 'static,
E: Send + 'static;
fn submit<T, E>(&self, task: T) -> Result<(), SubmissionError>
where
T: Runnable<E> + Send + 'static,
E: Send + 'static,
{
let mut task = task;
let handle = self.submit_callable(move || task.run())?;
drop(handle);
Ok(())
}
fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::ResultHandle<R, E>, SubmissionError>
where
C: Callable<R, E> + Send + 'static,
R: Send + 'static,
E: Send + 'static,
{
self.schedule_result_handle(Instant::now(), task)
}
fn submit_tracked_callable<C, R, E>(
&self,
task: C,
) -> Result<Self::TrackedHandle<R, E>, SubmissionError>
where
C: Callable<R, E> + Send + 'static,
R: Send + 'static,
E: Send + 'static,
{
self.schedule_callable_at(Instant::now(), task)
}
#[inline]
fn shutdown(&self) {
self.inner.shutdown();
}
#[inline]
fn stop(&self) -> StopReport {
self.inner.stop()
}
#[inline]
fn lifecycle(&self) -> ExecutorServiceLifecycle {
self.inner.lifecycle()
}
#[inline]
fn is_not_running(&self) -> bool {
self.inner.is_not_running()
}
#[inline]
fn is_terminated(&self) -> bool {
self.inner.is_terminated()
}
#[inline]
fn wait_termination(&self) {
self.inner.wait_for_termination();
}
}
impl ScheduledExecutorService for SingleThreadScheduledExecutorService {
fn schedule_callable_at<C, R, E>(
&self,
instant: Instant,
task: C,
) -> Result<Self::TrackedHandle<R, E>, SubmissionError>
where
C: Callable<R, E> + Send + 'static,
R: Send + 'static,
E: Send + 'static,
{
let (tracked, slot) = TaskEndpointPair::new().into_tracked_parts();
let cancellation_marker = Arc::new(AtomicBool::new(false));
let entry = CompletableScheduledTask::new(task, slot, Arc::clone(&cancellation_marker));
self.schedule_entry(instant, Box::new(entry))?;
Ok(ScheduledTaskHandle::new(
tracked,
cancellation_marker,
self.cancellation_callback(),
))
}
}
fn finish_queued_cancellation(inner: &Weak<SingleThreadScheduledExecutorServiceInner>) {
if let Some(inner) = inner.upgrade() {
inner.finish_queued_cancellation();
}
}