rusty-pv 0.1.0

Pipe viewer — a Rust port of Andrew Wood's `pv(1)` with progress bar, ETA, rate display, token-bucket rate limiting, IEC/SI unit math, SIGWINCH-aware terminal redraw, SIGUSR1 size refresh, multi-instance cursor coordination, and a typed library API.
Documentation
//! # rusty-pv
//!
//! A Rust port of Andrew Wood's `pv(1)` pipe viewer — show progress, elapsed
//! time, ETA, rate, and bytes transferred while data flows through a pipe.
//!
//! ## Quick start
//!
//! ```
//! use rusty_pv::{PvBuilder};
//! use std::io::Cursor;
//!
//! let pv = PvBuilder::new().total_bytes(1024).build();
//! let mut reader = Cursor::new(vec![0u8; 1024]);
//! let mut writer = Vec::new();
//! let n = pv.copy(&mut reader, &mut writer).unwrap();
//! assert_eq!(n, 1024);
//! ```
//!
//! ## Stability (lockstep SemVer)
//!
//! Library and binary share a single crate version. The EMA smoothing
//! constant α = 0.3 is **locked at v0.1.0** per [`ema::EMA_ALPHA`] — any
//! change is a MAJOR bump. The default display field order is
//! `[<name>:] <p> <t> <e/I> <r> <b> <a> <n>` regardless of CLI argv order.

#![deny(missing_docs)]

pub mod ema;
pub mod error;
pub mod throttle;
pub mod units;

#[cfg(feature = "cli")]
pub mod cursor;
#[cfg(all(feature = "cli", unix))]
pub mod signals;

pub use error::PvError;
pub use units::UnitSystem;

use std::io::{Read, Write};
use std::time::{Duration, Instant};

/// Per-tick progress snapshot delivered to [`Reporter::report`] (FR-045).
///
/// `#[non_exhaustive]` allows additive fields in SemVer-minor releases per
/// FR-056. Downstream code MUST NOT exhaustively match or construct via
/// struct-literal syntax outside the defining crate.
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct Progress {
    /// Bytes transferred so far.
    pub bytes_done: u64,
    /// Expected total bytes, if known.
    pub bytes_total: Option<u64>,
    /// Smoothed instantaneous rate in bytes/second (non-negative).
    pub rate: f64,
    /// Estimated time remaining (`None` when total is unknown OR rate is 0).
    pub eta: Option<Duration>,
    /// Monotonic wall-clock time since the copy loop began.
    pub elapsed: Duration,
}

/// Per-tick progress reporter (FR-044).
///
/// Implementations receive progress updates from the copy loop. The trait
/// requires `Send` so the renderer may move the callback onto a worker thread;
/// `Sync` is intentionally NOT required.
pub trait Reporter: Send {
    /// Report one progress snapshot.
    fn report(&mut self, progress: &Progress);
}

/// No-op reporter (default when no reporter is set on the builder).
#[derive(Debug, Default)]
pub struct NoopReporter;

impl Reporter for NoopReporter {
    fn report(&mut self, _progress: &Progress) {}
}

/// Configured pipe-viewer runner. Construct via [`PvBuilder`].
#[non_exhaustive]
pub struct Pv {
    total_bytes: Option<u64>,
    rate_limit: Option<u64>,
    buffer_size: usize,
    interval: Duration,
    name: Option<String>,
    reporter: Box<dyn Reporter>,
}

