time_sigil 0.0.1

task scheduler
Documentation
use std::collections::VecDeque;

use tokio::sync::{
    mpsc::{self},
    oneshot,
};
use tokio_util::sync::CancellationToken;

use crate::errors::Error;

pub trait Queue<T>: Sized {
    fn enque(&mut self, item: T);
    fn deque(&mut self) -> Option<T>;
    fn into_server(self, channels: usize) -> (Server<Self, T>, Handler<T>) {
        Server::new(self, channels)
    }
}

impl<T> Queue<T> for VecDeque<T> {
    fn enque(&mut self, item: T) {
        self.push_back(item)
    }

    fn deque(&mut self) -> Option<T> {
        self.pop_front()
    }
}

#[derive(Debug)]
pub struct Server<Q, T> {
    queue: Q,
    rx: mpsc::Receiver<Command<T>>,
}

impl<Q, T> Server<Q, T> {
    fn new(queue: Q, channels: usize) -> (Server<Q, T>, Handler<T>) {
        let (tx, rx) = mpsc::channel(channels);
        let handler = Handler { tx };
        let server = Server { queue, rx };
        (server, handler)
    }
}

impl<Q, T> Server<Q, T>
where
    Q: Queue<T>,
{
    pub async fn listen(self, cancel: CancellationToken) -> Result<(), Error> {
        tokio::select! {
            res = self.listening() => match res {
                Ok(_) => Ok(()),
                Err(e) => {
                    return Err(e)
                },
            },
            _ = cancel.cancelled() => Ok(()),
        }
    }

    async fn listening(mut self) -> Result<(), Error> {
        loop {
            if let Some(recv) = self.rx.recv().await {
                match recv {
                    Command::Enque(item) => {
                        self.queue.enque(item);
                    }
                    Command::Deque(tx) => {
                        let item = self.queue.deque();
                        match tx.send(item) {
                            Ok(_) => {}
                            Err(_) => return Err(Error::ChannelClosed),
                        }
                    }
                }
            }
        }
    }
}

#[derive(Debug)]
enum Command<T> {
    Enque(T),
    Deque(oneshot::Sender<Option<T>>),
}

#[derive(Debug)]
pub struct Handler<T> {
    tx: mpsc::Sender<Command<T>>,
}

impl<T> Clone for Handler<T> {
    fn clone(&self) -> Self {
        Self {
            tx: self.tx.clone(),
        }
    }
}

impl<T> Handler<T> {
    pub async fn enque(&self, item: T) -> Result<(), Error> {
        self.tx
            .send(Command::Enque(item))
            .await
            .map_err(|_| Error::ChannelClosed)
    }

    pub async fn deque(&self) -> Result<Option<T>, Error> {
        let (tx, rx) = oneshot::channel();
        self.tx
            .send(Command::Deque(tx))
            .await
            .map_err(|_| Error::ChannelClosed)?;
        rx.await.map_err(|_| Error::ChannelClosed)
    }
}