use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use crate::utils::friendly_size;
pub struct ProgressTracker {
multi: Option<MultiProgress>,
file_progress_bar: Option<ProgressBar>, current_file_bar: Option<ProgressBar>, bytes_read: Arc<AtomicU64>,
current_file_bytes: Arc<AtomicU64>, current_file_size: Arc<AtomicU64>, iops: Arc<AtomicU64>,
start_time: Instant,
last_update: Arc<AtomicU64>,
last_bytes: Arc<AtomicU64>,
}
impl ProgressTracker {
pub fn new(total_files: u64, message: &str) -> Self {
let (multi, file_progress_bar, current_file_bar) = if total_files > 0 {
let multi = MultiProgress::new();
let file_pb = multi.add(ProgressBar::new(total_files));
file_pb.set_style(
ProgressStyle::default_bar()
.template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {pos}/{len} ({percent}%) {msg}")
.unwrap()
.progress_chars("#>-"),
);
file_pb.set_message(message.to_string());
let current_pb = multi.add(ProgressBar::new(0));
current_pb.set_style(
ProgressStyle::default_bar()
.template("{spinner:.yellow} [{elapsed_precise}] [{wide_bar:.yellow/red}] {bytes}/{total_bytes} ({percent}%) [{bytes_per_sec}] ETA: {eta_precise}")
.unwrap()
.progress_chars("=>-"),
);
current_pb.set_length(0);
(Some(multi), Some(file_pb), Some(current_pb))
} else {
(None, None, None)
};
Self {
multi,
file_progress_bar,
current_file_bar,
bytes_read: Arc::new(AtomicU64::new(0)),
current_file_bytes: Arc::new(AtomicU64::new(0)),
current_file_size: Arc::new(AtomicU64::new(0)),
iops: Arc::new(AtomicU64::new(0)),
start_time: Instant::now(),
last_update: Arc::new(AtomicU64::new(0)),
last_bytes: Arc::new(AtomicU64::new(0)),
}
}
pub fn new_single_file(file_size: u64, file_name: &str) -> Self {
let multi = MultiProgress::new();
let current_pb = multi.add(ProgressBar::new(file_size));
current_pb.set_style(
ProgressStyle::default_bar()
.template("{spinner:.yellow} [{elapsed_precise}] [{wide_bar:.yellow/red}] {bytes}/{total_bytes} ({percent}%) [{bytes_per_sec}] ETA: {eta_precise}")
.unwrap()
.progress_chars("=>-"),
);
current_pb.set_message(format!("处理: {}", file_name));
Self {
multi: Some(multi),
file_progress_bar: None,
current_file_bar: Some(current_pb),
bytes_read: Arc::new(AtomicU64::new(0)),
current_file_bytes: Arc::new(AtomicU64::new(0)),
current_file_size: Arc::new(AtomicU64::new(file_size)),
iops: Arc::new(AtomicU64::new(0)),
start_time: Instant::now(),
last_update: Arc::new(AtomicU64::new(0)),
last_bytes: Arc::new(AtomicU64::new(0)),
}
}
pub fn start_file(&self, file_size: u64, file_name: &str) {
if let Some(pb) = &self.current_file_bar {
pb.set_length(file_size);
pb.set_position(0);
pb.set_message(format!("处理: {}", file_name));
self.current_file_size.store(file_size, Ordering::Relaxed);
self.current_file_bytes.store(0, Ordering::Relaxed);
}
}
pub fn finish_current_file(&self) {
if let Some(pb) = &self.current_file_bar {
if let Some(_file_pb) = &self.file_progress_bar {
pb.set_length(0);
} else {
pb.finish();
}
}
}
pub fn finish_file(&self) {
self.finish_current_file();
if let Some(pb) = &self.file_progress_bar {
pb.inc(1);
let files_processed = pb.position();
let now = self.start_time.elapsed().as_millis() as u64;
let last_update = self.last_update.load(Ordering::Relaxed);
if files_processed % 10 == 0 || now.saturating_sub(last_update) > 500 {
self.update_message();
self.last_update.store(now, Ordering::Relaxed);
}
}
}
fn update_message(&self) {
if let Some(pb) = &self.file_progress_bar {
let elapsed = self.start_time.elapsed().as_secs_f64();
let total_bytes = self.bytes_read.load(Ordering::Relaxed);
let total_ops = self.iops.load(Ordering::Relaxed);
if total_bytes > 0 && elapsed > 0.0 {
let speed_bytes_per_sec = total_bytes as f64 / elapsed;
let speed_str = friendly_size(speed_bytes_per_sec as u64);
let iops = total_ops as f64 / elapsed;
let files_processed = pb.position();
let files_total = pb.length().unwrap_or(0);
let files_remaining = files_total.saturating_sub(files_processed);
let avg_time_per_file = if files_processed > 0 {
elapsed / files_processed as f64
} else {
0.0
};
let eta_seconds = avg_time_per_file * files_remaining as f64;
let eta_str = if eta_seconds > 0.0 {
format!("ETA: {:.0}s", eta_seconds)
} else {
String::new()
};
pb.set_message(format!(
"IO速度: {}/s | IOPS: {:.0} | {}",
speed_str, iops, eta_str
));
}
}
}
pub fn finish(&self, message: &str) {
if let Some(pb) = &self.file_progress_bar {
pb.finish_with_message(message.to_string());
}
if let Some(pb) = &self.current_file_bar {
if self.file_progress_bar.is_none() {
pb.finish_with_message(message.to_string());
} else {
pb.set_length(0);
}
}
}
pub fn bytes_callback(&self) -> impl FnMut(u64) {
let bytes_read = self.bytes_read.clone();
let current_file_bytes = self.current_file_bytes.clone();
let last_bytes = self.last_bytes.clone();
let last_update = self.last_update.clone();
let start_time = self.start_time;
let file_progress_bar = self.file_progress_bar.clone();
let current_file_bar = self.current_file_bar.clone();
let bytes_read_clone = self.bytes_read.clone();
let iops = self.iops.clone();
move |bytes| {
bytes_read.fetch_add(bytes, Ordering::Relaxed);
current_file_bytes.fetch_add(bytes, Ordering::Relaxed);
if let Some(pb) = ¤t_file_bar {
let current_bytes = current_file_bytes.load(Ordering::Relaxed);
pb.set_position(current_bytes);
}
let total_bytes = bytes_read.load(Ordering::Relaxed);
let last_bytes_value = last_bytes.load(Ordering::Relaxed);
let now = start_time.elapsed().as_millis() as u64;
let last_update_value = last_update.load(Ordering::Relaxed);
let bytes_diff = total_bytes.saturating_sub(last_bytes_value);
let time_diff = now.saturating_sub(last_update_value);
const UPDATE_BYTES_THRESHOLD: u64 = 16 * 1024 * 1024; const UPDATE_TIME_THRESHOLD: u64 = 200;
if bytes_diff >= UPDATE_BYTES_THRESHOLD || time_diff >= UPDATE_TIME_THRESHOLD {
if let Some(pb) = &file_progress_bar {
let elapsed = start_time.elapsed().as_secs_f64();
let total_bytes = bytes_read_clone.load(Ordering::Relaxed);
let total_ops = iops.load(Ordering::Relaxed);
if total_bytes > 0 && elapsed > 0.0 {
let speed_bytes_per_sec = total_bytes as f64 / elapsed;
let speed_str = friendly_size(speed_bytes_per_sec as u64);
let iops_value = total_ops as f64 / elapsed;
let files_processed = pb.position();
let files_total = pb.length().unwrap_or(0);
let files_remaining = files_total.saturating_sub(files_processed);
let avg_time_per_file = if files_processed > 0 {
elapsed / files_processed as f64
} else {
0.0
};
let eta_seconds = avg_time_per_file * files_remaining as f64;
let eta_str = if eta_seconds > 0.0 {
format!("ETA: {:.0}s", eta_seconds)
} else {
String::new()
};
pb.set_message(format!(
"IO速度: {}/s | IOPS: {:.0} | {}",
speed_str, iops_value, eta_str
));
}
}
last_bytes.store(total_bytes, Ordering::Relaxed);
last_update.store(now, Ordering::Relaxed);
}
}
}
pub fn iop_callback(&self) -> impl FnMut() {
let iops = self.iops.clone();
move || {
iops.fetch_add(1, Ordering::Relaxed);
}
}
pub fn multi(&self) -> Option<&MultiProgress> {
self.multi.as_ref()
}
}