mod debughide;
use self::debughide::DebugHide;
use crate::execution::{
commander::{Commander, Instance},
progress::{Data, Observation},
BoxError, BoxFuture, BoxInstance, BoxStream,
};
use async_process::{ChildStdin, Command};
use core::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use futures_lite::{
io::{self, BufReader},
stream::{self, empty, once_future},
AsyncBufReadExt, AsyncRead, AsyncWriteExt, Stream, StreamExt,
};
use rand::Rng;
use serde::{Deserialize, Serialize};
use std::process::{ExitStatus, Stdio};
use std::sync::Arc;
const DEFAULT_FEEDBACK: &str = "\necho .$?$feedback\n";
pub fn provide(_name: &str, config: Config) -> Result<SubProcess, BoxError> {
let Config {
provider: _,
executable,
arguments,
feedback,
} = config;
Ok(SubProcess {
executable,
arguments,
feedback: feedback.unwrap_or_else(|| DEFAULT_FEEDBACK.into()),
})
}
#[derive(Debug, Default, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Config {
provider: Box<str>,
executable: Box<str>,
#[serde(default)]
arguments: Vec<Box<str>>,
feedback: Option<Box<str>>,
}
#[derive(Debug, Default)]
pub struct SubProcess {
executable: Box<str>,
arguments: Vec<Box<str>>,
feedback: Box<str>,
}
impl SubProcess {
pub fn new(executable: Box<str>, arguments: Vec<Box<str>>, feedback: Box<str>) -> Self {
Self {
executable,
arguments,
feedback,
}
}
}
impl Commander for SubProcess {
fn start(&self) -> Result<BoxInstance, BoxError> {
log::debug!("STARTING {:?}", self.executable);
let mut child = Command::new(self.executable.as_ref())
.args(self.arguments.iter().map(|a| a.as_ref()))
.reap_on_drop(true)
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
log::debug!("STARTED {}: {child:?}", self.executable);
let stdin = child.stdin.take();
let stream = Box::pin(stream_child(
child.stderr.take(),
child.stdout.take(),
child.status(),
));
Ok(Box::new(Process {
stdin,
stream,
feedback: self.feedback.clone(),
}))
}
}
struct Process {
feedback: Box<str>,
stdin: Option<ChildStdin>,
stream: Pin<Box<dyn Send + Sync + Stream<Item = Observation>>>,
}
impl Instance for Process {
fn shutdown(&mut self) -> BoxFuture<Result<BoxStream<Observation>, BoxError>> {
log::debug!("SHUTTING DOWN!");
Box::pin(async move {
if let Some(mut inp) = self.stdin.take() {
inp.close().await?;
}
let stream = core::mem::replace(&mut self.stream, Box::pin(empty()));
Ok(stream as BoxStream<Observation>)
})
}
fn execute(&mut self, command: &str) -> BoxFuture<Result<BoxStream<Observation>, BoxError>> {
let feedbackid = rand::rngs::OsRng.gen_range(usize::MAX / 10usize..usize::MAX);
let feedbackid = format!("{feedbackid:016X}");
let feedback = self.feedback.replace("$feedback", &feedbackid);
let command = format!("{command}{feedback}");
log::debug!("FBID: {feedbackid}");
log::debug!("CMD: {command:?}");
Box::pin(async move {
if let Some(inp) = self.stdin.as_mut() {
inp.write_all(command.as_bytes()).await?;
inp.flush().await?;
}
Ok(
Box::pin(Feedback::new(feedbackid.into(), self.stream.as_mut()))
as BoxStream<Observation>,
)
})
}
}
#[derive(Debug)]
struct Feedback<'a> {
feedbackid: Vec<u8>,
completed: bool,
completion: Option<usize>,
stream: DebugHide<Pin<Box<dyn 'a + Send + Stream<Item = Observation>>>>,
}
impl<'a> Feedback<'a> {
pub fn new(feedbackid: Vec<u8>, stream: impl 'a + Send + Stream<Item = Observation>) -> Self {
Feedback {
feedbackid,
completed: false,
completion: None,
stream: DebugHide::new(Box::pin(stream), "..."),
}
}
fn extract_feedback(&mut self, log: &mut Observation) {
match log {
Observation::Failure { .. } | Observation::Completed(_) => {
self.completed = true;
}
Observation::Error(data) | Observation::Output(data) => {
let buff = match data {
Data::Binary(vec) => vec.as_slice(),
Data::Utf8(s) => s.as_bytes(),
};
let (mut feedback_len, line) = match buff {
[line @ .., b'\n'] => (1, line),
line => (0, line),
};
feedback_len += self.feedbackid.len();
if !line.ends_with(self.feedbackid.as_slice()) {
return;
}
let code_len = line
.iter()
.rev()
.skip(self.feedbackid.len())
.take_while(|c| u8::is_ascii_digit(c))
.count();
feedback_len += code_len;
let code_start = buff.len() - feedback_len;
let feedback_start = code_start.saturating_sub(1); if buff[feedback_start] != b'.' {
return;
}
let Ok(code) = core::str::from_utf8(&buff[code_start..code_start + code_len])
.expect("we've already checked with is_ascii_digit")
.parse::<usize>()
else {
return;
};
let stripped = match data {
Data::Binary(v) => Data::Binary(v[..feedback_start].to_vec()),
Data::Utf8(s) => Data::Utf8(s[..feedback_start].into()),
};
self.completed = true;
if stripped.as_bytes().is_empty() {
*log = Observation::Completed(code);
} else {
*log = match log {
Observation::Error(_) => Observation::Error(stripped),
Observation::Output(_) => Observation::Output(stripped),
_ => unreachable!("only matching these here"),
};
self.completion = Some(code);
}
}
}
}
}
impl Stream for Feedback<'_> {
type Item = Observation;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(completion) = self.completion.take() {
return Poll::Ready(Some(Observation::Completed(completion)));
}
let mut log = match self.stream.poll_next(cx) {
Poll::Ready(Some(log)) => log,
Poll::Ready(None) => {
self.completed = true;
return Poll::Ready(None);
}
Poll::Pending if self.completed => {
return Poll::Ready(None);
}
Poll::Pending => {
return Poll::Pending;
}
};
self.extract_feedback(&mut log);
Poll::Ready(Some(log.try_utf8().into_owned()))
}
}
#[test]
fn test_feedback() {
let s = stream::iter(vec![
Observation::Output(Data::Utf8("abc\n".into())),
Observation::Output(Data::Utf8("not a feedback: x3123\n".into())),
Observation::Output(Data::Utf8("this is the feedback: .3123\n".into())),
Observation::Output(Data::Utf8("finish it off\n".into())),
]);
let mut sut = Feedback::new(b"123".to_vec(), s);
match futures_lite::future::block_on(sut.next()) {
Some(Observation::Output(Data::Utf8(o))) if o.as_ref() == "abc\n" => {}
fail => panic!("Expected abc\\n, got {fail:?}"),
}
match futures_lite::future::block_on(sut.next()) {
Some(Observation::Output(Data::Utf8(o))) if o.as_ref() == "not a feedback: x3123\n" => {}
fail => panic!("Expected not a feedback, got {fail:?}"),
}
match futures_lite::future::block_on(sut.next()) {
Some(Observation::Output(Data::Utf8(o))) if o.as_ref() == "this is the feedback: " => {}
fail => panic!("Expected a feedback line, got {fail:?}"),
}
match futures_lite::future::block_on(sut.next()) {
Some(Observation::Completed(3)) => {}
fail => panic!("Expected completion, got {fail:?}"),
}
match futures_lite::future::block_on(sut.next()) {
Some(Observation::Output(Data::Utf8(o))) if o.as_ref() == "finish it off\n" => {}
fail => panic!("Expected a finish it off line, got {fail:?}"),
}
match futures_lite::future::block_on(sut.next()) {
None => {}
fail => panic!("Expected None, got {fail:?}"),
}
}
fn stream_child(
e: Option<impl AsyncRead>,
o: Option<impl AsyncRead>,
c: impl Future<Output = io::Result<ExitStatus>>,
) -> impl Stream<Item = Observation> {
let errors = stream::iter(e.map(BufReader::new))
.flat_map(|r| {
r.split(b'\n').map(|v| {
eprintln!(
"err: {:?}",
v.as_ref().map(|v| String::from_utf8_lossy(v.as_slice()))
);
v.map(|mut v| {
v.push(b'\n');
v
})
.map(Data::Binary)
.map(Observation::Error)
})
})
.map(|result| {
result.unwrap_or_else(|e| Observation::Failure {
message: "Failed reading stderr".into(),
source: Some(Arc::new(e)),
})
})
.fuse();
let outputs = stream::iter(o.map(BufReader::new))
.flat_map(|r| {
r.split(b'\n').map(|v| {
log::debug!(
"out: {:?}",
v.as_ref().map(|v| String::from_utf8_lossy(v.as_slice()))
);
v.map(|mut v| {
v.push(b'\n');
v
})
.map(Data::Binary)
.map(Observation::Output)
})
})
.map(|result| {
result.unwrap_or_else(|e| Observation::Failure {
message: "Failed reading stdout".into(),
source: Some(Arc::new(e)),
})
})
.fuse();
let completion = once_future(c)
.map(|result| {
log::debug!("exit: {:?}", result);
result
.map(|r| r.code().unwrap_or_default() as usize)
.map(Observation::Completed)
.unwrap_or_else(|e| Observation::Failure {
message: "Failed waiting for exit".into(),
source: Some(Arc::new(e)),
})
})
.fuse();
outputs.or(errors).chain(completion).fuse()
}