async_copy_progress/lib.rs
1//! [![github]](https://github.com/mkroening/async-copy-progress) [![crates-io]](https://crates.io/crates/async-copy-progress) [![docs-rs]](https://docs.rs/async-copy-progress)
2//!
3//! [github]: https://img.shields.io/badge/github-8da0cb?style=for-the-badge&labelColor=555555&logo=github
4//! [crates-io]: https://img.shields.io/badge/crates.io-fc8d62?style=for-the-badge&labelColor=555555&logo=rust
5//! [docs-rs]: https://img.shields.io/badge/docs.rs-66c2a5?style=for-the-badge&labelColor=555555&logoColor=white&logo=
6//!
7//! <br>
8//!
9//! Asynchronous copying with progress callbacks.
10//!
11//! See [`copy`] for details.
12//!
13//! <br>
14//!
15//! # Example
16//!
17//! ```
18//! # spin_on::spin_on(async {
19//! # use std::sync::atomic::{AtomicU64, Ordering};
20//! let mut reader: &[u8] = b"hello";
21//! let mut writer: Vec<u8> = vec![];
22//!
23//! let progress = AtomicU64::new(0);
24//! let report_progress = |amt| progress.store(amt, Ordering::Relaxed);
25//!
26//! async_copy_progress::copy(&mut reader, &mut writer, report_progress).await?;
27//!
28//! assert_eq!(&b"hello"[..], &writer[..]);
29//! assert_eq!(5, progress.load(Ordering::Relaxed));
30//! # std::io::Result::Ok(()) });
31//! ```
32
33#![deny(rust_2018_idioms)]
34
35use std::{
36 future::Future,
37 io,
38 pin::Pin,
39 task::{Context, Poll},
40};
41
42use futures_core::ready;
43use futures_io::{AsyncBufRead, AsyncWrite};
44use pin_project_lite::pin_project;
45
46/// Creates a future which copies all the bytes from one object to another while
47/// reporting the progress.
48///
49/// The returned future will copy all the bytes read from `reader` into the
50/// `writer` specified. After each write, `report_progress` is called with the
51/// current amount of copied bytes. This future will only complete once the
52/// `reader` has hit EOF and all bytes have been written to and flushed from the
53/// `writer` provided.
54///
55/// On success the number of bytes is returned.
56///
57/// <br>
58///
59/// # Errors
60///
61/// This function will return an error immediately if any call to
62/// [`poll_fill_buf`], [`poll_write`] or [`poll_flush`] returns an error.
63///
64/// [`poll_fill_buf`]: AsyncBufRead::poll_fill_buf
65/// [`poll_write`]: AsyncWrite::poll_write
66/// [`poll_flush`]: AsyncWrite::poll_flush
67///
68/// <br>
69///
70/// # Example
71///
72/// ```
73/// # spin_on::spin_on(async {
74/// # use std::sync::atomic::{AtomicU64, Ordering};
75/// let mut reader: &[u8] = b"hello";
76/// let mut writer: Vec<u8> = vec![];
77///
78/// let progress = AtomicU64::new(0);
79/// let report_progress = |amt| progress.store(amt, Ordering::Relaxed);
80///
81/// async_copy_progress::copy(&mut reader, &mut writer, report_progress).await?;
82///
83/// assert_eq!(&b"hello"[..], &writer[..]);
84/// assert_eq!(5, progress.load(Ordering::Relaxed));
85/// # std::io::Result::Ok(()) });
86/// ```
87pub fn copy<R, W, F>(reader: R, writer: W, report_progress: F) -> Copy<R, W, F>
88where
89 R: AsyncBufRead,
90 W: AsyncWrite,
91 F: FnMut(u64),
92{
93 Copy {
94 reader,
95 writer,
96 amt: 0,
97 report_progress,
98 }
99}
100
101pin_project! {
102 /// Future for the [`copy`] function.
103 #[derive(Debug)]
104 #[must_use = "futures do nothing unless you `.await` or poll them"]
105 pub struct Copy<R, W, F> {
106 #[pin]
107 reader: R,
108 #[pin]
109 writer: W,
110 amt: u64,
111 report_progress: F,
112 }
113}
114
115impl<R, W, F> Future for Copy<R, W, F>
116where
117 R: AsyncBufRead,
118 W: AsyncWrite,
119 F: FnMut(u64),
120{
121 type Output = io::Result<u64>;
122
123 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
124 let mut this = self.project();
125 loop {
126 let buffer = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
127 if buffer.is_empty() {
128 ready!(this.writer.poll_flush(cx))?;
129 return Poll::Ready(Ok(*this.amt));
130 }
131
132 let i = ready!(this.writer.as_mut().poll_write(cx, buffer))?;
133 if i == 0 {
134 return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
135 }
136 *this.amt += i as u64;
137 this.reader.as_mut().consume(i);
138 (this.report_progress)(*this.amt);
139 }
140 }
141}