use std::fmt::Display;
use std::future::Future;
use std::io::IsTerminal;
use std::task::{Context, Poll};
use std::time::Instant;
use std::{io::Write, pin::Pin};
use crossterm::{cursor, QueueableCommand};
use futures_lite::{FutureExt as _, Stream, StreamExt as _};
use futures_util::stream::FuturesUnordered;
use crate::bar::Bar;
use crate::layout::{Layout, RenderContext};
use crate::spinner::Ticks;
use crate::term::{clear_line, reset};
use crate::Theme;
struct Annotated<F> {
inner: F,
id: usize,
}
impl<F> Annotated<F>
where
F: Future,
{
fn new(inner: F, id: usize) -> Self {
Self { inner, id }
}
}
impl<F> Future for Annotated<F>
where
F: Future + Unpin,
{
type Output = (F::Output, usize);
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = self.get_mut();
let id = this.id;
match this.inner.poll(cx) {
Poll::Ready(output) => Poll::Ready((output, id)),
Poll::Pending => Poll::Pending,
}
}
}
pub struct Task<'a, F> {
fut: F,
label: Option<String>,
messages: Option<Box<dyn Stream<Item = String> + Unpin + 'a>>,
progress: Option<Box<dyn Stream<Item = f64> + Unpin + 'a>>,
}
impl<F> From<F> for Task<'_, F>
where
F: Future,
{
fn from(fut: F) -> Self {
Self {
fut,
label: None,
messages: None,
progress: None,
}
}
}
impl<'a, F> Task<'a, F> {
pub fn with_label(mut self, label: impl Into<String>) -> Self {
self.label = Some(label.into());
self
}
pub fn with_messages<S, D>(mut self, messages: S) -> Self
where
S: Stream<Item = D> + Unpin + 'a,
D: Display + 'a,
{
self.messages = Some(Box::new(messages.map(|d| d.to_string())));
self
}
pub fn with_progress<S>(mut self, progress: S) -> Self
where
S: Stream<Item = f64> + Unpin + 'a,
{
self.progress = Some(Box::new(progress));
self
}
}
struct TaskState<'a> {
label: Option<String>,
message: Option<String>,
messages: Option<Box<dyn Stream<Item = String> + Unpin + 'a>>,
progress: Option<f64>,
progress_stream: Option<Box<dyn Stream<Item = f64> + Unpin + 'a>>,
}
pub struct Group<'a, F> {
inner: FuturesUnordered<Annotated<F>>,
ticks: Ticks<'a>,
tasks: Vec<Option<TaskState<'a>>>,
annotation_style: owo_colors::Style,
spinner: Option<char>,
spinner_style: owo_colors::Style,
bar: Bar<'a>,
bar_width: usize,
with_elapsed_time: bool,
start: Option<Instant>,
dirty: bool,
rendered_lines: usize,
render_buf: String,
layout: Layout,
is_tty: bool,
}
impl<'a, F> Group<'a, F>
where
F: Future,
{
pub fn new<S: Into<Theme<'a>>>(theme: S) -> Self {
let theme = theme.into();
let bar_width = theme.effective_bar_width();
Self {
inner: FuturesUnordered::new(),
ticks: theme.spinner.ticks(),
tasks: Vec::new(),
annotation_style: owo_colors::Style::new(),
spinner: None,
spinner_style: owo_colors::Style::new(),
bar: theme.bar,
bar_width,
with_elapsed_time: false,
start: None,
dirty: true,
rendered_lines: 0,
render_buf: String::new(),
layout: theme.layout,
is_tty: std::io::stdout().is_terminal(),
}
}
pub fn with_spinner_style(mut self, spinner_style: owo_colors::Style) -> Self {
self.spinner_style = spinner_style;
self
}
pub fn with_annotation_style(mut self, annotation_style: owo_colors::Style) -> Self {
self.annotation_style = annotation_style;
self
}
pub fn with_elapsed_time(mut self) -> Self {
self.with_elapsed_time = true;
self
}
pub fn push(&mut self, task: impl Into<Task<'a, F>>) {
let task = task.into();
let id = self.tasks.len();
self.tasks.push(Some(TaskState {
label: task.label,
message: None,
messages: task.messages,
progress: None,
progress_stream: task.progress,
}));
self.inner.push(Annotated::new(task.fut, id));
}
}
impl<F> Stream for Group<'_, F>
where
F: Future + Unpin,
{
type Item = F::Output;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
let inner = Pin::new(&mut this.inner);
let ticks = Pin::new(&mut this.ticks);
let elapsed = this.start.get_or_insert_with(Instant::now).elapsed();
if let Poll::Ready(spinner) = ticks.poll_next(cx) {
this.spinner = spinner;
this.dirty = true;
}
for task in this.tasks.iter_mut().flatten() {
if let Some(messages) = task.messages.as_mut() {
if let Poll::Ready(Some(msg)) = Pin::new(messages).poll_next(cx) {
task.message = Some(msg);
this.dirty = true;
}
}
if let Some(progress_stream) = task.progress_stream.as_mut() {
while let Poll::Ready(Some(p)) = Pin::new(&mut *progress_stream).poll_next(cx) {
task.progress = Some(p.clamp(0.0, 1.0));
this.dirty = true;
}
}
}
let item = match inner.poll_next(cx) {
Poll::Ready(Some((output, id))) => {
this.tasks[id] = None;
this.dirty = true;
Poll::Ready(Some(output))
}
Poll::Ready(None) => {
let _ = reset();
Poll::Ready(None)
}
Poll::Pending => Poll::Pending,
};
if this.is_tty && this.dirty && !matches!(item, Poll::Ready(None)) {
this.dirty = false;
let mut stdout = std::io::stdout().lock();
let _ = stdout.queue(cursor::Hide);
let active_count = this.tasks.iter().filter(|t| t.is_some()).count();
for task in this.tasks.iter().flatten() {
let _ = clear_line(&mut stdout);
let ctx = RenderContext {
spinner: this.spinner,
elapsed,
show_elapsed: this.with_elapsed_time,
bar: &this.bar,
bar_width: this.bar_width,
progress: task.progress,
label: task.label.as_deref(),
message: task.message.as_deref(),
spinner_style: this.spinner_style,
annotation_style: this.annotation_style,
};
this.render_buf.clear();
this.layout.render(&ctx, &mut this.render_buf);
let _ = stdout.write_all(this.render_buf.as_bytes());
let _ = stdout.write_all(b"\n");
}
let stale_lines = this.rendered_lines.saturating_sub(active_count);
for _ in 0..stale_lines {
let _ = clear_line(&mut stdout);
let _ = stdout.write_all(b"\n");
}
this.rendered_lines = active_count;
let total_advanced = active_count + stale_lines;
if total_advanced > 0 {
let _ = stdout.queue(cursor::MoveUp(total_advanced as u16));
}
let _ = stdout.flush();
}
item
}
}