impl Pv {
    /// Stream `reader` → `writer` while reporting progress to the configured
    /// reporter. Returns the number of bytes transferred or an error (FR-046).
    ///
    /// # Errors
    ///
    /// Returns [`PvError::Io`] on any I/O failure during read or write.
    pub fn copy<R: Read + ?Sized, W: Write + ?Sized>(
        mut self,
        reader: &mut R,
        writer: &mut W,
    ) -> Result<u64, PvError> {
        let start = Instant::now();
        let mut buf = vec![0u8; self.buffer_size];
        let mut bytes_done: u64 = 0;
        let throttle = self.rate_limit.map(throttle::TokenBucket::new);
        let mut ema = ema::Ema::new();
        let mut last_tick = start;
        let mut last_bytes: u64 = 0;

        loop {
            // Throttle (before each read).
            if let Some(tb) = &throttle {
                tb.maybe_sleep(bytes_done);
            }

            let n = reader.read(&mut buf).map_err(PvError::from)?;
            if n == 0 {
                break;
            }
            writer.write_all(&buf[..n]).map_err(PvError::from)?;
            bytes_done += n as u64;

            // Display / Reporter tick.
            let now = Instant::now();
            if now.duration_since(last_tick) >= self.interval {
                let dt = now.duration_since(last_tick).as_secs_f64().max(1e-9);
                let dn = (bytes_done - last_bytes) as f64;
                let sample = dn / dt;
                let smoothed = ema.update(sample);
                let elapsed = now.duration_since(start);
                let eta = match (self.total_bytes, smoothed > 0.0) {
                    (Some(total), true) if total > bytes_done => Some(Duration::from_secs_f64(
                        (total - bytes_done) as f64 / smoothed,
                    )),
                    _ => None,
                };
                let progress = Progress {
                    bytes_done,
                    bytes_total: self.total_bytes,
                    rate: smoothed,
                    eta,
                    elapsed,
                };
                self.reporter.report(&progress);
                last_tick = now;
                last_bytes = bytes_done;
            }
        }

        // Final flush tick.
        let elapsed = start.elapsed();
        let avg_rate = if elapsed.as_secs_f64() > 0.0 {
            bytes_done as f64 / elapsed.as_secs_f64()
        } else {
            0.0
        };
        let progress = Progress {
            bytes_done,
            bytes_total: self.total_bytes,
            rate: avg_rate,
            eta: None,
            elapsed,
        };
        self.reporter.report(&progress);

        writer.flush().map_err(PvError::from)?;
        Ok(bytes_done)
    }

    /// Configured total-byte hint (`-s SIZE`).
    #[must_use]
    pub fn total_bytes(&self) -> Option<u64> {
        self.total_bytes
    }

    /// Configured rate limit in bytes/sec (`-L RATE`).
    #[must_use]
    pub fn rate_limit(&self) -> Option<u64> {
        self.rate_limit
    }

    /// Configured internal buffer size (`-B BYTES`).
    #[must_use]
    pub fn buffer_size(&self) -> usize {
        self.buffer_size
    }

    /// Configured display interval (`-i SEC`).
    #[must_use]
    pub fn interval(&self) -> Duration {
        self.interval
    }

    /// Configured name (`-N NAME`).
    #[must_use]
    pub fn name(&self) -> Option<&str> {
        self.name.as_deref()
    }
}

impl std::fmt::Debug for Pv {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("Pv")
            .field("total_bytes", &self.total_bytes)
            .field("rate_limit", &self.rate_limit)
            .field("buffer_size", &self.buffer_size)
            .field("interval", &self.interval)
            .field("name", &self.name)
            .field("reporter", &"<dyn Reporter>")
            .finish()
    }
}

/// Builder for [`Pv`] (FR-043, FR-055). All methods are independent and
/// order-agnostic; each setter is idempotent (last-write-wins). `build()` is
/// INFALLIBLE.
pub struct PvBuilder {
    total_bytes: Option<u64>,
    rate_limit: Option<u64>,
    buffer_size: usize,
    interval: Duration,
    name: Option<String>,
    reporter: Option<Box<dyn Reporter>>,
}

impl PvBuilder {
    /// Fresh builder with all defaults applied:
    /// - `total_bytes`: `None`
    /// - `rate_limit`: `None`
    /// - `buffer_size`: 1 MiB
    /// - `interval`: 1 second
    /// - `name`: `None`
    /// - `reporter`: `NoopReporter`
    #[must_use]
    pub fn new() -> Self {
        PvBuilder {
            total_bytes: None,
            rate_limit: None,
            buffer_size: 1 << 20,
            interval: Duration::from_secs(1),
            name: None,
            reporter: None,
        }
    }

