tokio_copy_with_buffer/
lib.rs

1//! Customized variant of [`tokio_io::io::copy` function] where you can set your own
2//! buffer and retrieve it after copying. This may increase performance in some cases.
3//! [`tokio_io::io::copy` function]: https://docs.rs/tokio-io/0.1/tokio_io/io/fn.copy.html
4#![deny(missing_docs)]
5
6extern crate futures;
7#[macro_use]
8extern crate tokio_io;
9
10use std::io;
11
12use futures::{Future, Poll};
13
14use tokio_io::{AsyncRead, AsyncWrite};
15
16/// A future which will copy all data from a reader into a writer.
17///
18/// Created by the [`copy_with_buffer`] function, this future will resolve to the number of
19/// bytes copied (along with other things) or an error if one happens.
20///
21/// [`copy_with_buffer`]: fn.copy_with_buffer.html
22#[derive(Debug)]
23pub struct Copy<R, W> {
24    reader: Option<R>,
25    read_done: bool,
26    writer: Option<W>,
27    pos: usize,
28    cap: usize,
29    amt: u64,
30    buffer: Option<Box<[u8]>>,
31}
32
33/// Creates a future which represents copying all the bytes from one object to
34/// another, just like [the original `copy`] function. This version uses large
35/// buffer (65536) by default, unlike 4096 as in the original.
36///
37/// The returned future will copy all the bytes read from `reader` into the
38/// `writer` specified. This future will only complete once the `reader` has hit
39/// EOF and all bytes have been written to and flushed from the `writer`
40/// provided.
41///
42/// On success the number of bytes is returned and the `reader` and `writer` are
43/// consumed. Additionally the buffer used for copying is also returned.
44/// On error the error is returned and the I/O objects are consumed as
45/// well.
46/// [the original `copy`]: https://docs.rs/tokio-io/0.1/tokio_io/io/fn.copy.html
47pub fn copy<R, W>(reader: R, writer: W) -> Copy<R, W>
48    where R: AsyncRead,
49          W: AsyncWrite,
50{
51    copy_with_buffer(reader, writer, Box::new([0; 65536]))
52}
53
54/// Advanced version of [`copy`] where you can specify your own buffer.
55/// Buffer may be reused for multiple copy operations.
56///
57/// For other description text see the [`copy` function documentation].
58/// [`copy` function documentation]: fn.copy.html
59pub fn copy_with_buffer<R, W>(reader: R, writer: W, buffer: Box<[u8]>) -> Copy<R, W>
60    where R: AsyncRead,
61          W: AsyncWrite,
62{
63    Copy {
64        reader: Some(reader),
65        read_done: false,
66        writer: Some(writer),
67        amt: 0,
68        pos: 0,
69        cap: 0,
70        buffer: Some(buffer),
71    }
72}
73
74impl<R, W> Future for Copy<R, W>
75    where R: AsyncRead,
76          W: AsyncWrite,
77{
78    type Item = (u64, R, W, Box<[u8]>);
79    type Error = io::Error;
80
81    fn poll(&mut self) -> Poll<(u64, R, W, Box<[u8]>), io::Error> {
82        loop {
83            // If our buffer is empty, then we need to read some data to
84            // continue.
85            if self.pos == self.cap && !self.read_done {
86                let buf = self.buffer.as_mut().unwrap();
87                let reader = self.reader.as_mut().unwrap();
88                let n = try_nb!(reader.read(buf));
89                if n == 0 {
90                    self.read_done = true;
91                } else {
92                    self.pos = 0;
93                    self.cap = n;
94                }
95            }
96
97            // If our buffer has some data, let's write it out!
98            while self.pos < self.cap {
99                let buf = self.buffer.as_mut().unwrap();
100                let writer = self.writer.as_mut().unwrap();
101                let i = try_nb!(writer.write(&mut buf[self.pos..self.cap]));
102                if i == 0 {
103                    return Err(io::Error::new(io::ErrorKind::WriteZero,
104                                              "write zero byte into writer"));
105                } else {
106                    self.pos += i;
107                    self.amt += i as u64;
108                }
109            }
110
111            // If we've written al the data and we've seen EOF, flush out the
112            // data and finish the transfer.
113            // done with the entire transfer.
114            if self.pos == self.cap && self.read_done {
115                try_nb!(self.writer.as_mut().unwrap().flush());
116                let reader = self.reader.take().unwrap();
117                let writer = self.writer.take().unwrap();
118                let buffer = self.buffer.take().unwrap();
119                return Ok((self.amt, reader, writer, buffer).into())
120            }
121        }
122    }
123}