zero_pool/pool.rs
1use crate::{TaskFnPointer, queue::Queue, task_future::TaskFuture, worker::spawn_worker};
2use std::{sync::Arc, sync::Barrier, thread::JoinHandle};
3
4pub struct ZeroPool {
5 queue: Arc<Queue>,
6 workers: Vec<JoinHandle<()>>,
7}
8
9impl ZeroPool {
10 /// Creates a new thread pool with worker count equal to available parallelism
11 ///
12 /// Worker count is determined by `std::thread::available_parallelism()`,
13 /// falling back to 1 if unavailable. This is usually the optimal choice
14 /// for CPU-bound workloads.
15 ///
16 /// # Examples
17 ///
18 /// ```rust
19 /// use zero_pool::ZeroPool;
20 /// let pool = ZeroPool::new();
21 /// ```
22 pub fn new() -> Self {
23 let worker_count = std::thread::available_parallelism()
24 .map(std::num::NonZeroUsize::get)
25 .unwrap_or(1);
26 Self::with_workers(worker_count)
27 }
28
29 /// Creates a new thread pool with the specified number of workers
30 ///
31 /// Use this when you need precise control over the worker count,
32 /// for example when coordinating with other thread pools or
33 /// when you know the optimal count for your specific workload.
34 ///
35 /// # Panics
36 ///
37 /// Panics if `worker_count` is 0.
38 ///
39 /// ```rust
40 /// use zero_pool::ZeroPool;
41 /// let pool = ZeroPool::with_workers(4);
42 /// ```
43 pub fn with_workers(worker_count: usize) -> Self {
44 assert!(worker_count > 0, "Must have at least one worker");
45
46 let queue = Arc::new(Queue::new(worker_count));
47
48 let barrier = Arc::new(Barrier::new(worker_count + 1));
49 let workers: Vec<JoinHandle<()>> = (0..worker_count)
50 .map(|id| spawn_worker(id, queue.clone(), barrier.clone()))
51 .collect();
52
53 barrier.wait();
54
55 ZeroPool { queue, workers }
56 }
57
58 /// Submit a single typed task with automatic pointer conversion
59 ///
60 /// This method provides type safety while maintaining performance.
61 /// The parameter struct must remain valid until the future completes.
62 /// This is the recommended method for submitting individual tasks.
63 ///
64 /// # Examples
65 ///
66 /// ```rust
67 /// use zero_pool::{ZeroPool, zp_define_task_fn, zp_write};
68 ///
69 /// struct MyTaskParams { value: u64, result: *mut u64 }
70 ///
71 /// zp_define_task_fn!(my_task_fn, MyTaskParams, |params| {
72 /// zp_write!(params.result, params.value * 2);
73 /// });
74 ///
75 /// let pool = ZeroPool::new();
76 /// let mut result = 0u64;
77 /// let task_params = MyTaskParams { value: 42, result: &mut result };
78 /// let future = pool.submit_task(my_task_fn, &task_params);
79 /// future.wait();
80 /// assert_eq!(result, 84);
81 /// ```
82 pub fn submit_task<T>(&self, task_fn: TaskFnPointer, params: &T) -> TaskFuture {
83 let slice = std::slice::from_ref(params);
84 self.queue.push_task_batch(task_fn, slice)
85 }
86
87 /// Submit a batch of uniform tasks with automatic pointer conversion
88 ///
89 /// All tasks in the batch must be the same type and use the same task function.
90 /// This method handles the pointer conversion automatically and is the most
91 /// convenient way to submit large batches of similar work.
92 ///
93 /// # Examples
94 ///
95 /// ```rust
96 /// use zero_pool::{ZeroPool, zp_define_task_fn, zp_write};
97 ///
98 /// struct MyTaskParams { value: u64, result: *mut u64 }
99 ///
100 /// zp_define_task_fn!(my_task_fn, MyTaskParams, |params| {
101 /// zp_write!(params.result, params.value * 2);
102 /// });
103 ///
104 /// let pool = ZeroPool::new();
105 /// let mut results = vec![0u64; 1000];
106 /// let task_params: Vec<_> = (0..1000)
107 /// .map(|i| MyTaskParams { value: i as u64, result: &mut results[i] })
108 /// .collect();
109 /// let future = pool.submit_batch_uniform(my_task_fn, &task_params);
110 /// future.wait();
111 /// assert_eq!(results[0], 0);
112 /// assert_eq!(results[1], 2);
113 /// assert_eq!(results[999], 1998);
114 /// ```
115 pub fn submit_batch_uniform<T>(&self, task_fn: TaskFnPointer, params_vec: &[T]) -> TaskFuture {
116 self.queue.push_task_batch(task_fn, params_vec)
117 }
118}
119
120impl Default for ZeroPool {
121 /// Creates a new thread pool with default settings
122 ///
123 /// Equivalent to calling `ZeroPool::new()`. Worker count is determined by
124 /// `std::thread::available_parallelism()`, falling back to 1 if unavailable.
125 fn default() -> Self {
126 Self::new()
127 }
128}
129
130impl Drop for ZeroPool {
131 fn drop(&mut self) {
132 self.queue.shutdown();
133
134 let workers = std::mem::take(&mut self.workers);
135 for handle in workers {
136 let _ = handle.join();
137 }
138 }
139}