qubit-thread-pool 0.5.0

Dynamic and fixed thread pool executor services for Qubit Rust libraries
Documentation
/*******************************************************************************
 *
 *    Copyright (c) 2025 - 2026 Haixing Hu.
 *
 *    SPDX-License-Identifier: Apache-2.0
 *
 *    Licensed under the Apache License, Version 2.0.
 *
 ******************************************************************************/
use std::{
    sync::Arc,
    thread::JoinHandle,
};

use qubit_executor::service::{
    ExecutorService,
    ExecutorServiceLifecycle,
    StopReport,
    SubmissionError,
};
use qubit_executor::task::spi::TaskEndpointPair;
use qubit_executor::{
    TaskHandle,
    TrackedTask,
};
use qubit_function::{
    Callable,
    Runnable,
};

use super::fixed_thread_pool_builder::FixedThreadPoolBuilder;
use super::fixed_thread_pool_inner::FixedThreadPoolInner;
use super::fixed_worker::FixedWorker;
use super::fixed_worker_runtime::FixedWorkerRuntime;
use crate::{
    ExecutorServiceBuilderError,
    PoolJob,
    ThreadPoolStats,
};

/// Fixed-size thread pool implementing [`ExecutorService`].
///
/// `FixedThreadPool` prestarts a fixed number of worker threads and does not
/// support runtime pool-size changes. Use [`crate::ThreadPool`] when dynamic
/// core/maximum sizes or keep-alive policies are required.
pub struct FixedThreadPool {
    /// Shared fixed pool state.
    inner: Arc<FixedThreadPoolInner>,
}

impl FixedThreadPool {
    /// Builds a fixed pool from a validated [`FixedThreadPoolBuilder`].
    ///
    /// # Parameters
    ///
    /// * `builder` - Configuration produced by [`FixedThreadPoolBuilder`].
    ///
    /// # Returns
    ///
    /// A fixed thread-pool handle with workers already started.
    ///
    /// # Errors
    ///
    /// Returns [`ExecutorServiceBuilderError`] when a worker thread cannot be spawned.
    pub(crate) fn new_with_builder(
        builder: FixedThreadPoolBuilder,
    ) -> Result<Self, ExecutorServiceBuilderError> {
        let FixedThreadPoolBuilder {
            pool_size,
            queue_capacity,
            thread_name_prefix,
            stack_size,
            hooks,
        } = builder;
        let mut worker_runtimes = Vec::with_capacity(pool_size);
        for index in 0..pool_size {
            let worker_runtime = FixedWorkerRuntime::new(index);
            worker_runtimes.push(worker_runtime);
        }
        let inner = Arc::new(FixedThreadPoolInner::with_hooks(
            pool_size,
            queue_capacity,
            hooks,
        ));
        let mut worker_handles = Vec::with_capacity(pool_size);
        for (index, worker_runtime) in worker_runtimes.into_iter().enumerate() {
            inner.reserve_worker_slot();
            let worker_inner = Arc::clone(&inner);
            let thread_name = format!("{}-{}", thread_name_prefix, index);
            let mut builder = std::thread::Builder::new().name(thread_name);
            if let Some(stack_size) = stack_size {
                builder = builder.stack_size(stack_size);
            }
            match builder.spawn(move || FixedWorker::run(worker_inner, worker_runtime)) {
                Ok(handle) => worker_handles.push(handle),
                Err(source) => {
                    inner.rollback_worker_slot();
                    inner.stop_after_failed_build();
                    join_started_workers(worker_handles);
                    return Err(ExecutorServiceBuilderError::SpawnWorker {
                        index: Some(index),
                        source,
                    });
                }
            }
        }
        Ok(Self { inner })
    }

    /// Creates a fixed thread pool with `pool_size` prestarted workers.
    ///
    /// # Parameters
    ///
    /// * `pool_size` - Number of worker threads.
    ///
    /// # Returns
    ///
    /// A fixed thread pool.
    ///
    /// # Errors
    ///
    /// Returns [`ExecutorServiceBuilderError`] if the worker count is zero or a worker
    /// cannot be spawned.
    pub fn new(pool_size: usize) -> Result<Self, ExecutorServiceBuilderError> {
        Self::builder().pool_size(pool_size).build()
    }

    /// Creates a fixed pool builder.
    ///
    /// # Returns
    ///
    /// Builder with CPU parallelism defaults.
    pub fn builder() -> FixedThreadPoolBuilder {
        FixedThreadPoolBuilder::new()
    }

    /// Returns the fixed worker count.
    ///
    /// # Returns
    ///
    /// Number of workers in this pool.
    pub fn pool_size(&self) -> usize {
        self.inner.pool_size()
    }

    /// Returns the queued task count.
    ///
    /// # Returns
    ///
    /// Number of accepted tasks waiting to run.
    pub fn queued_count(&self) -> usize {
        self.inner.queued_count()
    }

    /// Returns the running task count.
    ///
    /// # Returns
    ///
    /// Number of tasks currently held by workers.
    pub fn running_count(&self) -> usize {
        self.inner.running_count()
    }

