zero_pool/
pool.rs

1use crate::{TaskFnPointer, queue::Queue, task_future::TaskFuture, worker::spawn_worker};
2use std::{sync::Arc, sync::Barrier, thread::JoinHandle};
3
4pub struct ZeroPool {
5    queue: Arc<Queue>,
6    workers: Vec<JoinHandle<()>>,
7}
8
9impl ZeroPool {
10    /// Creates a new thread pool with worker count equal to available parallelism
11    ///
12    /// Worker count is determined by `std::thread::available_parallelism()`,
13    /// falling back to 1 if unavailable. This is usually the optimal choice
14    /// for CPU-bound workloads.
15    ///
16    /// # Examples
17    ///
18    /// ```rust
19    /// use zero_pool::ZeroPool;
20    /// let pool = ZeroPool::new();
21    /// ```
22    pub fn new() -> Self {
23        let worker_count = std::thread::available_parallelism()
24            .map(std::num::NonZeroUsize::get)
25            .unwrap_or(1);
26        Self::with_workers(worker_count)
27    }
28
29    /// Creates a new thread pool with the specified number of workers
30    ///
31    /// Use this when you need precise control over the worker count,
32    /// for example when coordinating with other thread pools or
33    /// when you know the optimal count for your specific workload.
34    ///
35    /// # Panics
36    ///
37    /// Panics if `worker_count` is 0.
38    ///
39    /// ```rust
40    /// use zero_pool::ZeroPool;
41    /// let pool = ZeroPool::with_workers(4);
42    /// ```
43    pub fn with_workers(worker_count: usize) -> Self {
44        assert!(worker_count > 0, "Must have at least one worker");
45
46        let queue = Arc::new(Queue::new(worker_count));
47
48        let barrier = Arc::new(Barrier::new(worker_count + 1));
49        let workers: Vec<JoinHandle<()>> = (0..worker_count)
50            .map(|id| spawn_worker(id, queue.clone(), barrier.clone()))
51            .collect();
52
53        barrier.wait();
54
55        ZeroPool { queue, workers }
56    }
57
58    /// Submit a single typed task with automatic pointer conversion
59    ///
60    /// This method provides type safety while maintaining performance.
61    /// The parameter struct must remain valid until the future completes.
62    /// This is the recommended method for submitting individual tasks.
63    ///
64    /// # Examples
65    ///
66    /// ```rust
67    /// use zero_pool::{ZeroPool, zp_define_task_fn, zp_write};
68    ///
69    /// struct MyTaskParams { value: u64, result: *mut u64 }
70    ///
71    /// zp_define_task_fn!(my_task_fn, MyTaskParams, |params| {
72    ///     zp_write!(params.result, params.value * 2);
73    /// });
74    ///
75    /// let pool = ZeroPool::new();
76    /// let mut result = 0u64;
77    /// let task_params = MyTaskParams { value: 42, result: &mut result };
78    /// let future = pool.submit_task(my_task_fn, &task_params);
79    /// future.wait();
80    /// assert_eq!(result, 84);
81    /// ```
82    pub fn submit_task<T>(&self, task_fn: TaskFnPointer, params: &T) -> TaskFuture {
83        let slice = std::slice::from_ref(params);
84        self.queue.push_task_batch(task_fn, slice)
85    }
86
87    /// Submit a batch of uniform tasks with automatic pointer conversion
88    ///
89    /// All tasks in the batch must be the same type and use the same task function.
90    /// This method handles the pointer conversion automatically and is the most
91    /// convenient way to submit large batches of similar work.
92    ///
93    /// # Examples
94    ///
95    /// ```rust
96    /// use zero_pool::{ZeroPool, zp_define_task_fn, zp_write};
97    ///
98    /// struct MyTaskParams { value: u64, result: *mut u64 }
99    ///
100    /// zp_define_task_fn!(my_task_fn, MyTaskParams, |params| {
101    ///     zp_write!(params.result, params.value * 2);
102    /// });
103    ///
104    /// let pool = ZeroPool::new();
105    /// let mut results = vec![0u64; 1000];
106    /// let task_params: Vec<_> = (0..1000)
107    ///     .map(|i| MyTaskParams { value: i as u64, result: &mut results[i] })
108    ///     .collect();
109    /// let future = pool.submit_batch_uniform(my_task_fn, &task_params);
110    /// future.wait();
111    /// assert_eq!(results[0], 0);
112    /// assert_eq!(results[1], 2);
113    /// assert_eq!(results[999], 1998);
114    /// ```
115    pub fn submit_batch_uniform<T>(&self, task_fn: TaskFnPointer, params_vec: &[T]) -> TaskFuture {
116        self.queue.push_task_batch(task_fn, params_vec)
117    }
118}
119
120impl Drop for ZeroPool {
121    fn drop(&mut self) {
122        self.queue.shutdown();
123
124        let workers = std::mem::take(&mut self.workers);
125        for handle in workers {
126            let _ = handle.join();
127        }
128    }
129}