use color_eyre::Result;
use pin_project::pin_project;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio_stream::Stream;
use tokio_stream::wrappers::IntervalStream;
use tracing::instrument;
use crate::process_wrapper;
use crate::process_wrapper::Cmd;
use crate::process_wrapper::ExitSts;
use crate::process_wrapper::ProcessStream;
use crate::sys::SysApi;
use crate::time_wrapper::Duration;
use crate::time_wrapper::Instant;
use crate::user_wrapper::UserStream;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum EData {
StartRun,
StartSleep(Instant),
LineOut(String),
LineErr(String),
Done(ExitSts),
Err(std::io::ErrorKind),
Tick,
}
impl From<process_wrapper::Item> for EData {
fn from(item: process_wrapper::Item) -> Self {
match item {
process_wrapper::Item::Stdout(l) => EData::LineOut(l),
process_wrapper::Item::Stderr(l) => EData::LineErr(l),
process_wrapper::Item::Done(Ok(sts)) => EData::Done(sts),
process_wrapper::Item::Done(Err(e)) => EData::Err(e),
}
}
}
impl From<std::io::ErrorKind> for EData {
fn from(e: std::io::ErrorKind) -> Self {
EData::Err(e)
}
}
impl From<std::io::Error> for EData {
fn from(e: std::io::Error) -> Self {
e.kind().into()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EItem {
pub time: Instant,
pub data: EData,
}
impl EItem {
pub fn new<D>(time: Instant, data: D) -> EItem
where
D: Into<EData>,
{
Self {
time,
data: data.into(),
}
}
}
#[derive(Debug, Default)]
enum State {
#[default]
Start,
StartSleeping,
Running {
process: ProcessStream,
ticker: IntervalStream,
},
Sleeping {
deadline: Instant,
ticker: IntervalStream,
},
Done,
}
#[pin_project]
#[derive(Default, Debug)]
pub struct Engine<SI: SysApi> {
sys: SI,
cmd: Cmd,
refresh: Duration,
sleep: Duration,
exit_on_success: bool,
exit_on_failure: bool,
state: State,
user: Option<UserStream>,
exit_by_user: bool,
}
impl<SI: SysApi> Engine<SI> {
pub fn new(mut sys: SI, cmd: Cmd, refresh: Duration, sleep: Duration) -> Result<Self> {
let user_stream = sys.user_stream();
Ok(Self {
sys,
cmd,
refresh,
sleep,
exit_on_success: false,
exit_on_failure: false,
state: State::Start,
user: user_stream,
exit_by_user: false,
})
}
fn run(&mut self) -> std::result::Result<(), std::io::Error> {
let process = self.sys.run_command(self.cmd.clone())?;
let ticker = IntervalStream::new(self.refresh.into());
self.state = State::Running { process, ticker };
Ok(())
}
fn sleep(&mut self, now: Instant) -> EItem {
let deadline = &now + &self.sleep;
let ticker = IntervalStream::new(Duration::seconds(1).into());
self.state = State::Sleeping { deadline, ticker };
EItem::new(now, EData::StartSleep(deadline))
}
}
impl<SI: SysApi> Stream for Engine<SI> {
type Item = EItem;
#[instrument(level = "debug", ret, skip(cx))]
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.as_mut().project();
let now = this.sys.now();
if let Some(user) = this.user {
match Pin::new(user).poll_next(cx) {
Poll::Ready(Some(s)) if s == "q" => {
*this.exit_by_user = true;
*this.user = None;
}
Poll::Ready(Some(_)) => {}
Poll::Ready(None) => {
*this.user = None;
}
Poll::Pending => {}
};
}
let mut state = std::mem::take(&mut *this.state);
return match state {
State::Start => match self.run() {
Ok(_) => Poll::Ready(Some(EItem::new(now, EData::StartRun))),
Err(e) => Poll::Ready(Some(EItem::new(now, e))),
},
State::StartSleeping => {
let item = self.sleep(now);
Poll::Ready(Some(item))
}
State::Sleeping {
deadline,
ref mut ticker,
} => {
if *this.exit_by_user {
*this.state = State::Done;
Poll::Ready(None)
} else if let Poll::Ready(Some(_)) = Pin::new(ticker).poll_next(cx) {
let tick = EData::Tick;
if now < deadline {
*this.state = state;
Poll::Ready(Some(EItem::new(now, tick)))
} else {
*this.state = State::Start;
Poll::Ready(Some(EItem::new(now, tick)))
}
} else {
*this.state = state;
Poll::Pending
}
}
State::Running {
ref mut process,
ref mut ticker,
} => match Pin::new(process).poll_next(cx) {
Poll::Ready(Some(item)) => match item {
process_wrapper::Item::Stdout(_) => {
*this.state = state;
Poll::Ready(Some(EItem::new(now, item)))
}
process_wrapper::Item::Stderr(_) => {
*this.state = state;
Poll::Ready(Some(EItem::new(now, item)))
}
process_wrapper::Item::Done(Ok(ref exitsts)) => {
let success = exitsts.success();
if *this.exit_by_user
|| success && *this.exit_on_success
|| !success && *this.exit_on_failure
{
*this.state = State::Done;
} else {
*this.state = State::StartSleeping;
}
Poll::Ready(Some(EItem::new(now, item)))
}
process_wrapper::Item::Done(Err(e)) => {
*this.state = State::Done;
Poll::Ready(Some(EItem::new(now, e)))
}
},
Poll::Ready(None) => {
#[cfg(not(test))]
panic!("We should never see the underlying stream end");
#[cfg(test)]
{
*this.state = State::Done;
Poll::Ready(None)
}
}
Poll::Pending => {
if let Poll::Ready(Some(_)) = Pin::new(ticker).poll_next(cx) {
*this.state = state;
Poll::Ready(Some(EItem::new(now, EData::Tick)))
} else {
*this.state = state;
Poll::Pending
}
}
},
State::Done => {
*this.state = state;
Poll::Ready(None)
}
};
}
}
#[cfg(test)]
mod tests {
use color_eyre::Result;
use std::io;
use tokio_stream::StreamExt;
use crate::process_wrapper::Item;
use crate::sys::SysVirtual;
use crate::time_wrapper::Instant;
use super::*;
impl Engine<SysVirtual> {
pub fn new_virtual(
mut sys: SysVirtual,
exit_on_success: bool,
exit_on_failure: bool,
) -> Result<Self> {
let user_stream = sys.user_stream();
Ok(Self {
sys,
cmd: Cmd::default(),
refresh: Duration::INFINITE,
sleep: Duration::INFINITE,
exit_on_success,
exit_on_failure,
state: State::Start,
user: user_stream,
exit_by_user: false,
})
}
}
#[tokio::test]
async fn test_basic_success() -> Result<()> {
let list = vec![
Item::Stdout("stdout".into()),
Item::Stderr("stderr".into()),
Item::Done(Ok(ExitSts::default())),
];
let mut sys = SysVirtual::default();
sys.set_items(list.clone());
let streamer = Engine::new_virtual(sys, true, true)?;
let streamed = streamer.collect::<Vec<_>>().await;
let mut now = Instant::default();
assert_eq!(
streamed,
vec![
EItem {
time: now.incr(),
data: EData::StartRun
},
EItem {
time: now.incr(),
data: EData::LineOut("stdout".to_owned())
},
EItem {
time: now.incr(),
data: EData::LineErr("stderr".to_owned())
},
EItem {
time: now.incr(),
data: EData::Done(ExitSts::default())
}
]
);
Ok(())
}
#[tokio::test]
async fn test_done_err() -> Result<()> {
let list = vec![Item::Done(Err(io::ErrorKind::UnexpectedEof))];
let mut sys = SysVirtual::default();
sys.set_items(list.clone());
let streamer = Engine::new_virtual(sys, false, false)?;
let streamed = streamer.collect::<Vec<_>>().await;
let mut now = Instant::default();
assert_eq!(
streamed,
vec![
EItem {
time: now.incr(),
data: EData::StartRun,
},
EItem {
time: now.incr(),
data: EData::Err(io::ErrorKind::UnexpectedEof)
}
]
);
Ok(())
}
}