#![deny(missing_docs)]
pub mod ema;
pub mod error;
pub mod throttle;
pub mod units;
#[cfg(feature = "cli")]
pub mod cursor;
#[cfg(all(feature = "cli", unix))]
pub mod signals;
pub use error::PvError;
pub use units::UnitSystem;
use std::io::{Read, Write};
use std::time::{Duration, Instant};
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct Progress {
pub bytes_done: u64,
pub bytes_total: Option<u64>,
pub rate: f64,
pub eta: Option<Duration>,
pub elapsed: Duration,
}
pub trait Reporter: Send {
fn report(&mut self, progress: &Progress);
}
#[derive(Debug, Default)]
pub struct NoopReporter;
impl Reporter for NoopReporter {
fn report(&mut self, _progress: &Progress) {}
}
#[non_exhaustive]
pub struct Pv {
total_bytes: Option<u64>,
rate_limit: Option<u64>,
buffer_size: usize,
interval: Duration,
name: Option<String>,
reporter: Box<dyn Reporter>,
}
impl Pv {
pub fn copy<R: Read + ?Sized, W: Write + ?Sized>(
mut self,
reader: &mut R,
writer: &mut W,
) -> Result<u64, PvError> {
let start = Instant::now();
let mut buf = vec![0u8; self.buffer_size];
let mut bytes_done: u64 = 0;
let throttle = self.rate_limit.map(throttle::TokenBucket::new);
let mut ema = ema::Ema::new();
let mut last_tick = start;
let mut last_bytes: u64 = 0;
loop {
if let Some(tb) = &throttle {
tb.maybe_sleep(bytes_done);
}
let n = reader.read(&mut buf).map_err(PvError::from)?;
if n == 0 {
break;
}
writer.write_all(&buf[..n]).map_err(PvError::from)?;
bytes_done += n as u64;
let now = Instant::now();
if now.duration_since(last_tick) >= self.interval {
let dt = now.duration_since(last_tick).as_secs_f64().max(1e-9);
let dn = (bytes_done - last_bytes) as f64;
let sample = dn / dt;
let smoothed = ema.update(sample);
let elapsed = now.duration_since(start);
let eta = match (self.total_bytes, smoothed > 0.0) {
(Some(total), true) if total > bytes_done => Some(Duration::from_secs_f64(
(total - bytes_done) as f64 / smoothed,
)),
_ => None,
};
let progress = Progress {
bytes_done,
bytes_total: self.total_bytes,
rate: smoothed,
eta,
elapsed,
};
self.reporter.report(&progress);
last_tick = now;
last_bytes = bytes_done;
}
}
let elapsed = start.elapsed();
let avg_rate = if elapsed.as_secs_f64() > 0.0 {
bytes_done as f64 / elapsed.as_secs_f64()
} else {
0.0
};
let progress = Progress {
bytes_done,
bytes_total: self.total_bytes,
rate: avg_rate,
eta: None,
elapsed,
};
self.reporter.report(&progress);
writer.flush().map_err(PvError::from)?;
Ok(bytes_done)
}
#[must_use]
pub fn total_bytes(&self) -> Option<u64> {
self.total_bytes
}
#[must_use]
pub fn rate_limit(&self) -> Option<u64> {
self.rate_limit
}
#[must_use]
pub fn buffer_size(&self) -> usize {
self.buffer_size
}
#[must_use]
pub fn interval(&self) -> Duration {
self.interval
}
#[must_use]
pub fn name(&self) -> Option<&str> {
self.name.as_deref()
}
}
impl std::fmt::Debug for Pv {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Pv")
.field("total_bytes", &self.total_bytes)
.field("rate_limit", &self.rate_limit)
.field("buffer_size", &self.buffer_size)
.field("interval", &self.interval)
.field("name", &self.name)
.field("reporter", &"<dyn Reporter>")
.finish()
}
}
pub struct PvBuilder {
total_bytes: Option<u64>,
rate_limit: Option<u64>,
buffer_size: usize,
interval: Duration,
name: Option<String>,
reporter: Option<Box<dyn Reporter>>,
}
impl PvBuilder {
#[must_use]
pub fn new() -> Self {
PvBuilder {
total_bytes: None,
rate_limit: None,
buffer_size: 1 << 20,
interval: Duration::from_secs(1),
name: None,
reporter: None,
}
}
#[must_use]
pub fn total_bytes(mut self, n: u64) -> Self {
self.total_bytes = Some(n);
self
}
#[must_use]
pub fn rate_limit(mut self, bytes_per_sec: u64) -> Self {
self.rate_limit = Some(bytes_per_sec);
self
}
#[must_use]
pub fn buffer_size(mut self, n: usize) -> Self {
self.buffer_size = n;
self
}
#[must_use]
pub fn interval(mut self, d: Duration) -> Self {
self.interval = d;
self
}
#[must_use]
pub fn name(mut self, name: impl Into<String>) -> Self {
self.name = Some(name.into());
self
}
#[must_use]
pub fn reporter(mut self, r: Box<dyn Reporter>) -> Self {
self.reporter = Some(r);
self
}
#[must_use]
pub fn build(self) -> Pv {
Pv {
total_bytes: self.total_bytes,
rate_limit: self.rate_limit,
buffer_size: self.buffer_size,
interval: self.interval,
name: self.name,
reporter: self.reporter.unwrap_or_else(|| Box::new(NoopReporter)),
}
}
}
impl Default for PvBuilder {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Debug for PvBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PvBuilder")
.field("total_bytes", &self.total_bytes)
.field("rate_limit", &self.rate_limit)
.field("buffer_size", &self.buffer_size)
.field("interval", &self.interval)
.field("name", &self.name)
.field(
"reporter",
&self.reporter.as_ref().map(|_| "<dyn Reporter>"),
)
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use static_assertions::assert_impl_all;
assert_impl_all!(Pv: Send);
assert_impl_all!(PvBuilder: Send);
assert_impl_all!(Progress: Send, Sync);
assert_impl_all!(PvError: Send, Sync);
#[test]
fn copy_through_in_memory_buffer() {
let pv = PvBuilder::new().build();
let src = vec![0xABu8; 4096];
let mut reader = std::io::Cursor::new(src.clone());
let mut writer = Vec::new();
let n = pv.copy(&mut reader, &mut writer).unwrap();
assert_eq!(n, src.len() as u64);
assert_eq!(writer, src);
}
#[test]
fn builder_setters_are_independent() {
let p1 = PvBuilder::new()
.total_bytes(1024)
.rate_limit(500)
.interval(Duration::from_millis(100))
.name("a")
.build();
let p2 = PvBuilder::new()
.name("a")
.interval(Duration::from_millis(100))
.rate_limit(500)
.total_bytes(1024)
.build();
assert_eq!(p1.total_bytes(), p2.total_bytes());
assert_eq!(p1.rate_limit(), p2.rate_limit());
assert_eq!(p1.interval(), p2.interval());
assert_eq!(p1.name(), p2.name());
}
#[test]
fn custom_reporter_receives_updates() {
struct Counter(std::sync::Arc<std::sync::atomic::AtomicUsize>);
impl Reporter for Counter {
fn report(&mut self, _progress: &Progress) {
self.0.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
let count = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
let pv = PvBuilder::new()
.interval(Duration::from_millis(1))
.reporter(Box::new(Counter(count.clone())))
.build();
let mut reader = std::io::Cursor::new(vec![0u8; 65536]);
let mut writer = Vec::new();
let _ = pv.copy(&mut reader, &mut writer).unwrap();
assert!(count.load(std::sync::atomic::Ordering::Relaxed) >= 1);
}
}