async-cpupool 0.4.0

A simple async threadpool for CPU-bound tasks
Documentation
use crate::{
    drop_notifier::{DropListener, DropNotifier},
    executor::block_on,
    queue::Queue,
    selector::{select, Either},
    Canceled,
};

pub(super) fn channel<T>() -> (Sender<T>, Receiver<T>) {
    let queue = crate::queue::bounded(1);

    let (send_notifier, send_listener) = crate::drop_notifier::notifier();

    let (recv_notifier, recv_listener) = crate::drop_notifier::notifier();

    (
        Sender {
            queue: queue.clone(),
            send_notifier,
            recv_listener,
        },
        Receiver {
            queue,
            recv_notifier,
            send_listener,
        },
    )
}

pub(super) struct Sender<T> {
    queue: Queue<T>,
    #[allow(unused)]
    send_notifier: DropNotifier,
    recv_listener: DropListener,
}

pub(super) struct Receiver<T> {
    queue: Queue<T>,
    #[allow(unused)]
    recv_notifier: DropNotifier,
    send_listener: DropListener,
}

impl<T> Sender<T> {
    pub(super) async fn send(self, item: T) -> Result<(), Canceled> {
        match select(self.queue.push(item), self.recv_listener.listen()).await {
            Either::Left(()) => Ok(()),
            Either::Right(()) => Err(Canceled),
        }
    }

    pub(super) fn blocking_send(self, item: T) -> Result<(), Canceled> {
        block_on(self.send(item))
    }
}

impl<T> Receiver<T> {
    pub(super) async fn recv(self) -> Result<T, Canceled> {
        match select(self.queue.pop(), self.send_listener.listen()).await {
            Either::Left(item) => Ok(item),
            Either::Right(()) => Err(Canceled),
        }
    }
}