use std::{
future::Future,
pin::Pin,
sync::Arc,
time::Duration,
};
use qubit_executor::{
TaskHandle,
TrackedTask,
};
use qubit_function::{
Callable,
Runnable,
};
use qubit_thread_pool::{
ThreadPool,
ThreadPoolBuilder,
};
use qubit_tokio_executor::TokioExecutorService;
use super::{
ExecutionServicesBuildError,
ExecutionServicesBuilder,
ExecutionServicesStopReport,
ExecutorService,
ExecutorServiceLifecycle,
RayonExecutorService,
RayonTaskHandle,
SubmissionError,
TokioBlockingTaskHandle,
TokioIoExecutorService,
TokioTaskHandle,
};
pub type BlockingExecutorService = ThreadPool;
pub type BlockingExecutorServiceBuilder = ThreadPoolBuilder;
pub type TokioBlockingExecutorService = TokioExecutorService;
pub struct ExecutionServices {
blocking: Arc<BlockingExecutorService>,
cpu: RayonExecutorService,
tokio_blocking: TokioBlockingExecutorService,
io: TokioIoExecutorService,
}
impl ExecutionServices {
pub(crate) fn from_parts(
blocking: BlockingExecutorService,
cpu: RayonExecutorService,
tokio_blocking: TokioBlockingExecutorService,
io: TokioIoExecutorService,
) -> Self {
Self {
blocking: Arc::new(blocking),
cpu,
tokio_blocking,
io,
}
}
#[inline]
pub fn new() -> Result<Self, ExecutionServicesBuildError> {
Self::builder().build()
}
#[inline]
pub fn builder() -> ExecutionServicesBuilder {
ExecutionServicesBuilder::default()
}
#[inline]
pub fn blocking(&self) -> &BlockingExecutorService {
self.blocking.as_ref()
}
#[inline]
pub fn cpu(&self) -> &RayonExecutorService {
&self.cpu
}
#[inline]
pub fn tokio_blocking(&self) -> &TokioBlockingExecutorService {
&self.tokio_blocking
}
#[inline]
pub fn io(&self) -> &TokioIoExecutorService {
&self.io
}
#[inline]
pub fn submit_blocking<T, E>(&self, task: T) -> Result<(), SubmissionError>
where
T: Runnable<E> + Send + 'static,
E: Send + 'static,
{
self.blocking.submit(task)
}
#[inline]
pub fn submit_tracked_blocking<T, E>(
&self,
task: T,
) -> Result<TrackedTask<(), E>, SubmissionError>
where
T: Runnable<E> + Send + 'static,
E: Send + 'static,
{
self.blocking.submit_tracked(task)
}
#[inline]
pub fn submit_blocking_callable<C, R, E>(
&self,
task: C,
) -> Result<TaskHandle<R, E>, SubmissionError>
where
C: Callable<R, E> + Send + 'static,
R: Send + 'static,
E: Send + 'static,
{
self.blocking.submit_callable(task)
}
#[inline]
pub fn submit_tracked_blocking_callable<C, R, E>(
&self,
task: C,
) -> Result<TrackedTask<R, E>, SubmissionError>
where
C: Callable<R, E> + Send + 'static,
R: Send + 'static,
E: Send + 'static,
{
self.blocking.submit_tracked_callable(task)
}
#[inline]
pub fn submit_cpu<T, E>(&self, task: T) -> Result<(), SubmissionError>
where
T: Runnable<E> + Send + 'static,
E: Send + 'static,
{
self.cpu.submit(task)
}
#[inline]
pub fn submit_tracked_cpu<T, E>(
&self,
task: T,
) -> Result<RayonTaskHandle<(), E>, SubmissionError>
where
T: Runnable<E> + Send + 'static,
E: Send + 'static,
{
self.cpu.submit_tracked(task)
}
#[inline]
pub fn submit_cpu_callable<C, R, E>(&self, task: C) -> Result<TaskHandle<R, E>, SubmissionError>
where
C: Callable<R, E> + Send + 'static,
R: Send + 'static,
E: Send + 'static,
{
self.cpu.submit_callable(task)
}
#[inline]
pub fn submit_tracked_cpu_callable<C, R, E>(
&self,
task: C,
) -> Result<RayonTaskHandle<R, E>, SubmissionError>
where
C: Callable<R, E> + Send + 'static,
R: Send + 'static,
E: Send + 'static,
{
self.cpu.submit_tracked_callable(task)
}
#[inline]
pub fn submit_tokio_blocking<T, E>(&self, task: T) -> Result<(), SubmissionError>
where
T: Runnable<E> + Send + 'static,
E: Send + 'static,
{
self.tokio_blocking.submit(task)
}
#[inline]
pub fn submit_tracked_tokio_blocking<T, E>(
&self,
task: T,
) -> Result<TokioBlockingTaskHandle<(), E>, SubmissionError>
where
T: Runnable<E> + Send + 'static,
E: Send + 'static,
{
self.tokio_blocking.submit_tracked(task)
}
#[inline]
pub fn submit_tokio_blocking_callable<C, R, E>(
&self,
task: C,
) -> Result<TaskHandle<R, E>, SubmissionError>
where
C: Callable<R, E> + Send + 'static,
R: Send + 'static,
E: Send + 'static,
{
self.tokio_blocking.submit_callable(task)
}
#[inline]
pub fn submit_tracked_tokio_blocking_callable<C, R, E>(
&self,
task: C,
) -> Result<TokioBlockingTaskHandle<R, E>, SubmissionError>
where
C: Callable<R, E> + Send + 'static,
R: Send + 'static,
E: Send + 'static,
{
self.tokio_blocking.submit_tracked_callable(task)
}
#[inline]
pub fn spawn_io<F, R, E>(&self, future: F) -> Result<TokioTaskHandle<R, E>, SubmissionError>
where
F: Future<Output = Result<R, E>> + Send + 'static,
R: Send + 'static,
E: Send + 'static,
{
self.io.spawn(future)
}
pub fn shutdown(&self) {
self.blocking.shutdown();
self.cpu.shutdown();
self.tokio_blocking.shutdown();
self.io.shutdown();
}
pub fn stop(&self) -> ExecutionServicesStopReport {
ExecutionServicesStopReport {
blocking: self.blocking.stop(),
cpu: self.cpu.stop(),
tokio_blocking: self.tokio_blocking.stop(),
io: self.io.stop(),
}
}
pub fn lifecycle(&self) -> ExecutorServiceLifecycle {
let lifecycles = [
self.blocking.lifecycle(),
self.cpu.lifecycle(),
self.tokio_blocking.lifecycle(),
self.io.lifecycle(),
];
if lifecycles
.iter()
.all(|state| *state == ExecutorServiceLifecycle::Terminated)
{
ExecutorServiceLifecycle::Terminated
} else if lifecycles.contains(&ExecutorServiceLifecycle::Stopping) {
ExecutorServiceLifecycle::Stopping
} else if lifecycles
.iter()
.any(|state| *state != ExecutorServiceLifecycle::Running)
{
ExecutorServiceLifecycle::ShuttingDown
} else {
ExecutorServiceLifecycle::Running
}
}
#[inline]
pub fn is_running(&self) -> bool {
self.lifecycle() == ExecutorServiceLifecycle::Running
}
#[inline]
pub fn is_shutting_down(&self) -> bool {
self.lifecycle() == ExecutorServiceLifecycle::ShuttingDown
}
#[inline]
pub fn is_stopping(&self) -> bool {
self.lifecycle() == ExecutorServiceLifecycle::Stopping
}
#[inline]
pub fn is_not_running(&self) -> bool {
self.lifecycle() != ExecutorServiceLifecycle::Running
}
#[inline]
pub fn is_terminated(&self) -> bool {
self.lifecycle() == ExecutorServiceLifecycle::Terminated
}
pub fn await_termination(&self) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
Box::pin(async move {
let blocking = Arc::clone(&self.blocking);
let cpu = self.cpu.clone();
let tokio_blocking = self.tokio_blocking.clone();
let blocking_wait = tokio::task::spawn_blocking(move || blocking.wait_termination());
let cpu_wait = tokio::task::spawn_blocking(move || cpu.wait_termination());
let tokio_blocking_wait =
tokio::task::spawn_blocking(move || tokio_blocking.wait_termination());
while !self.io.is_terminated() {
tokio::time::sleep(Duration::from_millis(10)).await;
}
let _ = blocking_wait.await;
let _ = cpu_wait.await;
let _ = tokio_blocking_wait.await;
})
}
}