zero_pool/
pool.rs

1use crate::{
2    TaskFnPointer, TaskParamPointer, WorkItem, future::WorkFuture, queue::BatchQueue,
3    uniform_tasks_to_pointers, worker::spawn_worker,
4};
5use std::{sync::Arc, thread::JoinHandle};
6
7pub struct ZeroPool {
8    queue: Arc<BatchQueue>,
9    workers: Vec<JoinHandle<()>>,
10}
11
12impl ZeroPool {
13    pub fn new() -> Self {
14        let worker_count = std::thread::available_parallelism()
15            .map(std::num::NonZeroUsize::get)
16            .unwrap_or(1);
17        Self::with_workers(worker_count)
18    }
19
20    pub fn with_workers(worker_count: usize) -> Self {
21        assert!(worker_count > 0, "Must have at least one worker");
22
23        let queue = Arc::new(BatchQueue::new());
24
25        let workers: Vec<JoinHandle<()>> = (0..worker_count)
26            .map(|id| spawn_worker(id, queue.clone()))
27            .collect();
28
29        ZeroPool { queue, workers }
30    }
31
32    pub fn submit_raw_task(&self, task_fn: TaskFnPointer, params: TaskParamPointer) -> WorkFuture {
33        self.queue.push_single_task(task_fn, params)
34    }
35
36    pub fn submit_raw_task_batch(&self, tasks: &[WorkItem]) -> WorkFuture {
37        self.queue.push_task_batch(tasks)
38    }
39
40    pub fn submit_task<T>(&self, task_fn: TaskFnPointer, params: &T) -> WorkFuture {
41        let params_ptr = params as *const T as TaskParamPointer;
42        self.submit_raw_task(task_fn, params_ptr)
43    }
44
45    pub fn submit_batch_uniform<T>(&self, task_fn: TaskFnPointer, params_vec: &[T]) -> WorkFuture {
46        let tasks = uniform_tasks_to_pointers(task_fn, params_vec);
47        self.submit_raw_task_batch(&tasks)
48    }
49}
50
51impl Drop for ZeroPool {
52    fn drop(&mut self) {
53        self.queue.shutdown();
54
55        let workers = std::mem::take(&mut self.workers);
56        for handle in workers {
57            let _ = handle.join();
58        }
59    }
60}