use std::sync::Arc;
use qubit_function::Callable;
use qubit_executor::TaskRunner;
use crate::TokioTaskHandle;
use crate::tokio_executor_service_state::TokioExecutorServiceState;
use crate::tokio_service_task_guard::TokioServiceTaskGuard;
use qubit_executor::service::{
ExecutorService,
RejectedExecution,
ShutdownReport,
};
#[derive(Default, Clone)]
pub struct TokioExecutorService {
state: Arc<TokioExecutorServiceState>,
}
pub type TokioBlockingExecutorService = TokioExecutorService;
impl TokioExecutorService {
#[inline]
pub fn new() -> Self {
Self::default()
}
}
impl ExecutorService for TokioExecutorService {
type Handle<R, E>
= TokioTaskHandle<R, E>
where
R: Send + 'static,
E: Send + 'static;
type Termination<'a>
= std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + 'a>>
where
Self: 'a;
fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::Handle<R, E>, RejectedExecution>
where
C: Callable<R, E> + Send + 'static,
R: Send + 'static,
E: Send + 'static,
{
let submission_guard = self.state.lock_submission();
if self.state.shutdown.load() {
return Err(RejectedExecution::Shutdown);
}
self.state.active_tasks.inc();
let marker = Arc::new(());
let guard = TokioServiceTaskGuard::new(Arc::clone(&self.state), Arc::clone(&marker));
let handle = tokio::task::spawn_blocking(move || {
let _guard = guard;
TaskRunner::new(task).call()
});
self.state
.register_abort_handle(marker, handle.abort_handle());
drop(submission_guard);
Ok(TokioTaskHandle::new(handle))
}
fn shutdown(&self) {
let _guard = self.state.lock_submission();
self.state.shutdown.store(true);
self.state.notify_if_terminated();
}
fn shutdown_now(&self) -> ShutdownReport {
let _guard = self.state.lock_submission();
self.state.shutdown.store(true);
let running = self.state.active_tasks.get();
let cancellation_count = self.state.abort_tracked_tasks();
self.state.notify_if_terminated();
ShutdownReport::new(0, running, cancellation_count)
}
fn is_shutdown(&self) -> bool {
self.state.shutdown.load()
}
fn is_terminated(&self) -> bool {
self.is_shutdown() && self.state.active_tasks.is_zero()
}
fn await_termination(&self) -> Self::Termination<'_> {
Box::pin(async move {
let notified = self.state.terminated_notify.notified();
tokio::pin!(notified);
loop {
notified.as_mut().enable();
if self.is_terminated() {
return;
}
notified.as_mut().await;
notified.set(self.state.terminated_notify.notified());
}
})
}
}