use std::io::Write;
use std::pin::Pin;
use std::task::Poll;
use futures_lite::{Stream, stream};
use crate::bar::Bar;
use crate::term::clear_line;
pub use crate::style::ProgressStyle;
pub struct Progress<'a, S, F, T, M> {
inner: S,
bar: Bar<'a>,
bar_width: usize,
progress_fn: F,
ticks: T,
messages: M,
current: usize,
spinner: Option<char>,
message: Option<String>,
}
impl<'a, S, F, T, M, D> Stream for Progress<'a, S, F, T, M>
where
S: Stream + Unpin,
F: FnMut(usize, &S::Item) -> f64 + Unpin,
T: Stream<Item = char> + Unpin,
M: Stream<Item = D> + Unpin,
D: std::fmt::Display,
{
type Item = S::Item;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.get_mut();
let inner = Pin::new(&mut this.inner);
let ticks = Pin::new(&mut this.ticks);
let messages = Pin::new(&mut this.messages);
if let Poll::Ready(spinner) = ticks.poll_next(cx) {
this.spinner = spinner;
}
if let Poll::Ready(Some(message)) = messages.poll_next(cx) {
this.message = Some(message.to_string());
}
match inner.poll_next(cx) {
Poll::Ready(Some(item)) => {
this.current += 1;
let _ = clear_line(&mut std::io::stdout());
if let Some(spinner) = &this.spinner {
print!("{spinner} ");
}
let completed = (this.progress_fn)(this.current, &item);
print!("{}", this.bar.render(this.bar_width, completed));
if let Some(message) = &this.message {
print!(" {message}");
}
std::io::stdout().flush().expect("flushing");
Poll::Ready(Some(item))
}
Poll::Ready(None) => {
let _ = clear_line(&mut std::io::stdout());
std::io::stdout().flush().expect("flushing");
Poll::Ready(None)
}
Poll::Pending => Poll::Pending,
}
}
}
pub trait StreamExt<'a, F>: Stream {
fn progress(
self,
progress: ProgressStyle<'a>,
progress_fn: F,
) -> Progress<
'a,
Self,
F,
impl Stream<Item = char> + use<'a, F, Self>,
impl Stream<Item = impl std::fmt::Display>,
>
where
Self: Sized,
F: FnMut(usize, &Self::Item) -> f64 + Unpin,
{
let bar_width = progress.effective_bar_width();
Progress {
inner: self,
progress_fn,
bar: progress.bar,
bar_width,
ticks: progress.spinner.ticks(),
messages: stream::pending::<&'static str>(),
current: 0,
spinner: None,
message: None,
}
}
fn progress_with_messages(
self,
progress: ProgressStyle<'a>,
progress_fn: F,
messages: impl Stream<Item = impl std::fmt::Display>,
) -> Progress<'a, Self, F, impl Stream<Item = char>, impl Stream<Item = impl std::fmt::Display>>
where
Self: Sized,
F: FnMut(usize, &Self::Item) -> f64 + Unpin,
{
let bar_width = progress.effective_bar_width();
Progress {
inner: self,
progress_fn,
bar: progress.bar,
bar_width,
ticks: progress.spinner.ticks(),
messages,
current: 0,
spinner: None,
message: None,
}
}
}
impl<'a, S, F> StreamExt<'a, F> for S where S: Stream {}