use std::ffi::c_void;
use std::ptr;
use libxev_sys as sys;
#[derive(Debug, Clone, Copy, Default)]
pub struct Config {
pub stack_size: Option<u32>,
pub max_threads: Option<u32>,
}
pub struct ThreadPool {
raw: Box<sys::xev_threadpool>,
}
unsafe impl Send for ThreadPool {}
unsafe impl Sync for ThreadPool {}
impl ThreadPool {
pub fn new(config: Config) -> std::io::Result<Self> {
let mut raw: Box<std::mem::MaybeUninit<sys::xev_threadpool>> =
Box::new(std::mem::MaybeUninit::zeroed());
let mut cfg: sys::xev_threadpool_config =
unsafe { std::mem::zeroed::<sys::xev_threadpool_config>() };
unsafe { sys::xev_threadpool_config_init(&mut cfg) };
if let Some(v) = config.stack_size {
unsafe { sys::xev_threadpool_config_set_stack_size(&mut cfg, v) };
}
if let Some(v) = config.max_threads {
unsafe { sys::xev_threadpool_config_set_max_threads(&mut cfg, v) };
}
let rc = unsafe { sys::xev_threadpool_init(raw.as_mut_ptr(), &mut cfg) };
if rc != 0 {
return Err(std::io::Error::from_raw_os_error(rc));
}
let raw: Box<sys::xev_threadpool> = unsafe { Box::from_raw(Box::into_raw(raw).cast()) };
Ok(Self { raw })
}
pub fn schedule(&self, batch: &mut Batch) {
let raw = &*self.raw as *const sys::xev_threadpool as *mut sys::xev_threadpool;
unsafe { sys::xev_threadpool_schedule(raw, &mut batch.raw) };
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
unsafe {
sys::xev_threadpool_shutdown(&mut *self.raw);
sys::xev_threadpool_deinit(&mut *self.raw);
}
}
}
pub struct Task {
inner: *mut Inner,
}
unsafe impl Send for Task {}
#[repr(C)]
struct Inner {
pool_task: sys::xev_threadpool_task,
closure: Option<Box<dyn FnOnce() + Send + 'static>>,
}
impl Task {
pub fn new<F>(closure: F) -> Self
where
F: FnOnce() + Send + 'static,
{
let boxed: Box<dyn FnOnce() + Send + 'static> = Box::new(closure);
let inner = Box::new(Inner {
pool_task: unsafe { std::mem::zeroed() },
closure: Some(boxed),
});
let inner_ptr = Box::into_raw(inner);
unsafe {
sys::xev_threadpool_task_init(
ptr::addr_of_mut!((*inner_ptr).pool_task),
Some(trampoline),
);
}
Task { inner: inner_ptr }
}
}
unsafe extern "C" fn trampoline(t: *mut sys::xev_threadpool_task) {
let inner_ptr = t as *mut Inner;
let mut boxed = unsafe { Box::from_raw(inner_ptr) };
if let Some(closure) = boxed.closure.take() {
closure();
}
}
pub struct Batch {
raw: sys::xev_threadpool_batch,
}
impl Batch {
pub fn new() -> Self {
let mut raw: sys::xev_threadpool_batch = unsafe { std::mem::zeroed() };
unsafe { sys::xev_threadpool_batch_init(&mut raw) };
Batch { raw }
}
pub fn push(&mut self, task: Task) {
unsafe {
sys::xev_threadpool_batch_push_task(
&mut self.raw,
ptr::addr_of_mut!((*task.inner).pool_task),
);
}
let _ = task.inner;
#[expect(clippy::forget_non_drop)]
std::mem::forget(task);
}
}
impl Default for Batch {
fn default() -> Self {
Self::new()
}
}
#[allow(dead_code)]
fn _unused_cvoid(_: *mut c_void) {}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
#[test]
fn run_tasks_to_completion() {
let pool = ThreadPool::new(Config::default()).expect("init pool");
let counter = Arc::new(AtomicUsize::new(0));
let (tx, rx) = std::sync::mpsc::channel::<()>();
let mut batch = Batch::new();
for _ in 0..32 {
let counter = Arc::clone(&counter);
let tx = tx.clone();
batch.push(Task::new(move || {
counter.fetch_add(1, Ordering::SeqCst);
let _ = tx.send(());
}));
}
drop(tx);
pool.schedule(&mut batch);
let mut got = 0;
while rx.recv().is_ok() {
got += 1;
}
assert_eq!(got, 32);
assert_eq!(counter.load(Ordering::SeqCst), 32);
}
}