use std::fmt::Display;
use std::io::Write;
use std::pin::Pin;
use std::task::Poll;
use futures_lite::stream::Pending;
use futures_lite::{Stream, stream};
use crate::Theme;
use crate::bar::Bar;
use crate::spinner::Ticks;
use crate::term::clear_line;
pub struct StreamProgressBuilder<'a, S, F, M> {
inner: S,
bar: Bar<'a>,
bar_width: usize,
ticks: Ticks<'a>,
fraction_fn: F,
messages: M,
current: usize,
spinner_char: Option<char>,
message: Option<String>,
}
impl<'a, S, F, M> StreamProgressBuilder<'a, S, F, M> {
pub fn with_messages<S2>(self, messages: S2) -> StreamProgressBuilder<'a, S, F, S2>
where
S2: Stream + Unpin,
S2::Item: Display,
{
StreamProgressBuilder {
inner: self.inner,
bar: self.bar,
bar_width: self.bar_width,
ticks: self.ticks,
fraction_fn: self.fraction_fn,
messages,
current: self.current,
spinner_char: self.spinner_char,
message: self.message,
}
}
}
impl<S, F, M> Stream for StreamProgressBuilder<'_, S, F, M>
where
S: Stream + Unpin,
F: FnMut(usize, &S::Item) -> f64 + Unpin,
M: Stream + Unpin,
M::Item: Display,
{
type Item = S::Item;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if let Poll::Ready(spinner) = Pin::new(&mut this.ticks).poll_next(cx) {
this.spinner_char = spinner;
}
while let Poll::Ready(Some(msg)) = Pin::new(&mut this.messages).poll_next(cx) {
this.message = Some(msg.to_string());
}
match Pin::new(&mut this.inner).poll_next(cx) {
Poll::Ready(Some(item)) => {
this.current += 1;
let _ = clear_line(&mut std::io::stdout());
if let Some(spinner) = &this.spinner_char {
print!("{spinner} ");
}
let completed = (this.fraction_fn)(this.current, &item);
print!("{}", this.bar.render(this.bar_width, completed));
if let Some(message) = &this.message {
print!(" {message}");
}
std::io::stdout().flush().expect("flushing");
Poll::Ready(Some(item))
}
Poll::Ready(None) => {
let _ = clear_line(&mut std::io::stdout());
std::io::stdout().flush().expect("flushing");
Poll::Ready(None)
}
Poll::Pending => Poll::Pending,
}
}
}
pub trait StreamExt: Stream {
fn progress<'a, F>(
self,
theme: impl Into<Theme<'a>>,
fraction_fn: F,
) -> StreamProgressBuilder<'a, Self, F, Pending<&'static str>>
where
Self: Sized,
F: FnMut(usize, &Self::Item) -> f64 + Unpin,
{
let theme = theme.into();
let bar_width = theme.effective_bar_width();
StreamProgressBuilder {
inner: self,
bar: theme.bar,
bar_width,
ticks: theme.spinner.ticks(),
fraction_fn,
messages: stream::pending(),
current: 0,
spinner_char: None,
message: None,
}
}
}
impl<S> StreamExt for S where S: Stream {}