willdo 0.0.1

Task manager with DAG
Documentation
//! Run job scripts in a child sub-process

mod debughide;

use self::debughide::DebugHide;
use crate::execution::{
    commander::{Commander, Instance},
    progress::{Data, Observation},
    BoxError, BoxFuture, BoxInstance, BoxStream,
};
use async_process::{ChildStdin, Command};
use core::{
    future::Future,
    pin::Pin,
    task::{Context, Poll},
};
use futures_lite::{
    io::{self, BufReader},
    stream::{self, empty, once_future},
    AsyncBufReadExt, AsyncRead, AsyncWriteExt, Stream, StreamExt,
};
use rand::Rng;
use serde::{Deserialize, Serialize};
use std::process::{ExitStatus, Stdio};
use std::sync::Arc;

const DEFAULT_FEEDBACK: &str = "\necho .$?$feedback\n";

/// Make a sub-process commander
pub fn provide(_name: &str, config: Config) -> Result<SubProcess, BoxError> {
    let Config {
        provider: _,
        executable,
        arguments,
        feedback,
    } = config;

    Ok(SubProcess {
        executable,
        arguments,
        feedback: feedback.unwrap_or_else(|| DEFAULT_FEEDBACK.into()),
    })
}

/// SubProcess commander configuration
#[derive(Debug, Default, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Config {
    provider: Box<str>,
    executable: Box<str>,
    #[serde(default)]
    arguments: Vec<Box<str>>,
    feedback: Option<Box<str>>,
}

/// SubProcess commander instance
#[derive(Debug, Default)]
pub struct SubProcess {
    executable: Box<str>,
    arguments: Vec<Box<str>>,
    feedback: Box<str>,
}

impl SubProcess {
    /// Create a new SubProcess commander instance
    pub fn new(executable: Box<str>, arguments: Vec<Box<str>>, feedback: Box<str>) -> Self {
        Self {
            executable,
            arguments,
            feedback,
        }
    }
}

impl Commander for SubProcess {
    fn start(&self) -> Result<BoxInstance, BoxError> {
        log::debug!("STARTING {:?}", self.executable);
        let mut child = Command::new(self.executable.as_ref())
            .args(self.arguments.iter().map(|a| a.as_ref()))
            .reap_on_drop(true)
            .kill_on_drop(true)
            .stdin(Stdio::piped())
            .stdout(Stdio::piped())
            .stderr(Stdio::piped())
            .spawn()?;
        log::debug!("STARTED {}: {child:?}", self.executable);

        let stdin = child.stdin.take();
        let stream = Box::pin(stream_child(
            child.stderr.take(),
            child.stdout.take(),
            child.status(),
        ));

        Ok(Box::new(Process {
            stdin,
            stream,
            feedback: self.feedback.clone(),
        }))
    }
}

struct Process {
    feedback: Box<str>,
    stdin: Option<ChildStdin>,
    stream: Pin<Box<dyn Send + Sync + Stream<Item = Observation>>>,
}
impl Instance for Process {
    fn shutdown(&mut self) -> BoxFuture<Result<BoxStream<Observation>, BoxError>> {
        log::debug!("SHUTTING DOWN!");
        Box::pin(async move {
            if let Some(mut inp) = self.stdin.take() {
                inp.close().await?;
            }
            let stream = core::mem::replace(&mut self.stream, Box::pin(empty()));
            // add an option to signal shutdown to stream as well
            Ok(stream as BoxStream<Observation>)
        })
    }
    fn execute(&mut self, command: &str) -> BoxFuture<Result<BoxStream<Observation>, BoxError>> {
        let feedbackid = rand::rngs::OsRng.gen_range(usize::MAX / 10usize..usize::MAX);
        let feedbackid = format!("{feedbackid:016X}");
        let feedback = self.feedback.replace("$feedback", &feedbackid);
        let command = format!("{command}{feedback}");

        log::debug!("FBID: {feedbackid}");
        log::debug!("CMD: {command:?}");
        Box::pin(async move {
            if let Some(inp) = self.stdin.as_mut() {
                inp.write_all(command.as_bytes()).await?;
                inp.flush().await?;
            }

            Ok(
                Box::pin(Feedback::new(feedbackid.into(), self.stream.as_mut()))
                    as BoxStream<Observation>,
            )
        })
    }
}

