willdo 0.0.1

Task manager with DAG
Documentation
//! Not totally simple job runner

use crate::execution::BoxError;
use crate::execution::{
    progress::Observation, repository::Execution, BoxFuture, BoxStream, JobRepository, Progress,
    Runner,
};
use crate::job::JobId;
use core::{pin::Pin, task::{Context, Poll}};
use futures_lite::{FutureExt, Stream, StreamExt as _};
use std::collections::VecDeque;

/// A not-completely-simple runner will run all required jobs to completion.
///
/// It will run alternative paths in parallel and will skip unnecessary jobs.
///
/// It only reports basic start and completion of jobs (that's why it's simple).
pub struct Simple;

impl Runner for Simple {
    fn run<'f, 'j: 'f, J: 'j + JobRepository + Send + Unpin>(
        &self,
        jobs: J,
        goals: &[Box<str>],
    ) -> BoxStream<'f, (JobId, Progress)> {
        let goals = goals.to_vec();
        // let ready = jobs
        //     .select(&goals)
        //     .into_iter()
        //     .map(|j| (j, Progress::Select))
        //     .collect();
        let ready = vec![].into();
        Box::pin(SimpleEvents {
            jobs,
            goals,
            running: vec![],
            ready,
            reload: false,
        })
    }
}

struct SimpleEvents<J> {
    jobs: J,
    goals: Vec<Box<str>>,
    running: Vec<BoxFuture<'static, Execution>>,
    ready: VecDeque<(JobId, Progress)>,
    reload: bool,
}
impl<J> Stream for SimpleEvents<J>
where
    J: JobRepository + Send + Unpin,
{
    type Item = (JobId, Progress);
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let Self {
            jobs,
            goals,
            running,
            ready,
            reload,
        } = self.as_mut().get_mut();

        loop {
            if let Some(ready) = ready.pop_front() {
                break Poll::Ready(Some(ready));
            }

            if *reload || running.is_empty() {
                *reload = false;
                let runnable = jobs.execute(goals);
                ready.extend(
                    runnable
                        .iter()
                        .map(|e| (e.job().id.clone(), Progress::Start)),
                );
                running.extend(runnable.into_iter().map(|mut e| {
                    Box::pin(async move {
                        run(&mut e).await;
                        e
                    }) as BoxFuture<'static, Execution>
                }));
                if running.is_empty() {
                    break Poll::Ready(None);
                } else {
                    continue;
                }
            }
            let mut completed = None;
            for (i, v) in running.iter_mut().enumerate() {
                match v.poll(cx) {
                    Poll::Ready(execution) => {
                        completed = Some((i, execution));
                        break;
                    }
                    Poll::Pending => continue,
                }
            }
            let Some((i, execution)) = completed else {
                return Poll::Pending;
            };
            *reload = true;
            drop(running.remove(i));
            ready.push_back((
                execution.job().id.clone(),
                Progress::Observation(Observation::Completed(
                    execution.progress().get_status().unwrap_or(1),
                )),
            ));
        }
    }
}

/// Run a single execution.
async fn run(execution: &mut Execution) {
    if let Err(e) = run_job_fallible(execution).await {
        execution
            .progress()
            .update(Progress::Observation(Observation::Failure {
                message: "job execution failed".into(),
                source: Some(e.into()),
            }));
    }
}

async fn run_job_fallible(
    execution: &mut Execution,
) -> Result<(), BoxError> {
    let script = &execution.job().script.clone();
    let mut interpretter = execution.commander().start()?;
    execution.progress().update(Progress::Start);
    for command in script {
        execution
            .progress()
            .update(Progress::Command(command.clone()));
        let mut logs = interpretter.execute(command).await?;
        while let Some(observation) = logs.next().await {
            execution
                .progress()
                .update(Progress::Observation(observation));
        }
        if execution.progress().is_failed() {
            break;
        }
    }
    execution.progress().update(Progress::Shutdown);
    let mut shutdown = interpretter.shutdown().await?;
    while let Some(log) = shutdown.next().await {
        execution.progress().update(Progress::Observation(log));
    }

    Ok(())
}