use pin_project::pin_project;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio_stream::Stream;
use crate::differ::Differ;
use crate::engine::EData;
use crate::engine::EItem;
use crate::engine::Engine;
use crate::output::ClearLine;
use crate::output::MoveCursorUp;
use crate::output::OutputCommand;
use crate::output::WriteAll;
use crate::process_wrapper::Cmd;
use crate::progbar::progbar_running;
use crate::progbar::progbar_sleeping;
use crate::progbar::spinner_get;
use crate::sys::SysApi;
use crate::time_wrapper::Duration;
use crate::time_wrapper::Instant;
#[derive(Debug, Clone, Copy)]
pub enum State {
Running,
Sleeping {
deadline: Instant,
},
}
#[pin_project(project = ViewProjection)]
pub struct View<SI: SysApi> {
cmd: Cmd,
refresh: Duration,
sleep: Duration,
engine: Engine<SI>,
pending: VecDeque<OutputCommand>,
differ: Differ,
spinner: char,
start: Instant, duration: Option<Duration>,
printed_status: bool,
total_runs: u32,
unchanged_runs: u32,
state: State,
}
impl<SI: SysApi> View<SI> {
pub fn new(cmd: Cmd, refresh: Duration, sleep: Duration, engine: Engine<SI>) -> Self {
View {
cmd,
refresh,
sleep,
engine,
pending: VecDeque::default(),
differ: Differ::default(),
spinner: '-',
start: Instant::default(),
duration: None,
printed_status: false,
total_runs: 0,
unchanged_runs: 0,
state: State::Sleeping {
deadline: Default::default(),
},
}
}
}
impl<SI: SysApi> ViewProjection<'_, SI> {
fn _println(&mut self, mut s: String) {
s.push('\n');
self.pending
.push_back(OutputCommand::WriteAll(WriteAll(s.as_bytes().to_vec())))
}
fn status_maybe_clear(&mut self) {
if *self.printed_status {
self.pending
.push_back(OutputCommand::MoveCursorUp(MoveCursorUp(1)));
self.pending
.push_back(OutputCommand::ClearLine(ClearLine {}));
}
}
fn println(&mut self, s: String) {
self.status_maybe_clear();
self._println(s);
*self.printed_status = false;
}
fn process_line(&mut self, line: String) {
self.differ.push(line);
let mut differ = std::mem::take(self.differ);
if differ.has_changed() {
for line in &mut differ {
self.println(line);
}
}
*self.differ = differ;
}
fn status_update_running(&mut self, now: Instant) {
self.status_maybe_clear();
let mut spinner = *self.spinner;
self._println(ofmt!(
&now,
"{}",
progbar_running(
150, *self.unchanged_runs, &now, self.start, *self.duration, self.refresh, spinner_get(&mut spinner) )
.unwrap()
));
*self.spinner = spinner;
*self.printed_status = true;
}
fn status_update_sleeping(&mut self, now: Instant, deadline: Instant) {
self.status_maybe_clear();
let mut spinner = *self.spinner;
self._println(ofmt!(
self.start,
"{}",
progbar_sleeping(
*self.unchanged_runs,
self.sleep,
&now,
&deadline,
spinner_get(&mut spinner)
)
));
*self.spinner = spinner;
*self.printed_status = true;
}
}
impl<SI: SysApi> Stream for View<SI> {
type Item = OutputCommand;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut().project();
if let Some(output) = this.pending.pop_front() {
return Poll::Ready(Some(output));
}
let item = Pin::new(&mut this.engine).poll_next(cx);
match this.state {
State::Running => {
match item {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(EItem { time: now, data })) => match data {
EData::StartSleep(deadline) => {
*this.state = State::Sleeping { deadline };
self.poll_next(cx)
}
EData::LineOut(line) => {
this.process_line(line);
this.status_update_running(now);
self.poll_next(cx)
}
EData::LineErr(line) => {
this.process_line(line);
this.status_update_running(now);
self.poll_next(cx)
}
EData::Msg(msg) => {
this.println(ofmt!(&now, "{}", msg));
this.status_update_running(now);
self.poll_next(cx)
}
EData::Done(sts) => {
let line = ofmt_timeless!("subprocess exited with {}", sts);
this.process_line(line);
*this.duration = Some(&now - this.start);
*this.start = now;
*this.total_runs += 1;
if !this.differ.has_changed() {
*this.unchanged_runs += 1;
} else {
*this.unchanged_runs = 0;
}
self.poll_next(cx)
}
EData::Err(e) => {
this.println(ofmt!(&now, "err {:?}", e));
self.poll_next(cx)
}
EData::Tick => {
this.status_update_running(now);
self.poll_next(cx)
}
_ => {
panic!("unexpected data while running: {:?}", data);
}
},
}
}
State::Sleeping { deadline } => match item {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(EItem { time: now, data })) => match data {
EData::StartRun => {
if *this.total_runs == 0 {
this.println(ofmt!(&now, "start execution"));
}
this.differ.reset();
*this.start = now;
this.status_update_running(now);
this.process_line(ofmt_timeless!("+ {}", this.cmd));
*this.state = State::Running;
self.poll_next(cx)
}
EData::Msg(msg) => {
this.println(ofmt!(&now, "{}", msg));
self.poll_next(cx)
}
EData::Err(e) => {
this.println(ofmt!(&now, "err {:?}", e));
self.poll_next(cx)
}
EData::Tick => {
let deadline = *deadline;
this.status_update_sleeping(now, deadline);
self.poll_next(cx)
}
_ => {
panic!("unexpected data while sleeping: {:?}", data);
}
},
},
}
}
}