use std::fmt::Display;
use std::future::Future;
use std::task::{Context, Poll};
use std::time::Instant;
use std::{io::Write, pin::Pin};
use crossterm::{QueueableCommand, cursor};
use futures_lite::{FutureExt as _, Stream, StreamExt as _};
use futures_util::stream::FuturesUnordered;
use owo_colors::OwoColorize;
use crate::Theme;
use crate::bar::Bar;
use crate::spinner::Ticks;
use crate::term::{clear_line, reset};
struct Annotated<F> {
inner: F,
id: usize,
}
impl<F> Annotated<F>
where
F: Future,
{
fn new(inner: F, id: usize) -> Self {
Self { inner, id }
}
}
impl<F> Future for Annotated<F>
where
F: Future + Unpin,
{
type Output = (F::Output, usize);
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = self.get_mut();
let id = this.id;
match this.inner.poll(cx) {
Poll::Ready(output) => Poll::Ready((output, id)),
Poll::Pending => Poll::Pending,
}
}
}
pub struct Task<'a, F> {
fut: F,
label: Option<String>,
messages: Option<Box<dyn Stream<Item = String> + Unpin + 'a>>,
progress: Option<Box<dyn Stream<Item = f64> + Unpin + 'a>>,
}
impl<'a, F> From<F> for Task<'a, F>
where
F: Future,
{
fn from(fut: F) -> Self {
Self {
fut,
label: None,
messages: None,
progress: None,
}
}
}
impl<'a, F> Task<'a, F> {
pub fn with_label(mut self, label: impl Into<String>) -> Self {
self.label = Some(label.into());
self
}
pub fn with_messages<S, D>(mut self, messages: S) -> Self
where
S: Stream<Item = D> + Unpin + 'a,
D: Display + 'a,
{
self.messages = Some(Box::new(messages.map(|d| d.to_string())));
self
}
pub fn with_progress<S>(mut self, progress: S) -> Self
where
S: Stream<Item = f64> + Unpin + 'a,
{
self.progress = Some(Box::new(progress));
self
}
}
struct TaskState<'a> {
label: Option<String>,
message: Option<String>,
messages: Option<Box<dyn Stream<Item = String> + Unpin + 'a>>,
progress: Option<f64>,
progress_stream: Option<Box<dyn Stream<Item = f64> + Unpin + 'a>>,
}
pub struct Group<'a, F> {
inner: FuturesUnordered<Annotated<F>>,
ticks: Ticks<'a>,
tasks: Vec<Option<TaskState<'a>>>,
annotation_style: owo_colors::Style,
spinner: Option<char>,
spinner_style: owo_colors::Style,
bar: Bar<'a>,
bar_width: usize,
with_elapsed_time: bool,
start: Option<Instant>,
dirty: bool,
rendered_lines: usize,
}
impl<'a, F> Group<'a, F>
where
F: Future,
{
pub fn new<S: Into<Theme<'a>>>(theme: S) -> Self {
let theme = theme.into();
let bar_width = theme.effective_bar_width();
Self {
inner: FuturesUnordered::new(),
ticks: theme.spinner.ticks(),
tasks: Vec::new(),
annotation_style: owo_colors::Style::new(),
spinner: None,
spinner_style: owo_colors::Style::new(),
bar: theme.bar,
bar_width,
with_elapsed_time: false,
start: None,
dirty: true,
rendered_lines: 0,
}
}
pub fn with_spinner_style(mut self, spinner_style: owo_colors::Style) -> Self {
self.spinner_style = spinner_style;
self
}
pub fn with_annotation_style(mut self, annotation_style: owo_colors::Style) -> Self {
self.annotation_style = annotation_style;
self
}
pub fn with_elapsed_time(mut self, with_elapsed_time: bool) -> Self {
self.with_elapsed_time = with_elapsed_time;
self
}
pub fn push(&mut self, task: impl Into<Task<'a, F>>) {
let task = task.into();
let id = self.tasks.len();
self.tasks.push(Some(TaskState {
label: task.label,
message: None,
messages: task.messages,
progress: None,
progress_stream: task.progress,
}));
self.inner.push(Annotated::new(task.fut, id));
}
}
impl<F> Stream for Group<'_, F>
where
F: Future + Unpin,
{
type Item = F::Output;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
let inner = Pin::new(&mut this.inner);
let ticks = Pin::new(&mut this.ticks);
let elapsed = this.start.get_or_insert_with(Instant::now).elapsed();
if let Poll::Ready(spinner) = ticks.poll_next(cx) {
this.spinner = spinner;
this.dirty = true;
}
for task in this.tasks.iter_mut().flatten() {
if let Some(messages) = task.messages.as_mut()
&& let Poll::Ready(Some(msg)) = Pin::new(messages).poll_next(cx)
{
task.message = Some(msg);
this.dirty = true;
}
if let Some(progress_stream) = task.progress_stream.as_mut() {
while let Poll::Ready(Some(p)) = Pin::new(&mut *progress_stream).poll_next(cx) {
task.progress = Some(p.clamp(0.0, 1.0));
this.dirty = true;
}
}
}
let item = match inner.poll_next(cx) {
Poll::Ready(Some((output, id))) => {
this.tasks[id] = None;
this.dirty = true;
Poll::Ready(Some(output))
}
Poll::Ready(None) => {
let _ = reset();
Poll::Ready(None)
}
Poll::Pending => Poll::Pending,
};
if this.dirty && !matches!(item, Poll::Ready(None)) {
this.dirty = false;
let mut stdout = std::io::stdout();
let _ = stdout.queue(cursor::Hide);
let active_count = this.tasks.iter().filter(|t| t.is_some()).count();
for task in this.tasks.iter().flatten() {
let _ = clear_line(&mut stdout);
if let Some(spinner) = &this.spinner {
print!("{} ", spinner.style(this.spinner_style));
}
if this.with_elapsed_time {
print!("[{:.2}s] ", elapsed.as_secs_f64());
}
if let Some(label) = &task.label {
print!("{} ", label.style(this.annotation_style));
}
if let Some(progress) = task.progress {
let bar = this.bar.render(this.bar_width, progress);
if !bar.is_empty() {
print!("{bar} ");
}
}
if let Some(message) = &task.message {
println!("{message}");
} else {
println!();
}
}
let stale_lines = this.rendered_lines.saturating_sub(active_count);
for _ in 0..stale_lines {
let _ = clear_line(&mut stdout);
println!();
}
this.rendered_lines = active_count;
let total_advanced = active_count + stale_lines;
if total_advanced > 0 {
let _ = stdout.queue(cursor::MoveUp(total_advanced as u16));
}
let _ = stdout.flush();
}
item
}
}