async_copy_progress/
lib.rs

1//! [![github]](https://github.com/mkroening/async-copy-progress) [![crates-io]](https://crates.io/crates/async-copy-progress) [![docs-rs]](https://docs.rs/async-copy-progress)
2//!
3//! [github]: https://img.shields.io/badge/github-8da0cb?style=for-the-badge&labelColor=555555&logo=github
4//! [crates-io]: https://img.shields.io/badge/crates.io-fc8d62?style=for-the-badge&labelColor=555555&logo=rust
5//! [docs-rs]: https://img.shields.io/badge/docs.rs-66c2a5?style=for-the-badge&labelColor=555555&logoColor=white&logo=
6//!
7//! <br>
8//!
9//! Asynchronous copying with progress callbacks.
10//!
11//! See [`copy`] for details.
12//!
13//! <br>
14//!
15//! # Example
16//!
17//! ```
18//! # spin_on::spin_on(async {
19//! # use std::sync::atomic::{AtomicU64, Ordering};
20//! let mut reader: &[u8] = b"hello";
21//! let mut writer: Vec<u8> = vec![];
22//!
23//! let progress = AtomicU64::new(0);
24//! let report_progress = |amt| progress.store(amt, Ordering::Relaxed);
25//!
26//! async_copy_progress::copy(&mut reader, &mut writer, report_progress).await?;
27//!
28//! assert_eq!(&b"hello"[..], &writer[..]);
29//! assert_eq!(5, progress.load(Ordering::Relaxed));
30//! # std::io::Result::Ok(()) });
31//! ```
32
33#![deny(rust_2018_idioms)]
34
35use std::{
36    future::Future,
37    io,
38    pin::Pin,
39    task::{Context, Poll},
40};
41
42use futures_core::ready;
43use futures_io::{AsyncBufRead, AsyncWrite};
44use pin_project_lite::pin_project;
45
46/// Creates a future which copies all the bytes from one object to another while
47/// reporting the progress.
48///
49/// The returned future will copy all the bytes read from `reader` into the
50/// `writer` specified. After each write, `report_progress` is called with the
51/// current amount of copied bytes. This future will only complete once the
52/// `reader` has hit EOF and all bytes have been written to and flushed from the
53/// `writer` provided.
54///
55/// On success the number of bytes is returned.
56///
57/// <br>
58///
59/// # Errors
60///
61/// This function will return an error immediately if any call to
62/// [`poll_fill_buf`], [`poll_write`] or [`poll_flush`] returns an error.
63///
64/// [`poll_fill_buf`]: AsyncBufRead::poll_fill_buf
65/// [`poll_write`]: AsyncWrite::poll_write
66/// [`poll_flush`]: AsyncWrite::poll_flush
67///
68/// <br>
69///
70/// # Example
71///
72/// ```
73/// # spin_on::spin_on(async {
74/// # use std::sync::atomic::{AtomicU64, Ordering};
75/// let mut reader: &[u8] = b"hello";
76/// let mut writer: Vec<u8> = vec![];
77///
78/// let progress = AtomicU64::new(0);
79/// let report_progress = |amt| progress.store(amt, Ordering::Relaxed);
80///
81/// async_copy_progress::copy(&mut reader, &mut writer, report_progress).await?;
82///
83/// assert_eq!(&b"hello"[..], &writer[..]);
84/// assert_eq!(5, progress.load(Ordering::Relaxed));
85/// # std::io::Result::Ok(()) });
86/// ```
87pub fn copy<R, W, F>(reader: R, writer: W, report_progress: F) -> Copy<R, W, F>
88where
89    R: AsyncBufRead,
90    W: AsyncWrite,
91    F: FnMut(u64),
92{
93    Copy {
94        reader,
95        writer,
96        amt: 0,
97        report_progress,
98    }
99}
100
101pin_project! {
102    /// Future for the [`copy`] function.
103    #[derive(Debug)]
104    #[must_use = "futures do nothing unless you `.await` or poll them"]
105    pub struct Copy<R, W, F> {
106        #[pin]
107        reader: R,
108        #[pin]
109        writer: W,
110        amt: u64,
111        report_progress: F,
112    }
113}
114
115impl<R, W, F> Future for Copy<R, W, F>
116where
117    R: AsyncBufRead,
118    W: AsyncWrite,
119    F: FnMut(u64),
120{
121    type Output = io::Result<u64>;
122
123    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
124        let mut this = self.project();
125        loop {
126            let buffer = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
127            if buffer.is_empty() {
128                ready!(this.writer.poll_flush(cx))?;
129                return Poll::Ready(Ok(*this.amt));
130            }
131
132            let i = ready!(this.writer.as_mut().poll_write(cx, buffer))?;
133            if i == 0 {
134                return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
135            }
136            *this.amt += i as u64;
137            this.reader.as_mut().consume(i);
138            (this.report_progress)(*this.amt);
139        }
140    }
141}