zero_pool/
pool.rs

1use crate::{TaskFnPointer, queue::Queue, task_future::TaskFuture, worker::spawn_worker};
2use std::{
3    sync::{Arc, Barrier},
4    thread::JoinHandle,
5};
6
7pub struct ZeroPool {
8    queue: Arc<Queue>,
9    workers: Vec<JoinHandle<()>>,
10}
11
12impl ZeroPool {
13    /// Creates a new thread pool with worker count equal to available parallelism
14    ///
15    /// Worker count is determined by `std::thread::available_parallelism()`,
16    /// falling back to 1 if unavailable. This is usually the optimal choice
17    /// for CPU-bound workloads.
18    ///
19    /// # Examples
20    ///
21    /// ```rust
22    /// use zero_pool::ZeroPool;
23    /// let pool = ZeroPool::new();
24    /// ```
25    pub fn new() -> Self {
26        let worker_count = std::thread::available_parallelism()
27            .map(std::num::NonZeroUsize::get)
28            .unwrap_or(1);
29        Self::with_workers(worker_count)
30    }
31
32    /// Creates a new thread pool with the specified number of workers
33    ///
34    /// Use this when you need precise control over the worker count,
35    /// for example when coordinating with other thread pools or
36    /// when you know the optimal count for your specific workload.
37    ///
38    /// # Panics
39    ///
40    /// Panics if `worker_count` is 0.
41    ///
42    /// ```rust
43    /// use zero_pool::ZeroPool;
44    /// let pool = ZeroPool::with_workers(4);
45    /// ```
46    pub fn with_workers(worker_count: usize) -> Self {
47        assert!(worker_count > 0, "Must have at least one worker");
48
49        let queue = Arc::new(Queue::new(worker_count));
50
51        let barrier = Arc::new(Barrier::new(worker_count + 1));
52        let workers: Vec<JoinHandle<()>> = (0..worker_count)
53            .map(|id| spawn_worker(id, queue.clone(), barrier.clone()))
54            .collect();
55
56        barrier.wait();
57
58        ZeroPool { queue, workers }
59    }
60
61    /// Submit a single typed task with automatic pointer conversion
62    ///
63    /// This method provides type safety while maintaining performance.
64    /// The parameter struct must remain valid until the future completes.
65    /// This is the recommended method for submitting individual tasks.
66    ///
67    /// # Examples
68    ///
69    /// ```rust
70    /// use zero_pool::{ZeroPool, zp_define_task_fn, zp_write};
71    ///
72    /// struct MyTaskParams { value: u64, result: *mut u64 }
73    ///
74    /// zp_define_task_fn!(my_task_fn, MyTaskParams, |params| {
75    ///     zp_write!(params.result, params.value * 2);
76    /// });
77    ///
78    /// let pool = ZeroPool::new();
79    /// let mut result = 0u64;
80    /// let task_params = MyTaskParams { value: 42, result: &mut result };
81    /// let future = pool.submit_task(my_task_fn, &task_params);
82    /// future.wait();
83    /// assert_eq!(result, 84);
84    /// ```
85    #[inline]
86    pub fn submit_task<T>(&self, task_fn: TaskFnPointer, params: &T) -> TaskFuture {
87        let slice = std::slice::from_ref(params);
88        self.queue.push_task_batch(task_fn, slice)
89    }
90
91    /// Submit a batch of uniform tasks with automatic pointer conversion
92    ///
93    /// All tasks in the batch must be the same type and use the same task function.
94    /// This method handles the pointer conversion automatically and is the most
95    /// convenient way to submit large batches of similar work.
96    ///
97    /// # Examples
98    ///
99    /// ```rust
100    /// use zero_pool::{ZeroPool, zp_define_task_fn, zp_write};
101    ///
102    /// struct MyTaskParams { value: u64, result: *mut u64 }
103    ///
104    /// zp_define_task_fn!(my_task_fn, MyTaskParams, |params| {
105    ///     zp_write!(params.result, params.value * 2);
106    /// });
107    ///
108    /// let pool = ZeroPool::new();
109    /// let mut results = vec![0u64; 1000];
110    /// let task_params: Vec<_> = (0..1000)
111    ///     .map(|i| MyTaskParams { value: i as u64, result: &mut results[i] })
112    ///     .collect();
113    /// let future = pool.submit_batch_uniform(my_task_fn, &task_params);
114    /// future.wait();
115    /// assert_eq!(results[0], 0);
116    /// assert_eq!(results[1], 2);
117    /// assert_eq!(results[999], 1998);
118    /// ```
119    #[inline]
120    pub fn submit_batch_uniform<T>(&self, task_fn: TaskFnPointer, params_vec: &[T]) -> TaskFuture {
121        self.queue.push_task_batch(task_fn, params_vec)
122    }
123}
124
125impl Default for ZeroPool {
126    /// Creates a new thread pool with default settings
127    ///
128    /// Equivalent to calling `ZeroPool::new()`. Worker count is determined by
129    /// `std::thread::available_parallelism()`, falling back to 1 if unavailable.
130    fn default() -> Self {
131        Self::new()
132    }
133}
134
135impl Drop for ZeroPool {
136    fn drop(&mut self) {
137        self.queue.shutdown();
138
139        let workers = std::mem::take(&mut self.workers);
140        for handle in workers {
141            let _ = handle.join();
142        }
143    }
144}