Skip to main content

zero_pool/
pool.rs

1use crate::{
2    atomic_latch::AtomicLatch, queue::Queue, task_future::TaskFuture, worker::spawn_worker,
3};
4use std::{
5    num::NonZeroUsize,
6    sync::Arc,
7    thread::{self, JoinHandle},
8};
9
10pub struct ZeroPool {
11    queue: Arc<Queue>,
12    workers: Box<[JoinHandle<()>]>,
13}
14
15impl ZeroPool {
16    /// Creates a new thread pool with worker count equal to available parallelism
17    ///
18    /// Worker count is determined by `std::thread::available_parallelism()`,
19    /// falling back to 1 if unavailable. This is usually the optimal choice
20    /// for CPU-bound workloads.
21    ///
22    /// # Examples
23    ///
24    /// ```rust
25    /// use zero_pool::ZeroPool;
26    /// let pool = ZeroPool::new();
27    /// ```
28    pub fn new() -> Self {
29        let worker_count = thread::available_parallelism().unwrap_or(NonZeroUsize::MIN);
30        Self::with_workers(worker_count)
31    }
32
33    /// Creates a new thread pool with the specified number of workers
34    ///
35    /// Use this when you need precise control over the worker count,
36    /// for example when coordinating with other thread pools or
37    /// when you know the optimal count for your specific workload.
38    ///
39    /// # Examples
40    ///
41    /// ```rust
42    /// use std::num::NonZeroUsize;
43    /// use zero_pool::ZeroPool;
44    /// let pool = ZeroPool::with_workers(NonZeroUsize::new(4).unwrap());
45    /// ```
46    pub fn with_workers(worker_count: NonZeroUsize) -> Self {
47        let worker_count = worker_count.get();
48
49        let queue = Arc::new(Queue::new(worker_count));
50
51        let latch = AtomicLatch::new(worker_count);
52        let workers = (0..worker_count)
53            .map(|id| spawn_worker(id, queue.clone(), latch.clone()))
54            .collect();
55
56        latch.wait();
57
58        ZeroPool { queue, workers }
59    }
60
61    /// Submit a single typed task
62    ///
63    /// The parameter struct must remain valid until the future completes.
64    /// This is the recommended method for submitting individual tasks.
65    ///
66    /// # Examples
67    ///
68    /// ```rust
69    /// use zero_pool::ZeroPool;
70    ///
71    /// struct MyTaskParams { value: u64, result: *mut u64 }
72    ///
73    /// fn my_task_fn(params: &MyTaskParams) {
74    ///     unsafe { *params.result = params.value * 2; }
75    /// }
76    ///
77    /// let pool = ZeroPool::new();
78    /// let mut result = 0u64;
79    /// let task_params = MyTaskParams { value: 42, result: &mut result };
80    /// let future = pool.submit_task(my_task_fn, &task_params);
81    /// future.wait();
82    /// assert_eq!(result, 84);
83    /// ```
84    #[inline]
85    pub fn submit_task<T>(&self, task_fn: fn(&T), params: &T) -> TaskFuture {
86        let slice = std::slice::from_ref(params);
87        self.queue.push_task_batch(task_fn, slice)
88    }
89
90    /// Submit a batch of uniform tasks
91    ///
92    /// All tasks in the batch must be the same type and use the same task function.
93    /// This method handles the pointer conversion automatically and is the most
94    /// convenient way to submit large batches of similar work.
95    ///
96    /// # Examples
97    ///
98    /// ```rust
99    /// use zero_pool::ZeroPool;
100    ///
101    /// struct MyTaskParams { value: u64, result: *mut u64 }
102    ///
103    /// fn my_task_fn(params: &MyTaskParams) {
104    ///     unsafe { *params.result = params.value * 2; }
105    /// }
106    ///
107    /// let pool = ZeroPool::new();
108    /// let mut results = vec![0u64; 1000];
109    /// let task_params: Vec<_> = (0..1000)
110    ///     .map(|i| MyTaskParams { value: i as u64, result: &mut results[i] })
111    ///     .collect();
112    /// let future = pool.submit_batch(my_task_fn, &task_params);
113    /// future.wait();
114    /// assert_eq!(results[0], 0);
115    /// assert_eq!(results[1], 2);
116    /// assert_eq!(results[999], 1998);
117    /// ```
118    #[inline]
119    pub fn submit_batch<T>(&self, task_fn: fn(&T), params_vec: &[T]) -> TaskFuture {
120        self.queue.push_task_batch(task_fn, params_vec)
121    }
122}
123
124impl Default for ZeroPool {
125    /// Creates a new thread pool with default settings
126    ///
127    /// Equivalent to calling `ZeroPool::new()`. Worker count is determined by
128    /// `std::thread::available_parallelism()`, falling back to 1 if unavailable.
129    fn default() -> Self {
130        Self::new()
131    }
132}
133
134impl Drop for ZeroPool {
135    fn drop(&mut self) {
136        self.queue.shutdown();
137
138        let workers = std::mem::take(&mut self.workers);
139        for handle in workers {
140            let _ = handle.join();
141        }
142    }
143}