mod ci;
mod render;
use ci::{CiState, format_duration};
use clx::progress::{
ProgressJob, ProgressJobBuilder, ProgressJobDoneBehavior, ProgressOutput, ProgressStatus,
};
use clx::style;
use std::collections::HashMap;
use std::io::{IsTerminal, Write};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, OnceLock, Weak};
use std::time::{Duration, Instant};
const TTY_MAX_VISIBLE_FETCH_ROWS: usize = 5;
fn overflow_fetch_label(count: usize) -> String {
let word = pluralizer::pluralize("package", count as isize, false);
format!("{count} more {word}…")
}
pub(crate) fn aube_prefix_line(msg: &str) -> String {
format!(
"{} {} {} {} {msg}",
style::emagenta("aube").bold(),
style::edim(crate::version::VERSION.as_str()),
style::edim("by en.dev"),
style::edim("·"),
)
}
pub struct InstallProgress {
mode: Mode,
unpacked_sizes: Arc<Mutex<HashMap<String, u64>>>,
}
#[derive(Clone)]
enum Mode {
Tty {
root: Arc<ProgressJob>,
total: Arc<AtomicUsize>,
reused: Arc<AtomicUsize>,
downloaded: Arc<AtomicUsize>,
phase_num: Arc<AtomicUsize>,
downloaded_bytes: Arc<AtomicU64>,
estimated_bytes: Arc<AtomicU64>,
fetch_start: Arc<OnceLock<Instant>>,
completed_at_fetch_start: Arc<AtomicUsize>,
fetch_state: Arc<Mutex<FetchState>>,
},
Ci(Arc<CiState>),
}
struct FetchState {
visible: usize,
overflow: usize,
overflow_row: Option<Arc<ProgressJob>>,
}
impl Clone for InstallProgress {
fn clone(&self) -> Self {
if let Mode::Ci(s) = &self.mode {
s.alive.fetch_add(1, Ordering::Relaxed);
}
Self {
mode: self.mode.clone(),
unpacked_sizes: self.unpacked_sizes.clone(),
}
}
}
impl InstallProgress {
pub fn try_new() -> Option<Self> {
if clx::progress::output() == ProgressOutput::Text {
return None;
}
if std::io::stderr().is_terminal() && !is_ci::cached() {
Some(Self::new_tty())
} else {
Some(Self::new_ci())
}
}
fn new_tty() -> Self {
let header = format!(
"{} {} {}",
style::emagenta("aube").bold(),
style::edim(crate::version::VERSION.as_str()),
style::edim("by en.dev"),
);
let root = ProgressJobBuilder::new()
.body(
"{{aube}}{{phase}} {{progress_bar(flex=true)}} {{cur}}/{{total}}{{bytes}}{{rate}}{{eta}}",
)
.body_text(Some(
"{{aube}}{{phase}} {{cur}}/{{total}}{{bytes}}{{rate}}{{eta}}",
))
.prop("aube", &header)
.prop("phase", "")
.prop("bytes", "")
.prop("rate", "")
.prop("eta", "")
.progress_current(0)
.progress_total(0)
.on_done(ProgressJobDoneBehavior::Hide)
.start();
Self {
mode: Mode::Tty {
root,
total: Arc::new(AtomicUsize::new(0)),
reused: Arc::new(AtomicUsize::new(0)),
downloaded: Arc::new(AtomicUsize::new(0)),
phase_num: Arc::new(AtomicUsize::new(0)),
downloaded_bytes: Arc::new(AtomicU64::new(0)),
estimated_bytes: Arc::new(AtomicU64::new(0)),
fetch_start: Arc::new(OnceLock::new()),
completed_at_fetch_start: Arc::new(AtomicUsize::new(usize::MAX)),
fetch_state: Arc::new(Mutex::new(FetchState {
visible: 0,
overflow: 0,
overflow_row: None,
})),
},
unpacked_sizes: Arc::new(Mutex::new(HashMap::new())),
}
}
fn new_ci() -> Self {
let state = Arc::new(CiState::new());
CiState::spawn_heartbeat(&state);
Self {
mode: Mode::Ci(state),
unpacked_sizes: Arc::new(Mutex::new(HashMap::new())),
}
}
pub fn set_total(&self, total: usize) {
match &self.mode {
Mode::Tty { root, total: t, .. } => {
t.store(total, Ordering::Relaxed);
root.progress_total(total);
self.refresh_eta();
}
Mode::Ci(s) => {
s.resolved.store(total, Ordering::Relaxed);
}
}
}
pub fn inc_total(&self, n: usize) {
match &self.mode {
Mode::Tty { root, total, .. } => {
let new_total = total.fetch_add(n, Ordering::Relaxed) + n;
root.progress_total(new_total);
self.refresh_eta();
}
Mode::Ci(s) => {
s.resolved.fetch_add(n, Ordering::Relaxed);
}
}
}
pub fn inc_estimated_bytes(&self, dep_path: &str, bytes: u64) {
let prior = self
.unpacked_sizes
.lock()
.unwrap()
.insert(dep_path.to_string(), bytes)
.unwrap_or(0);
match &self.mode {
Mode::Tty {
estimated_bytes, ..
} => {
if prior > 0 {
estimated_bytes.fetch_sub(prior, Ordering::Relaxed);
}
estimated_bytes.fetch_add(bytes, Ordering::Relaxed);
self.refresh_bytes_segment();
}
Mode::Ci(s) => {
if prior > 0 {
s.estimated_bytes.fetch_sub(prior, Ordering::Relaxed);
}
s.estimated_bytes.fetch_add(bytes, Ordering::Relaxed);
}
}
}
pub fn reconcile_estimated_bytes<I, S>(&self, surviving_dep_paths: I)
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
let map = self.unpacked_sizes.lock().unwrap();
let sum: u64 = surviving_dep_paths
.into_iter()
.filter_map(|k| map.get(k.as_ref()).copied())
.sum();
drop(map);
match &self.mode {
Mode::Tty {
estimated_bytes, ..
} => {
estimated_bytes.store(sum, Ordering::Relaxed);
self.refresh_bytes_segment();
}
Mode::Ci(s) => {
s.estimated_bytes.store(sum, Ordering::Relaxed);
}
}
}
pub fn set_phase(&self, phase: &str) {
match &self.mode {
Mode::Tty {
root,
phase_num,
fetch_start,
reused,
downloaded,
completed_at_fetch_start,
..
} => {
if phase.is_empty() {
root.prop("phase", "");
} else {
let colored_phase = match phase {
"resolving" => style::eyellow(phase).to_string(),
"linking" => style::ecyan(phase).to_string(),
_ => style::edim(phase).to_string(),
};
root.prop("phase", &format!(" {} {}", style::edim("—"), colored_phase));
}
let n = match phase {
"resolving" => 1,
"fetching" => 2,
"linking" => 3,
_ => 0,
};
phase_num.store(n, Ordering::Relaxed);
if n == 2 {
let _ = fetch_start.set(Instant::now());
let baseline =
reused.load(Ordering::Relaxed) + downloaded.load(Ordering::Relaxed);
let _ = completed_at_fetch_start.compare_exchange(
usize::MAX,
baseline,
Ordering::Relaxed,
Ordering::Relaxed,
);
} else if n == 3 {
root.prop("rate", "");
root.prop("eta", "");
}
self.refresh_bytes_segment();
self.refresh_rate();
self.refresh_eta();
}
Mode::Ci(s) => s.set_phase(phase),
}
}
pub fn inc_reused(&self, n: usize) {
match &self.mode {
Mode::Tty { root, reused, .. } => {
reused.fetch_add(n, Ordering::Relaxed);
root.increment(n);
self.refresh_eta();
}
Mode::Ci(s) => {
s.reused.fetch_add(n, Ordering::Relaxed);
}
}
}
pub fn inc_downloaded_bytes(&self, bytes: u64) {
match &self.mode {
Mode::Tty {
downloaded_bytes, ..
} => {
downloaded_bytes.fetch_add(bytes, Ordering::Relaxed);
self.refresh_bytes_segment();
self.refresh_rate();
self.refresh_eta();
}
Mode::Ci(s) => {
s.downloaded_bytes.fetch_add(bytes, Ordering::Relaxed);
}
}
}
fn refresh_bytes_segment(&self) {
let Mode::Tty {
root,
downloaded_bytes,
estimated_bytes,
phase_num,
..
} = &self.mode
else {
return;
};
let bytes = downloaded_bytes.load(Ordering::Relaxed);
let estimated_unpacked = estimated_bytes.load(Ordering::Relaxed);
let estimated = render::estimated_download_bytes(estimated_unpacked);
let phase = phase_num.load(Ordering::Relaxed);
if phase < 2 || (bytes == 0 && estimated == 0) {
root.prop("bytes", "");
return;
}
let segment = if estimated > bytes && bytes > 0 {
format!(
" · {} {} {}",
style::ebold(render::format_bytes(bytes)),
style::edim("/"),
style::edim(format!("~{}", render::format_bytes(estimated))),
)
} else if bytes > 0 {
format!(" · {}", style::ebold(render::format_bytes(bytes)))
} else {
format!(
" · {}",
style::edim(format!("~{}", render::format_bytes(estimated)))
)
};
root.prop("bytes", &segment);
}
fn refresh_rate(&self) {
let Mode::Tty {
root,
phase_num,
downloaded_bytes,
fetch_start,
..
} = &self.mode
else {
return;
};
if phase_num.load(Ordering::Relaxed) != 2 {
root.prop("rate", "");
return;
}
let bytes = downloaded_bytes.load(Ordering::Relaxed);
let Some(start) = fetch_start.get() else {
return;
};
let elapsed_ms = start.elapsed().as_millis() as u64;
if bytes == 0 || elapsed_ms == 0 {
root.prop("rate", "");
return;
}
let rate = bytes.saturating_mul(1000) / elapsed_ms;
root.prop(
"rate",
&format!(
" · {}",
style::edim(format!("{}/s", render::format_bytes(rate)))
),
);
}
fn refresh_eta(&self) {
let Mode::Tty {
root,
total,
reused,
downloaded,
phase_num,
fetch_start,
completed_at_fetch_start,
..
} = &self.mode
else {
return;
};
let phase = phase_num.load(Ordering::Relaxed);
if phase == 0 || phase == 3 {
root.prop("eta", "");
return;
}
let total_n = total.load(Ordering::Relaxed);
let completed =
(reused.load(Ordering::Relaxed) + downloaded.load(Ordering::Relaxed)).min(total_n);
let baseline = completed_at_fetch_start.load(Ordering::Relaxed);
let placeholder = || root.prop("eta", &format!(" · {}", style::edim("ETA …")));
if completed >= total_n || total_n == 0 || baseline == usize::MAX {
placeholder();
return;
}
let Some(start) = fetch_start.get() else {
placeholder();
return;
};
let fetch_elapsed_ms = start.elapsed().as_millis() as u64;
let fetch_completed = completed.saturating_sub(baseline);
if fetch_completed == 0 || fetch_elapsed_ms == 0 {
placeholder();
return;
}
let remaining = total_n - completed;
let eta_ms = fetch_elapsed_ms.saturating_mul(remaining as u64) / fetch_completed as u64;
let eta_str = format_duration(Duration::from_millis(eta_ms));
root.prop(
"eta",
&format!(" · {}", style::edim(format!("ETA {eta_str}"))),
);
}
pub fn start_fetch(&self, name: &str, version: &str) -> FetchRow {
match &self.mode {
Mode::Tty {
root,
fetch_state,
downloaded,
..
} => {
let mut st = fetch_state.lock().unwrap();
if st.visible < TTY_MAX_VISIBLE_FETCH_ROWS {
st.visible += 1;
drop(st);
let child = ProgressJobBuilder::new()
.body(" {{spinner()}} {{label | flex}}")
.body_text(None::<String>)
.prop("label", &format!("{name}@{version}"))
.status(ProgressStatus::Running)
.on_done(ProgressJobDoneBehavior::Hide)
.build();
let child = root.add(child);
return FetchRow {
inner: FetchRowInner::Tty {
child,
root: Arc::downgrade(root),
fetch_state: Arc::downgrade(fetch_state),
downloaded: Arc::downgrade(downloaded),
visible: true,
},
completed: false,
};
}
st.overflow += 1;
if st.overflow_row.is_none() {
let row = ProgressJobBuilder::new()
.body(" {{spinner()}} {{label | flex}}")
.body_text(None::<String>)
.prop("label", &overflow_fetch_label(st.overflow))
.status(ProgressStatus::Running)
.on_done(ProgressJobDoneBehavior::Hide)
.build();
st.overflow_row = Some(root.add(row));
} else if let Some(row) = &st.overflow_row {
row.prop("label", &overflow_fetch_label(st.overflow));
}
FetchRow {
inner: FetchRowInner::Tty {
child: st.overflow_row.as_ref().unwrap().clone(),
root: Arc::downgrade(root),
fetch_state: Arc::downgrade(fetch_state),
downloaded: Arc::downgrade(downloaded),
visible: false,
},
completed: false,
}
}
Mode::Ci(s) => FetchRow {
inner: FetchRowInner::Ci(Arc::downgrade(s)),
completed: false,
},
}
}
pub fn finish(&self, print_ci_summary: bool) {
match &self.mode {
Mode::Tty { root, .. } => {
root.set_status(ProgressStatus::Done);
clx::progress::stop_clear();
}
Mode::Ci(s) => s.stop(print_ci_summary),
}
}
pub fn print_install_summary(
&self,
linked: usize,
top_level_linked: usize,
total_packages: usize,
elapsed: Duration,
) {
if linked == 0 && top_level_linked == 0 {
let body = if total_packages == 0 {
"Already up to date".to_string()
} else {
format!(
"Already up to date ({})",
pluralizer::pluralize("package", total_packages as isize, true)
)
};
let msg = format!("{} {}", style::egreen("✓").bold(), style::ebold(&body));
let line = aube_prefix_line(&msg);
let _ = writeln!(std::io::stderr(), "{line}");
return;
}
if linked == 0 {
return;
}
let needs_summary = match &self.mode {
Mode::Tty { .. } => true,
Mode::Ci(s) => !s.shown.load(Ordering::Relaxed),
};
if !needs_summary {
return;
}
let msg = format!(
"{} installed {} in {}",
style::egreen("✓").bold(),
style::ebold(pluralizer::pluralize("package", linked as isize, true)),
style::edim(format_duration(elapsed)),
);
let line = aube_prefix_line(&msg);
let _ = writeln!(std::io::stderr(), "{line}");
}
}
impl Drop for InstallProgress {
fn drop(&mut self) {
match &self.mode {
Mode::Tty { root, .. } => {
if Arc::strong_count(root) == 1 {
root.set_status(ProgressStatus::Done);
clx::progress::stop_clear();
}
}
Mode::Ci(s) => {
if s.alive.fetch_sub(1, Ordering::Relaxed) == 1 {
s.stop(false);
}
}
}
}
}
pub struct FetchRow {
inner: FetchRowInner,
completed: bool,
}
enum FetchRowInner {
Tty {
child: Arc<ProgressJob>,
root: Weak<ProgressJob>,
fetch_state: Weak<Mutex<FetchState>>,
downloaded: Weak<AtomicUsize>,
visible: bool,
},
Ci(Weak<CiState>),
}
impl FetchRow {
fn finish_inner(&mut self) {
if self.completed {
return;
}
self.completed = true;
match &self.inner {
FetchRowInner::Tty {
child,
root,
fetch_state,
downloaded,
visible,
} => {
if let Some(root) = root.upgrade() {
root.increment(1);
}
if let Some(d) = downloaded.upgrade() {
d.fetch_add(1, Ordering::Relaxed);
}
if *visible {
child.set_status(ProgressStatus::Done);
if let Some(st) = fetch_state.upgrade() {
let mut st = st.lock().unwrap();
if st.visible > 0 {
st.visible -= 1;
}
}
} else if let Some(st) = fetch_state.upgrade() {
let mut st = st.lock().unwrap();
if st.overflow > 0 {
st.overflow -= 1;
}
if st.overflow == 0 {
if let Some(row) = st.overflow_row.take() {
row.set_status(ProgressStatus::Done);
}
} else if let Some(row) = &st.overflow_row {
row.prop("label", &overflow_fetch_label(st.overflow));
}
}
}
FetchRowInner::Ci(weak) => {
if let Some(s) = weak.upgrade() {
s.downloaded.fetch_add(1, Ordering::Relaxed);
}
}
}
}
}
impl Drop for FetchRow {
fn drop(&mut self) {
self.finish_inner();
}
}
pub fn safe_eprintln(msg: &str) {
use std::io::Write;
let was_paused = clx::progress::is_paused();
if !was_paused {
clx::progress::pause();
}
let _: () = clx::progress::with_terminal_lock(|| {
let mut stderr = std::io::stderr().lock();
let _ = writeln!(stderr, "{msg}");
let _ = stderr.flush();
});
if !was_paused {
clx::progress::resume();
}
}
#[derive(Clone, Copy, Default)]
pub struct PausingWriter;
impl<'a> tracing_subscriber::fmt::MakeWriter<'a> for PausingWriter {
type Writer = PausingWriterGuard;
fn make_writer(&'a self) -> Self::Writer {
PausingWriterGuard { buf: Vec::new() }
}
}
pub struct PausingWriterGuard {
buf: Vec<u8>,
}
impl Write for PausingWriterGuard {
fn write(&mut self, data: &[u8]) -> std::io::Result<usize> {
self.buf.extend_from_slice(data);
Ok(data.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
impl Drop for PausingWriterGuard {
fn drop(&mut self) {
if self.buf.is_empty() {
return;
}
let buf = std::mem::take(&mut self.buf);
let was_paused = clx::progress::is_paused();
if !was_paused {
clx::progress::pause();
}
let _: () = clx::progress::with_terminal_lock(|| {
let mut stderr = std::io::stderr().lock();
let _ = stderr.write_all(&buf);
let _ = stderr.flush();
});
if !was_paused {
clx::progress::resume();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn overflow_fetch_label_pluralizes_count() {
assert_eq!(overflow_fetch_label(1), "1 more package…");
assert_eq!(overflow_fetch_label(2), "2 more packages…");
}
}