#[derive(Debug)]
struct Feedback<'a> {
    feedbackid: Vec<u8>,
    completed: bool,
    completion: Option<usize>,
    stream: DebugHide<Pin<Box<dyn 'a + Send + Stream<Item = Observation>>>>,
}
impl<'a> Feedback<'a> {
    pub fn new(feedbackid: Vec<u8>, stream: impl 'a + Send + Stream<Item = Observation>) -> Self {
        Feedback {
            feedbackid,
            completed: false,
            completion: None,
            stream: DebugHide::new(Box::pin(stream), "..."),
        }
    }
    fn extract_feedback(&mut self, log: &mut Observation) {
        match log {
            Observation::Failure { .. } | Observation::Completed(_) => {
                self.completed = true;
            }
            Observation::Error(data) | Observation::Output(data) => {
                let buff = match data {
                    Data::Binary(vec) => vec.as_slice(),
                    Data::Utf8(s) => s.as_bytes(),
                };
                let (mut feedback_len, line) = match buff {
                    [line @ .., b'\n'] => (1, line),
                    // it could be a line without a new line, but that's unlikely
                    line => (0, line),
                };
                feedback_len += self.feedbackid.len();
                if !line.ends_with(self.feedbackid.as_slice()) {
                    // feedback id not found, give up
                    return;
                }
                let code_len = line
                    .iter()
                    .rev()
                    .skip(self.feedbackid.len())
                    .take_while(|c| u8::is_ascii_digit(c))
                    .count();

                feedback_len += code_len;
                let code_start = buff.len() - feedback_len;
                let feedback_start = code_start.saturating_sub(1); // lead .
                if buff[feedback_start] != b'.' {
                    // expecting a full stop to lead the feedback, skip if not
                    return;
                }
                let Ok(code) = core::str::from_utf8(&buff[code_start..code_start + code_len])
                    .expect("we've already checked with is_ascii_digit")
                    .parse::<usize>()
                else {
                    // cannot parse usize, that's not a feedback
                    return;
                };
                let stripped = match data {
                    Data::Binary(v) => Data::Binary(v[..feedback_start].to_vec()),
                    Data::Utf8(s) => Data::Utf8(s[..feedback_start].into()),
                };

                self.completed = true;

                if stripped.as_bytes().is_empty() {
                    *log = Observation::Completed(code);
                } else {
                    // delay completion after content
                    *log = match log {
                        Observation::Error(_) => Observation::Error(stripped),
                        Observation::Output(_) => Observation::Output(stripped),
                        _ => unreachable!("only matching these here"),
                    };
                    self.completion = Some(code);
                }
            }
        }
    }
}
impl Stream for Feedback<'_> {
    type Item = Observation;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        if let Some(completion) = self.completion.take() {
            return Poll::Ready(Some(Observation::Completed(completion)));
        }
        let mut log = match self.stream.poll_next(cx) {
            Poll::Ready(Some(log)) => log,
            Poll::Ready(None) => {
                self.completed = true;
                return Poll::Ready(None);
            }
            Poll::Pending if self.completed => {
                return Poll::Ready(None);
            }
            Poll::Pending => {
                return Poll::Pending;
            }
        };
        self.extract_feedback(&mut log);
        Poll::Ready(Some(log.try_utf8().into_owned()))
    }
}

#[test]
fn test_feedback() {
    let s = stream::iter(vec![
        Observation::Output(Data::Utf8("abc\n".into())),
        Observation::Output(Data::Utf8("not a feedback: x3123\n".into())),
        Observation::Output(Data::Utf8("this is the feedback: .3123\n".into())),
        Observation::Output(Data::Utf8("finish it off\n".into())),
    ]);

    let mut sut = Feedback::new(b"123".to_vec(), s);

    match futures_lite::future::block_on(sut.next()) {
        Some(Observation::Output(Data::Utf8(o))) if o.as_ref() == "abc\n" => {}
        fail => panic!("Expected abc\\n, got {fail:?}"),
    }
    match futures_lite::future::block_on(sut.next()) {
        Some(Observation::Output(Data::Utf8(o))) if o.as_ref() == "not a feedback: x3123\n" => {}
        fail => panic!("Expected not a feedback, got {fail:?}"),
    }
    match futures_lite::future::block_on(sut.next()) {
        Some(Observation::Output(Data::Utf8(o))) if o.as_ref() == "this is the feedback: " => {}
        fail => panic!("Expected a feedback line, got {fail:?}"),
    }
    match futures_lite::future::block_on(sut.next()) {
        Some(Observation::Completed(3)) => {}
        fail => panic!("Expected completion, got {fail:?}"),
    }
    match futures_lite::future::block_on(sut.next()) {
        Some(Observation::Output(Data::Utf8(o))) if o.as_ref() == "finish it off\n" => {}
        fail => panic!("Expected a finish it off line, got {fail:?}"),
    }
    match futures_lite::future::block_on(sut.next()) {
        None => {}
        fail => panic!("Expected None, got {fail:?}"),
    }
}

fn stream_child(
    e: Option<impl AsyncRead>,
    o: Option<impl AsyncRead>,
    c: impl Future<Output = io::Result<ExitStatus>>,
) -> impl Stream<Item = Observation> {
    let errors = stream::iter(e.map(BufReader::new))
        .flat_map(|r| {
            r.split(b'\n').map(|v| {
                eprintln!(
                    "err: {:?}",
                    v.as_ref().map(|v| String::from_utf8_lossy(v.as_slice()))
                );
                v.map(|mut v| {
                    v.push(b'\n');
                    v
                })
                .map(Data::Binary)
                .map(Observation::Error)
            })
        })
        .map(|result| {
            result.unwrap_or_else(|e| Observation::Failure {
                message: "Failed reading stderr".into(),
                source: Some(Arc::new(e)),
            })
        })
        .fuse();
    let outputs = stream::iter(o.map(BufReader::new))
        .flat_map(|r| {
            r.split(b'\n').map(|v| {
                log::debug!(
                    "out: {:?}",
                    v.as_ref().map(|v| String::from_utf8_lossy(v.as_slice()))
                );
                v.map(|mut v| {
                    v.push(b'\n');
                    v
                })
                .map(Data::Binary)
                .map(Observation::Output)
            })
        })
        .map(|result| {
            result.unwrap_or_else(|e| Observation::Failure {
                message: "Failed reading stdout".into(),
                source: Some(Arc::new(e)),
            })
        })
        .fuse();
    let completion = once_future(c)
        .map(|result| {
            log::debug!("exit: {:?}", result);
            result
                .map(|r| r.code().unwrap_or_default() as usize)
                .map(Observation::Completed)
                .unwrap_or_else(|e| Observation::Failure {
                    message: "Failed waiting for exit".into(),
                    source: Some(Arc::new(e)),
                })
        })
        .fuse();

    outputs.or(errors).chain(completion).fuse()
}