async_std/io/
copy.rs

1use std::future::Future;
2use std::pin::Pin;
3
4use pin_project_lite::pin_project;
5
6use crate::io::{self, BufRead, BufReader, Read, Write};
7use crate::task::{Context, Poll};
8use crate::utils::Context as _;
9
10/// Copies the entire contents of a reader into a writer.
11///
12/// This function will continuously read data from `reader` and then
13/// write it into `writer` in a streaming fashion until `reader`
14/// returns EOF.
15///
16/// On success, the total number of bytes that were copied from
17/// `reader` to `writer` is returned.
18///
19/// If you’re wanting to copy the contents of one file to another and you’re
20/// working with filesystem paths, see the [`fs::copy`] function.
21///
22/// This function is an async version of [`std::io::copy`].
23///
24/// [`std::io::copy`]: https://doc.rust-lang.org/std/io/fn.copy.html
25/// [`fs::copy`]: ../fs/fn.copy.html
26///
27/// # Errors
28///
29/// This function will return an error immediately if any call to `read` or
30/// `write` returns an error. All instances of `ErrorKind::Interrupted` are
31/// handled by this function and the underlying operation is retried.
32///
33/// # Examples
34///
35/// ```
36/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
37/// #
38/// use async_std::io;
39///
40/// let mut reader: &[u8] = b"hello";
41/// let mut writer = io::stdout();
42///
43/// io::copy(&mut reader, &mut writer).await?;
44/// #
45/// # Ok(()) }) }
46/// ```
47#[cfg(any(feature = "docs", not(feature = "unstable")))]
48pub async fn copy<R, W>(reader: &mut R, writer: &mut W) -> io::Result<u64>
49where
50    R: Read + Unpin + ?Sized,
51    W: Write + Unpin + ?Sized,
52{
53    pin_project! {
54        struct CopyFuture<R, W> {
55            #[pin]
56            reader: R,
57            #[pin]
58            writer: W,
59            amt: u64,
60        }
61    }
62
63    impl<R, W> Future for CopyFuture<R, W>
64    where
65        R: BufRead,
66        W: Write + Unpin,
67    {
68        type Output = io::Result<u64>;
69
70        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
71            let mut this = self.project();
72            loop {
73                let buffer = futures_core::ready!(this.reader.as_mut().poll_fill_buf(cx))?;
74                if buffer.is_empty() {
75                    futures_core::ready!(this.writer.as_mut().poll_flush(cx))?;
76                    return Poll::Ready(Ok(*this.amt));
77                }
78
79                let i = futures_core::ready!(this.writer.as_mut().poll_write(cx, buffer))?;
80                if i == 0 {
81                    return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
82                }
83                *this.amt += i as u64;
84                this.reader.as_mut().consume(i);
85            }
86        }
87    }
88
89    let future = CopyFuture {
90        reader: BufReader::new(reader),
91        writer,
92        amt: 0,
93    };
94    future.await.context(|| String::from("io::copy failed"))
95}
96
97/// Copies the entire contents of a reader into a writer.
98///
99/// This function will continuously read data from `reader` and then
100/// write it into `writer` in a streaming fashion until `reader`
101/// returns EOF.
102///
103/// On success, the total number of bytes that were copied from
104/// `reader` to `writer` is returned.
105///
106/// If you’re wanting to copy the contents of one file to another and you’re
107/// working with filesystem paths, see the [`fs::copy`] function.
108///
109/// This function is an async version of [`std::io::copy`].
110///
111/// [`std::io::copy`]: https://doc.rust-lang.org/std/io/fn.copy.html
112/// [`fs::copy`]: ../fs/fn.copy.html
113///
114/// # Errors
115///
116/// This function will return an error immediately if any call to `read` or
117/// `write` returns an error. All instances of `ErrorKind::Interrupted` are
118/// handled by this function and the underlying operation is retried.
119///
120/// # Examples
121///
122/// ```
123/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
124/// #
125/// use async_std::io;
126///
127/// let mut reader: &[u8] = b"hello";
128/// let mut writer = io::stdout();
129///
130/// io::copy(&mut reader, &mut writer).await?;
131/// #
132/// # Ok(()) }) }
133/// ```
134#[cfg(all(feature = "unstable", not(feature = "docs")))]
135pub async fn copy<R, W>(reader: R, writer: W) -> io::Result<u64>
136where
137    R: Read + Unpin,
138    W: Write + Unpin,
139{
140    pin_project! {
141        struct CopyFuture<R, W> {
142            #[pin]
143            reader: R,
144            #[pin]
145            writer: W,
146            amt: u64,
147        }
148    }
149
150    impl<R, W> Future for CopyFuture<R, W>
151    where
152        R: BufRead,
153        W: Write + Unpin,
154    {
155        type Output = io::Result<u64>;
156
157        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
158            let mut this = self.project();
159            loop {
160                let buffer = futures_core::ready!(this.reader.as_mut().poll_fill_buf(cx))?;
161                if buffer.is_empty() {
162                    futures_core::ready!(this.writer.as_mut().poll_flush(cx))?;
163                    return Poll::Ready(Ok(*this.amt));
164                }
165
166                let i = futures_core::ready!(this.writer.as_mut().poll_write(cx, buffer))?;
167                if i == 0 {
168                    return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
169                }
170                *this.amt += i as u64;
171                this.reader.as_mut().consume(i);
172            }
173        }
174    }
175
176    let future = CopyFuture {
177        reader: BufReader::new(reader),
178        writer,
179        amt: 0,
180    };
181    future.await.context(|| String::from("io::copy failed"))
182}