futures_util/io/
copy_into.rs

1use std::io;
2use std::boxed::Box;
3
4use {Future, Poll, task};
5
6use futures_io::{AsyncRead, AsyncWrite};
7
8/// A future which will copy all data from a reader into a writer.
9///
10/// Created by the [`copy_into`] function, this future will resolve to the number of
11/// bytes copied or an error if one happens.
12///
13/// [`copy_into`]: fn.copy_into.html
14#[derive(Debug)]
15pub struct CopyInto<R, W> {
16    reader: Option<R>,
17    read_done: bool,
18    writer: Option<W>,
19    pos: usize,
20    cap: usize,
21    amt: u64,
22    buf: Box<[u8]>,
23}
24
25pub fn copy_into<R, W>(reader: R, writer: W) -> CopyInto<R, W> {
26    CopyInto {
27        reader: Some(reader),
28        read_done: false,
29        writer: Some(writer),
30        amt: 0,
31        pos: 0,
32        cap: 0,
33        buf: Box::new([0; 2048]),
34    }
35}
36
37impl<R, W> Future for CopyInto<R, W>
38    where R: AsyncRead,
39          W: AsyncWrite,
40{
41    type Item = (u64, R, W);
42    type Error = io::Error;
43
44    fn poll(&mut self, cx: &mut task::Context) -> Poll<(u64, R, W), io::Error> {
45        loop {
46            // If our buffer is empty, then we need to read some data to
47            // continue.
48            if self.pos == self.cap && !self.read_done {
49                let reader = self.reader.as_mut().unwrap();
50                let n = try_ready!(reader.poll_read(cx, &mut self.buf));
51                if n == 0 {
52                    self.read_done = true;
53                } else {
54                    self.pos = 0;
55                    self.cap = n;
56                }
57            }
58
59            // If our buffer has some data, let's write it out!
60            while self.pos < self.cap {
61                let writer = self.writer.as_mut().unwrap();
62                let i = try_ready!(writer.poll_write(cx, &self.buf[self.pos..self.cap]));
63                if i == 0 {
64                    return Err(io::Error::new(io::ErrorKind::WriteZero,
65                                              "write zero byte into writer"));
66                } else {
67                    self.pos += i;
68                    self.amt += i as u64;
69                }
70            }
71
72            // If we've written al the data and we've seen EOF, flush out the
73            // data and finish the transfer.
74            // done with the entire transfer.
75            if self.pos == self.cap && self.read_done {
76                try_ready!(self.writer.as_mut().unwrap().poll_flush(cx));
77                let reader = self.reader.take().unwrap();
78                let writer = self.writer.take().unwrap();
79                return Ok((self.amt, reader, writer).into())
80            }
81        }
82    }
83}