futures_bufio/
read.rs

1use futures::Future;
2use futures::future::{Either, ok};
3use futures_cpupool::CpuPool;
4
5use std::ops::DerefMut;
6
7use std::io::{self, Read};
8use std::mem;
9
10use common::EXP_POOL;
11
12/// Adds buffering to any reader, similar to the [standard `BufReader`], but performs non-buffer
13/// reads in a thread pool.
14///
15/// [standard `BufReader`]: https://doc.rust-lang.org/std/io/struct.BufReader.html
16///
17/// All reads are returned as futures.
18///
19/// This reader is most useful for wrapping readers that never block or cannot return EWOULDBLOCK,
20/// but are slow. Notably, this is useful for wrapping `io::File`.
21///
22/// All reads must take and own the `BufReader` and the buffer being written for the duration of
23/// the read.
24///
25/// # Examples
26/// ```
27/// # extern crate futures_cpupool;
28/// # extern crate futures;
29/// # extern crate futures_bufio;
30/// #
31/// # use futures::Future;
32/// # use futures_cpupool::CpuPool;
33/// # use futures_bufio::BufReader;
34/// # use std::io;
35/// # fn main() {
36/// let f = io::Cursor::new(b"normally, we would open a file here here".to_vec());
37/// let pool = CpuPool::new(1);
38/// let reader = BufReader::with_pool_and_capacity(pool, 10, f);
39///
40/// let buf = vec![0; 10];
41/// let (reader, buf, n) = reader.try_read_full(buf).wait().unwrap_or_else(|(_, _, e)| {
42///     // in real usage, we have the option to deconstruct our BufReader or reuse buf here
43///     panic!("unable to read full: {}", e);
44/// });
45/// assert_eq!(n, 10);
46/// assert_eq!(&buf[..n], b"normally, ");
47/// # }
48/// ```
49pub struct BufReader<R> {
50    inner: R,
51    buf: Box<[u8]>,
52    pos: usize,
53    cap: usize,
54    pool: Option<CpuPool>,
55}
56
57/// Wraps `R` with the original buffer being read into and the number of bytes read.
58type OkRead<R, B> = (R, B, usize);
59/// Wraps `R` with the original buffer being read into and the error encountered while reading.
60type ErrRead<R, B> = (R, B, io::Error);
61
62impl<R: Read + Send + 'static> BufReader<R> {
63    /// Creates and returns a new `BufReader` with an internal buffer of size `cap`.
64    ///
65    /// # Examples
66    /// ```
67    /// # extern crate futures_cpupool;
68    /// # extern crate futures_bufio;
69    /// #
70    /// # use futures_cpupool::CpuPool;
71    /// # use futures_bufio::BufReader;
72    /// # use std::io;
73    /// # fn main() {
74    /// let f = io::Cursor::new(b"foo text".to_vec());
75    /// let pool = CpuPool::new(1);
76    /// let reader = BufReader::with_pool_and_capacity(pool, 4<<10, f);
77    /// # }
78    /// ```
79    pub fn with_pool_and_capacity(pool: CpuPool, cap: usize, inner: R) -> BufReader<R> {
80        let mut buf = Vec::with_capacity(cap);
81        unsafe {
82            buf.set_len(cap);
83        }
84        BufReader::with_pool_and_buf(pool, buf.into_boxed_slice(), inner)
85    }
86
87    /// Creates and returns a new `BufReader` with `buf` as the internal buffer.
88    ///
89    /// # Examples
90    /// ```
91    /// # extern crate futures_cpupool;
92    /// # extern crate futures_bufio;
93    /// #
94    /// # use futures_cpupool::CpuPool;
95    /// # use futures_bufio::BufReader;
96    /// # use std::io;
97    /// # fn main() {
98    /// let f = io::Cursor::new(b"foo text".to_vec());
99    /// let pool = CpuPool::new(1);
100    /// let buf = vec![0; 4096].into_boxed_slice();
101    /// let reader = BufReader::with_pool_and_buf(pool, buf, f);
102    /// # }
103    /// ```
104    pub fn with_pool_and_buf(pool: CpuPool, buf: Box<[u8]>, inner: R) -> BufReader<R> {
105        let cap = buf.len();
106        BufReader {
107            inner: inner,
108            buf: buf,
109            pos: cap,
110            cap: cap,
111            pool: Some(pool),
112        }
113    }
114
115    /// Gets a reference to the underlying reader.
116    ///
117    /// It is likely invalid to read directly from the underlying reader and then use the `BufReader`
118    /// again.
119    pub fn get_ref(&self) -> &R {
120        &self.inner
121    }
122
123    /// Gets a mutable reference to the underlying reader.
124    ///
125    /// It is likely invalid to read directly from the underlying reader and then use the `BufReader`
126    /// again.
127    pub fn get_mut(&mut self) -> &R {
128        &mut self.inner
129    }
130
131    /// Sets the `BufReader`s internal buffer position to `pos`.
132    ///
133    ///
134    /// This is _highly_ unsafe for the following reasons:
135    ///
136    ///   - the internal buffer may have uninitialized memory, and moving the read position back
137    ///     will mean reading uninitialized memory
138    ///
139    ///   - the pos is not validated, meaning it is possible to move the pos past the end of the
140    ///     internal buffer. This will cause a panic on the next use of `try_read_full`.
141    ///
142    ///   - it is possible to move _past_ the internal "capacity" end position, meaning a read may
143    ///     panic due to the beginning of a read being after its end.
144    ///
145    /// This function should only be used for setting up a new `BufReader` with a buffer that
146    /// contains known, existing contents.
147    ///
148    /// # Examples
149    /// ```
150    /// # extern crate futures_cpupool;
151    /// # extern crate futures;
152    /// # extern crate futures_bufio;
153    /// #
154    /// # use futures::Future;
155    /// # use futures_cpupool::CpuPool;
156    /// # use futures_bufio::BufReader;
157    /// # use std::io;
158    /// # fn main() {
159    /// let f = io::Cursor::new(vec![]);
160    /// let pool = CpuPool::new(1);
161    /// let mut buf = vec![0; 4096].into_boxed_slice();
162    ///
163    /// let p = b"pre-existing text";
164    /// let buf_len = buf.len();
165    ///
166    /// // copy some known text to our buffer - note it must be at the end
167    /// &mut buf[buf_len-p.len()..].copy_from_slice(p);
168    ///
169    /// let mut reader = BufReader::with_pool_and_buf(pool, buf, f);
170    ///
171    /// // unsafely move the reader's position to the beginning of our known text, and read it
172    /// unsafe { reader.set_pos(buf_len-p.len()); }
173    /// let (_, b, _) = reader
174    ///     .try_read_full(vec![0; p.len()])
175    ///     .wait()
176    ///     .unwrap_or_else(|(_, _, e)| {
177    ///         panic!("unable to read: {}", e);
178    ///     });
179    ///
180    /// // our read should be all of our known contents
181    /// assert_eq!(&*b, p);
182    /// # }
183    /// ```
184    pub unsafe fn set_pos(&mut self, pos: usize) {
185        self.pos = pos;
186    }
187
188    /// Returns the internal components of a `BufReader`, allowing reuse. This
189    /// is unsafe because it does not zero the memory of the buffer, meaning
190    /// the buffer could countain uninitialized memory.
191    ///
192    /// # Examples
193    /// ```
194    /// # extern crate futures_cpupool;
195    /// # extern crate futures_bufio;
196    /// #
197    /// # use futures_cpupool::CpuPool;
198    /// # use futures_bufio::BufReader;
199    /// # use std::io;
200    /// # fn main() {
201    /// let f = io::Cursor::new(b"foo text".to_vec());
202    /// let pool = CpuPool::new(1);
203    /// let reader = BufReader::with_pool_and_capacity(pool, 4<<10, f);
204    ///
205    /// let (f, buf, pool) = unsafe { reader.components() };
206    /// assert_eq!(f.get_ref(), b"foo text");
207    /// assert_eq!(buf.len(), 4<<10);
208    /// # }
209    /// ```
210    pub unsafe fn components(mut self) -> (R, Box<[u8]>, CpuPool) {
211        let r = mem::replace(&mut self.inner, mem::uninitialized());
212        let buf = mem::replace(&mut self.buf, mem::uninitialized());
213        let mut pool = mem::replace(&mut self.pool, mem::uninitialized());
214        let pool = pool.take().expect(EXP_POOL);
215        mem::forget(self);
216        (r, buf, pool)
217    }
218
219    /// Reads into `buf` until `buf` is filled or the underlying reader returns a zero read (hits
220    /// EOF).
221    ///
222    /// This returns the buffer and the number of bytes read. The buffer may need sized down on use
223    /// if this returns with a short read.
224    ///
225    /// If used on `io::File`'s, `BufReader` could be valuable for performing page-aligned reads.
226    /// In this case, once this function returns a short read, we reached EOF and any futures
227    /// reads may be un-aligned.
228    ///
229    /// # Examples
230    /// ```
231    /// # extern crate futures_cpupool;
232    /// # extern crate futures;
233    /// # extern crate futures_bufio;
234    /// #
235    /// # use futures::Future;
236    /// # use futures_cpupool::CpuPool;
237    /// # use futures_bufio::BufReader;
238    /// # use std::io;
239    /// # fn main() {
240    /// let f = io::Cursor::new(b"foo text".to_vec());
241    /// let pool = CpuPool::new(1);
242    /// let reader = BufReader::with_pool_and_capacity(pool, 10, f);
243    ///
244    /// let buf = vec![0; 10];
245    /// let (reader, buf, n) = reader.try_read_full(buf).wait().unwrap_or_else(|(_, _, e)| {
246    ///     // in real usage, we have the option to deconstruct our BufReader or reuse buf here
247    ///     panic!("unable to read full: {}", e);
248    /// });
249    /// assert_eq!(n, 8);
250    /// assert_eq!(&*buf, b"foo text\0\0");
251    /// assert_eq!(&buf[..n], b"foo text");
252    /// # }
253    /// ```
254    pub fn try_read_full<B>(
255        mut self,
256        mut buf: B,
257    ) -> impl Future<Item = OkRead<Self, B>, Error = ErrRead<Self, B>>
258    where
259        B: DerefMut<Target = [u8]> + Send + 'static,
260    {
261        const U8READ: &str = "&[u8] reads never error";
262        let mut rem = buf.len();
263        let mut at = 0;
264
265        if self.pos != self.cap {
266            at = (&self.buf[self.pos..self.cap]).read(&mut buf).expect(
267                U8READ,
268            );
269            rem -= at;
270            self.pos += at;
271
272            if rem == 0 {
273                return Either::A(ok::<OkRead<Self, B>, ErrRead<Self, B>>((self, buf, at)));
274            }
275        }
276        // self.pos == self.cap
277
278        let pool = self.pool.take().expect(EXP_POOL);
279
280        let block = if self.cap > 0 {
281            rem - rem % self.cap
282        } else {
283            rem
284        };
285
286        let fut = pool.spawn_fn(move || {
287            if block > 0 {
288                let (block_read, err) = try_read_full(&mut self.inner, &mut buf[at..at + block]);
289                if let Some(e) = err {
290                    return Err((self, buf, e));
291                }
292
293                at += block_read;
294                rem -= block_read;
295                if rem == 0 {
296                    return Ok((self, buf, at));
297                }
298            }
299
300            let (buf_read, err) = try_read_full(&mut self.inner, &mut self.buf);
301            match err {
302                Some(e) => Err((self, buf, e)),
303                None => {
304                    self.cap = buf_read;
305                    self.pos = (&self.buf[..self.cap]).read(&mut buf[at..]).expect(U8READ);
306                    at += self.pos;
307                    Ok((self, buf, at))
308                }
309            }
310        });
311
312        Either::B(fut.then(|res| match res {
313            Ok(mut x) => {
314                x.0.pool = Some(pool);
315                Ok(x)
316            }
317            Err(mut x) => {
318                x.0.pool = Some(pool);
319                Err(x)
320            }
321        }))
322    }
323}
324
325fn try_read_full<R: Read>(r: &mut R, mut buf: &mut [u8]) -> (usize, Option<io::Error>) {
326    let mut nn: usize = 0;
327    while !buf.is_empty() {
328        match r.read(buf) {
329            Ok(0) => break,
330            Ok(n) => {
331                let tmp = buf;
332                buf = &mut tmp[n..];
333                nn += n;
334            }
335            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
336            Err(e) => return (nn, Some(e)),
337        }
338    }
339    (nn, None)
340}
341
342#[test]
343fn test_read() {
344    use std::fs;
345    use std::io::Write;
346
347    // create the test file
348    fs::OpenOptions::new()
349        .write(true)
350        .create_new(true)
351        .open("bar.txt")
352        .expect("unable to exclusively create foo")
353        .write_all(
354            "Strapped down to my bed, feet cold, eyes red. I'm out of my head. Am I alive? Am I \
355             dead?"
356                .as_bytes(),
357        )
358        .expect("unable to write all");
359
360
361    // create our buffered reader
362    let f = BufReader::with_pool_and_capacity(
363        CpuPool::new(1),
364        10,
365        fs::OpenOptions::new().read(true).open("bar.txt").expect(
366            "foo does not exist?",
367        ),
368    );
369    assert_eq!(f.pos, 10);
370
371    // disk read, no blocks
372    let (f, buf, n) = f.try_read_full(vec![0; 5]).wait().unwrap_or_else(
373        |(_, _, e)| {
374            panic!("unable to read: {}", e)
375        },
376    );
377    assert_eq!(n, 5);
378    assert_eq!(&*buf, b"Strap");
379    assert_eq!(f.pos, 5);
380    assert_eq!(f.cap, 10);
381    assert_eq!(&*f.buf, b"Strapped d");
382
383    // mem read only
384    let (f, buf, n) = f.try_read_full(vec![0; 2]).wait().unwrap_or_else(
385        |(_, _, e)| {
386            panic!("unable to read: {}", e)
387        },
388    );
389    assert_eq!(n, 2);
390    assert_eq!(&*buf, b"pe");
391    assert_eq!(f.pos, 7);
392    assert_eq!(f.cap, 10);
393    assert_eq!(&*f.buf, b"Strapped d");
394
395    // mem (3) + disk blocks (20) + more mem (2)
396    let (f, buf, n) = f.try_read_full(vec![0; 25]).wait().unwrap_or_else(
397        |(_, _, e)| {
398            panic!("unable to read: {}", e)
399        },
400    );
401    assert_eq!(n, 25);
402    assert_eq!(&*buf, b"d down to my bed, feet co");
403    assert_eq!(f.pos, 2);
404    assert_eq!(f.cap, 10);
405    assert_eq!(&*f.buf, b"cold, eyes");
406
407    // mem (8) + disk block (10)
408    let (f, buf, n) = f.try_read_full(vec![0; 18]).wait().unwrap_or_else(
409        |(_, _, e)| {
410            panic!("unable to read: {}", e)
411        },
412    );
413    assert_eq!(n, 18);
414    assert_eq!(&*buf, b"ld, eyes red. I'm ");
415    assert_eq!(f.pos, 10);
416    assert_eq!(f.cap, 10);
417    assert_eq!(&*f.buf, b"cold, eyes"); // non-reset buf
418
419    // disk block (10)
420    let (f, buf, n) = f.try_read_full(vec![0; 10]).wait().unwrap_or_else(
421        |(_, _, e)| {
422            panic!("unable to read: {}", e)
423        },
424    );
425    assert_eq!(n, 10);
426    assert_eq!(&*buf, b"out of my ");
427    assert_eq!(f.pos, 10);
428    assert_eq!(f.cap, 10);
429    assert_eq!(&*f.buf, b"cold, eyes");
430
431    // disk block (20) + mem (9) (over-read by one byte)
432    let (f, buf, n) = f.try_read_full(vec![0; 29]).wait().unwrap_or_else(
433        |(_, _, e)| {
434            panic!("unable to read: {}", e)
435        },
436    );
437    assert_eq!(n, 28);
438    assert_eq!(&*buf, b"head. Am I alive? Am I dead?\0");
439    assert_eq!(f.pos, 8);
440    assert_eq!(f.cap, 8);
441    assert_eq!(&*f.buf, b" I dead?es");
442
443    let (f, buf, n) = f.try_read_full(vec![0; 2]).wait().unwrap_or_else(
444        |(_, _, e)| {
445            panic!("unable to read: {}", e)
446        },
447    );
448    assert_eq!(n, 0);
449    assert_eq!(&*buf, b"\0\0");
450    assert_eq!(f.pos, 0);
451    assert_eq!(f.cap, 0);
452    assert_eq!(&*f.buf, b" I dead?es");
453
454    fs::remove_file("bar.txt").expect("expected file to be removed");
455}