use std::{
future::Future,
pin::Pin,
sync::Arc,
};
use qubit_executor::TaskExecutionError;
use crate::TokioTaskHandle;
use crate::tokio_io_executor_service_state::TokioIoExecutorServiceState;
use crate::tokio_io_service_task_guard::TokioIoServiceTaskGuard;
use qubit_executor::service::{
RejectedExecution,
ShutdownReport,
};
#[derive(Default, Clone)]
pub struct TokioIoExecutorService {
state: Arc<TokioIoExecutorServiceState>,
}
impl TokioIoExecutorService {
#[inline]
pub fn new() -> Self {
Self::default()
}
pub fn spawn<F, R, E>(&self, future: F) -> Result<TokioTaskHandle<R, E>, RejectedExecution>
where
F: Future<Output = Result<R, E>> + Send + 'static,
R: Send + 'static,
E: Send + 'static,
{
let submission_guard = self.state.lock_submission();
if self.state.shutdown.load() {
return Err(RejectedExecution::Shutdown);
}
self.state.active_tasks.inc();
let marker = Arc::new(());
let guard = TokioIoServiceTaskGuard::new(Arc::clone(&self.state), Arc::clone(&marker));
let handle = tokio::spawn(async move {
let _guard = guard;
future.await.map_err(TaskExecutionError::Failed)
});
self.state
.register_abort_handle(marker, handle.abort_handle());
drop(submission_guard);
Ok(TokioTaskHandle::new(handle))
}
pub fn shutdown(&self) {
let _guard = self.state.lock_submission();
self.state.shutdown.store(true);
self.state.notify_if_terminated();
}
pub fn shutdown_now(&self) -> ShutdownReport {
let _guard = self.state.lock_submission();
self.state.shutdown.store(true);
let running = self.state.active_tasks.get();
let cancellation_count = self.state.abort_tracked_tasks();
self.state.notify_if_terminated();
ShutdownReport::new(0, running, cancellation_count)
}
#[inline]
pub fn is_shutdown(&self) -> bool {
self.state.shutdown.load()
}
#[inline]
pub fn is_terminated(&self) -> bool {
self.is_shutdown() && self.state.active_tasks.is_zero()
}
pub fn await_termination(&self) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
Box::pin(async move {
let notified = self.state.terminated_notify.notified();
tokio::pin!(notified);
loop {
notified.as_mut().enable();
if self.is_terminated() {
return;
}
notified.as_mut().await;
notified.set(self.state.terminated_notify.notified());
}
})
}
}