willdo 0.0.1

Task manager with DAG
Documentation
//! Tracking execution progress

use crate::job::JobId;
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::borrow::Cow;
use std::sync::{mpsc::Sender, Arc};

/// Tracks job execution progress
pub trait ProgressTracker: core::fmt::Debug {
    /// Update the tracker with a new event
    fn update(&self, log: Progress);
    /// Get the current completion status - None - not completed
    fn get_status(&self) -> Option<usize>;
    /// Get failed state
    fn is_failed(&self) -> bool;
}

#[derive(Debug, Default, Clone)]
pub(crate) struct Outcome {
    started: Arc<AtomicBool>,
    completed: Arc<AtomicBool>,
    completion: Arc<AtomicUsize>,
}

impl ProgressTracker for Outcome {
    fn get_status(&self) -> Option<usize> {
        self.completed
            .load(Ordering::Relaxed)
            .then_some(self.completion.load(Ordering::Relaxed))
    }
    fn update(&self, log: Progress) {
        self.started.store(true, Ordering::Relaxed);
        match &log {
            Progress::Shutdown
            | Progress::Command(_)
            | Progress::Start
            | Progress::Observation(Observation::Output(_)) => {}
            Progress::Observation(Observation::Completed(code)) => {
                self.completed.store(true, Ordering::Relaxed);
                let _ = self
                    .completion
                    .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |c| {
                        (c == 0).then_some(*code)
                    });
            }
            Progress::Observation(Observation::Error(_) | Observation::Failure { .. }) => {
                self.completed.store(true, Ordering::Relaxed);
                let _ = self
                    .completion
                    .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |c| {
                        (c == 0).then_some(1)
                    });
            }
        }
    }
    fn is_failed(&self) -> bool {
        matches!(self.get_status(), Some(1..))
    }
}

#[derive(Debug)]
pub(crate) struct RichOutcome {
    job: JobId,
    inner: Box<dyn ProgressTracker + Send + Sync>,
    sender: Sender<Progress>,
}
impl RichOutcome {
    pub(crate) fn new(job: JobId, outcome: Outcome, sender: Sender<Progress>) -> Self {
        Self {
            job,
            inner: Box::new(outcome),
            sender,
        }
    }
}

impl ProgressTracker for RichOutcome {
    fn get_status(&self) -> Option<usize> {
        self.inner.get_status()
    }
    fn update(&self, log: Progress) {
        log::debug!("job {} log {log:?}", self.job);
        self.inner.update(log.clone());
        self.sender.send(log).expect("sender is closed");
    }
    fn is_failed(&self) -> bool {
        self.inner.is_failed()
    }
}

/// Job execution progress event
#[derive(Debug, Clone)]
pub enum Progress {
    /// Job script interpretter has been started
    Start,
    /// Script command has been issued
    Command(Box<str>),
    /// Script command observation has been made
    Observation(Observation),
    /// Job script interpretter is shut down
    Shutdown,
}

/// Command execution event
#[derive(Debug, Clone)]
pub enum Observation {
    /// The command exited with an exit code
    Completed(usize),
    /// error log (stderr)
    Error(Data),
    /// output log (stdout)
    Output(Data),
    /// Other commander failure
    Failure {
        /// Failure description
        message: Box<str>,
        /// Original error, if any
        source: Option<Arc<dyn core::error::Error + Send + Sync>>,
    },
}
impl Observation {
    /// Check if the event represents any kind of command failure
    pub fn is_failure(&self) -> bool {
        match self {
            Observation::Completed(0) => false,
            Observation::Completed(_) => true,
            Observation::Error(_) => true,
            Observation::Output(_) => false,
            Observation::Failure { .. } => true,
        }
    }
    /// Check if the event represents a command completion
    pub fn is_completion(&self) -> Option<usize> {
        match self {
            Observation::Completed(code) => Some(*code),
            _ => None,
        }
    }
    /// Try to convert observed data to UTF-8
    pub fn try_utf8(&self) -> Cow<'_, Observation> {
        match self {
            Observation::Failure { .. } | Observation::Completed(_) => Cow::Borrowed(self),
            Observation::Error(data) => match data.try_utf8() {
                Cow::Borrowed(_) => Cow::Borrowed(self),
                Cow::Owned(data) => Cow::Owned(Observation::Error(data)),
            },
            Observation::Output(data) => match data.try_utf8() {
                Cow::Borrowed(_) => Cow::Borrowed(self),
                Cow::Owned(data) => Cow::Owned(Observation::Output(data)),
            },
        }
    }
}
impl core::fmt::Display for Observation {
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
        match self.try_utf8().as_ref() {
            Observation::Completed(code) => write!(f, "completed: {code}"),
            Observation::Error(data) => write!(f, "error: {data:?}",),
            Observation::Output(data) => write!(f, "output: {data:?}"),
            Observation::Failure { message, source } => write!(
                f,
                "failed - {message}: {source}",
                source = source.as_deref().map(|e| e.to_string()).unwrap_or_default()
            ),
        }
    }
}

/// Command output data
#[derive(Debug, Clone)]
pub enum Data {
    /// It is (still) binary
    Binary(Vec<u8>),
    /// It is a UTF-8 string
    Utf8(Box<str>),
}
impl Data {
    /// Get byte slice
    pub fn as_bytes(&self) -> &[u8] {
        match self {
            Data::Binary(vec) => vec.as_slice(),
            Data::Utf8(str) => str.as_bytes(),
        }
    }
    /// Try to convert observed data to UTF-8
    pub fn try_utf8(&self) -> Cow<'_, Data> {
        match self {
            Data::Binary(vec) => core::str::from_utf8(vec.as_slice())
                .map(|s| Cow::Owned(Data::Utf8(s.into())))
                .unwrap_or_else(|_| Cow::Borrowed(self)),
            Data::Utf8(_) => Cow::Borrowed(self),
        }
    }
}