zero_pool/
pool.rs

1use crate::{
2    TaskFnPointer, TaskItem, TaskParamPointer, queue::Queue, task_future::TaskFuture,
3    uniform_tasks_to_pointers, worker::spawn_worker,
4};
5use std::{sync::Arc, thread::JoinHandle};
6
7pub struct ZeroPool {
8    queue: Arc<Queue>,
9    workers: Vec<JoinHandle<()>>,
10}
11
12impl ZeroPool {
13    /// Creates a new thread pool with worker count equal to available parallelism
14    ///
15    /// Worker count is determined by `std::thread::available_parallelism()`,
16    /// falling back to 1 if unavailable. This is usually the optimal choice
17    /// for CPU-bound workloads.
18    ///
19    /// # Examples
20    ///
21    /// ```rust
22    /// use zero_pool::ZeroPool;
23    /// let pool = ZeroPool::new();
24    /// ```
25    pub fn new() -> Self {
26        let worker_count = std::thread::available_parallelism()
27            .map(std::num::NonZeroUsize::get)
28            .unwrap_or(1);
29        Self::with_workers(worker_count)
30    }
31
32    /// Creates a new thread pool with the specified number of workers
33    ///
34    /// Use this when you need precise control over the worker count,
35    /// for example when coordinating with other thread pools or
36    /// when you know the optimal count for your specific workload.
37    ///
38    /// # Panics
39    ///
40    /// Panics if `worker_count` is 0.
41    ///
42    /// ```rust
43    /// use zero_pool::ZeroPool;
44    /// let pool = ZeroPool::with_workers(4);
45    /// ```
46    pub fn with_workers(worker_count: usize) -> Self {
47        assert!(worker_count > 0, "Must have at least one worker");
48
49        let queue = Arc::new(Queue::new());
50
51        let workers: Vec<JoinHandle<()>> = (0..worker_count)
52            .map(|id| spawn_worker(id, queue.clone()))
53            .collect();
54
55        ZeroPool { queue, workers }
56    }
57
58    /// Submit a single task using raw function and parameter pointers
59    ///
60    /// This is the lowest-level submission method with minimal overhead.
61    /// Most users should prefer the type-safe `submit_task` method.
62    ///
63    /// # Safety
64    ///
65    /// The parameter pointer must remain valid until the returned future completes.
66    /// The caller must ensure the pointer points to the correct parameter type
67    /// expected by the task function.
68    ///
69    /// # Examples
70    ///
71    /// ```rust
72    /// use zero_pool::{ZeroPool, TaskFnPointer, TaskParamPointer, zp_task_params, zp_define_task_fn, zp_write};
73    ///
74    /// zp_task_params! {
75    ///     MyTask { value: u64, result: *mut u64 }
76    /// }
77    ///
78    /// zp_define_task_fn!(my_task, MyTask, |params| {
79    ///     zp_write!(params.result, params.value * 2);
80    /// });
81    ///
82    /// let pool = ZeroPool::new();
83    /// let mut result = 0u64;
84    /// let params = MyTask::new(42, &mut result);
85    /// let future = pool.submit_raw_task(
86    ///     my_task as TaskFnPointer,
87    ///     &params as *const _ as TaskParamPointer
88    /// );
89    /// future.wait();
90    /// ```
91    pub fn submit_raw_task(&self, task_fn: TaskFnPointer, params: TaskParamPointer) -> TaskFuture {
92        self.queue.push_single_task(task_fn, params)
93    }
94
95    /// Submit a batch of tasks using raw work items
96    ///
97    /// This method provides optimal performance for pre-converted task batches.
98    /// Use `uniform_tasks_to_pointers` to convert typed tasks efficiently,
99    /// or build work items manually for mixed task types.
100    ///
101    /// # Examples
102    ///
103    /// ```rust
104    /// use zero_pool::{ZeroPool, uniform_tasks_to_pointers, zp_task_params, zp_define_task_fn, zp_write};
105    ///
106    /// zp_task_params! {
107    ///     MyTask { value: u64, result: *mut u64 }
108    /// }
109    ///
110    /// zp_define_task_fn!(my_task, MyTask, |params| {
111    ///     zp_write!(params.result, params.value * 2);
112    /// });
113    ///
114    /// let pool = ZeroPool::new();
115    /// let mut results = [0u64; 2];
116    /// let params_vec = [
117    ///     MyTask::new(1, &mut results[0]),
118    ///     MyTask::new(2, &mut results[1])
119    /// ];
120    /// let tasks = uniform_tasks_to_pointers(my_task, &params_vec);
121    /// let future = pool.submit_raw_task_batch(&tasks);
122    /// future.wait();
123    /// ```
124    pub fn submit_raw_task_batch(&self, tasks: &[TaskItem]) -> TaskFuture {
125        self.queue.push_task_batch(tasks)
126    }
127
128    /// Submit a single typed task with automatic pointer conversion
129    ///
130    /// This method provides type safety while maintaining performance.
131    /// The parameter struct must remain valid until the future completes.
132    /// This is the recommended method for submitting individual tasks.
133    ///
134    /// # Examples
135    ///
136    /// ```rust
137    /// use zero_pool::{ZeroPool, zp_task_params, zp_define_task_fn, zp_write};
138    ///
139    /// zp_task_params! {
140    ///     MyTask { value: u64, result: *mut u64 }
141    /// }
142    ///
143    /// zp_define_task_fn!(my_task_fn, MyTask, |params| {
144    ///     zp_write!(params.result, params.value * 2);
145    /// });
146    ///
147    /// let pool = ZeroPool::new();
148    /// let mut result = 0u64;
149    /// let task_params = MyTask::new(42, &mut result);
150    /// let future = pool.submit_task(my_task_fn, &task_params);
151    /// future.wait();
152    /// assert_eq!(result, 84);
153    /// ```
154    pub fn submit_task<T>(&self, task_fn: TaskFnPointer, params: &T) -> TaskFuture {
155        let params_ptr = params as *const T as TaskParamPointer;
156        self.submit_raw_task(task_fn, params_ptr)
157    }
158
159    /// Submit a batch of uniform tasks with automatic pointer conversion
160    ///
161    /// All tasks in the batch must be the same type and use the same task function.
162    /// This method handles the pointer conversion automatically and is the most
163    /// convenient way to submit large batches of similar work.
164    ///
165    /// # Examples
166    ///
167    /// ```rust
168    /// use zero_pool::{ZeroPool, zp_task_params, zp_define_task_fn, zp_write};
169    ///
170    /// zp_task_params! {
171    ///     MyTask { value: u64, result: *mut u64 }
172    /// }
173    ///
174    /// zp_define_task_fn!(my_task_fn, MyTask, |params| {
175    ///     zp_write!(params.result, params.value * 2);
176    /// });
177    ///
178    /// let pool = ZeroPool::new();
179    /// let mut results = vec![0u64; 1000];
180    /// let tasks: Vec<_> = (0..1000)
181    ///     .map(|i| MyTask::new(i as u64, &mut results[i]))
182    ///     .collect();
183    /// let future = pool.submit_batch_uniform(my_task_fn, &tasks);
184    /// future.wait();
185    /// ```
186    pub fn submit_batch_uniform<T>(&self, task_fn: TaskFnPointer, params_vec: &[T]) -> TaskFuture {
187        let tasks = uniform_tasks_to_pointers(task_fn, params_vec);
188        self.submit_raw_task_batch(&tasks)
189    }
190}
191
192impl Drop for ZeroPool {
193    fn drop(&mut self) {
194        self.queue.shutdown();
195
196        let workers = std::mem::take(&mut self.workers);
197        for handle in workers {
198            let _ = handle.join();
199        }
200    }
201}