use std::{
future::Future,
pin::Pin,
thread,
time::Duration,
};
use qubit_function::{
Callable,
Runnable,
};
use thiserror::Error;
use qubit_executor::TaskHandle;
use super::{
BlockingExecutorService,
BlockingExecutorServiceBuilder,
ExecutorService,
RayonExecutorService,
RayonExecutorServiceBuildError,
RayonExecutorServiceBuilder,
RayonTaskHandle,
RejectedExecution,
ShutdownReport,
TokioBlockingExecutorService,
TokioIoExecutorService,
TokioTaskHandle,
};
#[derive(Debug, Error)]
pub enum ExecutionServicesBuildError {
#[error("failed to build blocking executor service: {source}")]
Blocking {
#[from]
source: super::ThreadPoolBuildError,
},
#[error("failed to build cpu executor service: {source}")]
Cpu {
#[from]
source: RayonExecutorServiceBuildError,
},
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub struct ExecutionServicesShutdownReport {
pub blocking: ShutdownReport,
pub cpu: ShutdownReport,
pub tokio_blocking: ShutdownReport,
pub io: ShutdownReport,
}
impl ExecutionServicesShutdownReport {
#[inline]
pub const fn total_queued(&self) -> usize {
self.blocking.queued + self.cpu.queued + self.tokio_blocking.queued + self.io.queued
}
#[inline]
pub const fn total_running(&self) -> usize {
self.blocking.running + self.cpu.running + self.tokio_blocking.running + self.io.running
}
#[inline]
pub const fn total_cancelled(&self) -> usize {
self.blocking.cancelled
+ self.cpu.cancelled
+ self.tokio_blocking.cancelled
+ self.io.cancelled
}
}
#[derive(Debug, Clone)]
pub struct ExecutionServicesBuilder {
blocking: BlockingExecutorServiceBuilder,
cpu: RayonExecutorServiceBuilder,
}
impl ExecutionServicesBuilder {
#[inline]
pub fn blocking_pool_size(mut self, pool_size: usize) -> Self {
self.blocking = self.blocking.pool_size(pool_size);
self
}
#[inline]
pub fn blocking_core_pool_size(mut self, core_pool_size: usize) -> Self {
self.blocking = self.blocking.core_pool_size(core_pool_size);
self
}
#[inline]
pub fn blocking_maximum_pool_size(mut self, maximum_pool_size: usize) -> Self {
self.blocking = self.blocking.maximum_pool_size(maximum_pool_size);
self
}
#[inline]
pub fn blocking_queue_capacity(mut self, capacity: usize) -> Self {
self.blocking = self.blocking.queue_capacity(capacity);
self
}
#[inline]
pub fn blocking_unbounded_queue(mut self) -> Self {
self.blocking = self.blocking.unbounded_queue();
self
}
#[inline]
pub fn blocking_thread_name_prefix(mut self, prefix: &str) -> Self {
self.blocking = self.blocking.thread_name_prefix(prefix);
self
}
#[inline]
pub fn blocking_stack_size(mut self, stack_size: usize) -> Self {
self.blocking = self.blocking.stack_size(stack_size);
self
}
#[inline]
pub fn blocking_keep_alive(mut self, keep_alive: Duration) -> Self {
self.blocking = self.blocking.keep_alive(keep_alive);
self
}
#[inline]
pub fn blocking_allow_core_thread_timeout(mut self, allow: bool) -> Self {
self.blocking = self.blocking.allow_core_thread_timeout(allow);
self
}
#[inline]
pub fn blocking_prestart_core_threads(mut self) -> Self {
self.blocking = self.blocking.prestart_core_threads();
self
}
#[inline]
pub fn cpu_threads(mut self, num_threads: usize) -> Self {
self.cpu = self.cpu.num_threads(num_threads);
self
}
#[inline]
pub fn cpu_thread_name_prefix(mut self, prefix: &str) -> Self {
self.cpu = self.cpu.thread_name_prefix(prefix);
self
}
#[inline]
pub fn cpu_stack_size(mut self, stack_size: usize) -> Self {
self.cpu = self.cpu.stack_size(stack_size);
self
}
pub fn build(self) -> Result<ExecutionServices, ExecutionServicesBuildError> {
let blocking = self
.blocking
.build()
.map_err(|source| ExecutionServicesBuildError::Blocking { source })?;
let cpu = self
.cpu
.build()
.map_err(|source| ExecutionServicesBuildError::Cpu { source })?;
let tokio_blocking = TokioBlockingExecutorService::new();
let io = TokioIoExecutorService::new();
Ok(ExecutionServices {
blocking,
cpu,
tokio_blocking,
io,
})
}
}
impl Default for ExecutionServicesBuilder {
fn default() -> Self {
let pool_size = default_pool_size();
Self {
blocking: BlockingExecutorService::builder().pool_size(pool_size),
cpu: RayonExecutorService::builder().num_threads(pool_size),
}
}
}
pub struct ExecutionServices {
blocking: BlockingExecutorService,
cpu: RayonExecutorService,
tokio_blocking: TokioBlockingExecutorService,
io: TokioIoExecutorService,
}
impl ExecutionServices {
#[inline]
pub fn new() -> Result<Self, ExecutionServicesBuildError> {
Self::builder().build()
}
#[inline]
pub fn builder() -> ExecutionServicesBuilder {
ExecutionServicesBuilder::default()
}
#[inline]
pub fn blocking(&self) -> &BlockingExecutorService {
&self.blocking
}
#[inline]
pub fn cpu(&self) -> &RayonExecutorService {
&self.cpu
}
#[inline]
pub fn tokio_blocking(&self) -> &TokioBlockingExecutorService {
&self.tokio_blocking
}
#[inline]
pub fn io(&self) -> &TokioIoExecutorService {
&self.io
}
#[inline]
pub fn submit_blocking<T, E>(&self, task: T) -> Result<TaskHandle<(), E>, RejectedExecution>
where
T: Runnable<E> + Send + 'static,
E: Send + 'static,
{
self.blocking.submit(task)
}
#[inline]
pub fn submit_blocking_callable<C, R, E>(
&self,
task: C,
) -> Result<TaskHandle<R, E>, RejectedExecution>
where
C: Callable<R, E> + Send + 'static,
R: Send + 'static,
E: Send + 'static,
{
self.blocking.submit_callable(task)
}
#[inline]
pub fn submit_cpu<T, E>(&self, task: T) -> Result<RayonTaskHandle<(), E>, RejectedExecution>
where
T: Runnable<E> + Send + 'static,
E: Send + 'static,
{
self.cpu.submit(task)
}
#[inline]
pub fn submit_cpu_callable<C, R, E>(
&self,
task: C,
) -> Result<RayonTaskHandle<R, E>, RejectedExecution>
where
C: Callable<R, E> + Send + 'static,
R: Send + 'static,
E: Send + 'static,
{
self.cpu.submit_callable(task)
}
#[inline]
pub fn submit_tokio_blocking<T, E>(
&self,
task: T,
) -> Result<TokioTaskHandle<(), E>, RejectedExecution>
where
T: Runnable<E> + Send + 'static,
E: Send + 'static,
{
self.tokio_blocking.submit(task)
}
#[inline]
pub fn submit_tokio_blocking_callable<C, R, E>(
&self,
task: C,
) -> Result<TokioTaskHandle<R, E>, RejectedExecution>
where
C: Callable<R, E> + Send + 'static,
R: Send + 'static,
E: Send + 'static,
{
self.tokio_blocking.submit_callable(task)
}
#[inline]
pub fn spawn_io<F, R, E>(&self, future: F) -> Result<TokioTaskHandle<R, E>, RejectedExecution>
where
F: Future<Output = Result<R, E>> + Send + 'static,
R: Send + 'static,
E: Send + 'static,
{
self.io.spawn(future)
}
pub fn shutdown(&self) {
self.blocking.shutdown();
self.cpu.shutdown();
self.tokio_blocking.shutdown();
self.io.shutdown();
}
pub fn shutdown_now(&self) -> ExecutionServicesShutdownReport {
ExecutionServicesShutdownReport {
blocking: self.blocking.shutdown_now(),
cpu: self.cpu.shutdown_now(),
tokio_blocking: self.tokio_blocking.shutdown_now(),
io: self.io.shutdown_now(),
}
}
#[inline]
pub fn is_shutdown(&self) -> bool {
self.blocking.is_shutdown()
&& self.cpu.is_shutdown()
&& self.tokio_blocking.is_shutdown()
&& self.io.is_shutdown()
}
#[inline]
pub fn is_terminated(&self) -> bool {
self.blocking.is_terminated()
&& self.cpu.is_terminated()
&& self.tokio_blocking.is_terminated()
&& self.io.is_terminated()
}
pub fn await_termination(&self) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
Box::pin(async move {
self.blocking.await_termination().await;
self.cpu.await_termination().await;
self.tokio_blocking.await_termination().await;
self.io.await_termination().await;
})
}
}
fn default_pool_size() -> usize {
thread::available_parallelism()
.map(usize::from)
.unwrap_or(1)
}