zero_pool/pool.rs
1use crate::{TaskFnPointer, queue::Queue, task_future::TaskFuture, worker::spawn_worker};
2use std::{
3 num::NonZeroUsize,
4 sync::{Arc, Barrier},
5 thread::{self, JoinHandle},
6};
7
8pub struct ZeroPool {
9 queue: Arc<Queue>,
10 workers: Vec<JoinHandle<()>>,
11}
12
13impl ZeroPool {
14 /// Creates a new thread pool with worker count equal to available parallelism
15 ///
16 /// Worker count is determined by `std::thread::available_parallelism()`,
17 /// falling back to 1 if unavailable. This is usually the optimal choice
18 /// for CPU-bound workloads.
19 ///
20 /// # Examples
21 ///
22 /// ```rust
23 /// use zero_pool::ZeroPool;
24 /// let pool = ZeroPool::new();
25 /// ```
26 pub fn new() -> Self {
27 let worker_count = thread::available_parallelism().unwrap_or(NonZeroUsize::MIN);
28 Self::with_workers(worker_count)
29 }
30
31 /// Creates a new thread pool with the specified number of workers
32 ///
33 /// Use this when you need precise control over the worker count,
34 /// for example when coordinating with other thread pools or
35 /// when you know the optimal count for your specific workload.
36 ///
37 /// # Examples
38 ///
39 /// ```rust
40 /// use std::num::NonZeroUsize;
41 /// use zero_pool::ZeroPool;
42 /// let pool = ZeroPool::with_workers(NonZeroUsize::new(4).unwrap());
43 /// ```
44 pub fn with_workers(worker_count: NonZeroUsize) -> Self {
45 let worker_count = worker_count.get();
46
47 let queue = Arc::new(Queue::new(worker_count));
48
49 let barrier = Arc::new(Barrier::new(worker_count + 1));
50 let workers: Vec<JoinHandle<()>> = (0..worker_count)
51 .map(|id| spawn_worker(id, queue.clone(), barrier.clone()))
52 .collect();
53
54 barrier.wait();
55
56 ZeroPool { queue, workers }
57 }
58
59 /// Submit a single typed task with automatic pointer conversion
60 ///
61 /// This method provides type safety while maintaining performance.
62 /// The parameter struct must remain valid until the future completes.
63 /// This is the recommended method for submitting individual tasks.
64 ///
65 /// # Examples
66 ///
67 /// ```rust
68 /// use zero_pool::{ZeroPool, zp_define_task_fn, zp_write};
69 ///
70 /// struct MyTaskParams { value: u64, result: *mut u64 }
71 ///
72 /// zp_define_task_fn!(my_task_fn, MyTaskParams, |params| {
73 /// zp_write!(params.result, params.value * 2);
74 /// });
75 ///
76 /// let pool = ZeroPool::new();
77 /// let mut result = 0u64;
78 /// let task_params = MyTaskParams { value: 42, result: &mut result };
79 /// let future = pool.submit_task(my_task_fn, &task_params);
80 /// future.wait();
81 /// assert_eq!(result, 84);
82 /// ```
83 #[inline]
84 pub fn submit_task<T>(&self, task_fn: TaskFnPointer, params: &T) -> TaskFuture {
85 let slice = std::slice::from_ref(params);
86 self.queue.push_task_batch(task_fn, slice)
87 }
88
89 /// Submit a batch of uniform tasks with automatic pointer conversion
90 ///
91 /// All tasks in the batch must be the same type and use the same task function.
92 /// This method handles the pointer conversion automatically and is the most
93 /// convenient way to submit large batches of similar work.
94 ///
95 /// # Examples
96 ///
97 /// ```rust
98 /// use zero_pool::{ZeroPool, zp_define_task_fn, zp_write};
99 ///
100 /// struct MyTaskParams { value: u64, result: *mut u64 }
101 ///
102 /// zp_define_task_fn!(my_task_fn, MyTaskParams, |params| {
103 /// zp_write!(params.result, params.value * 2);
104 /// });
105 ///
106 /// let pool = ZeroPool::new();
107 /// let mut results = vec![0u64; 1000];
108 /// let task_params: Vec<_> = (0..1000)
109 /// .map(|i| MyTaskParams { value: i as u64, result: &mut results[i] })
110 /// .collect();
111 /// let future = pool.submit_batch_uniform(my_task_fn, &task_params);
112 /// future.wait();
113 /// assert_eq!(results[0], 0);
114 /// assert_eq!(results[1], 2);
115 /// assert_eq!(results[999], 1998);
116 /// ```
117 #[inline]
118 pub fn submit_batch_uniform<T>(&self, task_fn: TaskFnPointer, params_vec: &[T]) -> TaskFuture {
119 self.queue.push_task_batch(task_fn, params_vec)
120 }
121}
122
123impl Default for ZeroPool {
124 /// Creates a new thread pool with default settings
125 ///
126 /// Equivalent to calling `ZeroPool::new()`. Worker count is determined by
127 /// `std::thread::available_parallelism()`, falling back to 1 if unavailable.
128 fn default() -> Self {
129 Self::new()
130 }
131}
132
133impl Drop for ZeroPool {
134 fn drop(&mut self) {
135 self.queue.shutdown();
136
137 let workers = std::mem::take(&mut self.workers);
138 for handle in workers {
139 let _ = handle.join();
140 }
141 }
142}