transfer_progress/
lib.rs

1#[cfg(feature = "bytesize")]
2use std::fmt;
3use std::{
4    io::{self, prelude::*},
5    sync::{
6        atomic::{AtomicBool, AtomicU64, Ordering},
7        Arc,
8    },
9    thread,
10    time::{Duration, Instant},
11};
12
13#[cfg(feature = "bytesize")]
14use bytesize::ByteSize;
15use progress_streams::ProgressReader;
16
17#[derive(Default)]
18struct TransferState {
19    transferred: AtomicU64,
20    complete: AtomicBool,
21}
22
23/// Monitors the progress of a transfer from a [reader][Read] to a [writer][Write].
24pub struct Transfer<R, W>
25where
26    R: Read + Send + 'static,
27    W: Write + Send + 'static,
28{
29    start_time: Instant,
30    state: Arc<TransferState>,
31    handle: thread::JoinHandle<io::Result<(R, W)>>,
32}
33
34impl<R, W> Transfer<R, W>
35where
36    R: Read + Send + 'static,
37    W: Write + Send + 'static,
38{
39    /// Creates and starts a new `Transfer`.
40    /// # Example
41    /// ```no_run
42    /// use transfer_progress::Transfer;
43    /// use std::fs::File;
44    /// let reader = File::open("file1.txt")?;
45    /// let writer = File::create("file2.txt")?;
46    /// let transfer = Transfer::new(reader, writer);
47    /// # Ok::<_, std::io::Error>(())
48    /// ```
49    pub fn new(reader: R, mut writer: W) -> Self {
50        let state = Arc::new(TransferState::default());
51        let state_clone = Arc::clone(&state);
52        let handle = thread::spawn(move || -> io::Result<(R, W)> {
53            let mut reader = ProgressReader::new(reader, |bytes| {
54                // If someone would like to confirm the correctness of the ordering guarantees, that would
55                // be much appreciated.
56                state_clone
57                    .transferred
58                    .fetch_add(bytes as u64, Ordering::Release);
59            });
60            // We need to store the result and bubble it later so we can set the complete flag.
61            let res = io::copy(&mut reader, &mut writer);
62            state_clone.complete.store(true, Ordering::Release);
63            res.map(|_| (reader.into_inner(), writer))
64        });
65        Self {
66            start_time: Instant::now(),
67            state,
68            handle,
69        }
70    }
71
72    /// Consumes the `Transfer`, blocking until the transfer is complete.
73    ///
74    /// If the transfer was successful, returns `Ok(reader, writer)`, otherwise returns
75    /// the error.
76    ///
77    /// If the transfer is already complete, returns immediately.
78    /// # Example
79    /// ```no_run
80    /// use transfer_progress::Transfer;
81    /// use std::fs::File;
82    /// let reader = File::open("file1.txt")?;
83    /// let writer = File::create("file2.txt")?;
84    /// let transfer = Transfer::new(reader, writer);
85    /// let (reader, writer) = transfer.finish()?;
86    /// # Ok::<_, std::io::Error>(())
87    /// ```
88    pub fn finish(self) -> io::Result<(R, W)> {
89        self.handle.join().unwrap()
90    }
91
92    /// Tests if the transfer is complete
93    /// # Example
94    /// ```no_run
95    /// use transfer_progress::Transfer;
96    /// use std::fs::File;
97    /// let reader = File::open("file1.txt")?;
98    /// let writer = File::create("file2.txt")?;
99    /// let transfer = Transfer::new(reader, writer);
100    /// while !transfer.is_complete() {
101    /// println!("Not complete yet");
102    /// std::thread::sleep(std::time::Duration::from_secs(1));
103    /// }
104    /// println!("Complete!");
105    /// # Ok::<_, std::io::Error>(())
106    /// ```
107    pub fn is_complete(&self) -> bool {
108        // If someone would like to confirm the correctness of the ordering guarantees, that would
109        // be much appreciated.
110        self.state.complete.load(Ordering::Acquire)
111    }
112
113    /// Returns the number of bytes transferred thus far between the reader and the writer.
114    /// # Example
115    /// ```no_run
116    /// use transfer_progress::Transfer;
117    /// use std::fs::File;
118    /// let reader = File::open("file1.txt")?;
119    /// let writer = File::create("file2.txt")?;
120    /// let transfer = Transfer::new(reader, writer);
121    /// while !transfer.is_complete() {
122    /// println!("{} bytes transferred so far", transfer.transferred());
123    /// std::thread::sleep(std::time::Duration::from_secs(1));
124    /// }
125    /// println!("Complete!");
126    /// # Ok::<_, std::io::Error>(())
127    /// ```
128    pub fn transferred(&self) -> u64 {
129        // If someone would like to confirm the correctness of the ordering guarantees, that would
130        // be much appreciated.
131        self.state.transferred.load(Ordering::Acquire)
132    }
133
134    /// Returns the elapsed time since the transfer started.
135    /// # Example
136    /// ```no_run
137    /// use transfer_progress::Transfer;
138    /// use std::fs::File;
139    /// let reader = File::open("file1.txt")?;
140    /// let writer = File::create("file2.txt")?;
141    /// let transfer = Transfer::new(reader, writer);
142    /// while !transfer.is_complete() {}
143    /// println!("Transfer took {:?}", transfer.running_time());
144    /// # Ok::<_, std::io::Error>(())
145    /// ```
146    pub fn running_time(&self) -> Duration {
147        self.start_time.elapsed()
148    }
149
150    /// Returns the average speed, in bytes per second, of the transfer.
151    /// # Example
152    /// ```no_run
153    /// use transfer_progress::Transfer;
154    /// use std::fs::File;
155    /// let reader = File::open("file1.txt")?;
156    /// let writer = File::create("file2.txt")?;
157    /// let transfer = Transfer::new(reader, writer);
158    /// while !transfer.is_complete() {
159    /// println!("{}B/s", transfer.speed());
160    /// std::thread::sleep(std::time::Duration::from_secs(1));
161    /// }
162    /// # Ok::<_, std::io::Error>(())
163    /// ```
164    pub fn speed(&self) -> u64 {
165        (self.transferred() as f64 / self.running_time().as_secs_f64()).round() as u64
166    }
167}
168
169#[cfg(feature = "bytesize")]
170impl<R, W> fmt::Debug for Transfer<R, W>
171where
172    R: Read + Send + 'static,
173    W: Write + Send + 'static,
174{
175    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176        let transferred = ByteSize::b(self.transferred());
177        let speed = ByteSize::b(self.speed());
178        if f.alternate() {
179            // Use SI units
180            write!(
181                f,
182                "{:#} ({:#}/s)",
183                transferred.to_string_as(true),
184                speed.to_string_as(true)
185            )
186        } else {
187            write!(f, "{:#} ({:#}/s)", transferred, speed)
188        }
189    }
190}
191
192#[cfg(feature = "bytesize")]
193impl<R, W> fmt::Display for Transfer<R, W>
194where
195    R: Read + Send + 'static,
196    W: Write + Send + 'static,
197{
198    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
199        fmt::Debug::fmt(self, f)
200    }
201}
202
203/// Monitors the progress of a transfer with a known size.
204pub struct SizedTransfer<R, W>
205where
206    R: Read + Send + 'static,
207    W: Write + Send + 'static,
208{
209    inner: Transfer<R, W>,
210    size: u64,
211}
212
213impl<R, W> SizedTransfer<R, W>
214where
215    R: Read + Send + 'static,
216    W: Write + Send + 'static,
217{
218    /// Creates and starts a new `SizedTransfer`.
219    /// # Example
220    /// ```no_run
221    /// use transfer_progress::SizedTransfer;
222    /// use std::fs::File;
223    /// use std::io::Read;
224    /// let reader = File::open("file1.txt")?.take(1024); // Bytes
225    /// let writer = File::create("file2.txt")?;
226    /// let transfer = SizedTransfer::new(reader, writer, 1024);
227    /// # Ok::<_, std::io::Error>(())
228    /// ```
229    pub fn new(reader: R, writer: W, size: u64) -> Self {
230        Self {
231            inner: Transfer::new(reader, writer),
232            size,
233        }
234    }
235
236    /// Returns the total size (in bytes) of the transfer, as specified when calling
237    /// [`new`][SizedTransfer::new].
238    /// # Example
239    /// ```no_run
240    /// use transfer_progress::SizedTransfer;
241    /// use std::fs::File;
242    /// use std::io::Read;
243    /// # fn main() -> std::io::Result<()> {
244    /// let reader = File::open("file1.txt")?.take(1024); // Bytes
245    /// let writer = File::create("file2.txt")?;
246    /// let transfer = SizedTransfer::new(reader, writer, 1024);
247    /// assert_eq!(transfer.size(), 1024);
248    /// # Ok(())
249    /// # }
250    /// ```
251    pub fn size(&self) -> u64 {
252        self.size
253    }
254
255    /// Returns the number of bytes remaining.
256    /// # Example
257    /// ```no_run
258    /// use transfer_progress::SizedTransfer;
259    /// use std::fs::File;
260    /// use std::io::Read;
261    /// let reader = File::open("file1.txt")?.take(1024); // Bytes
262    /// let writer = File::create("file2.txt")?;
263    /// let transfer = SizedTransfer::new(reader, writer, 1024);
264    /// while !transfer.is_complete() {
265    /// println!("{} bytes remaining", transfer.remaining());
266    /// std::thread::sleep(std::time::Duration::from_secs(1));
267    /// }
268    /// # Ok::<_, std::io::Error>(())
269    /// ```
270    pub fn remaining(&self) -> u64 {
271        self.size - self.inner.transferred()
272    }
273
274    /// Consumes the `SizedTransfer`, blocking until the transfer is complete.
275    ///
276    /// If the transfer was successful, returns `Ok(reader, writer)`, otherwise returns
277    /// the error.
278    ///
279    /// If the transfer is already complete, returns immediately.
280    /// # Example
281    /// ```no_run
282    /// use transfer_progress::SizedTransfer;
283    /// use std::fs::File;
284    /// use std::io::Read;
285    /// let reader = File::open("file1.txt")?.take(1024); // Bytes
286    /// let writer = File::create("file2.txt")?;
287    /// let transfer = SizedTransfer::new(reader, writer, 1024);
288    /// let (reader, writer) = transfer.finish()?;
289    /// # Ok::<_, std::io::Error>(())
290    /// ```
291    pub fn finish(self) -> io::Result<(R, W)> {
292        self.inner.finish()
293    }
294
295    /// Returns a fraction between 0.0 and 1.0 representing the state of the transfer.
296    /// # Example
297    /// ```no_run
298    /// use transfer_progress::SizedTransfer;
299    /// use std::fs::File;
300    /// use std::io::Read;
301    /// let reader = File::open("file1.txt")?.take(1024); // Bytes
302    /// let writer = File::create("file2.txt")?;
303    /// let transfer = SizedTransfer::new(reader, writer, 1024);
304    /// while !transfer.is_complete() {
305    /// println!("Transfer is {:.0}% complete", transfer.fraction_transferred() * 100.0);
306    /// std::thread::sleep(std::time::Duration::from_secs(1));
307    /// }
308    /// # Ok::<_, std::io::Error>(())
309    /// ```
310    pub fn fraction_transferred(&self) -> f64 {
311        self.transferred() as f64 / self.size as f64
312    }
313
314    /// Returns the approximate remaining time until this transfer completes. Returns `None` if
315    /// this cannot be calculated (I.E. no bytes have been transferred yet, so a speed cannot be
316    /// determined).
317    /// # Example
318    /// ```no_run
319    /// use transfer_progress::SizedTransfer;
320    /// use std::fs::File;
321    /// use std::io::Read;
322    /// let reader = File::open("file1.txt")?.take(1024); // Bytes
323    /// let writer = File::create("file2.txt")?;
324    /// let transfer = SizedTransfer::new(reader, writer, 1024);
325    /// while !transfer.is_complete() {
326    /// if let Some(eta) = transfer.eta() {
327    /// println!("Transfer will complete in approximately {:?}", eta);
328    /// } else {
329    /// println!("Transfer completion time is unknown");
330    /// }
331    /// std::thread::sleep(std::time::Duration::from_secs(1));
332    /// }
333    /// # Ok::<_, std::io::Error>(())
334    /// ```
335    pub fn eta(&self) -> Option<Duration> {
336        // Cache this so we don't have to perform an atomic access twice
337        let transferred = self.inner.transferred();
338        if transferred == 0 {
339            return None;
340        }
341        let remaining = self.size - transferred;
342        let elapsed = self.running_time().as_secs_f64();
343        let eta = (elapsed / transferred as f64) * remaining as f64;
344        Some(Duration::from_secs_f64(eta))
345    }
346}
347
348impl<R, W> std::ops::Deref for SizedTransfer<R, W>
349where
350    R: Read + Send + 'static,
351    W: Write + Send + 'static,
352{
353    type Target = Transfer<R, W>;
354
355    fn deref(&self) -> &Self::Target {
356        &self.inner
357    }
358}
359
360#[cfg(feature = "bytesize")]
361impl<R, W> fmt::Debug for SizedTransfer<R, W>
362where
363    R: Read + Send + 'static,
364    W: Write + Send + 'static,
365{
366    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
367        let percentage = self.fraction_transferred() * 100.0;
368        let transferred = ByteSize::b(self.transferred());
369        let size = ByteSize::b(self.size);
370        let speed = ByteSize::b(self.speed());
371        if f.alternate() {
372            write!(
373                f,
374                "{:.1} % ({} of {}, {}/s)",
375                percentage, transferred, size, speed
376            )
377        } else {
378            write!(
379                f,
380                "{:.1} % ({} of {}, {}/s)",
381                percentage,
382                transferred.to_string_as(true),
383                size.to_string_as(true),
384                speed.to_string_as(true)
385            )
386        }
387    }
388}
389
390#[cfg(feature = "bytesize")]
391impl<R, W> fmt::Display for SizedTransfer<R, W>
392where
393    R: Read + Send + 'static,
394    W: Write + Send + 'static,
395{
396    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
397        fmt::Debug::fmt(self, f)
398    }
399}