futures_util/io/
copy_into.rs1use std::io;
2use std::boxed::Box;
3
4use {Future, Poll, task};
5
6use futures_io::{AsyncRead, AsyncWrite};
7
8#[derive(Debug)]
15pub struct CopyInto<R, W> {
16 reader: Option<R>,
17 read_done: bool,
18 writer: Option<W>,
19 pos: usize,
20 cap: usize,
21 amt: u64,
22 buf: Box<[u8]>,
23}
24
25pub fn copy_into<R, W>(reader: R, writer: W) -> CopyInto<R, W> {
26 CopyInto {
27 reader: Some(reader),
28 read_done: false,
29 writer: Some(writer),
30 amt: 0,
31 pos: 0,
32 cap: 0,
33 buf: Box::new([0; 2048]),
34 }
35}
36
37impl<R, W> Future for CopyInto<R, W>
38 where R: AsyncRead,
39 W: AsyncWrite,
40{
41 type Item = (u64, R, W);
42 type Error = io::Error;
43
44 fn poll(&mut self, cx: &mut task::Context) -> Poll<(u64, R, W), io::Error> {
45 loop {
46 if self.pos == self.cap && !self.read_done {
49 let reader = self.reader.as_mut().unwrap();
50 let n = try_ready!(reader.poll_read(cx, &mut self.buf));
51 if n == 0 {
52 self.read_done = true;
53 } else {
54 self.pos = 0;
55 self.cap = n;
56 }
57 }
58
59 while self.pos < self.cap {
61 let writer = self.writer.as_mut().unwrap();
62 let i = try_ready!(writer.poll_write(cx, &self.buf[self.pos..self.cap]));
63 if i == 0 {
64 return Err(io::Error::new(io::ErrorKind::WriteZero,
65 "write zero byte into writer"));
66 } else {
67 self.pos += i;
68 self.amt += i as u64;
69 }
70 }
71
72 if self.pos == self.cap && self.read_done {
76 try_ready!(self.writer.as_mut().unwrap().poll_flush(cx));
77 let reader = self.reader.take().unwrap();
78 let writer = self.writer.take().unwrap();
79 return Ok((self.amt, reader, writer).into())
80 }
81 }
82 }
83}