use std::{
future::Future,
sync::Arc,
};
use qubit_executor::TaskExecutionError;
use crate::TokioTaskHandle;
use crate::tokio_io_executor_service_state::TokioIoExecutorServiceState;
use crate::tokio_io_service_task_guard::TokioIoServiceTaskGuard;
use crate::tokio_runtime::ensure_tokio_runtime_entered;
use qubit_executor::service::{
ExecutorServiceLifecycle,
StopReport,
SubmissionError,
};
#[derive(Default, Clone)]
pub struct TokioIoExecutorService {
state: Arc<TokioIoExecutorServiceState>,
}
impl TokioIoExecutorService {
#[inline]
pub fn new() -> Self {
Self::default()
}
pub fn spawn<F, R, E>(&self, future: F) -> Result<TokioTaskHandle<R, E>, SubmissionError>
where
F: Future<Output = Result<R, E>> + Send + 'static,
R: Send + 'static,
E: Send + 'static,
{
let submission_guard = self.state.lock_submission();
if self.state.is_not_running() {
return Err(SubmissionError::Shutdown);
}
ensure_tokio_runtime_entered()?;
self.state.active_tasks.inc();
let marker = Arc::new(());
let guard = TokioIoServiceTaskGuard::new(Arc::clone(&self.state), Arc::clone(&marker));
let handle = tokio::spawn(async move {
let _guard = guard;
future.await.map_err(TaskExecutionError::Failed)
});
self.state
.register_abort_handle(marker, handle.abort_handle());
drop(submission_guard);
Ok(TokioTaskHandle::new(handle))
}
pub fn shutdown(&self) {
let _guard = self.state.lock_submission();
self.state.shutdown();
}
pub fn stop(&self) -> StopReport {
let _guard = self.state.lock_submission();
self.state.stop();
let running = self.state.active_tasks.get();
let cancellation_count = self.state.abort_tracked_tasks();
StopReport::new(0, running, cancellation_count)
}
#[inline]
pub fn lifecycle(&self) -> ExecutorServiceLifecycle {
self.state.lifecycle()
}
#[inline]
pub fn is_running(&self) -> bool {
self.lifecycle() == ExecutorServiceLifecycle::Running
}
#[inline]
pub fn is_shutting_down(&self) -> bool {
self.lifecycle() == ExecutorServiceLifecycle::ShuttingDown
}
#[inline]
pub fn is_stopping(&self) -> bool {
self.lifecycle() == ExecutorServiceLifecycle::Stopping
}
#[inline]
pub fn is_not_running(&self) -> bool {
self.state.is_not_running()
}
#[inline]
pub fn is_terminated(&self) -> bool {
self.lifecycle() == ExecutorServiceLifecycle::Terminated
}
}