    /// Returns the live worker count.
    ///
    /// # Returns
    ///
    /// Number of worker loops that have not exited.
    pub fn live_worker_count(&self) -> usize {
        self.inner.state.read(|state| state.live_workers)
    }

    /// Returns a point-in-time stats snapshot.
    ///
    /// # Returns
    ///
    /// Snapshot containing queue, worker, and lifecycle counters.
    pub fn stats(&self) -> ThreadPoolStats {
        self.inner.stats()
    }

    /// Blocks until all accepted work has completed.
    ///
    /// This is a join-style wait for quiescence: it does not request shutdown
    /// and does not wait for worker threads to exit. Concurrent submissions may
    /// extend the wait until those accepted jobs also drain.
    #[inline]
    pub fn join(&self) {
        self.inner.wait_until_idle();
    }
}

impl Default for FixedThreadPool {
    /// Creates a fixed thread pool using [`FixedThreadPoolBuilder::default`].
    ///
    /// # Returns
    ///
    /// A fixed thread pool with CPU parallelism defaults and prestarted workers.
    ///
    /// # Panics
    ///
    /// Panics when the default builder fails to spawn a worker thread.
    fn default() -> Self {
        FixedThreadPoolBuilder::default()
            .build()
            .expect("failed to build default FixedThreadPool")
    }
}

impl Drop for FixedThreadPool {
    /// Requests graceful shutdown when the pool handle is dropped.
    fn drop(&mut self) {
        self.inner.shutdown();
    }
}

impl ExecutorService for FixedThreadPool {
    type ResultHandle<R, E>
        = TaskHandle<R, E>
    where
        R: Send + 'static,
        E: Send + 'static;

    type TrackedHandle<R, E>
        = TrackedTask<R, E>
    where
        R: Send + 'static,
        E: Send + 'static;

    /// Accepts a runnable and queues it for fixed pool workers.
    fn submit<T, E>(&self, task: T) -> Result<(), SubmissionError>
    where
        T: Runnable<E> + Send + 'static,
        E: Send + 'static,
    {
        self.inner.submit(PoolJob::detached(task))
    }

    /// Accepts a callable and queues it for fixed pool workers.
    ///
    /// # Parameters
    ///
    /// * `task` - Callable to execute on a fixed pool worker.
    ///
    /// # Returns
    ///
    /// A [`TaskHandle`] for the accepted task.
    ///
    /// # Errors
    ///
    /// Returns [`SubmissionError::Shutdown`] after shutdown or
    /// [`SubmissionError::Saturated`] when a bounded queue is full.
    fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::ResultHandle<R, E>, SubmissionError>
    where
        C: Callable<R, E> + Send + 'static,
        R: Send + 'static,
        E: Send + 'static,
    {
        let (handle, completion) = TaskEndpointPair::new().into_parts();
        let job = PoolJob::from_task(task, completion);
        self.inner.submit(job)?;
        Ok(handle)
    }

    /// Accepts a callable and queues it with a tracked handle.
    ///
    /// # Parameters
    ///
    /// * `task` - Callable to execute on a fixed pool worker.
    ///
    /// # Returns
    ///
    /// A [`TrackedTask`] that reports task status and can observe completion,
    /// failure, or queued cancellation.
    ///
    /// # Errors
    ///
    /// Returns [`SubmissionError::Shutdown`] after shutdown or
    /// [`SubmissionError::Saturated`] when a bounded queue is full.
    fn submit_tracked_callable<C, R, E>(
        &self,
        task: C,
    ) -> Result<Self::TrackedHandle<R, E>, SubmissionError>
    where
        C: Callable<R, E> + Send + 'static,
        R: Send + 'static,
        E: Send + 'static,
    {
        let (handle, completion) = TaskEndpointPair::new().into_tracked_parts();
        let job = PoolJob::from_task(task, completion);
        self.inner.submit(job)?;
        Ok(handle)
    }

    /// Stops accepting new work and drains accepted queued tasks.
    fn shutdown(&self) {
        self.inner.shutdown();
    }

    /// Stops accepting work and cancels queued tasks.
    ///
    /// # Returns
    ///
    /// A count-based shutdown report.
    fn stop(&self) -> StopReport {
        self.inner.stop()
    }

    /// Returns the current lifecycle state.
    fn lifecycle(&self) -> ExecutorServiceLifecycle {
        self.inner.lifecycle()
    }

    /// Returns whether shutdown has been requested.
    ///
    /// # Returns
    ///
    /// `true` when this pool no longer accepts new work.
    fn is_not_running(&self) -> bool {
        self.inner.is_not_running()
    }

    /// Returns whether this pool is fully terminated.
    ///
    /// # Returns
    ///
    /// `true` after shutdown and after all workers have exited.
    fn is_terminated(&self) -> bool {
        self.inner.is_terminated()
    }

    /// Blocks until this fixed pool has terminated.
    fn wait_termination(&self) {
        self.inner.wait_for_termination();
    }
}

/// Joins workers that were already spawned during a failed build.
///
/// # Parameters
///
/// * `worker_handles` - Join handles for workers started before construction failed.
fn join_started_workers(worker_handles: Vec<JoinHandle<()>>) {
    for worker_handle in worker_handles {
        let _ignored = worker_handle.join();
    }
}