Skip to main content

zero_pool/
pool.rs

1use crate::{queue::Queue, task_future::TaskFuture, worker::spawn_worker};
2use std::{
3    num::NonZeroUsize,
4    sync::Arc,
5    thread::{self, JoinHandle},
6};
7
8pub struct ZeroPool {
9    queue: Arc<Queue>,
10    workers: Box<[JoinHandle<()>]>,
11}
12
13impl ZeroPool {
14    /// Creates a new thread pool with worker count equal to available parallelism
15    ///
16    /// Worker count is determined by `std::thread::available_parallelism()`,
17    /// falling back to 1 if unavailable. This is usually the optimal choice
18    /// for CPU-bound workloads.
19    ///
20    /// # Examples
21    ///
22    /// ```rust
23    /// use zero_pool::ZeroPool;
24    /// let pool = ZeroPool::new();
25    /// ```
26    pub fn new() -> Self {
27        let worker_count = thread::available_parallelism().unwrap_or(NonZeroUsize::MIN);
28        Self::with_workers(worker_count)
29    }
30
31    /// Creates a new thread pool with the specified number of workers
32    ///
33    /// Use this when you need precise control over the worker count,
34    /// for example when coordinating with other thread pools or
35    /// when you know the optimal count for your specific workload.
36    ///
37    /// # Examples
38    ///
39    /// ```rust
40    /// use std::num::NonZeroUsize;
41    /// use zero_pool::ZeroPool;
42    /// let pool = ZeroPool::with_workers(NonZeroUsize::new(4).unwrap());
43    /// ```
44    pub fn with_workers(worker_count: NonZeroUsize) -> Self {
45        let worker_count = worker_count.get();
46
47        let queue = Arc::new(Queue::new(worker_count));
48
49        let latch = TaskFuture::new(worker_count);
50        let workers = (0..worker_count)
51            .map(|id| spawn_worker(id, queue.clone(), latch.clone()))
52            .collect();
53
54        latch.wait();
55
56        ZeroPool { queue, workers }
57    }
58
59    /// Submit a single typed task
60    ///
61    /// The parameter struct must remain valid until the future completes.
62    /// This is the recommended method for submitting individual tasks.
63    ///
64    /// # Examples
65    ///
66    /// ```rust
67    /// use zero_pool::ZeroPool;
68    ///
69    /// struct MyTaskParams { value: u64, result: *mut u64 }
70    ///
71    /// fn my_task_fn(params: &MyTaskParams) {
72    ///     unsafe { *params.result = params.value * 2; }
73    /// }
74    ///
75    /// let pool = ZeroPool::new();
76    /// let mut result = 0u64;
77    /// let task_params = MyTaskParams { value: 42, result: &mut result };
78    /// let future = pool.submit_task(my_task_fn, &task_params);
79    /// future.wait();
80    /// assert_eq!(result, 84);
81    /// ```
82    #[inline]
83    pub fn submit_task<T>(&self, task_fn: fn(&T), params: &T) -> TaskFuture {
84        let slice = std::slice::from_ref(params);
85        self.queue.push_task_batch(task_fn, slice)
86    }
87
88    /// Submit a batch of uniform tasks
89    ///
90    /// All tasks in the batch must be the same type and use the same task function.
91    /// This method handles the pointer conversion automatically and is the most
92    /// convenient way to submit large batches of similar work.
93    ///
94    /// # Examples
95    ///
96    /// ```rust
97    /// use zero_pool::ZeroPool;
98    ///
99    /// struct MyTaskParams { value: u64, result: *mut u64 }
100    ///
101    /// fn my_task_fn(params: &MyTaskParams) {
102    ///     unsafe { *params.result = params.value * 2; }
103    /// }
104    ///
105    /// let pool = ZeroPool::new();
106    /// let mut results = vec![0u64; 1000];
107    /// let task_params: Vec<_> = (0..1000)
108    ///     .map(|i| MyTaskParams { value: i as u64, result: &mut results[i] })
109    ///     .collect();
110    /// let future = pool.submit_batch(my_task_fn, &task_params);
111    /// future.wait();
112    /// assert_eq!(results[0], 0);
113    /// assert_eq!(results[1], 2);
114    /// assert_eq!(results[999], 1998);
115    /// ```
116    #[inline]
117    pub fn submit_batch<T>(&self, task_fn: fn(&T), params_vec: &[T]) -> TaskFuture {
118        self.queue.push_task_batch(task_fn, params_vec)
119    }
120}
121
122impl Default for ZeroPool {
123    /// Creates a new thread pool with default settings
124    ///
125    /// Equivalent to calling `ZeroPool::new()`. Worker count is determined by
126    /// `std::thread::available_parallelism()`, falling back to 1 if unavailable.
127    fn default() -> Self {
128        Self::new()
129    }
130}
131
132impl Drop for ZeroPool {
133    fn drop(&mut self) {
134        self.queue.shutdown();
135
136        let workers = std::mem::take(&mut self.workers);
137        for handle in workers {
138            let _ = handle.join();
139        }
140    }
141}