use std::io::Write;
use anyhow::Context;
use carlog::Status;
use console;
use indicatif::{
ProgressBar,
ProgressDrawTarget,
ProgressStyle,
};
use portable_pty::{
CommandBuilder,
PtySize,
native_pty_system,
};
pub struct Logger {
progress_bar: Option<ProgressBar>,
line_count: usize,
}
impl Logger {
pub fn new() -> Self {
Self {
progress_bar: None,
line_count: 0,
}
}
#[allow(dead_code)] pub fn progress(&mut self, message: &str) {
let pb = ProgressBar::new_spinner();
pb.set_draw_target(ProgressDrawTarget::stderr());
pb.set_style(
ProgressStyle::default_spinner()
.template("{spinner:.green} {msg}")
.unwrap(),
);
pb.set_message(message.to_string());
pb.enable_steady_tick(std::time::Duration::from_millis(100));
self.progress_bar = Some(pb);
}
#[allow(dead_code)] pub fn set_progress_message(&self, message: &str) {
if let Some(pb) = &self.progress_bar {
pb.set_message(message.to_string());
}
}
pub fn status(&mut self, action: &str, target: &str) {
if let Some(pb) = self.progress_bar.take() {
pb.finish_and_clear();
}
use console::style;
let formatted_message = format!("{:>12} {}", style(action).cyan().bold(), target);
let pb = ProgressBar::new_spinner();
pb.set_draw_target(ProgressDrawTarget::stderr());
pb.set_style(ProgressStyle::default_spinner().template("{msg}").unwrap());
pb.set_message(formatted_message);
self.progress_bar = Some(pb);
self.line_count = 1;
}
#[allow(dead_code)] pub fn status_permanent(&self, action: &str, target: &str) {
let status = Status::new()
.bold()
.justify()
.color(carlog::CargoColor::Green)
.status(action);
let formatted_target = format!(" {}", target);
if let Some(pb) = &self.progress_bar {
pb.suspend(|| {
let _ = status.print_stderr(&formatted_target);
});
} else {
let _ = status.print_stderr(&formatted_target);
}
}
#[allow(dead_code)] pub fn print_message(&self, msg: &str) {
if let Some(pb) = &self.progress_bar {
pb.suspend(|| {
eprintln!("{}", msg);
});
} else {
eprintln!("{}", msg);
}
}
#[allow(dead_code)] pub fn info(&self, action: &str, target: &str) {
let status = Status::new()
.bold()
.justify()
.color(carlog::CargoColor::Cyan)
.status(action);
let formatted_target = format!(" {}", target);
if let Some(pb) = &self.progress_bar {
pb.suspend(|| {
let _ = status.print_stderr(&formatted_target);
});
} else {
let _ = status.print_stderr(&formatted_target);
}
}
pub fn warning(&self, action: &str, target: &str) {
let status = Status::new()
.bold()
.justify()
.color(carlog::CargoColor::Yellow)
.status(action);
let formatted_target = format!(" {}", target);
if let Some(pb) = &self.progress_bar {
pb.suspend(|| {
let _ = status.print_stderr(&formatted_target);
});
} else {
let _ = status.print_stderr(&formatted_target);
}
}
#[allow(dead_code)] pub fn error(&self, action: &str, target: &str) {
let status = Status::new()
.bold()
.justify()
.color(carlog::CargoColor::Red)
.status(action);
let formatted_target = format!(" {}", target);
if let Some(pb) = &self.progress_bar {
pb.suspend(|| {
let _ = status.print_stderr(&formatted_target);
});
} else {
let _ = status.print_stderr(&formatted_target);
}
}
pub fn clear_status(&mut self) {
if let Some(pb) = self.progress_bar.take() {
pb.finish_and_clear();
self.line_count = 0;
}
}
pub fn suspend<F, R>(&mut self, f: F) -> R
where
F: FnOnce() -> R,
{
if let Some(pb) = &self.progress_bar {
pb.suspend(f)
} else {
f()
}
}
pub fn finish(&mut self) {
if let Some(pb) = self.progress_bar.take() {
pb.finish_and_clear();
self.line_count = 0;
}
}
}
#[derive(Debug, Clone)]
pub struct SubprocessOutput {
pub stdout: Vec<u8>,
pub stderr: Vec<u8>,
pub exit_code: u32,
}
impl SubprocessOutput {
pub fn stdout_str(&self) -> anyhow::Result<String> {
String::from_utf8(self.stdout.clone()).context("Failed to parse stdout as UTF-8")
}
pub fn stderr_str(&self) -> anyhow::Result<String> {
String::from_utf8(self.stderr.clone()).context("Failed to parse stderr as UTF-8")
}
pub fn success(&self) -> bool {
self.exit_code == 0
}
pub fn exit_code(&self) -> u32 {
self.exit_code
}
}
pub async fn run_subprocess<F>(
logger: &mut Logger,
cmd_builder: F,
stderr_lines: Option<usize>,
) -> anyhow::Result<SubprocessOutput>
where
F: FnOnce() -> CommandBuilder,
{
let stderr_lines = stderr_lines.unwrap_or(5);
let term = console::Term::stderr();
let is_term = term.is_term();
if is_term {
if let Some(pb) = logger.progress_bar.take() {
pb.finish_and_clear();
}
if logger.line_count > 0 {
let _ = term.clear_last_lines(logger.line_count);
logger.line_count = 0;
}
}
let stderr_lines_u16 = stderr_lines as u16;
let lines_drawn = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
let lines_drawn_render = lines_drawn.clone();
let cmd = cmd_builder();
let pty_system = native_pty_system();
let pty_size = PtySize {
rows: stderr_lines_u16,
cols: 80,
pixel_width: 0,
pixel_height: 0,
};
let pty = pty_system
.openpty(pty_size)
.context("Failed to create PTY")?;
let mut child = pty
.slave
.spawn_command(cmd)
.context("Failed to spawn command in PTY")?;
let mut reader = pty
.master
.try_clone_reader()
.context("Failed to clone PTY reader")?;
let master = pty.master;
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<Vec<u8>>();
let tx_clone = tx.clone();
let collected_output = std::sync::Arc::new(std::sync::Mutex::new(Vec::<u8>::new()));
let collected_output_clone = collected_output.clone();
#[allow(clippy::excessive_nesting)]
let pty_task = tokio::spawn(async move {
tokio::task::spawn_blocking(move || {
let mut full_output = Vec::new();
let mut buffer = vec![0u8; 4096];
loop {
match reader.read(&mut buffer) {
Ok(0) => break, Ok(bytes_read) => {
let chunk = &buffer[..bytes_read];
full_output.extend_from_slice(chunk);
if let Ok(mut collected) = collected_output_clone.lock() {
collected.extend_from_slice(chunk);
}
let _ = tx.send(chunk.to_vec());
}
Err(err) => {
let error_msg = format!("<pty read error: {}>", err);
let error_bytes = error_msg.as_bytes();
full_output.extend_from_slice(error_bytes);
if let Ok(mut collected) = collected_output_clone.lock() {
collected.extend_from_slice(error_bytes);
}
let _ = tx.send(error_bytes.to_vec());
break;
}
}
}
drop(tx);
Ok::<Vec<u8>, anyhow::Error>(full_output)
})
.await
.context("Failed to join blocking PTY read task")?
});
let mut output_buffer = Vec::new();
let mut output_ring: Vec<Vec<u8>> = Vec::with_capacity(stderr_lines);
#[allow(clippy::excessive_nesting)]
let render_task = tokio::spawn(async move {
let mut current_lines_displayed: usize = 0;
while let Some(chunk) = rx.recv().await {
output_buffer.extend_from_slice(&chunk);
let mut lines: Vec<Vec<u8>> = Vec::new();
let mut current_line = Vec::new();
for byte in output_buffer.iter().copied() {
current_line.push(byte);
if byte == b'\n' {
lines.push(current_line);
current_line = Vec::new();
}
}
output_buffer = current_line;
for line in lines {
output_ring.push(line);
if output_ring.len() > stderr_lines {
output_ring.remove(0);
}
}
if is_term && !output_ring.is_empty() {
let mut stderr_handle = std::io::stderr();
if current_lines_displayed > 0 {
write!(stderr_handle, "\x1b[{}A", current_lines_displayed).ok();
for _ in 0..current_lines_displayed {
write!(stderr_handle, "\x1b[2K\x1b[1B").ok(); }
write!(stderr_handle, "\x1b[{}A", current_lines_displayed).ok();
}
for line_bytes in &output_ring {
let _ = stderr_handle.write_all(line_bytes);
}
let _ = stderr_handle.flush();
current_lines_displayed = output_ring.len();
lines_drawn_render
.store(current_lines_displayed, std::sync::atomic::Ordering::SeqCst);
}
}
if !output_buffer.is_empty() {
output_ring.push(output_buffer);
if output_ring.len() > stderr_lines {
output_ring.remove(0);
}
if is_term {
let mut stderr_handle = std::io::stderr();
if current_lines_displayed > 0 {
write!(stderr_handle, "\x1b[{}A", current_lines_displayed).ok();
for _ in 0..current_lines_displayed {
write!(stderr_handle, "\x1b[2K\x1b[1B").ok();
}
write!(stderr_handle, "\x1b[{}A", current_lines_displayed).ok();
}
for line_bytes in &output_ring {
let _ = stderr_handle.write_all(line_bytes);
}
let _ = stderr_handle.flush();
lines_drawn_render.store(output_ring.len(), std::sync::atomic::Ordering::SeqCst);
}
}
(output_ring, is_term)
});
let status = tokio::task::spawn_blocking(move || child.wait())
.await
.context("Failed to join process wait task")?
.context("Failed to wait for subprocess")?;
drop(master);
#[cfg(windows)]
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let timeout_duration = if cfg!(windows) {
std::time::Duration::from_millis(500)
} else {
std::time::Duration::from_secs(10)
};
let pty_output = match tokio::time::timeout(timeout_duration, pty_task).await {
Ok(result) => {
result.context("Failed to join PTY task")??
}
Err(_) => {
drop(tx_clone);
collected_output.lock().unwrap().clone()
}
};
let render_timeout = if cfg!(windows) {
std::time::Duration::from_millis(500)
} else {
std::time::Duration::from_secs(5)
};
let (_final_output_ring, was_term) =
match tokio::time::timeout(render_timeout, render_task).await {
Ok(result) => result.context("Failed to join render task")?,
Err(_) => {
(Vec::new(), is_term)
}
};
let stdout_bytes = Vec::new(); let stderr_bytes = pty_output;
let exit_code = status.exit_code();
let final_lines_drawn = lines_drawn.load(std::sync::atomic::Ordering::SeqCst);
if was_term && final_lines_drawn > 0 {
let mut stderr_handle = std::io::stderr();
write!(stderr_handle, "\x1b[{}A", final_lines_drawn).ok();
for _ in 0..final_lines_drawn {
write!(stderr_handle, "\x1b[2K\x1b[1B").ok(); }
write!(stderr_handle, "\x1b[{}A", final_lines_drawn).ok();
let _ = stderr_handle.flush();
}
Ok(SubprocessOutput {
stdout: stdout_bytes,
stderr: stderr_bytes,
exit_code,
})
}
impl Default for Logger {
fn default() -> Self {
Self::new()
}
}
impl Drop for Logger {
fn drop(&mut self) {
if let Some(pb) = self.progress_bar.take() {
pb.finish_and_clear();
}
if self.line_count > 0 {
use console::Term;
let term = Term::stderr();
if term.is_term() {
let _ = term.clear_last_lines(self.line_count);
}
self.line_count = 0;
}
}
}
#[cfg(test)]
mod tests {
#[cfg(not(windows))]
use portable_pty::CommandBuilder;
use super::*;
#[tokio::test]
async fn test_logger_new() {
let logger = Logger::new();
assert!(logger.progress_bar.is_none());
assert_eq!(logger.line_count, 0);
}
#[tokio::test]
async fn test_logger_status() {
let mut logger = Logger::new();
logger.status("Building", "test-crate");
assert!(logger.progress_bar.is_some());
assert_eq!(logger.line_count, 1);
}
#[tokio::test]
async fn test_logger_clear_status() {
let mut logger = Logger::new();
logger.status("Building", "test-crate");
assert!(logger.progress_bar.is_some());
logger.clear_status();
assert!(logger.progress_bar.is_none());
assert_eq!(logger.line_count, 0);
}
#[tokio::test]
async fn test_logger_finish() {
let mut logger = Logger::new();
logger.status("Building", "test-crate");
logger.finish();
assert!(logger.progress_bar.is_none());
assert_eq!(logger.line_count, 0);
}
#[tokio::test]
async fn test_subprocess_output_success() {
let output = SubprocessOutput {
stdout: b"stdout content".to_vec(),
stderr: b"stderr content".to_vec(),
exit_code: 0,
};
assert!(output.success());
assert_eq!(output.exit_code(), 0);
assert_eq!(output.stdout_str().unwrap(), "stdout content");
assert_eq!(output.stderr_str().unwrap(), "stderr content");
}
#[tokio::test]
async fn test_subprocess_output_failure() {
let output = SubprocessOutput {
stdout: b"".to_vec(),
stderr: b"error message".to_vec(),
exit_code: 1,
};
assert!(!output.success());
assert_eq!(output.exit_code(), 1);
assert_eq!(output.stderr_str().unwrap(), "error message");
}
#[tokio::test]
#[cfg(not(windows))]
async fn test_run_subprocess_simple_success() {
let mut logger = Logger::new();
let output = run_subprocess(
&mut logger,
|| {
let mut cmd = CommandBuilder::new("echo");
cmd.arg("hello world");
cmd
},
Some(3),
)
.await
.unwrap();
assert!(output.success());
assert_eq!(output.exit_code(), 0);
let stderr = output.stderr_str().unwrap();
assert!(stderr.contains("hello world") || stderr.is_empty());
}
#[tokio::test]
#[cfg(not(windows))]
async fn test_run_subprocess_simple_failure() {
let mut logger = Logger::new();
let output = run_subprocess(&mut logger, || CommandBuilder::new("false"), Some(3))
.await
.unwrap();
assert!(!output.success());
assert_ne!(output.exit_code(), 0);
}
#[tokio::test]
#[cfg(not(windows))]
async fn test_run_subprocess_multiline_output() {
let mut logger = Logger::new();
let output = run_subprocess(
&mut logger,
|| {
let mut cmd = CommandBuilder::new("sh");
cmd.arg("-c");
cmd.arg("echo 'line 1'; echo 'line 2'; echo 'line 3'; echo 'line 4'; echo 'line 5'; echo 'line 6'");
cmd
},
Some(3), )
.await
.unwrap();
assert!(output.success());
let stderr = output.stderr_str().unwrap();
assert!(stderr.contains("line 1"));
assert!(stderr.contains("line 6"));
}
#[tokio::test]
#[cfg(not(windows))]
async fn test_run_subprocess_with_progress_bar() {
let mut logger = Logger::new();
logger.status("Preparing", "test");
assert!(logger.progress_bar.is_some());
let output = run_subprocess(
&mut logger,
|| {
let mut cmd = CommandBuilder::new("echo");
cmd.arg("test output");
cmd
},
None,
)
.await
.unwrap();
assert!(output.success());
}
#[tokio::test]
#[cfg(not(windows))]
async fn test_run_subprocess_exit_code_preservation() {
let mut logger = Logger::new();
let output = run_subprocess(
&mut logger,
|| {
let mut cmd = CommandBuilder::new("sh");
cmd.arg("-c");
cmd.arg("exit 42");
cmd
},
None,
)
.await
.unwrap();
assert!(!output.success());
assert_eq!(output.exit_code(), 42);
}
#[tokio::test]
#[cfg(not(windows))]
async fn test_run_subprocess_ansi_colors_preserved() {
let mut logger = Logger::new();
let output = run_subprocess(
&mut logger,
|| {
let mut cmd = CommandBuilder::new("sh");
cmd.arg("-c");
cmd.arg("echo -e '\\033[31mred\\033[0m'");
cmd
},
None,
)
.await
.unwrap();
assert!(output.success());
let stderr = output.stderr_str().unwrap();
assert!(stderr.contains("\x1b[31m") || stderr.contains("red"));
}
#[tokio::test]
#[cfg(not(windows))]
async fn test_run_subprocess_default_stderr_lines() {
let mut logger = Logger::new();
let output = run_subprocess(
&mut logger,
|| {
let mut cmd = CommandBuilder::new("echo");
cmd.arg("test");
cmd
},
None, )
.await
.unwrap();
assert!(output.success());
}
#[tokio::test]
#[cfg(not(windows))]
async fn test_run_subprocess_custom_stderr_lines() {
let mut logger = Logger::new();
let output = run_subprocess(
&mut logger,
|| {
let mut cmd = CommandBuilder::new("echo");
cmd.arg("test");
cmd
},
Some(10), )
.await
.unwrap();
assert!(output.success());
}
#[tokio::test]
#[cfg(not(windows))]
async fn test_run_subprocess_nonexistent_command() {
let mut logger = Logger::new();
let result = run_subprocess(
&mut logger,
|| CommandBuilder::new("nonexistent-command-xyz-123"),
None,
)
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_subprocess_output_utf8_handling() {
let output = SubprocessOutput {
stdout: "hello 世界".as_bytes().to_vec(),
stderr: "error 错误".as_bytes().to_vec(),
exit_code: 0,
};
assert_eq!(output.stdout_str().unwrap(), "hello 世界");
assert_eq!(output.stderr_str().unwrap(), "error 错误");
}
#[tokio::test]
async fn test_subprocess_output_invalid_utf8() {
let output = SubprocessOutput {
stdout: vec![0xFF, 0xFE, 0xFD], stderr: vec![],
exit_code: 0,
};
assert!(output.stdout_str().is_err());
}
#[tokio::test]
async fn test_logger_suspend() {
let mut logger = Logger::new();
logger.status("Building", "test");
let result = logger.suspend(|| 42);
assert_eq!(result, 42);
}
#[tokio::test]
async fn test_logger_suspend_without_progress() {
let mut logger = Logger::new();
let result = logger.suspend(|| 42);
assert_eq!(result, 42);
}
#[tokio::test]
async fn test_logger_status_permanent() {
let logger = Logger::new();
logger.status_permanent("Compiling", "test-crate");
}
#[tokio::test]
async fn test_logger_warning() {
let logger = Logger::new();
logger.warning("Warning", "test message");
}
#[tokio::test]
async fn test_logger_info() {
let logger = Logger::new();
logger.info("Info", "test message");
}
#[tokio::test]
async fn test_logger_error() {
let logger = Logger::new();
logger.error("Error", "test message");
}
#[tokio::test]
async fn test_logger_print_message() {
let logger = Logger::new();
logger.print_message("test message");
}
#[tokio::test]
async fn test_logger_progress() {
let mut logger = Logger::new();
logger.progress("Processing...");
assert!(logger.progress_bar.is_some());
}
#[tokio::test]
async fn test_logger_set_progress_message() {
let mut logger = Logger::new();
logger.progress("Initial");
logger.set_progress_message("Updated");
assert!(logger.progress_bar.is_some());
}
}