1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
use std::io;
use futures::{Future, Poll};
use crate::{AsyncRead, AsyncWrite};
#[derive(Debug, Copy, Clone)]
pub struct CopyOptions {
pub stop_on_reader_zero_read: bool,
pub once: bool,
pub buffer_size: usize,
/// Because of -u or -U
pub skip: bool,
pub max_ops: Option<usize>,
}
/// A future which will copy all data from a reader into a writer.
/// A modified version of tokio_io::copy::Copy.
///
/// Created by the [`copy`] function, this future will resolve to the number of
/// bytes copied or an error if one happens.
///
/// [`copy`]: fn.copy.html
#[derive(Debug)]
pub struct Copy<R, W> {
reader: Option<R>,
read_done: bool,
writer: Option<W>,
pos: usize,
cap: usize,
amt: u64,
buf: Box<[u8]>,
opts: CopyOptions,
read_occurred: bool,
remaining_ops: Option<usize>,
preamble: Vec<String>,
preamble_index: usize,
}
/// Creates a future which represents copying all the bytes from one object to
/// another.
///
/// The returned future will copy all the bytes read from `reader` into the
/// `writer` specified. This future will only complete once the `reader` has hit
/// EOF and all bytes have been written to and flushed from the `writer`
/// provided.
///
/// On success the number of bytes is returned and the `reader` and `writer` are
/// consumed. On error the error is returned and the I/O objects are consumed as
/// well.
///
/// Unlike original tokio_io::copy::copy, it does not always stop on zero length
/// reads , handles BrokenPipe error kind as EOF and flushes after every write
pub fn copy<R, W>(reader: R, writer: W, opts: CopyOptions, preamble: Vec<String>) -> Copy<R, W>
where
R: AsyncRead,
W: AsyncWrite,
{
Copy {
reader: Some(reader),
read_done: false,
writer: Some(writer),
amt: 0,
pos: 0,
cap: 0,
// TODO - de-hardcode buffer size
buf: vec![0; opts.buffer_size].into_boxed_slice(),
opts,
read_occurred: false,
remaining_ops: opts.max_ops,
preamble,
preamble_index: 0,
}
}
impl<R, W> Future for Copy<R, W>
where
R: AsyncRead,
W: AsyncWrite,
{
type Item = (u64, R, W);
type Error = io::Error;
fn poll(&mut self) -> Poll<(u64, R, W), io::Error> {
loop {
// First ensure that preamble messages got drained
if self.preamble_index < self.preamble.len() {
let writer = self.writer.as_mut().unwrap();
let i = try_nb!(writer.write(&self.preamble[self.preamble_index].as_bytes()));
if i == 0 {
return Err(io::Error::new(
io::ErrorKind::WriteZero,
"write zero byte into writer",
));
} else {
trace!("preamble write {}", i);
if i != self.preamble[self.preamble_index].len() {
warn!("Short write of a preamble. Expect trimmed data.")
}
self.preamble_index += 1;
}
try_nb!(writer.flush());
continue;
}
// Handle inhibiting options only after preamble is drained.
if self.opts.skip {
debug!("copy skipped");
let reader = self.reader.take().unwrap();
let writer = self.writer.take().unwrap();
return Ok((0, reader, writer).into());
}
// If our buffer is empty, then we need to read some data to
// continue.
trace!("poll");
if self.pos == self.cap && !self.read_done {
if self.read_occurred && self.opts.once {
debug!("Once mode requested, so aborting copy");
self.read_done = true;
continue;
}
if self.remaining_ops == Some(0) {
debug!("Maximum number of messages to copy exceed, so aborting copy");
self.read_done = true;
continue;
}
let reader = self.reader.as_mut().unwrap();
let rr = reader.read(&mut self.buf);
if let Err(ref e) = rr {
if e.kind() == io::ErrorKind::BrokenPipe {
debug!("BrokenPipe: read_done");
self.read_done = true;
continue;
}
}
let n = try_nb!(rr);
trace!("read {}", n);
if let Some(ref mut maxops) = self.remaining_ops {
*maxops -= 1;
}
if n == 0 {
debug!("zero len");
if self.opts.stop_on_reader_zero_read {
debug!("read_done");
self.read_done = true;
}
continue;
} else {
self.pos = 0;
self.cap = n;
self.read_occurred = true;
}
}
// If our buffer has some data, let's write it out!
while self.pos < self.cap {
let writer = self.writer.as_mut().unwrap();
let i = try_nb!(writer.write(&self.buf[self.pos..self.cap]));
if i == 0 {
return Err(io::Error::new(
io::ErrorKind::WriteZero,
"write zero byte into writer",
));
} else {
trace!("write {}", i);
self.pos += i;
self.amt += i as u64;
}
try_nb!(writer.flush());
}
// If we've written al the data and we've seen EOF, flush out the
// data and finish the transfer.
// done with the entire transfer.
if self.pos == self.cap && self.read_done {
try_nb!(self.writer.as_mut().unwrap().flush());
let reader = self.reader.take().unwrap();
let writer = self.writer.take().unwrap();
debug!("done");
return Ok((self.amt, reader, writer).into());
}
}
}
}