zero_pool/pool.rs
1use crate::{
2 atomic_latch::AtomicLatch, queue::Queue, task_future::TaskFuture, worker::spawn_worker,
3};
4use std::{
5 num::NonZeroUsize,
6 sync::Arc,
7 thread::{self, JoinHandle},
8};
9
10pub struct ZeroPool {
11 queue: Arc<Queue>,
12 workers: Box<[JoinHandle<()>]>,
13}
14
15impl ZeroPool {
16 /// Creates a new thread pool with worker count equal to available parallelism
17 ///
18 /// Worker count is determined by `std::thread::available_parallelism()`,
19 /// falling back to 1 if unavailable. This is usually the optimal choice
20 /// for CPU-bound workloads.
21 ///
22 /// # Examples
23 ///
24 /// ```rust
25 /// use zero_pool::ZeroPool;
26 /// let pool = ZeroPool::new();
27 /// ```
28 pub fn new() -> Self {
29 let worker_count = thread::available_parallelism().unwrap_or(NonZeroUsize::MIN);
30 Self::with_workers(worker_count)
31 }
32
33 /// Creates a new thread pool with the specified number of workers
34 ///
35 /// Use this when you need precise control over the worker count,
36 /// for example when coordinating with other thread pools or
37 /// when you know the optimal count for your specific workload.
38 ///
39 /// # Examples
40 ///
41 /// ```rust
42 /// use std::num::NonZeroUsize;
43 /// use zero_pool::ZeroPool;
44 /// let pool = ZeroPool::with_workers(NonZeroUsize::new(4).unwrap());
45 /// ```
46 pub fn with_workers(worker_count: NonZeroUsize) -> Self {
47 let worker_count = worker_count.get();
48
49 let queue = Arc::new(Queue::new(worker_count));
50
51 let latch = AtomicLatch::new(worker_count);
52 let workers = (0..worker_count)
53 .map(|id| spawn_worker(id, queue.clone(), latch.clone()))
54 .collect();
55
56 latch.wait();
57
58 ZeroPool { queue, workers }
59 }
60
61 /// Submit a single typed task
62 ///
63 /// The parameter struct must remain valid until the future completes.
64 /// This is the recommended method for submitting individual tasks.
65 ///
66 /// # Examples
67 ///
68 /// ```rust
69 /// use zero_pool::ZeroPool;
70 ///
71 /// struct MyTaskParams { value: u64, result: *mut u64 }
72 ///
73 /// fn my_task_fn(params: &MyTaskParams) {
74 /// unsafe { *params.result = params.value * 2; }
75 /// }
76 ///
77 /// let pool = ZeroPool::new();
78 /// let mut result = 0u64;
79 /// let task_params = MyTaskParams { value: 42, result: &mut result };
80 /// let future = pool.submit_task(my_task_fn, &task_params);
81 /// future.wait();
82 /// assert_eq!(result, 84);
83 /// ```
84 #[inline]
85 pub fn submit_task<T>(&self, task_fn: fn(&T), params: &T) -> TaskFuture {
86 let slice = std::slice::from_ref(params);
87 self.queue.push_task_batch(task_fn, slice)
88 }
89
90 /// Submit a batch of uniform tasks
91 ///
92 /// All tasks in the batch must be the same type and use the same task function.
93 /// This method handles the pointer conversion automatically and is the most
94 /// convenient way to submit large batches of similar work.
95 ///
96 /// # Examples
97 ///
98 /// ```rust
99 /// use zero_pool::ZeroPool;
100 ///
101 /// struct MyTaskParams { value: u64, result: *mut u64 }
102 ///
103 /// fn my_task_fn(params: &MyTaskParams) {
104 /// unsafe { *params.result = params.value * 2; }
105 /// }
106 ///
107 /// let pool = ZeroPool::new();
108 /// let mut results = vec![0u64; 1000];
109 /// let task_params: Vec<_> = (0..1000)
110 /// .map(|i| MyTaskParams { value: i as u64, result: &mut results[i] })
111 /// .collect();
112 /// let future = pool.submit_batch(my_task_fn, &task_params);
113 /// future.wait();
114 /// assert_eq!(results[0], 0);
115 /// assert_eq!(results[1], 2);
116 /// assert_eq!(results[999], 1998);
117 /// ```
118 #[inline]
119 pub fn submit_batch<T>(&self, task_fn: fn(&T), params_vec: &[T]) -> TaskFuture {
120 self.queue.push_task_batch(task_fn, params_vec)
121 }
122}
123
124impl Default for ZeroPool {
125 /// Creates a new thread pool with default settings
126 ///
127 /// Equivalent to calling `ZeroPool::new()`. Worker count is determined by
128 /// `std::thread::available_parallelism()`, falling back to 1 if unavailable.
129 fn default() -> Self {
130 Self::new()
131 }
132}
133
134impl Drop for ZeroPool {
135 fn drop(&mut self) {
136 self.queue.shutdown();
137
138 let workers = std::mem::take(&mut self.workers);
139 for handle in workers {
140 let _ = handle.join();
141 }
142 }
143}