Skip to main content

futures_copy/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![doc = include_str!("../README.md")]
3// @@ begin lint list maintained by maint/add_warning @@
4#![allow(renamed_and_removed_lints)] // @@REMOVE_WHEN(ci_arti_stable)
5#![allow(unknown_lints)] // @@REMOVE_WHEN(ci_arti_nightly)
6#![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_time_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
39#![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
41#![allow(clippy::result_large_err)] // temporary workaround for arti#587
42#![allow(clippy::needless_raw_string_hashes)] // complained-about code is fine, often best
43#![allow(clippy::needless_lifetimes)] // See arti#1765
44#![allow(mismatched_lifetime_syntaxes)] // temporary workaround for arti#2060
45#![allow(clippy::collapsible_if)] // See arti#2342
46#![deny(clippy::unused_async)]
47//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
48
49mod arc_io_result;
50mod copy_buf;
51mod copy_buf_bidi;
52pub mod eof;
53mod fuse_buf_reader;
54use std::{
55    future::Future,
56    pin::Pin,
57    task::{Context, Poll},
58};
59
60pub use copy_buf::{CopyBuf, copy_buf};
61pub use copy_buf_bidi::{CopyBufBidirectional, copy_buf_bidirectional};
62pub use eof::EofStrategy;
63
64use futures::{AsyncRead, AsyncWrite, io::BufReader};
65use pin_project::pin_project;
66
67/// Return a future to copy bytes from `reader` to `writer`.
68///
69/// See [`copy_buf()`] for full details.
70///
71/// Unlike `copy_buf`, this function does not require that `reader` implements AsyncBufRead:
72/// it wraps `reader` internally in a new `BufReader` with default capacity.
73///
74/// ## Limitations
75///
76/// If an error occurs during transmission, buffered data that was read from `reader`
77/// but not written to `writer` will be lost.
78/// To avoid this, use [`copy_buf()`].
79///
80/// Similarly, if you drop this future while it is still pending,
81/// any buffered data will be lost.
82///
83/// See the crate-level documentation for further
84/// [discussion of this function's limitations](crate#Limitations).
85pub fn copy<R, W>(reader: R, writer: W) -> Copy<R, W>
86where
87    R: AsyncRead,
88    W: AsyncWrite,
89{
90    let reader = BufReader::new(reader);
91    Copy(copy_buf(reader, writer))
92}
93
94/// Return a future to copies bytes from `stream_a` to `stream_b`,
95/// and from `stream_b` to `stream_a`.
96///
97/// See [`copy_buf_bidirectional()`] for full details.
98///
99/// Unlike `copy_buf_bidirectional`, this function does not require that either stream implements AsyncBufRead:
100/// it wraps them internally in a new `BufReader` with default capacity.
101///
102/// ## Limitations
103///
104/// If an error occurs during transmission, data that was read from one stream,
105/// but not written to the other, will be lost.
106/// To avoid this, use [`copy_buf_bidirectional()`].
107///
108/// Similarly, if you drop this future while it is still pending,
109/// any buffered data will be lost.
110///
111/// See the crate-level documentation for further
112/// [discussion of this function's limitations](crate#Limitations).
113pub fn copy_bidirectional<A, B, AE, BE>(
114    stream_a: A,
115    stream_b: B,
116    on_a_eof: AE,
117    on_b_eof: BE,
118) -> CopyBidirectional<A, B, AE, BE>
119where
120    A: AsyncRead + AsyncWrite,
121    B: AsyncRead + AsyncWrite,
122    AE: EofStrategy<B>,
123    BE: EofStrategy<A>,
124{
125    let stream_a = BufReader::new(stream_a);
126    let stream_b = BufReader::new(stream_b);
127    CopyBidirectional(copy_buf_bidirectional(
128        stream_a,
129        stream_b,
130        eof::BufReaderEofWrapper(on_a_eof),
131        eof::BufReaderEofWrapper(on_b_eof),
132    ))
133}
134
135/// A future returned by [`copy`].
136#[derive(Debug)]
137#[pin_project]
138#[must_use = "futures do nothing unless you `.await` or poll them"]
139pub struct Copy<R, W>(#[pin] CopyBuf<BufReader<R>, W>);
140
141/// A future returned by [`copy_bidirectional`].
142#[derive(Debug)]
143#[pin_project]
144#[must_use = "futures do nothing unless you `.await` or poll them"]
145pub struct CopyBidirectional<A, B, AE, BE>(
146    #[pin]
147    CopyBufBidirectional<
148        BufReader<A>,
149        BufReader<B>,
150        eof::BufReaderEofWrapper<AE>,
151        eof::BufReaderEofWrapper<BE>,
152    >,
153);
154
155// Note: There is intentionally no `into_inner` implementation for these types,
156// since returning the original streams would discard any buffered data.
157
158impl<R, W> Future for Copy<R, W>
159where
160    R: AsyncRead,
161    W: AsyncWrite,
162{
163    type Output = std::io::Result<u64>;
164
165    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
166        self.project().0.poll(cx)
167    }
168}
169
170impl<A, B, AE, BE> Future for CopyBidirectional<A, B, AE, BE>
171where
172    A: AsyncRead + AsyncWrite,
173    B: AsyncRead + AsyncWrite,
174    AE: EofStrategy<B>,
175    BE: EofStrategy<A>,
176{
177    type Output = std::io::Result<(u64, u64)>;
178
179    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
180        self.project().0.poll(cx)
181    }
182}
183
184#[cfg(test)]
185mod test {
186    // @@ begin test lint list maintained by maint/add_warning @@
187    #![allow(clippy::bool_assert_comparison)]
188    #![allow(clippy::clone_on_copy)]
189    #![allow(clippy::dbg_macro)]
190    #![allow(clippy::mixed_attributes_style)]
191    #![allow(clippy::print_stderr)]
192    #![allow(clippy::print_stdout)]
193    #![allow(clippy::single_char_pattern)]
194    #![allow(clippy::unwrap_used)]
195    #![allow(clippy::unchecked_time_subtraction)]
196    #![allow(clippy::useless_vec)]
197    #![allow(clippy::needless_pass_by_value)]
198    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
199
200    use super::*;
201    use std::io;
202
203    /// A struct that implements AsyncRead and AsyncWrite, but always returns an error.
204    #[derive(Debug, Clone)]
205    pub(crate) struct ErrorRW(pub(crate) io::ErrorKind);
206
207    impl AsyncRead for ErrorRW {
208        fn poll_read(
209            self: Pin<&mut Self>,
210            _cx: &mut Context<'_>,
211            _buf: &mut [u8],
212        ) -> Poll<io::Result<usize>> {
213            Poll::Ready(Err(io::Error::from(self.0)))
214        }
215    }
216
217    impl AsyncWrite for ErrorRW {
218        fn poll_write(
219            self: Pin<&mut Self>,
220            _cx: &mut Context<'_>,
221            _buf: &[u8],
222        ) -> Poll<io::Result<usize>> {
223            Poll::Ready(Err(io::Error::from(self.0)))
224        }
225
226        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
227            Poll::Ready(Err(io::Error::from(self.0)))
228        }
229
230        fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
231            Poll::Ready(Err(io::Error::from(self.0)))
232        }
233    }
234
235    /// A struct that implements AsyncRead, but never returns any data.
236    ///
237    /// (This reader is always _pending_.)
238    pub(crate) struct PausedRead;
239
240    impl AsyncRead for PausedRead {
241        fn poll_read(
242            self: Pin<&mut Self>,
243            _cx: &mut Context<'_>,
244            _buf: &mut [u8],
245        ) -> Poll<io::Result<usize>> {
246            Poll::Pending
247        }
248    }
249
250    /// A read-write pair, stapled into a Read+Write stream.
251    #[pin_project]
252    pub(crate) struct RWPair<R, W>(#[pin] pub(crate) R, #[pin] pub(crate) W);
253
254    impl<R: AsyncRead, W> AsyncRead for RWPair<R, W> {
255        fn poll_read(
256            self: Pin<&mut Self>,
257            cx: &mut Context<'_>,
258            buf: &mut [u8],
259        ) -> Poll<io::Result<usize>> {
260            self.project().0.poll_read(cx, buf)
261        }
262    }
263
264    impl<R, W: AsyncWrite> AsyncWrite for RWPair<R, W> {
265        fn poll_write(
266            self: Pin<&mut Self>,
267            cx: &mut Context<'_>,
268            buf: &[u8],
269        ) -> Poll<io::Result<usize>> {
270            self.project().1.poll_write(cx, buf)
271        }
272
273        fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
274            self.project().1.poll_flush(cx)
275        }
276
277        fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
278            self.project().1.poll_close(cx)
279        }
280    }
281}