tokio_copy_with_buffer/lib.rs
1//! Customized variant of [`tokio_io::io::copy` function] where you can set your own
2//! buffer and retrieve it after copying. This may increase performance in some cases.
3//! [`tokio_io::io::copy` function]: https://docs.rs/tokio-io/0.1/tokio_io/io/fn.copy.html
4#![deny(missing_docs)]
5
6extern crate futures;
7#[macro_use]
8extern crate tokio_io;
9
10use std::io;
11
12use futures::{Future, Poll};
13
14use tokio_io::{AsyncRead, AsyncWrite};
15
16/// A future which will copy all data from a reader into a writer.
17///
18/// Created by the [`copy_with_buffer`] function, this future will resolve to the number of
19/// bytes copied (along with other things) or an error if one happens.
20///
21/// [`copy_with_buffer`]: fn.copy_with_buffer.html
22#[derive(Debug)]
23pub struct Copy<R, W> {
24 reader: Option<R>,
25 read_done: bool,
26 writer: Option<W>,
27 pos: usize,
28 cap: usize,
29 amt: u64,
30 buffer: Option<Box<[u8]>>,
31}
32
33/// Creates a future which represents copying all the bytes from one object to
34/// another, just like [the original `copy`] function. This version uses large
35/// buffer (65536) by default, unlike 4096 as in the original.
36///
37/// The returned future will copy all the bytes read from `reader` into the
38/// `writer` specified. This future will only complete once the `reader` has hit
39/// EOF and all bytes have been written to and flushed from the `writer`
40/// provided.
41///
42/// On success the number of bytes is returned and the `reader` and `writer` are
43/// consumed. Additionally the buffer used for copying is also returned.
44/// On error the error is returned and the I/O objects are consumed as
45/// well.
46/// [the original `copy`]: https://docs.rs/tokio-io/0.1/tokio_io/io/fn.copy.html
47pub fn copy<R, W>(reader: R, writer: W) -> Copy<R, W>
48 where R: AsyncRead,
49 W: AsyncWrite,
50{
51 copy_with_buffer(reader, writer, Box::new([0; 65536]))
52}
53
54/// Advanced version of [`copy`] where you can specify your own buffer.
55/// Buffer may be reused for multiple copy operations.
56///
57/// For other description text see the [`copy` function documentation].
58/// [`copy` function documentation]: fn.copy.html
59pub fn copy_with_buffer<R, W>(reader: R, writer: W, buffer: Box<[u8]>) -> Copy<R, W>
60 where R: AsyncRead,
61 W: AsyncWrite,
62{
63 Copy {
64 reader: Some(reader),
65 read_done: false,
66 writer: Some(writer),
67 amt: 0,
68 pos: 0,
69 cap: 0,
70 buffer: Some(buffer),
71 }
72}
73
74impl<R, W> Future for Copy<R, W>
75 where R: AsyncRead,
76 W: AsyncWrite,
77{
78 type Item = (u64, R, W, Box<[u8]>);
79 type Error = io::Error;
80
81 fn poll(&mut self) -> Poll<(u64, R, W, Box<[u8]>), io::Error> {
82 loop {
83 // If our buffer is empty, then we need to read some data to
84 // continue.
85 if self.pos == self.cap && !self.read_done {
86 let buf = self.buffer.as_mut().unwrap();
87 let reader = self.reader.as_mut().unwrap();
88 let n = try_nb!(reader.read(buf));
89 if n == 0 {
90 self.read_done = true;
91 } else {
92 self.pos = 0;
93 self.cap = n;
94 }
95 }
96
97 // If our buffer has some data, let's write it out!
98 while self.pos < self.cap {
99 let buf = self.buffer.as_mut().unwrap();
100 let writer = self.writer.as_mut().unwrap();
101 let i = try_nb!(writer.write(&mut buf[self.pos..self.cap]));
102 if i == 0 {
103 return Err(io::Error::new(io::ErrorKind::WriteZero,
104 "write zero byte into writer"));
105 } else {
106 self.pos += i;
107 self.amt += i as u64;
108 }
109 }
110
111 // If we've written al the data and we've seen EOF, flush out the
112 // data and finish the transfer.
113 // done with the entire transfer.
114 if self.pos == self.cap && self.read_done {
115 try_nb!(self.writer.as_mut().unwrap().flush());
116 let reader = self.reader.take().unwrap();
117 let writer = self.writer.take().unwrap();
118 let buffer = self.buffer.take().unwrap();
119 return Ok((self.amt, reader, writer, buffer).into())
120 }
121 }
122 }
123}