use std::fmt::Display;
use std::io::{IsTerminal, Write};
use std::pin::Pin;
use std::task::Poll;
use std::time::{Duration, Instant};
use crossterm::{cursor, QueueableCommand};
use futures_lite::stream::Pending;
use futures_lite::{stream, Stream};
use crate::bar::Bar;
use crate::layout::{Layout, RenderContext};
use crate::spinner::Ticks;
use crate::term::{clear_line, CursorGuard};
use crate::Theme;
pub struct StreamProgressBuilder<'a, S, F, M> {
inner: S,
bar: Bar<'a>,
bar_width: usize,
ticks: Ticks<'a>,
fraction_fn: F,
messages: M,
current: usize,
spinner_char: Option<char>,
message: Option<String>,
with_elapsed_time: bool,
start: Option<Instant>,
render_buf: String,
layout: Layout,
guard: CursorGuard,
}
impl<'a, S, F, M> StreamProgressBuilder<'a, S, F, M> {
pub fn with_label(mut self, label: impl Display) -> Self {
self.message = Some(label.to_string());
self
}
pub fn with_elapsed_time(mut self) -> Self {
self.with_elapsed_time = true;
self
}
pub fn with_messages<S2>(self, messages: S2) -> StreamProgressBuilder<'a, S, F, S2>
where
S2: Stream + Unpin,
S2::Item: Display,
{
StreamProgressBuilder {
inner: self.inner,
bar: self.bar,
bar_width: self.bar_width,
ticks: self.ticks,
fraction_fn: self.fraction_fn,
messages,
current: self.current,
spinner_char: self.spinner_char,
message: self.message,
with_elapsed_time: self.with_elapsed_time,
start: self.start,
render_buf: self.render_buf,
layout: self.layout,
guard: self.guard,
}
}
}
impl<S, F, M> Stream for StreamProgressBuilder<'_, S, F, M>
where
S: Stream + Unpin,
F: FnMut(usize, &S::Item) -> f64 + Unpin,
M: Stream + Unpin,
M::Item: Display,
{
type Item = S::Item;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if let Poll::Ready(spinner) = Pin::new(&mut this.ticks).poll_next(cx) {
this.spinner_char = spinner;
}
while let Poll::Ready(Some(msg)) = Pin::new(&mut this.messages).poll_next(cx) {
this.message = Some(msg.to_string());
}
match Pin::new(&mut this.inner).poll_next(cx) {
Poll::Ready(Some(item)) => {
this.current += 1;
if this.guard.is_tty {
let elapsed = if this.with_elapsed_time {
this.start.get_or_insert_with(Instant::now).elapsed()
} else {
Duration::ZERO
};
let completed = (this.fraction_fn)(this.current, &item);
let ctx = RenderContext {
spinner: this.spinner_char,
elapsed,
show_elapsed: this.with_elapsed_time,
bar: &this.bar,
bar_width: this.bar_width,
progress: Some(completed),
label: None,
message: this.message.as_deref(),
spinner_style: owo_colors::Style::new(),
annotation_style: owo_colors::Style::new(),
};
this.render_buf.clear();
this.layout.render(&ctx, &mut this.render_buf);
let mut stdout = std::io::stdout().lock();
let _ = clear_line(&mut stdout);
let _ = stdout.queue(cursor::Hide);
let _ = stdout.write_all(this.render_buf.as_bytes());
let _ = stdout.flush();
}
Poll::Ready(Some(item))
}
Poll::Ready(None) => {
if this.guard.is_tty {
let mut stdout = std::io::stdout().lock();
let _ = clear_line(&mut stdout);
let _ = stdout.queue(cursor::Show);
let _ = stdout.flush();
}
Poll::Ready(None)
}
Poll::Pending => Poll::Pending,
}
}
}
pub trait StreamExt: Stream {
fn progress<'a, F>(
self,
theme: impl Into<Theme<'a>>,
fraction_fn: F,
) -> StreamProgressBuilder<'a, Self, F, Pending<&'static str>>
where
Self: Sized,
F: FnMut(usize, &Self::Item) -> f64 + Unpin,
{
let theme = theme.into();
let bar_width = theme.effective_bar_width();
StreamProgressBuilder {
inner: self,
bar: theme.bar,
bar_width,
ticks: theme.spinner.ticks(),
fraction_fn,
messages: stream::pending(),
current: 0,
spinner_char: None,
message: None,
with_elapsed_time: false,
start: None,
render_buf: String::new(),
layout: theme.layout,
guard: CursorGuard {
is_tty: std::io::stdout().is_terminal(),
},
}
}
}
impl<S> StreamExt for S where S: Stream {}