use crate::job::JobId;
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::borrow::Cow;
use std::sync::{mpsc::Sender, Arc};
pub trait ProgressTracker: core::fmt::Debug {
fn update(&self, log: Progress);
fn get_status(&self) -> Option<usize>;
fn is_failed(&self) -> bool;
}
#[derive(Debug, Default, Clone)]
pub(crate) struct Outcome {
started: Arc<AtomicBool>,
completed: Arc<AtomicBool>,
completion: Arc<AtomicUsize>,
}
impl ProgressTracker for Outcome {
fn get_status(&self) -> Option<usize> {
self.completed
.load(Ordering::Relaxed)
.then_some(self.completion.load(Ordering::Relaxed))
}
fn update(&self, log: Progress) {
self.started.store(true, Ordering::Relaxed);
match &log {
Progress::Shutdown
| Progress::Command(_)
| Progress::Start
| Progress::Observation(Observation::Output(_)) => {}
Progress::Observation(Observation::Completed(code)) => {
self.completed.store(true, Ordering::Relaxed);
let _ = self
.completion
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |c| {
(c == 0).then_some(*code)
});
}
Progress::Observation(Observation::Error(_) | Observation::Failure { .. }) => {
self.completed.store(true, Ordering::Relaxed);
let _ = self
.completion
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |c| {
(c == 0).then_some(1)
});
}
}
}
fn is_failed(&self) -> bool {
matches!(self.get_status(), Some(1..))
}
}
#[derive(Debug)]
pub(crate) struct RichOutcome {
job: JobId,
inner: Box<dyn ProgressTracker + Send + Sync>,
sender: Sender<Progress>,
}
impl RichOutcome {
pub(crate) fn new(job: JobId, outcome: Outcome, sender: Sender<Progress>) -> Self {
Self {
job,
inner: Box::new(outcome),
sender,
}
}
}
impl ProgressTracker for RichOutcome {
fn get_status(&self) -> Option<usize> {
self.inner.get_status()
}
fn update(&self, log: Progress) {
log::debug!("job {} log {log:?}", self.job);
self.inner.update(log.clone());
self.sender.send(log).expect("sender is closed");
}
fn is_failed(&self) -> bool {
self.inner.is_failed()
}
}
#[derive(Debug, Clone)]
pub enum Progress {
Start,
Command(Box<str>),
Observation(Observation),
Shutdown,
}
#[derive(Debug, Clone)]
pub enum Observation {
Completed(usize),
Error(Data),
Output(Data),
Failure {
message: Box<str>,
source: Option<Arc<dyn core::error::Error + Send + Sync>>,
},
}
impl Observation {
pub fn is_failure(&self) -> bool {
match self {
Observation::Completed(0) => false,
Observation::Completed(_) => true,
Observation::Error(_) => true,
Observation::Output(_) => false,
Observation::Failure { .. } => true,
}
}
pub fn is_completion(&self) -> Option<usize> {
match self {
Observation::Completed(code) => Some(*code),
_ => None,
}
}
pub fn try_utf8(&self) -> Cow<'_, Observation> {
match self {
Observation::Failure { .. } | Observation::Completed(_) => Cow::Borrowed(self),
Observation::Error(data) => match data.try_utf8() {
Cow::Borrowed(_) => Cow::Borrowed(self),
Cow::Owned(data) => Cow::Owned(Observation::Error(data)),
},
Observation::Output(data) => match data.try_utf8() {
Cow::Borrowed(_) => Cow::Borrowed(self),
Cow::Owned(data) => Cow::Owned(Observation::Output(data)),
},
}
}
}
impl core::fmt::Display for Observation {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self.try_utf8().as_ref() {
Observation::Completed(code) => write!(f, "completed: {code}"),
Observation::Error(data) => write!(f, "error: {data:?}",),
Observation::Output(data) => write!(f, "output: {data:?}"),
Observation::Failure { message, source } => write!(
f,
"failed - {message}: {source}",
source = source.as_deref().map(|e| e.to_string()).unwrap_or_default()
),
}
}
}
#[derive(Debug, Clone)]
pub enum Data {
Binary(Vec<u8>),
Utf8(Box<str>),
}
impl Data {
pub fn as_bytes(&self) -> &[u8] {
match self {
Data::Binary(vec) => vec.as_slice(),
Data::Utf8(str) => str.as_bytes(),
}
}
pub fn try_utf8(&self) -> Cow<'_, Data> {
match self {
Data::Binary(vec) => core::str::from_utf8(vec.as_slice())
.map(|s| Cow::Owned(Data::Utf8(s.into())))
.unwrap_or_else(|_| Cow::Borrowed(self)),
Data::Utf8(_) => Cow::Borrowed(self),
}
}
}