futures_copy/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![doc = include_str!("../README.md")]
3// @@ begin lint list maintained by maint/add_warning @@
4#![allow(renamed_and_removed_lints)] // @@REMOVE_WHEN(ci_arti_stable)
5#![allow(unknown_lints)] // @@REMOVE_WHEN(ci_arti_nightly)
6#![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_time_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
39#![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
41#![allow(clippy::result_large_err)] // temporary workaround for arti#587
42#![allow(clippy::needless_raw_string_hashes)] // complained-about code is fine, often best
43#![allow(clippy::needless_lifetimes)] // See arti#1765
44#![allow(mismatched_lifetime_syntaxes)] // temporary workaround for arti#2060
45//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
46
47mod arc_io_result;
48mod copy_buf;
49mod copy_buf_bidi;
50pub mod eof;
51mod fuse_buf_reader;
52use std::{
53    future::Future,
54    pin::Pin,
55    task::{Context, Poll},
56};
57
58pub use copy_buf::{CopyBuf, copy_buf};
59pub use copy_buf_bidi::{CopyBufBidirectional, copy_buf_bidirectional};
60pub use eof::EofStrategy;
61
62use futures::{AsyncRead, AsyncWrite, io::BufReader};
63use pin_project::pin_project;
64
65/// Return a future to copy bytes from `reader` to `writer`.
66///
67/// See [`copy_buf()`] for full details.
68///
69/// Unlike `copy_buf`, this function does not require that `reader` implements AsyncBufRead:
70/// it wraps `reader` internally in a new `BufReader` with default capacity.
71///
72/// ## Limitations
73///
74/// If an error occurs during transmission, buffered data that was read from `reader`
75/// but not written to `writer` will be lost.
76/// To avoid this, use [`copy_buf()`].
77///
78/// Similarly, if you drop this future while it is still pending,
79/// any buffered data will be lost.
80///
81/// See the crate-level documentation for further
82/// [discussion of this function's limitations](crate#Limitations).
83pub fn copy<R, W>(reader: R, writer: W) -> Copy<R, W>
84where
85    R: AsyncRead,
86    W: AsyncWrite,
87{
88    let reader = BufReader::new(reader);
89    Copy(copy_buf(reader, writer))
90}
91
92/// Return a future to copies bytes from `stream_a` to `stream_b`,
93/// and from `stream_b` to `stream_a`.
94///
95/// See [`copy_buf_bidirectional()`] for full details.
96///
97/// Unlike `copy_buf_bidirectional`, this function does not require that either stream implements AsyncBufRead:
98/// it wraps them internally in a new `BufReader` with default capacity.
99///
100/// ## Limitations
101///
102/// If an error occurs during transmission, data that was read from one stream,
103/// but not written to the other, will be lost.
104/// To avoid this, use [`copy_buf_bidirectional()`].
105///
106/// Similarly, if you drop this future while it is still pending,
107/// any buffered data will be lost.
108///
109/// See the crate-level documentation for further
110/// [discussion of this function's limitations](crate#Limitations).
111pub fn copy_bidirectional<A, B, AE, BE>(
112    stream_a: A,
113    stream_b: B,
114    on_a_eof: AE,
115    on_b_eof: BE,
116) -> CopyBidirectional<A, B, AE, BE>
117where
118    A: AsyncRead + AsyncWrite,
119    B: AsyncRead + AsyncWrite,
120    AE: EofStrategy<B>,
121    BE: EofStrategy<A>,
122{
123    let stream_a = BufReader::new(stream_a);
124    let stream_b = BufReader::new(stream_b);
125    CopyBidirectional(copy_buf_bidirectional(
126        stream_a,
127        stream_b,
128        eof::BufReaderEofWrapper(on_a_eof),
129        eof::BufReaderEofWrapper(on_b_eof),
130    ))
131}
132
133/// A future returned by [`copy`].
134#[derive(Debug)]
135#[pin_project]
136#[must_use = "futures do nothing unless you `.await` or poll them"]
137pub struct Copy<R, W>(#[pin] CopyBuf<BufReader<R>, W>);
138
139/// A future returned by [`copy_bidirectional`].
140#[derive(Debug)]
141#[pin_project]
142#[must_use = "futures do nothing unless you `.await` or poll them"]
143pub struct CopyBidirectional<A, B, AE, BE>(
144    #[pin]
145    CopyBufBidirectional<
146        BufReader<A>,
147        BufReader<B>,
148        eof::BufReaderEofWrapper<AE>,
149        eof::BufReaderEofWrapper<BE>,
150    >,
151);
152
153// Note: There is intentionally no `into_inner` implementation for these types,
154// since returning the original streams would discard any buffered data.
155
156impl<R, W> Future for Copy<R, W>
157where
158    R: AsyncRead,
159    W: AsyncWrite,
160{
161    type Output = std::io::Result<u64>;
162
163    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
164        self.project().0.poll(cx)
165    }
166}
167
168impl<A, B, AE, BE> Future for CopyBidirectional<A, B, AE, BE>
169where
170    A: AsyncRead + AsyncWrite,
171    B: AsyncRead + AsyncWrite,
172    AE: EofStrategy<B>,
173    BE: EofStrategy<A>,
174{
175    type Output = std::io::Result<(u64, u64)>;
176
177    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
178        self.project().0.poll(cx)
179    }
180}
181
182#[cfg(test)]
183mod test {
184    // @@ begin test lint list maintained by maint/add_warning @@
185    #![allow(clippy::bool_assert_comparison)]
186    #![allow(clippy::clone_on_copy)]
187    #![allow(clippy::dbg_macro)]
188    #![allow(clippy::mixed_attributes_style)]
189    #![allow(clippy::print_stderr)]
190    #![allow(clippy::print_stdout)]
191    #![allow(clippy::single_char_pattern)]
192    #![allow(clippy::unwrap_used)]
193    #![allow(clippy::unchecked_time_subtraction)]
194    #![allow(clippy::useless_vec)]
195    #![allow(clippy::needless_pass_by_value)]
196    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
197
198    use super::*;
199    use std::io;
200
201    /// A struct that implements AsyncRead and AsyncWrite, but always returns an error.
202    #[derive(Debug, Clone)]
203    pub(crate) struct ErrorRW(pub(crate) io::ErrorKind);
204
205    impl AsyncRead for ErrorRW {
206        fn poll_read(
207            self: Pin<&mut Self>,
208            _cx: &mut Context<'_>,
209            _buf: &mut [u8],
210        ) -> Poll<io::Result<usize>> {
211            Poll::Ready(Err(io::Error::from(self.0)))
212        }
213    }
214
215    impl AsyncWrite for ErrorRW {
216        fn poll_write(
217            self: Pin<&mut Self>,
218            _cx: &mut Context<'_>,
219            _buf: &[u8],
220        ) -> Poll<io::Result<usize>> {
221            Poll::Ready(Err(io::Error::from(self.0)))
222        }
223
224        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
225            Poll::Ready(Err(io::Error::from(self.0)))
226        }
227
228        fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
229            Poll::Ready(Err(io::Error::from(self.0)))
230        }
231    }
232
233    /// A struct that implements AsyncRead, but never returns any data.
234    ///
235    /// (This reader is always _pending_.)
236    pub(crate) struct PausedRead;
237
238    impl AsyncRead for PausedRead {
239        fn poll_read(
240            self: Pin<&mut Self>,
241            _cx: &mut Context<'_>,
242            _buf: &mut [u8],
243        ) -> Poll<io::Result<usize>> {
244            Poll::Pending
245        }
246    }
247
248    /// A read-write pair, stapled into a Read+Write stream.
249    #[pin_project]
250    pub(crate) struct RWPair<R, W>(#[pin] pub(crate) R, #[pin] pub(crate) W);
251
252    impl<R: AsyncRead, W> AsyncRead for RWPair<R, W> {
253        fn poll_read(
254            self: Pin<&mut Self>,
255            cx: &mut Context<'_>,
256            buf: &mut [u8],
257        ) -> Poll<io::Result<usize>> {
258            self.project().0.poll_read(cx, buf)
259        }
260    }
261
262    impl<R, W: AsyncWrite> AsyncWrite for RWPair<R, W> {
263        fn poll_write(
264            self: Pin<&mut Self>,
265            cx: &mut Context<'_>,
266            buf: &[u8],
267        ) -> Poll<io::Result<usize>> {
268            self.project().1.poll_write(cx, buf)
269        }
270
271        fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
272            self.project().1.poll_flush(cx)
273        }
274
275        fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
276            self.project().1.poll_close(cx)
277        }
278    }
279}