    /// Expected total bytes (`-s SIZE`).
    #[must_use]
    pub fn total_bytes(mut self, n: u64) -> Self {
        self.total_bytes = Some(n);
        self
    }

    /// Rate limit in bytes/sec (`-L RATE`).
    #[must_use]
    pub fn rate_limit(mut self, bytes_per_sec: u64) -> Self {
        self.rate_limit = Some(bytes_per_sec);
        self
    }

    /// Internal copy-loop buffer size (`-B BYTES`).
    #[must_use]
    pub fn buffer_size(mut self, n: usize) -> Self {
        self.buffer_size = n;
        self
    }

    /// Display update interval (`-i SEC`).
    #[must_use]
    pub fn interval(mut self, d: Duration) -> Self {
        self.interval = d;
        self
    }

    /// Name prefix for the display line (`-N NAME`).
    #[must_use]
    pub fn name(mut self, name: impl Into<String>) -> Self {
        self.name = Some(name.into());
        self
    }

    /// Set a custom progress reporter.
    #[must_use]
    pub fn reporter(mut self, r: Box<dyn Reporter>) -> Self {
        self.reporter = Some(r);
        self
    }

    /// Build a configured [`Pv`]. INFALLIBLE.
    #[must_use]
    pub fn build(self) -> Pv {
        Pv {
            total_bytes: self.total_bytes,
            rate_limit: self.rate_limit,
            buffer_size: self.buffer_size,
            interval: self.interval,
            name: self.name,
            reporter: self.reporter.unwrap_or_else(|| Box::new(NoopReporter)),
        }
    }
}

impl Default for PvBuilder {
    fn default() -> Self {
        Self::new()
    }
}

impl std::fmt::Debug for PvBuilder {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("PvBuilder")
            .field("total_bytes", &self.total_bytes)
            .field("rate_limit", &self.rate_limit)
            .field("buffer_size", &self.buffer_size)
            .field("interval", &self.interval)
            .field("name", &self.name)
            .field(
                "reporter",
                &self.reporter.as_ref().map(|_| "<dyn Reporter>"),
            )
            .finish()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use static_assertions::assert_impl_all;

    assert_impl_all!(Pv: Send);
    assert_impl_all!(PvBuilder: Send);
    assert_impl_all!(Progress: Send, Sync);
    assert_impl_all!(PvError: Send, Sync);

    #[test]
    fn copy_through_in_memory_buffer() {
        let pv = PvBuilder::new().build();
        let src = vec![0xABu8; 4096];
        let mut reader = std::io::Cursor::new(src.clone());
        let mut writer = Vec::new();
        let n = pv.copy(&mut reader, &mut writer).unwrap();
        assert_eq!(n, src.len() as u64);
        assert_eq!(writer, src);
    }

    #[test]
    fn builder_setters_are_independent() {
        let p1 = PvBuilder::new()
            .total_bytes(1024)
            .rate_limit(500)
            .interval(Duration::from_millis(100))
            .name("a")
            .build();
        let p2 = PvBuilder::new()
            .name("a")
            .interval(Duration::from_millis(100))
            .rate_limit(500)
            .total_bytes(1024)
            .build();
        assert_eq!(p1.total_bytes(), p2.total_bytes());
        assert_eq!(p1.rate_limit(), p2.rate_limit());
        assert_eq!(p1.interval(), p2.interval());
        assert_eq!(p1.name(), p2.name());
    }

    #[test]
    fn custom_reporter_receives_updates() {
        struct Counter(std::sync::Arc<std::sync::atomic::AtomicUsize>);
        impl Reporter for Counter {
            fn report(&mut self, _progress: &Progress) {
                self.0.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
            }
        }
        let count = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
        let pv = PvBuilder::new()
            .interval(Duration::from_millis(1))
            .reporter(Box::new(Counter(count.clone())))
            .build();
        let mut reader = std::io::Cursor::new(vec![0u8; 65536]);
        let mut writer = Vec::new();
        let _ = pv.copy(&mut reader, &mut writer).unwrap();
        // At minimum the final flush tick fires.
        assert!(count.load(std::sync::atomic::Ordering::Relaxed) >= 1);
    }
}