zero_pool/pool.rs
1use crate::{TaskFnPointer, queue::Queue, task_future::TaskFuture, worker::spawn_worker};
2use std::{
3 sync::{Arc, Barrier},
4 thread::JoinHandle,
5};
6
7pub struct ZeroPool {
8 queue: Arc<Queue>,
9 workers: Vec<JoinHandle<()>>,
10}
11
12impl ZeroPool {
13 /// Creates a new thread pool with worker count equal to available parallelism
14 ///
15 /// Worker count is determined by `std::thread::available_parallelism()`,
16 /// falling back to 1 if unavailable. This is usually the optimal choice
17 /// for CPU-bound workloads.
18 ///
19 /// # Examples
20 ///
21 /// ```rust
22 /// use zero_pool::ZeroPool;
23 /// let pool = ZeroPool::new();
24 /// ```
25 pub fn new() -> Self {
26 let worker_count = std::thread::available_parallelism()
27 .map(std::num::NonZeroUsize::get)
28 .unwrap_or(1);
29 Self::with_workers(worker_count)
30 }
31
32 /// Creates a new thread pool with the specified number of workers
33 ///
34 /// Use this when you need precise control over the worker count,
35 /// for example when coordinating with other thread pools or
36 /// when you know the optimal count for your specific workload.
37 ///
38 /// # Panics
39 ///
40 /// Panics if `worker_count` is 0.
41 ///
42 /// ```rust
43 /// use zero_pool::ZeroPool;
44 /// let pool = ZeroPool::with_workers(4);
45 /// ```
46 pub fn with_workers(worker_count: usize) -> Self {
47 assert!(worker_count > 0, "Must have at least one worker");
48
49 let queue = Arc::new(Queue::new(worker_count));
50
51 let barrier = Arc::new(Barrier::new(worker_count + 1));
52 let workers: Vec<JoinHandle<()>> = (0..worker_count)
53 .map(|id| spawn_worker(id, queue.clone(), barrier.clone()))
54 .collect();
55
56 barrier.wait();
57
58 ZeroPool { queue, workers }
59 }
60
61 /// Submit a single typed task with automatic pointer conversion
62 ///
63 /// This method provides type safety while maintaining performance.
64 /// The parameter struct must remain valid until the future completes.
65 /// This is the recommended method for submitting individual tasks.
66 ///
67 /// # Examples
68 ///
69 /// ```rust
70 /// use zero_pool::{ZeroPool, zp_define_task_fn, zp_write};
71 ///
72 /// struct MyTaskParams { value: u64, result: *mut u64 }
73 ///
74 /// zp_define_task_fn!(my_task_fn, MyTaskParams, |params| {
75 /// zp_write!(params.result, params.value * 2);
76 /// });
77 ///
78 /// let pool = ZeroPool::new();
79 /// let mut result = 0u64;
80 /// let task_params = MyTaskParams { value: 42, result: &mut result };
81 /// let future = pool.submit_task(my_task_fn, &task_params);
82 /// future.wait();
83 /// assert_eq!(result, 84);
84 /// ```
85 #[inline]
86 pub fn submit_task<T>(&self, task_fn: TaskFnPointer, params: &T) -> TaskFuture {
87 let slice = std::slice::from_ref(params);
88 self.queue.push_task_batch(task_fn, slice)
89 }
90
91 /// Submit a batch of uniform tasks with automatic pointer conversion
92 ///
93 /// All tasks in the batch must be the same type and use the same task function.
94 /// This method handles the pointer conversion automatically and is the most
95 /// convenient way to submit large batches of similar work.
96 ///
97 /// # Examples
98 ///
99 /// ```rust
100 /// use zero_pool::{ZeroPool, zp_define_task_fn, zp_write};
101 ///
102 /// struct MyTaskParams { value: u64, result: *mut u64 }
103 ///
104 /// zp_define_task_fn!(my_task_fn, MyTaskParams, |params| {
105 /// zp_write!(params.result, params.value * 2);
106 /// });
107 ///
108 /// let pool = ZeroPool::new();
109 /// let mut results = vec![0u64; 1000];
110 /// let task_params: Vec<_> = (0..1000)
111 /// .map(|i| MyTaskParams { value: i as u64, result: &mut results[i] })
112 /// .collect();
113 /// let future = pool.submit_batch_uniform(my_task_fn, &task_params);
114 /// future.wait();
115 /// assert_eq!(results[0], 0);
116 /// assert_eq!(results[1], 2);
117 /// assert_eq!(results[999], 1998);
118 /// ```
119 #[inline]
120 pub fn submit_batch_uniform<T>(&self, task_fn: TaskFnPointer, params_vec: &[T]) -> TaskFuture {
121 self.queue.push_task_batch(task_fn, params_vec)
122 }
123}
124
125impl Default for ZeroPool {
126 /// Creates a new thread pool with default settings
127 ///
128 /// Equivalent to calling `ZeroPool::new()`. Worker count is determined by
129 /// `std::thread::available_parallelism()`, falling back to 1 if unavailable.
130 fn default() -> Self {
131 Self::new()
132 }
133}
134
135impl Drop for ZeroPool {
136 fn drop(&mut self) {
137 self.queue.shutdown();
138
139 let workers = std::mem::take(&mut self.workers);
140 for handle in workers {
141 let _ = handle.join();
142 }
143 }
144}