async_std/io/copy.rs
1use std::future::Future;
2use std::pin::Pin;
3
4use pin_project_lite::pin_project;
5
6use crate::io::{self, BufRead, BufReader, Read, Write};
7use crate::task::{Context, Poll};
8use crate::utils::Context as _;
9
10/// Copies the entire contents of a reader into a writer.
11///
12/// This function will continuously read data from `reader` and then
13/// write it into `writer` in a streaming fashion until `reader`
14/// returns EOF.
15///
16/// On success, the total number of bytes that were copied from
17/// `reader` to `writer` is returned.
18///
19/// If you’re wanting to copy the contents of one file to another and you’re
20/// working with filesystem paths, see the [`fs::copy`] function.
21///
22/// This function is an async version of [`std::io::copy`].
23///
24/// [`std::io::copy`]: https://doc.rust-lang.org/std/io/fn.copy.html
25/// [`fs::copy`]: ../fs/fn.copy.html
26///
27/// # Errors
28///
29/// This function will return an error immediately if any call to `read` or
30/// `write` returns an error. All instances of `ErrorKind::Interrupted` are
31/// handled by this function and the underlying operation is retried.
32///
33/// # Examples
34///
35/// ```
36/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
37/// #
38/// use async_std::io;
39///
40/// let mut reader: &[u8] = b"hello";
41/// let mut writer = io::stdout();
42///
43/// io::copy(&mut reader, &mut writer).await?;
44/// #
45/// # Ok(()) }) }
46/// ```
47#[cfg(any(feature = "docs", not(feature = "unstable")))]
48pub async fn copy<R, W>(reader: &mut R, writer: &mut W) -> io::Result<u64>
49where
50 R: Read + Unpin + ?Sized,
51 W: Write + Unpin + ?Sized,
52{
53 pin_project! {
54 struct CopyFuture<R, W> {
55 #[pin]
56 reader: R,
57 #[pin]
58 writer: W,
59 amt: u64,
60 }
61 }
62
63 impl<R, W> Future for CopyFuture<R, W>
64 where
65 R: BufRead,
66 W: Write + Unpin,
67 {
68 type Output = io::Result<u64>;
69
70 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
71 let mut this = self.project();
72 loop {
73 let buffer = futures_core::ready!(this.reader.as_mut().poll_fill_buf(cx))?;
74 if buffer.is_empty() {
75 futures_core::ready!(this.writer.as_mut().poll_flush(cx))?;
76 return Poll::Ready(Ok(*this.amt));
77 }
78
79 let i = futures_core::ready!(this.writer.as_mut().poll_write(cx, buffer))?;
80 if i == 0 {
81 return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
82 }
83 *this.amt += i as u64;
84 this.reader.as_mut().consume(i);
85 }
86 }
87 }
88
89 let future = CopyFuture {
90 reader: BufReader::new(reader),
91 writer,
92 amt: 0,
93 };
94 future.await.context(|| String::from("io::copy failed"))
95}
96
97/// Copies the entire contents of a reader into a writer.
98///
99/// This function will continuously read data from `reader` and then
100/// write it into `writer` in a streaming fashion until `reader`
101/// returns EOF.
102///
103/// On success, the total number of bytes that were copied from
104/// `reader` to `writer` is returned.
105///
106/// If you’re wanting to copy the contents of one file to another and you’re
107/// working with filesystem paths, see the [`fs::copy`] function.
108///
109/// This function is an async version of [`std::io::copy`].
110///
111/// [`std::io::copy`]: https://doc.rust-lang.org/std/io/fn.copy.html
112/// [`fs::copy`]: ../fs/fn.copy.html
113///
114/// # Errors
115///
116/// This function will return an error immediately if any call to `read` or
117/// `write` returns an error. All instances of `ErrorKind::Interrupted` are
118/// handled by this function and the underlying operation is retried.
119///
120/// # Examples
121///
122/// ```
123/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
124/// #
125/// use async_std::io;
126///
127/// let mut reader: &[u8] = b"hello";
128/// let mut writer = io::stdout();
129///
130/// io::copy(&mut reader, &mut writer).await?;
131/// #
132/// # Ok(()) }) }
133/// ```
134#[cfg(all(feature = "unstable", not(feature = "docs")))]
135pub async fn copy<R, W>(reader: R, writer: W) -> io::Result<u64>
136where
137 R: Read + Unpin,
138 W: Write + Unpin,
139{
140 pin_project! {
141 struct CopyFuture<R, W> {
142 #[pin]
143 reader: R,
144 #[pin]
145 writer: W,
146 amt: u64,
147 }
148 }
149
150 impl<R, W> Future for CopyFuture<R, W>
151 where
152 R: BufRead,
153 W: Write + Unpin,
154 {
155 type Output = io::Result<u64>;
156
157 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
158 let mut this = self.project();
159 loop {
160 let buffer = futures_core::ready!(this.reader.as_mut().poll_fill_buf(cx))?;
161 if buffer.is_empty() {
162 futures_core::ready!(this.writer.as_mut().poll_flush(cx))?;
163 return Poll::Ready(Ok(*this.amt));
164 }
165
166 let i = futures_core::ready!(this.writer.as_mut().poll_write(cx, buffer))?;
167 if i == 0 {
168 return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
169 }
170 *this.amt += i as u64;
171 this.reader.as_mut().consume(i);
172 }
173 }
174 }
175
176 let future = CopyFuture {
177 reader: BufReader::new(reader),
178 writer,
179 amt: 0,
180 };
181 future.await.context(|| String::from("io::copy failed"))
182}