time_sigil 0.0.2

task scheduler
Documentation
use crate::{errors::Error, Task};
use tokio_util::sync::CancellationToken;

use super::OneWay;

pub trait IntoTaskRunner<T, Q>: Sized {
    fn into_task_runner(self, propagate: OneWay<T, Q>) -> TaskRunner<T, Q, Self> {
        TaskRunner {
            task: self,
            propagate,
        }
    }
}

impl<F, T, Q> IntoTaskRunner<T, Q> for F
where
    F: Task<T, Q> + Sized,
{
    fn into_task_runner(self, propagate: OneWay<T, Q>) -> TaskRunner<T, Q, Self> {
        TaskRunner {
            task: self,
            propagate,
        }
    }
}

#[derive(Debug)]
pub struct TaskRunner<T, Q, F> {
    task: F,
    propagate: OneWay<T, Q>,
}

impl<T, Q, F: Clone> Clone for TaskRunner<T, Q, F> {
    fn clone(&self) -> Self {
        Self {
            task: self.task.clone(),
            propagate: self.propagate.clone(),
        }
    }
}

impl<T, Q, F: Task<T, Q>> TaskRunner<T, Q, F>
where
    T: Sync + Send + 'static,
    Q: Sync + Send + 'static,
    F: Sync + Send + 'static + Clone,
{
    pub async fn listen(self, num_workers: usize, cancel: CancellationToken) -> Result<(), Error> {
        let mut set = tokio::task::JoinSet::new();
        for _ in 0..num_workers {
            set.spawn(self.clone().work());
        }
        tokio::select! {
            _ = cancel.cancelled() => {
                set.abort_all();
                Ok(())
            }
        }
    }

    async fn work(self) -> Result<(), Error> {
        loop {
            if let Ok(Some(req)) = self.propagate.pull().await {
                let res = self.task.run(req);
                match self.propagate.push(res).await {
                    Ok(_) => {}
                    Err(e) => return Err(e),
                }
            }
        }
    }
}