use std::{
future::Future,
pin::Pin,
};
use qubit_executor::TaskHandle;
use qubit_function::{
Callable,
Runnable,
};
use qubit_thread_pool::{
ThreadPool,
ThreadPoolBuilder,
};
use qubit_tokio_executor::TokioExecutorService;
use super::{
ExecutionServicesBuildError,
ExecutionServicesBuilder,
ExecutionServicesShutdownReport,
ExecutorService,
RayonExecutorService,
RayonTaskHandle,
RejectedExecution,
TokioIoExecutorService,
TokioTaskHandle,
};
pub type BlockingExecutorService = ThreadPool;
pub type BlockingExecutorServiceBuilder = ThreadPoolBuilder;
pub type TokioBlockingExecutorService = TokioExecutorService;
pub struct ExecutionServices {
blocking: BlockingExecutorService,
cpu: RayonExecutorService,
tokio_blocking: TokioBlockingExecutorService,
io: TokioIoExecutorService,
}
impl ExecutionServices {
pub(crate) fn from_parts(
blocking: BlockingExecutorService,
cpu: RayonExecutorService,
tokio_blocking: TokioBlockingExecutorService,
io: TokioIoExecutorService,
) -> Self {
Self {
blocking,
cpu,
tokio_blocking,
io,
}
}
#[inline]
pub fn new() -> Result<Self, ExecutionServicesBuildError> {
Self::builder().build()
}
#[inline]
pub fn builder() -> ExecutionServicesBuilder {
ExecutionServicesBuilder::default()
}
#[inline]
pub fn blocking(&self) -> &BlockingExecutorService {
&self.blocking
}
#[inline]
pub fn cpu(&self) -> &RayonExecutorService {
&self.cpu
}
#[inline]
pub fn tokio_blocking(&self) -> &TokioBlockingExecutorService {
&self.tokio_blocking
}
#[inline]
pub fn io(&self) -> &TokioIoExecutorService {
&self.io
}
#[inline]
pub fn submit_blocking<T, E>(&self, task: T) -> Result<TaskHandle<(), E>, RejectedExecution>
where
T: Runnable<E> + Send + 'static,
E: Send + 'static,
{
self.blocking.submit(task)
}
#[inline]
pub fn submit_blocking_callable<C, R, E>(
&self,
task: C,
) -> Result<TaskHandle<R, E>, RejectedExecution>
where
C: Callable<R, E> + Send + 'static,
R: Send + 'static,
E: Send + 'static,
{
self.blocking.submit_callable(task)
}
#[inline]
pub fn submit_cpu<T, E>(&self, task: T) -> Result<RayonTaskHandle<(), E>, RejectedExecution>
where
T: Runnable<E> + Send + 'static,
E: Send + 'static,
{
self.cpu.submit(task)
}
#[inline]
pub fn submit_cpu_callable<C, R, E>(
&self,
task: C,
) -> Result<RayonTaskHandle<R, E>, RejectedExecution>
where
C: Callable<R, E> + Send + 'static,
R: Send + 'static,
E: Send + 'static,
{
self.cpu.submit_callable(task)
}
#[inline]
pub fn submit_tokio_blocking<T, E>(
&self,
task: T,
) -> Result<TokioTaskHandle<(), E>, RejectedExecution>
where
T: Runnable<E> + Send + 'static,
E: Send + 'static,
{
self.tokio_blocking.submit(task)
}
#[inline]
pub fn submit_tokio_blocking_callable<C, R, E>(
&self,
task: C,
) -> Result<TokioTaskHandle<R, E>, RejectedExecution>
where
C: Callable<R, E> + Send + 'static,
R: Send + 'static,
E: Send + 'static,
{
self.tokio_blocking.submit_callable(task)
}
#[inline]
pub fn spawn_io<F, R, E>(&self, future: F) -> Result<TokioTaskHandle<R, E>, RejectedExecution>
where
F: Future<Output = Result<R, E>> + Send + 'static,
R: Send + 'static,
E: Send + 'static,
{
self.io.spawn(future)
}
pub fn shutdown(&self) {
self.blocking.shutdown();
self.cpu.shutdown();
self.tokio_blocking.shutdown();
self.io.shutdown();
}
pub fn shutdown_now(&self) -> ExecutionServicesShutdownReport {
ExecutionServicesShutdownReport {
blocking: self.blocking.shutdown_now(),
cpu: self.cpu.shutdown_now(),
tokio_blocking: self.tokio_blocking.shutdown_now(),
io: self.io.shutdown_now(),
}
}
#[inline]
pub fn is_shutdown(&self) -> bool {
self.blocking.is_shutdown()
&& self.cpu.is_shutdown()
&& self.tokio_blocking.is_shutdown()
&& self.io.is_shutdown()
}
#[inline]
pub fn is_terminated(&self) -> bool {
self.blocking.is_terminated()
&& self.cpu.is_terminated()
&& self.tokio_blocking.is_terminated()
&& self.io.is_terminated()
}
pub fn await_termination(&self) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
Box::pin(async move {
self.blocking.await_termination().await;
self.cpu.await_termination().await;
self.tokio_blocking.await_termination().await;
self.io.await_termination().await;
})
}
}