use std::{
cell::LazyCell,
sync::{
Arc, LazyLock, Mutex,
mpsc::{Sender, channel},
},
thread::{self, spawn},
};
pub trait LeptonThreadPool {
fn run(&self, f: Box<dyn FnOnce() + Send + 'static>);
}
pub enum ThreadPoolHolder<'a> {
Dyn(&'a dyn LeptonThreadPool),
Owned(Box<dyn LeptonThreadPool>),
}
impl LeptonThreadPool for ThreadPoolHolder<'_> {
fn run(&self, f: Box<dyn FnOnce() + Send + 'static>) {
match self {
ThreadPoolHolder::Dyn(p) => p.run(f),
ThreadPoolHolder::Owned(p) => p.run(f),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum LeptonThreadPriority {
Low,
#[default]
Normal,
High,
}
#[derive(Default)]
pub struct SimpleThreadPool {
priority: LeptonThreadPriority,
idle_threads: LazyLock<Arc<Mutex<Vec<Sender<Box<dyn FnOnce() + Send + 'static>>>>>>,
}
impl SimpleThreadPool {
pub const fn new(priority: LeptonThreadPriority) -> Self {
SimpleThreadPool {
priority,
idle_threads: LazyLock::new(|| Arc::new(Mutex::new(Vec::new()))),
}
}
#[allow(dead_code)]
pub fn get_idle_threads(&self) -> usize {
self.idle_threads.lock().unwrap().len()
}
fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
if let Some(sender) = self.idle_threads.lock().unwrap().pop() {
sender.send(Box::new(f)).unwrap();
} else {
let (tx_schedule, rx_schedule) = channel();
let priority = self.priority;
let idle_threads = self.idle_threads.clone();
spawn(move || {
#[cfg(any(target_os = "windows", target_os = "linux"))]
match priority {
LeptonThreadPriority::Low => thread_priority::set_current_thread_priority(
thread_priority::ThreadPriority::Min,
)
.unwrap(),
LeptonThreadPriority::Normal => {}
LeptonThreadPriority::High => thread_priority::set_current_thread_priority(
thread_priority::ThreadPriority::Max,
)
.unwrap(),
}
f();
loop {
if let Ok(mut i) = idle_threads.lock() {
if i.len() > *NUM_CPUS {
break;
}
i.push(tx_schedule.clone());
} else {
break;
}
if let Ok(f) = rx_schedule.recv() {
f();
} else {
break;
}
}
});
}
}
}
pub static DEFAULT_THREAD_POOL: SimpleThreadPool =
SimpleThreadPool::new(LeptonThreadPriority::Normal);
impl LeptonThreadPool for SimpleThreadPool {
fn run(&self, f: Box<dyn FnOnce() + Send + 'static>) {
self.execute(f);
}
}
static NUM_CPUS: LazyLock<usize> = LazyLock::new(|| thread::available_parallelism().unwrap().get());
#[test]
fn test_threadpool() {
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
let a: Arc<AtomicU32> = Arc::new(AtomicU32::new(0));
for _i in 0usize..100 {
let aref = a.clone();
DEFAULT_THREAD_POOL.execute(move || {
aref.fetch_add(1, Ordering::AcqRel);
});
}
while a.load(std::sync::atomic::Ordering::Acquire) < 100 {
thread::yield_now();
}
println!("Idle threads: {}", DEFAULT_THREAD_POOL.get_idle_threads());
}
pub struct SingleThreadPool {
sender: LazyCell<std::sync::mpsc::Sender<Box<dyn FnOnce() + Send + 'static>>>,
}
impl Default for SingleThreadPool {
fn default() -> Self {
let sender: LazyCell<std::sync::mpsc::Sender<Box<dyn FnOnce() + Send + 'static>>> =
LazyCell::new(|| {
let (tx, rx) = std::sync::mpsc::channel::<Box<dyn FnOnce() + Send + 'static>>();
DEFAULT_THREAD_POOL.run(Box::new(move || {
for job in rx {
job();
}
}));
tx
});
SingleThreadPool { sender }
}
}
impl LeptonThreadPool for SingleThreadPool {
fn run(&self, f: Box<dyn FnOnce() + Send + 'static>) {
self.sender.send(f).unwrap();
}
}