netio/adapt/
mod.rs

1use std::{self, cmp};
2
3use {Result, Error, ErrorKind, Read, BufRead, BufReadGrow, Write};
4
5mod compat;
6
7/// An adapter that retries reading/writing operations of the underlying reader or writer.
8///
9/// This struct is generally created by calling `retry()` on a reader or writer.
10/// Please see the documentation of [`Stream::retry`] for more details.
11///
12/// [`Stream::retry`]: ./trait.Stream.html#method.retry
13pub struct Retry<I> {
14    inner: I,
15}
16
17impl<I> Retry<I> {
18    #[inline]
19    pub fn new(inner: I) -> Retry<I> { Retry { inner: inner } }
20}
21
22impl<I: Read> Read for Retry<I> {
23    #[inline]
24    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
25        loop {
26            match self.inner.read(buf) {
27                Err(ref e) if e.kind() == ErrorKind::Interrupted => {}
28                other => return other,
29            }
30        }
31    }
32}
33
34impl<I: Write> Write for Retry<I> {
35    #[inline]
36    fn write(&mut self, buf: &[u8]) -> Result<usize> {
37        loop {
38            match self.inner.write(buf) {
39                Err(ref e) if e.kind() == ErrorKind::Interrupted => {}
40                other => return other,
41            }
42        }
43    }
44
45    #[inline]
46    fn flush(&mut self) -> Result<()> {
47        loop {
48            match self.inner.flush() {
49                Err(ref e) if e.kind() == ErrorKind::Interrupted => {}
50                other => return other,
51            }
52        }
53    }
54}
55
56impl<R: BufRead> BufRead for Retry<R> {
57    #[inline]
58    fn fill_buf(&mut self) -> Result<&[u8]> {
59        while let Err(e) = self.inner.fill_buf() {
60            if e.kind() != ErrorKind::Interrupted {
61                return Err(e);
62            }
63        }
64        self.inner.fill_buf()
65    }
66
67    #[inline]
68    fn consume(&mut self, amt: usize) { self.inner.consume(amt) }
69}
70
71impl<R: BufReadGrow> BufReadGrow for Retry<R> {
72    #[inline]
73    fn grow_buf(&mut self) -> Result<&[u8]> {
74        while let Err(e) = self.inner.grow_buf() {
75            if e.kind() != ErrorKind::Interrupted {
76                return Err(e);
77            }
78        }
79        self.inner.fill_buf()
80    }
81}
82
83/// An adapter that restarts from the beginning after EOF is reached.
84///
85/// This struct is generally created by calling `repeat()` on a reader.
86/// Please see the documentation of [`Read::repeat`] for more details.
87///
88/// [`Read::repeat`]: ./trait.Read.html#method.repeat
89pub struct Repeat<R> {
90    inner: R,
91}
92
93impl<I> Repeat<I> {
94    #[inline]
95    pub fn new(inner: I) -> Repeat<I> { Repeat { inner: inner } }
96}
97
98impl<I: Read + std::io::Seek> Read for Repeat<I> {
99    #[inline]
100    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
101        match self.inner.read(buf) {
102            Ok(0) => {
103                try!(self.inner.seek(std::io::SeekFrom::Start(0)));
104                self.inner.read(buf)
105            }
106            Ok(n) => Ok(n),
107            Err(e) => Err(e),
108        }
109    }
110}
111
112impl<I: BufRead + std::io::Seek> BufRead for Repeat<I> {
113    #[inline]
114    fn fill_buf(&mut self) -> Result<&[u8]> {
115        if try!(self.inner.fill_buf()).is_empty() {
116            try!(self.inner.seek(std::io::SeekFrom::Start(0)));
117        }
118        self.inner.fill_buf()
119    }
120
121    #[inline]
122    fn consume(&mut self, amt: usize) { self.inner.consume(amt) }
123}
124
125/// Adapter which limits the bytes read from / written to an underlying reader / writer.
126///
127/// This struct is generally created by calling `take()` on a reader/writer.
128/// Please see the documentation of [`Stream::take`] for more details.
129///
130/// [`Stream::take`]: ./trait.Stream.html#method.take
131pub struct Take<T> {
132    inner: T,
133    limit: u64,
134}
135
136impl<T> Take<T> {
137    #[inline]
138    pub fn new(inner: T, limit: u64) -> Take<T> { Take { inner: inner, limit: limit } }
139
140    /// Returns the number of bytes that can be read before this instance will return EOF.
141    ///
142    /// Note
143    /// ====
144    /// This instance may reach EOF after reading fewer bytes than indicated by
145    /// this method if the underlying `Read` instance reaches EOF.
146    #[inline]
147    pub fn limit(&self) -> u64 { self.limit }
148}
149
150impl<T: Read> Read for Take<T> {
151    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
152        // Don't call into inner reader at all at EOF because it may still block
153        if self.limit == 0 {
154            return Ok(0);
155        }
156
157        let max = cmp::min(buf.len() as u64, self.limit) as usize;
158        let n = try!(self.inner.read(&mut buf[..max]));
159        self.limit -= n as u64;
160        Ok(n)
161    }
162}
163
164impl<T: BufRead> BufRead for Take<T> {
165    #[inline]
166    fn fill_buf(&mut self) -> Result<&[u8]> {
167        // Don't call into inner reader at all at EOF because it may still block
168        if self.limit == 0 {
169            return Ok(&[]);
170        }
171
172        let buf = try!(self.inner.fill_buf());
173        let cap = cmp::min(buf.len() as u64, self.limit) as usize;
174        Ok(&buf[..cap])
175    }
176
177    #[inline]
178    fn consume(&mut self, amt: usize) {
179        // Don't let callers reset the limit by passing an overlarge value
180        let amt = cmp::min(amt as u64, self.limit) as usize;
181        self.limit -= amt as u64;
182        self.inner.consume(amt);
183    }
184}
185
186impl<T: BufReadGrow> BufReadGrow for Take<T> {
187    fn grow_buf(&mut self) -> Result<&[u8]> {
188        // Don't call into inner reader at all at EOF because it may still block
189        if self.limit == 0 || self.limit == try!(self.fill_buf()).len() as u64 {
190            return Err(Error::new(ErrorKind::UnexpectedEof, "Stream is already at EOF"));
191        }
192
193        let buf = try!(self.inner.grow_buf());
194        let cap = cmp::min(buf.len() as u64, self.limit) as usize;
195        Ok(&buf[..cap])
196    }
197}
198
199impl<T: Write> Write for Take<T> {
200    fn write(&mut self, buf: &[u8]) -> Result<usize> {
201        if self.limit == 0 {
202            return Ok(0);
203        }
204
205        let amt = cmp::min(self.limit, buf.len() as u64) as usize;
206        let amt = try!(self.inner.write(&buf[..amt]));
207        self.limit -= amt as u64;
208        Ok(amt)
209    }
210
211    #[inline]
212    fn flush(&mut self) -> Result<()> { self.inner.flush() }
213}
214
215/// Adapter to chain together two readers.
216///
217/// This struct is generally created by calling `chain()` on a reader.
218/// Please see the documentation of [`Read::chain`] for more details.
219///
220/// [`Read::chain`]: ./trait.Read.html#method.chain
221pub struct Chain<T, U> {
222    first: T,
223    second: U,
224    done_first: bool,
225}
226
227impl<T, U> Chain<T, U> {
228    #[inline]
229    pub fn new(first: T, second: U) -> Chain<T, U> {
230        Chain { first: first, second: second, done_first: false }
231    }
232
233    pub fn into_inner(self) -> (T, U) { (self.first, self.second) }
234
235    pub fn get_first(&self) -> &T { &self.first }
236
237    pub fn get_mut_first(&mut self) -> &mut T { &mut self.first }
238
239    pub fn get_second(&self) -> &U { &self.second }
240
241    pub fn get_mut_second(&mut self) -> &mut U { &mut self.second }
242}
243
244impl<T: Read, U: Read> Read for Chain<T, U> {
245    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
246        if !self.done_first {
247            match try!(self.first.read(buf)) {
248                0 => {
249                    self.done_first = true;
250                }
251                n => return Ok(n),
252            }
253        }
254        self.second.read(buf)
255    }
256}
257
258impl<T: BufRead, U: BufRead> BufRead for Chain<T, U> {
259    fn fill_buf(&mut self) -> Result<&[u8]> {
260        if !self.done_first {
261            match try!(self.first.fill_buf()) {
262                buf if buf.len() == 0 => {
263                    self.done_first = true;
264                }
265                buf => return Ok(buf),
266            }
267        }
268        self.second.fill_buf()
269    }
270
271    fn consume(&mut self, amt: usize) {
272        if !self.done_first {
273            self.first.consume(amt)
274        } else {
275            self.second.consume(amt)
276        }
277    }
278}
279
280pub struct Compat<I> {
281    inner: I,
282}
283
284impl<I> Compat<I> {
285    pub fn new(inner: I) -> Compat<I> { Compat { inner: inner } }
286}
287
288impl<R: std::io::Read> Read for Compat<R> {
289    #[inline]
290    fn read(&mut self, buf: &mut [u8]) -> Result<usize> { self.inner.read(buf) }
291}
292
293impl<R: std::io::Read> std::io::Read for Compat<R> {
294    #[inline]
295    fn read(&mut self, buf: &mut [u8]) -> Result<usize> { self.inner.read(buf) }
296}
297
298impl<R: std::io::BufRead> BufRead for Compat<R> {
299    #[inline]
300    fn fill_buf(&mut self) -> Result<&[u8]> { self.inner.fill_buf() }
301    #[inline]
302    fn consume(&mut self, amt: usize) { self.inner.consume(amt) }
303}
304
305impl<R: std::io::BufRead> std::io::BufRead for Compat<R> {
306    #[inline]
307    fn fill_buf(&mut self) -> Result<&[u8]> { self.inner.fill_buf() }
308    #[inline]
309    fn consume(&mut self, amt: usize) { self.inner.consume(amt) }
310}
311
312impl<R: std::io::Write> Write for Compat<R> {
313    #[inline]
314    fn write(&mut self, buf: &[u8]) -> Result<usize> { self.inner.write(buf) }
315    #[inline]
316    fn flush(&mut self) -> Result<()> { self.inner.flush() }
317}
318
319impl<R: std::io::Write> std::io::Write for Compat<R> {
320    #[inline]
321    fn write(&mut self, buf: &[u8]) -> Result<usize> { self.inner.write(buf) }
322    #[inline]
323    fn flush(&mut self) -> Result<()> { self.inner.flush() }
324}
325
326
327#[cfg(test)]
328mod tests {
329    use std::io;
330
331    use {ErrorKind, Stream};
332
333    #[test]
334    fn take_write() {
335        let mut c = io::Cursor::new(&b"0123456789"[..]);
336        let mut v = Vec::new();
337        assert_eq!(::copy(&mut c, &mut v.by_ref().take(0)).unwrap_err().kind(),
338                   ErrorKind::WriteZero);
339        assert_eq!(v, b"");
340
341        let mut c = io::Cursor::new(&b"0123456789"[..]);
342        let mut v = Vec::new();
343        assert_eq!(::copy(&mut c, &mut v.by_ref().take(9)).unwrap_err().kind(),
344                   ErrorKind::WriteZero);
345        assert_eq!(v, b"012345678");
346
347        let mut c = io::Cursor::new(&b"0123456789"[..]);
348        let mut v = Vec::new();
349        ::copy(&mut c, &mut v.by_ref().take(10)).unwrap();
350        assert_eq!(v, b"0123456789");
351
352        let mut c = io::Cursor::new(&b"0123456789"[..]);
353        let mut v = Vec::new();
354        ::copy(&mut c, &mut v.by_ref().take(11)).unwrap();
355        assert_eq!(v, b"0123456789");
356    }
357}