async-cpupool 0.4.0

A simple async threadpool for CPU-bound tasks
Documentation
use std::collections::VecDeque;

use crate::notify::Notify;
use crate::sync::{Arc, Mutex};

thread_local! {
    #[cfg(any(loom, test))]
    static QUEUE_COUNT: std::cell::RefCell<std::num::Wrapping<u64>> = std::cell::RefCell::new(std::num::Wrapping(0));
}

#[inline(always)]
fn increment_queue_count() {
    #[cfg(any(loom, test))]
    QUEUE_COUNT.with_borrow_mut(|v| *v += 1);
}

#[inline(always)]
fn decrement_queue_count() {
    #[cfg(any(loom, test))]
    QUEUE_COUNT.with_borrow_mut(|v| *v -= 1);
}

#[cfg(any(test, loom))]
#[doc(hidden)]
pub fn queue_count() -> u64 {
    QUEUE_COUNT.with_borrow(|v| v.0)
}

#[doc(hidden)]
pub fn bounded<T>(capacity: usize) -> Queue<T> {
    Queue::bounded(capacity)
}

#[doc(hidden)]
pub struct Queue<T> {
    inner: Arc<QueueState<T>>,
    capacity: usize,
}

struct QueueState<T> {
    queue: Mutex<VecDeque<T>>,
    push_notify: Notify,
    pop_notify: Notify,
}

impl<T> Queue<T> {
    #[doc(hidden)]
    pub fn bounded(capacity: usize) -> Self {
        increment_queue_count();
        metrics::counter!("async-cpupool.queue.created").increment(1);

        Self {
            inner: Arc::new(QueueState {
                queue: Mutex::new(VecDeque::new()),
                push_notify: Notify::new(),
                pop_notify: Notify::new(),
            }),
            capacity,
        }
    }

    #[doc(hidden)]
    pub fn len(&self) -> usize {
        self.inner.queue.lock().expect("not poisoned").len()
    }

    pub(super) fn is_full_or(&self) -> Result<(), usize> {
        let len = self.len();

        if len >= self.capacity {
            Ok(())
        } else {
            Err(len)
        }
    }

    #[doc(hidden)]
    pub async fn push(&self, mut item: T) {
        let Some(returned_item) = self.try_push(item) else {
            return;
        };

        item = returned_item;

        loop {
            let listener = self.inner.push_notify.listen().await;

            let Some(returned_item) = self.try_push(item) else {
                return;
            };

            item = returned_item;

            listener.await;
        }
    }

    #[doc(hidden)]
    pub fn try_push(&self, item: T) -> Option<T> {
        match self.try_push_impl(item) {
            Some(item) => Some(item),
            None => {
                self.inner.pop_notify.notify_one();
                metrics::counter!("async-cpupool.queue.pushed").increment(1);
                None
            }
        }
    }

    fn try_push_impl(&self, item: T) -> Option<T> {
        let mut guard = self.inner.queue.lock().expect("not poisoned");

        if self.capacity <= guard.len() {
            Some(item)
        } else {
            guard.push_back(item);
            None
        }
    }

    #[doc(hidden)]
    pub async fn pop(&self) -> T {
        if let Some(item) = self.try_pop() {
            return item;
        }

        loop {
            let listener = self.inner.pop_notify.listen().await;

            if let Some(item) = self.try_pop() {
                return item;
            }

            listener.await;
        }
    }

    #[doc(hidden)]
    pub fn try_pop(&self) -> Option<T> {
        let item = self.try_pop_impl()?;
        self.inner.push_notify.notify_one();
        metrics::counter!("async-cpupool.queue.popped").increment(1);

        Some(item)
    }

    fn try_pop_impl(&self) -> Option<T> {
        self.inner.queue.lock().expect("not poisoned").pop_front()
    }
}

impl<T> Clone for Queue<T> {
    fn clone(&self) -> Self {
        Self {
            inner: Arc::clone(&self.inner),
            capacity: self.capacity,
        }
    }
}

impl<T> Drop for QueueState<T> {
    fn drop(&mut self) {
        decrement_queue_count();
        metrics::counter!("async-cpupool.queue.dropped").increment(1);
    }
}