time_sigil 0.0.1

task scheduler
Documentation
use tokio_util::sync::CancellationToken;

use crate::{errors::Error, queue::Handler};

pub trait Task<T, Q>: Sized {
    fn run(&self, req: T) -> Q;

    fn into_runner(self, req_q: Handler<T>, res_q: Handler<Q>) -> TaskRunner<T, Q, Self> {
        TaskRunner {
            task: self,
            req_q,
            res_q,
        }
    }
}

impl<F: Fn(T) -> Q + Clone, T, Q> Task<T, Q> for F {
    fn run(&self, req: T) -> Q {
        (self)(req)
    }
}

#[derive(Debug)]
pub struct TaskRunner<T, Q, F> {
    task: F,
    req_q: Handler<T>,
    res_q: Handler<Q>,
}

impl<T, Q, F: Clone> Clone for TaskRunner<T, Q, F> {
    fn clone(&self) -> Self {
        Self {
            task: self.task.clone(),
            req_q: self.req_q.clone(),
            res_q: self.res_q.clone(),
        }
    }
}

impl<T, Q, F: Task<T, Q>> TaskRunner<T, Q, F> {
    pub async fn listen(self, cancel: CancellationToken) -> Result<(), Error> {
        let req_q = self.req_q.clone();
        let res_q = self.res_q.clone();
        self.listening(req_q, res_q, cancel).await
    }

    async fn listening(
        self,
        request_queue: Handler<T>,
        result_queue: Handler<Q>,
        cancel: CancellationToken,
    ) -> Result<(), Error> {
        tokio::select! {
            res = self.work(request_queue, result_queue) => match res {
                Ok(_) => Ok(()),
                Err(e) => Err(e),
            },
            _ = cancel.cancelled() => Ok(())
        }
    }

    async fn work(self, request_queue: Handler<T>, result_queue: Handler<Q>) -> Result<(), Error> {
        loop {
            if let Ok(Some(req)) = request_queue.deque().await {
                let res = self.task.run(req);
                match result_queue.enque(res).await {
                    Ok(_) => {}
                    Err(e) => return Err(e),
                }
            }
        }
    }
}