zero_pool/
pool.rs

1use crate::{TaskFnPointer, queue::Queue, task_future::TaskFuture, worker::spawn_worker};
2use std::{
3    num::NonZeroUsize,
4    sync::{Arc, Barrier},
5    thread::{self, JoinHandle},
6};
7
8pub struct ZeroPool {
9    queue: Arc<Queue>,
10    workers: Vec<JoinHandle<()>>,
11}
12
13impl ZeroPool {
14    /// Creates a new thread pool with worker count equal to available parallelism
15    ///
16    /// Worker count is determined by `std::thread::available_parallelism()`,
17    /// falling back to 1 if unavailable. This is usually the optimal choice
18    /// for CPU-bound workloads.
19    ///
20    /// # Examples
21    ///
22    /// ```rust
23    /// use zero_pool::ZeroPool;
24    /// let pool = ZeroPool::new();
25    /// ```
26    pub fn new() -> Self {
27        let worker_count = thread::available_parallelism().unwrap_or(NonZeroUsize::MIN);
28        Self::with_workers(worker_count)
29    }
30
31    /// Creates a new thread pool with the specified number of workers
32    ///
33    /// Use this when you need precise control over the worker count,
34    /// for example when coordinating with other thread pools or
35    /// when you know the optimal count for your specific workload.
36    ///
37    /// # Examples
38    ///
39    /// ```rust
40    /// use std::num::NonZeroUsize;
41    /// use zero_pool::ZeroPool;
42    /// let pool = ZeroPool::with_workers(NonZeroUsize::new(4).unwrap());
43    /// ```
44    pub fn with_workers(worker_count: NonZeroUsize) -> Self {
45        let worker_count = worker_count.get();
46
47        let queue = Arc::new(Queue::new(worker_count));
48
49        let barrier = Arc::new(Barrier::new(worker_count + 1));
50        let workers: Vec<JoinHandle<()>> = (0..worker_count)
51            .map(|id| spawn_worker(id, queue.clone(), barrier.clone()))
52            .collect();
53
54        barrier.wait();
55
56        ZeroPool { queue, workers }
57    }
58
59    /// Submit a single typed task with automatic pointer conversion
60    ///
61    /// This method provides type safety while maintaining performance.
62    /// The parameter struct must remain valid until the future completes.
63    /// This is the recommended method for submitting individual tasks.
64    ///
65    /// # Examples
66    ///
67    /// ```rust
68    /// use zero_pool::{ZeroPool, zp_define_task_fn, zp_write};
69    ///
70    /// struct MyTaskParams { value: u64, result: *mut u64 }
71    ///
72    /// zp_define_task_fn!(my_task_fn, MyTaskParams, |params| {
73    ///     zp_write!(params.result, params.value * 2);
74    /// });
75    ///
76    /// let pool = ZeroPool::new();
77    /// let mut result = 0u64;
78    /// let task_params = MyTaskParams { value: 42, result: &mut result };
79    /// let future = pool.submit_task(my_task_fn, &task_params);
80    /// future.wait();
81    /// assert_eq!(result, 84);
82    /// ```
83    #[inline]
84    pub fn submit_task<T>(&self, task_fn: TaskFnPointer, params: &T) -> TaskFuture {
85        let slice = std::slice::from_ref(params);
86        self.queue.push_task_batch(task_fn, slice)
87    }
88
89    /// Submit a batch of uniform tasks with automatic pointer conversion
90    ///
91    /// All tasks in the batch must be the same type and use the same task function.
92    /// This method handles the pointer conversion automatically and is the most
93    /// convenient way to submit large batches of similar work.
94    ///
95    /// # Examples
96    ///
97    /// ```rust
98    /// use zero_pool::{ZeroPool, zp_define_task_fn, zp_write};
99    ///
100    /// struct MyTaskParams { value: u64, result: *mut u64 }
101    ///
102    /// zp_define_task_fn!(my_task_fn, MyTaskParams, |params| {
103    ///     zp_write!(params.result, params.value * 2);
104    /// });
105    ///
106    /// let pool = ZeroPool::new();
107    /// let mut results = vec![0u64; 1000];
108    /// let task_params: Vec<_> = (0..1000)
109    ///     .map(|i| MyTaskParams { value: i as u64, result: &mut results[i] })
110    ///     .collect();
111    /// let future = pool.submit_batch_uniform(my_task_fn, &task_params);
112    /// future.wait();
113    /// assert_eq!(results[0], 0);
114    /// assert_eq!(results[1], 2);
115    /// assert_eq!(results[999], 1998);
116    /// ```
117    #[inline]
118    pub fn submit_batch_uniform<T>(&self, task_fn: TaskFnPointer, params_vec: &[T]) -> TaskFuture {
119        self.queue.push_task_batch(task_fn, params_vec)
120    }
121}
122
123impl Default for ZeroPool {
124    /// Creates a new thread pool with default settings
125    ///
126    /// Equivalent to calling `ZeroPool::new()`. Worker count is determined by
127    /// `std::thread::available_parallelism()`, falling back to 1 if unavailable.
128    fn default() -> Self {
129        Self::new()
130    }
131}
132
133impl Drop for ZeroPool {
134    fn drop(&mut self) {
135        self.queue.shutdown();
136
137        let workers = std::mem::take(&mut self.workers);
138        for handle in workers {
139            let _ = handle.join();
140        }
141    }
142}