transfer_progress/lib.rs
1#[cfg(feature = "bytesize")]
2use std::fmt;
3use std::{
4 io::{self, prelude::*},
5 sync::{
6 atomic::{AtomicBool, AtomicU64, Ordering},
7 Arc,
8 },
9 thread,
10 time::{Duration, Instant},
11};
12
13#[cfg(feature = "bytesize")]
14use bytesize::ByteSize;
15use progress_streams::ProgressReader;
16
17#[derive(Default)]
18struct TransferState {
19 transferred: AtomicU64,
20 complete: AtomicBool,
21}
22
23/// Monitors the progress of a transfer from a [reader][Read] to a [writer][Write].
24pub struct Transfer<R, W>
25where
26 R: Read + Send + 'static,
27 W: Write + Send + 'static,
28{
29 start_time: Instant,
30 state: Arc<TransferState>,
31 handle: thread::JoinHandle<io::Result<(R, W)>>,
32}
33
34impl<R, W> Transfer<R, W>
35where
36 R: Read + Send + 'static,
37 W: Write + Send + 'static,
38{
39 /// Creates and starts a new `Transfer`.
40 /// # Example
41 /// ```no_run
42 /// use transfer_progress::Transfer;
43 /// use std::fs::File;
44 /// let reader = File::open("file1.txt")?;
45 /// let writer = File::create("file2.txt")?;
46 /// let transfer = Transfer::new(reader, writer);
47 /// # Ok::<_, std::io::Error>(())
48 /// ```
49 pub fn new(reader: R, mut writer: W) -> Self {
50 let state = Arc::new(TransferState::default());
51 let state_clone = Arc::clone(&state);
52 let handle = thread::spawn(move || -> io::Result<(R, W)> {
53 let mut reader = ProgressReader::new(reader, |bytes| {
54 // If someone would like to confirm the correctness of the ordering guarantees, that would
55 // be much appreciated.
56 state_clone
57 .transferred
58 .fetch_add(bytes as u64, Ordering::Release);
59 });
60 // We need to store the result and bubble it later so we can set the complete flag.
61 let res = io::copy(&mut reader, &mut writer);
62 state_clone.complete.store(true, Ordering::Release);
63 res.map(|_| (reader.into_inner(), writer))
64 });
65 Self {
66 start_time: Instant::now(),
67 state,
68 handle,
69 }
70 }
71
72 /// Consumes the `Transfer`, blocking until the transfer is complete.
73 ///
74 /// If the transfer was successful, returns `Ok(reader, writer)`, otherwise returns
75 /// the error.
76 ///
77 /// If the transfer is already complete, returns immediately.
78 /// # Example
79 /// ```no_run
80 /// use transfer_progress::Transfer;
81 /// use std::fs::File;
82 /// let reader = File::open("file1.txt")?;
83 /// let writer = File::create("file2.txt")?;
84 /// let transfer = Transfer::new(reader, writer);
85 /// let (reader, writer) = transfer.finish()?;
86 /// # Ok::<_, std::io::Error>(())
87 /// ```
88 pub fn finish(self) -> io::Result<(R, W)> {
89 self.handle.join().unwrap()
90 }
91
92 /// Tests if the transfer is complete
93 /// # Example
94 /// ```no_run
95 /// use transfer_progress::Transfer;
96 /// use std::fs::File;
97 /// let reader = File::open("file1.txt")?;
98 /// let writer = File::create("file2.txt")?;
99 /// let transfer = Transfer::new(reader, writer);
100 /// while !transfer.is_complete() {
101 /// println!("Not complete yet");
102 /// std::thread::sleep(std::time::Duration::from_secs(1));
103 /// }
104 /// println!("Complete!");
105 /// # Ok::<_, std::io::Error>(())
106 /// ```
107 pub fn is_complete(&self) -> bool {
108 // If someone would like to confirm the correctness of the ordering guarantees, that would
109 // be much appreciated.
110 self.state.complete.load(Ordering::Acquire)
111 }
112
113 /// Returns the number of bytes transferred thus far between the reader and the writer.
114 /// # Example
115 /// ```no_run
116 /// use transfer_progress::Transfer;
117 /// use std::fs::File;
118 /// let reader = File::open("file1.txt")?;
119 /// let writer = File::create("file2.txt")?;
120 /// let transfer = Transfer::new(reader, writer);
121 /// while !transfer.is_complete() {
122 /// println!("{} bytes transferred so far", transfer.transferred());
123 /// std::thread::sleep(std::time::Duration::from_secs(1));
124 /// }
125 /// println!("Complete!");
126 /// # Ok::<_, std::io::Error>(())
127 /// ```
128 pub fn transferred(&self) -> u64 {
129 // If someone would like to confirm the correctness of the ordering guarantees, that would
130 // be much appreciated.
131 self.state.transferred.load(Ordering::Acquire)
132 }
133
134 /// Returns the elapsed time since the transfer started.
135 /// # Example
136 /// ```no_run
137 /// use transfer_progress::Transfer;
138 /// use std::fs::File;
139 /// let reader = File::open("file1.txt")?;
140 /// let writer = File::create("file2.txt")?;
141 /// let transfer = Transfer::new(reader, writer);
142 /// while !transfer.is_complete() {}
143 /// println!("Transfer took {:?}", transfer.running_time());
144 /// # Ok::<_, std::io::Error>(())
145 /// ```
146 pub fn running_time(&self) -> Duration {
147 self.start_time.elapsed()
148 }
149
150 /// Returns the average speed, in bytes per second, of the transfer.
151 /// # Example
152 /// ```no_run
153 /// use transfer_progress::Transfer;
154 /// use std::fs::File;
155 /// let reader = File::open("file1.txt")?;
156 /// let writer = File::create("file2.txt")?;
157 /// let transfer = Transfer::new(reader, writer);
158 /// while !transfer.is_complete() {
159 /// println!("{}B/s", transfer.speed());
160 /// std::thread::sleep(std::time::Duration::from_secs(1));
161 /// }
162 /// # Ok::<_, std::io::Error>(())
163 /// ```
164 pub fn speed(&self) -> u64 {
165 (self.transferred() as f64 / self.running_time().as_secs_f64()).round() as u64
166 }
167}
168
169#[cfg(feature = "bytesize")]
170impl<R, W> fmt::Debug for Transfer<R, W>
171where
172 R: Read + Send + 'static,
173 W: Write + Send + 'static,
174{
175 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176 let transferred = ByteSize::b(self.transferred());
177 let speed = ByteSize::b(self.speed());
178 if f.alternate() {
179 // Use SI units
180 write!(
181 f,
182 "{:#} ({:#}/s)",
183 transferred.to_string_as(true),
184 speed.to_string_as(true)
185 )
186 } else {
187 write!(f, "{:#} ({:#}/s)", transferred, speed)
188 }
189 }
190}
191
192#[cfg(feature = "bytesize")]
193impl<R, W> fmt::Display for Transfer<R, W>
194where
195 R: Read + Send + 'static,
196 W: Write + Send + 'static,
197{
198 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
199 fmt::Debug::fmt(self, f)
200 }
201}
202
203/// Monitors the progress of a transfer with a known size.
204pub struct SizedTransfer<R, W>
205where
206 R: Read + Send + 'static,
207 W: Write + Send + 'static,
208{
209 inner: Transfer<R, W>,
210 size: u64,
211}
212
213impl<R, W> SizedTransfer<R, W>
214where
215 R: Read + Send + 'static,
216 W: Write + Send + 'static,
217{
218 /// Creates and starts a new `SizedTransfer`.
219 /// # Example
220 /// ```no_run
221 /// use transfer_progress::SizedTransfer;
222 /// use std::fs::File;
223 /// use std::io::Read;
224 /// let reader = File::open("file1.txt")?.take(1024); // Bytes
225 /// let writer = File::create("file2.txt")?;
226 /// let transfer = SizedTransfer::new(reader, writer, 1024);
227 /// # Ok::<_, std::io::Error>(())
228 /// ```
229 pub fn new(reader: R, writer: W, size: u64) -> Self {
230 Self {
231 inner: Transfer::new(reader, writer),
232 size,
233 }
234 }
235
236 /// Returns the total size (in bytes) of the transfer, as specified when calling
237 /// [`new`][SizedTransfer::new].
238 /// # Example
239 /// ```no_run
240 /// use transfer_progress::SizedTransfer;
241 /// use std::fs::File;
242 /// use std::io::Read;
243 /// # fn main() -> std::io::Result<()> {
244 /// let reader = File::open("file1.txt")?.take(1024); // Bytes
245 /// let writer = File::create("file2.txt")?;
246 /// let transfer = SizedTransfer::new(reader, writer, 1024);
247 /// assert_eq!(transfer.size(), 1024);
248 /// # Ok(())
249 /// # }
250 /// ```
251 pub fn size(&self) -> u64 {
252 self.size
253 }
254
255 /// Returns the number of bytes remaining.
256 /// # Example
257 /// ```no_run
258 /// use transfer_progress::SizedTransfer;
259 /// use std::fs::File;
260 /// use std::io::Read;
261 /// let reader = File::open("file1.txt")?.take(1024); // Bytes
262 /// let writer = File::create("file2.txt")?;
263 /// let transfer = SizedTransfer::new(reader, writer, 1024);
264 /// while !transfer.is_complete() {
265 /// println!("{} bytes remaining", transfer.remaining());
266 /// std::thread::sleep(std::time::Duration::from_secs(1));
267 /// }
268 /// # Ok::<_, std::io::Error>(())
269 /// ```
270 pub fn remaining(&self) -> u64 {
271 self.size - self.inner.transferred()
272 }
273
274 /// Consumes the `SizedTransfer`, blocking until the transfer is complete.
275 ///
276 /// If the transfer was successful, returns `Ok(reader, writer)`, otherwise returns
277 /// the error.
278 ///
279 /// If the transfer is already complete, returns immediately.
280 /// # Example
281 /// ```no_run
282 /// use transfer_progress::SizedTransfer;
283 /// use std::fs::File;
284 /// use std::io::Read;
285 /// let reader = File::open("file1.txt")?.take(1024); // Bytes
286 /// let writer = File::create("file2.txt")?;
287 /// let transfer = SizedTransfer::new(reader, writer, 1024);
288 /// let (reader, writer) = transfer.finish()?;
289 /// # Ok::<_, std::io::Error>(())
290 /// ```
291 pub fn finish(self) -> io::Result<(R, W)> {
292 self.inner.finish()
293 }
294
295 /// Returns a fraction between 0.0 and 1.0 representing the state of the transfer.
296 /// # Example
297 /// ```no_run
298 /// use transfer_progress::SizedTransfer;
299 /// use std::fs::File;
300 /// use std::io::Read;
301 /// let reader = File::open("file1.txt")?.take(1024); // Bytes
302 /// let writer = File::create("file2.txt")?;
303 /// let transfer = SizedTransfer::new(reader, writer, 1024);
304 /// while !transfer.is_complete() {
305 /// println!("Transfer is {:.0}% complete", transfer.fraction_transferred() * 100.0);
306 /// std::thread::sleep(std::time::Duration::from_secs(1));
307 /// }
308 /// # Ok::<_, std::io::Error>(())
309 /// ```
310 pub fn fraction_transferred(&self) -> f64 {
311 self.transferred() as f64 / self.size as f64
312 }
313
314 /// Returns the approximate remaining time until this transfer completes. Returns `None` if
315 /// this cannot be calculated (I.E. no bytes have been transferred yet, so a speed cannot be
316 /// determined).
317 /// # Example
318 /// ```no_run
319 /// use transfer_progress::SizedTransfer;
320 /// use std::fs::File;
321 /// use std::io::Read;
322 /// let reader = File::open("file1.txt")?.take(1024); // Bytes
323 /// let writer = File::create("file2.txt")?;
324 /// let transfer = SizedTransfer::new(reader, writer, 1024);
325 /// while !transfer.is_complete() {
326 /// if let Some(eta) = transfer.eta() {
327 /// println!("Transfer will complete in approximately {:?}", eta);
328 /// } else {
329 /// println!("Transfer completion time is unknown");
330 /// }
331 /// std::thread::sleep(std::time::Duration::from_secs(1));
332 /// }
333 /// # Ok::<_, std::io::Error>(())
334 /// ```
335 pub fn eta(&self) -> Option<Duration> {
336 // Cache this so we don't have to perform an atomic access twice
337 let transferred = self.inner.transferred();
338 if transferred == 0 {
339 return None;
340 }
341 let remaining = self.size - transferred;
342 let elapsed = self.running_time().as_secs_f64();
343 let eta = (elapsed / transferred as f64) * remaining as f64;
344 Some(Duration::from_secs_f64(eta))
345 }
346}
347
348impl<R, W> std::ops::Deref for SizedTransfer<R, W>
349where
350 R: Read + Send + 'static,
351 W: Write + Send + 'static,
352{
353 type Target = Transfer<R, W>;
354
355 fn deref(&self) -> &Self::Target {
356 &self.inner
357 }
358}
359
360#[cfg(feature = "bytesize")]
361impl<R, W> fmt::Debug for SizedTransfer<R, W>
362where
363 R: Read + Send + 'static,
364 W: Write + Send + 'static,
365{
366 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
367 let percentage = self.fraction_transferred() * 100.0;
368 let transferred = ByteSize::b(self.transferred());
369 let size = ByteSize::b(self.size);
370 let speed = ByteSize::b(self.speed());
371 if f.alternate() {
372 write!(
373 f,
374 "{:.1} % ({} of {}, {}/s)",
375 percentage, transferred, size, speed
376 )
377 } else {
378 write!(
379 f,
380 "{:.1} % ({} of {}, {}/s)",
381 percentage,
382 transferred.to_string_as(true),
383 size.to_string_as(true),
384 speed.to_string_as(true)
385 )
386 }
387 }
388}
389
390#[cfg(feature = "bytesize")]
391impl<R, W> fmt::Display for SizedTransfer<R, W>
392where
393 R: Read + Send + 'static,
394 W: Write + Send + 'static,
395{
396 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
397 fmt::Debug::fmt(self, f)
398 }
399}