time_sigil 0.0.2

task scheduler
Documentation
use std::collections::VecDeque;

use tokio_util::sync::CancellationToken;

use crate::{tasks::OneWay, Error, IntoTaskRunner, Queue, Server, Task, TaskRunner};

pub fn new_inmemory<T, Q, F>(task: F) -> (Runner<VecDeque<T>, T, VecDeque<Q>, Q, F>, OneWay<Q, T>)
where
    F: IntoTaskRunner<T, Q> + Clone,
{
    new(VecDeque::new(), VecDeque::new(), task)
}

pub fn new<T, Q, A, B, F>(task_q: A, res_q: B, task: F) -> (Runner<A, T, B, Q, F>, OneWay<Q, T>)
where
    A: Queue<T>,
    B: Queue<Q>,
    F: IntoTaskRunner<T, Q> + Clone,
{
    let (t_s, t_h) = task_q.into_server(10);
    let (r_s, r_h) = res_q.into_server(10);
    let task_oneway = OneWay::new(t_h.clone(), r_h.clone());
    let task = task.into_task_runner(task_oneway);
    let runner = Runner {
        task_q: t_s,
        res_q: r_s,
        task,
    };
    (runner, OneWay::new(r_h.clone(), t_h.clone()))
}

pub struct Runner<Q, A, T, B, F> {
    task_q: Server<Q, A>,
    res_q: Server<T, B>,
    task: TaskRunner<A, B, F>,
}

impl<Q, A, T, B, F> Runner<Q, A, T, B, F>
where
    T: Queue<B> + Sync + Send + 'static,
    Q: Queue<A> + Sync + Send + 'static,
    F: Task<A, B> + Clone + Sync + Send + 'static,
    A: Sync + Send + 'static,
    B: Sync + Send + 'static,
{
    pub async fn listen(self, num_task: usize, cancel: CancellationToken) -> Result<(), Error> {
        tokio::spawn(self.listening(num_task, cancel));
        Ok(())
    }

    async fn listening(self, num_task: usize, cancel: CancellationToken) -> Result<(), Error> {
        let mut set = tokio::task::JoinSet::new();
        set.spawn(self.task_q.listen(cancel.clone()));
        set.spawn(self.res_q.listen(cancel.clone()));
        set.spawn(self.task.listen(num_task, cancel.clone()));

        tokio::select! {
            _ = cancel.cancelled() => Ok(())
        }
    }
}