use std::{
sync::{
Arc, LazyLock, Mutex,
mpsc::{Sender, channel},
},
thread::{self, spawn},
};
pub trait LeptonThreadPool {
fn max_parallelism(&self) -> usize;
fn run(&self, f: Box<dyn FnOnce() + Send + 'static>);
}
pub enum ThreadPoolHolder<'a> {
Dyn(&'a dyn LeptonThreadPool),
Owned(Box<dyn LeptonThreadPool>),
}
impl LeptonThreadPool for ThreadPoolHolder<'_> {
fn max_parallelism(&self) -> usize {
match self {
ThreadPoolHolder::Dyn(p) => p.max_parallelism(),
ThreadPoolHolder::Owned(p) => p.max_parallelism(),
}
}
fn run(&self, f: Box<dyn FnOnce() + Send + 'static>) {
match self {
ThreadPoolHolder::Dyn(p) => p.run(f),
ThreadPoolHolder::Owned(p) => p.run(f),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum LeptonThreadPriority {
Low,
#[default]
Normal,
High,
}
#[derive(Default)]
pub struct SimpleThreadPool {
priority: LeptonThreadPriority,
idle_threads: LazyLock<Arc<Mutex<Vec<Sender<Box<dyn FnOnce() + Send + 'static>>>>>>,
}
impl SimpleThreadPool {
pub const fn new(priority: LeptonThreadPriority) -> Self {
SimpleThreadPool {
priority,
idle_threads: LazyLock::new(|| Arc::new(Mutex::new(Vec::new()))),
}
}
#[allow(dead_code)]
pub fn get_idle_threads(&self) -> usize {
self.idle_threads.lock().unwrap().len()
}
fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
if let Some(sender) = self.idle_threads.lock().unwrap().pop() {
sender.send(Box::new(f)).unwrap();
} else {
let (tx_schedule, rx_schedule) = channel();
let priority = self.priority;
let idle_threads = self.idle_threads.clone();
spawn(move || {
#[cfg(any(target_os = "windows", target_os = "linux"))]
match priority {
LeptonThreadPriority::Low => thread_priority::set_current_thread_priority(
thread_priority::ThreadPriority::Min,
)
.unwrap(),
LeptonThreadPriority::Normal => {}
LeptonThreadPriority::High => thread_priority::set_current_thread_priority(
thread_priority::ThreadPriority::Max,
)
.unwrap(),
}
f();
loop {
if let Ok(mut i) = idle_threads.lock() {
if i.len() > *NUM_CPUS {
break;
}
i.push(tx_schedule.clone());
} else {
break;
}
if let Ok(f) = rx_schedule.recv() {
f();
} else {
break;
}
}
});
}
}
}
pub static DEFAULT_THREAD_POOL: SimpleThreadPool =
SimpleThreadPool::new(LeptonThreadPriority::Normal);
impl LeptonThreadPool for SimpleThreadPool {
fn max_parallelism(&self) -> usize {
*NUM_CPUS
}
fn run(&self, f: Box<dyn FnOnce() + Send + 'static>) {
self.execute(f);
}
}
static NUM_CPUS: LazyLock<usize> = LazyLock::new(|| thread::available_parallelism().unwrap().get());
#[test]
fn test_threadpool() {
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
let a: Arc<AtomicU32> = Arc::new(AtomicU32::new(0));
for _i in 0usize..100 {
let aref = a.clone();
DEFAULT_THREAD_POOL.execute(move || {
aref.fetch_add(1, Ordering::AcqRel);
});
}
while a.load(std::sync::atomic::Ordering::Acquire) < 100 {
thread::yield_now();
}
println!("Idle threads: {}", DEFAULT_THREAD_POOL.get_idle_threads());
}
#[derive(Default)]
pub struct SingleThreadPool {}
impl LeptonThreadPool for SingleThreadPool {
fn max_parallelism(&self) -> usize {
1
}
fn run(&self, _f: Box<dyn FnOnce() + Send + 'static>) {
panic!("SingleThreadPool does not support run; execute directly instead");
}
}