use std::num::NonZero;
use std::thread::available_parallelism;
use crossbeam::channel;
use super::{Builder, JoinHandle, ThreadPriority};
pub struct Pool {
job_sender: channel::Sender<Job>,
_handles: Vec<JoinHandle>,
parallelism: NonZero<usize>,
}
struct Job {
requested_priority: ThreadPriority,
f: Box<dyn FnOnce() + Send + 'static>,
}
impl Pool {
pub fn new() -> Pool {
const STACK_SIZE: usize = 2 * 1024 * 1024;
const INITIAL_PRIORITY: ThreadPriority = ThreadPriority::Worker;
const DEFAULT_PARALLELISM: usize = 4;
let threads = available_parallelism().map(usize::from).unwrap_or(DEFAULT_PARALLELISM);
let (job_sender, job_receiver) = channel::unbounded();
let mut handles = Vec::with_capacity(threads);
for i in 0..threads {
let handle = Builder::new(INITIAL_PRIORITY)
.stack_size(STACK_SIZE)
.name(format!("cairo-ls:worker:{i}"))
.spawn({
let job_receiver: channel::Receiver<Job> = job_receiver.clone();
move || {
let mut current_priority = INITIAL_PRIORITY;
for job in job_receiver {
if job.requested_priority != current_priority {
job.requested_priority.apply_to_current_thread();
current_priority = job.requested_priority;
}
(job.f)();
}
}
})
.expect("failed to spawn thread");
handles.push(handle);
}
Pool { _handles: handles, job_sender, parallelism: NonZero::new(threads).unwrap() }
}
pub fn spawn<F>(&self, priority: ThreadPriority, f: F)
where
F: FnOnce() + Send + 'static,
{
let f = Box::new(move || {
if cfg!(debug_assertions) {
priority.assert_is_used_on_current_thread();
}
f();
});
let job = Job { requested_priority: priority, f };
self.job_sender.send(job).unwrap();
}
pub fn parallelism(&self) -> NonZero<usize> {
self.parallelism
}
}