futures_bufio/read.rs
1use futures::Future;
2use futures::future::{Either, ok};
3use futures_cpupool::CpuPool;
4
5use std::ops::DerefMut;
6
7use std::io::{self, Read};
8use std::mem;
9
10use common::EXP_POOL;
11
12/// Adds buffering to any reader, similar to the [standard `BufReader`], but performs non-buffer
13/// reads in a thread pool.
14///
15/// [standard `BufReader`]: https://doc.rust-lang.org/std/io/struct.BufReader.html
16///
17/// All reads are returned as futures.
18///
19/// This reader is most useful for wrapping readers that never block or cannot return EWOULDBLOCK,
20/// but are slow. Notably, this is useful for wrapping `io::File`.
21///
22/// All reads must take and own the `BufReader` and the buffer being written for the duration of
23/// the read.
24///
25/// # Examples
26/// ```
27/// # extern crate futures_cpupool;
28/// # extern crate futures;
29/// # extern crate futures_bufio;
30/// #
31/// # use futures::Future;
32/// # use futures_cpupool::CpuPool;
33/// # use futures_bufio::BufReader;
34/// # use std::io;
35/// # fn main() {
36/// let f = io::Cursor::new(b"normally, we would open a file here here".to_vec());
37/// let pool = CpuPool::new(1);
38/// let reader = BufReader::with_pool_and_capacity(pool, 10, f);
39///
40/// let buf = vec![0; 10];
41/// let (reader, buf, n) = reader.try_read_full(buf).wait().unwrap_or_else(|(_, _, e)| {
42/// // in real usage, we have the option to deconstruct our BufReader or reuse buf here
43/// panic!("unable to read full: {}", e);
44/// });
45/// assert_eq!(n, 10);
46/// assert_eq!(&buf[..n], b"normally, ");
47/// # }
48/// ```
49pub struct BufReader<R> {
50 inner: R,
51 buf: Box<[u8]>,
52 pos: usize,
53 cap: usize,
54 pool: Option<CpuPool>,
55}
56
57/// Wraps `R` with the original buffer being read into and the number of bytes read.
58type OkRead<R, B> = (R, B, usize);
59/// Wraps `R` with the original buffer being read into and the error encountered while reading.
60type ErrRead<R, B> = (R, B, io::Error);
61
62impl<R: Read + Send + 'static> BufReader<R> {
63 /// Creates and returns a new `BufReader` with an internal buffer of size `cap`.
64 ///
65 /// # Examples
66 /// ```
67 /// # extern crate futures_cpupool;
68 /// # extern crate futures_bufio;
69 /// #
70 /// # use futures_cpupool::CpuPool;
71 /// # use futures_bufio::BufReader;
72 /// # use std::io;
73 /// # fn main() {
74 /// let f = io::Cursor::new(b"foo text".to_vec());
75 /// let pool = CpuPool::new(1);
76 /// let reader = BufReader::with_pool_and_capacity(pool, 4<<10, f);
77 /// # }
78 /// ```
79 pub fn with_pool_and_capacity(pool: CpuPool, cap: usize, inner: R) -> BufReader<R> {
80 let mut buf = Vec::with_capacity(cap);
81 unsafe {
82 buf.set_len(cap);
83 }
84 BufReader::with_pool_and_buf(pool, buf.into_boxed_slice(), inner)
85 }
86
87 /// Creates and returns a new `BufReader` with `buf` as the internal buffer.
88 ///
89 /// # Examples
90 /// ```
91 /// # extern crate futures_cpupool;
92 /// # extern crate futures_bufio;
93 /// #
94 /// # use futures_cpupool::CpuPool;
95 /// # use futures_bufio::BufReader;
96 /// # use std::io;
97 /// # fn main() {
98 /// let f = io::Cursor::new(b"foo text".to_vec());
99 /// let pool = CpuPool::new(1);
100 /// let buf = vec![0; 4096].into_boxed_slice();
101 /// let reader = BufReader::with_pool_and_buf(pool, buf, f);
102 /// # }
103 /// ```
104 pub fn with_pool_and_buf(pool: CpuPool, buf: Box<[u8]>, inner: R) -> BufReader<R> {
105 let cap = buf.len();
106 BufReader {
107 inner: inner,
108 buf: buf,
109 pos: cap,
110 cap: cap,
111 pool: Some(pool),
112 }
113 }
114
115 /// Gets a reference to the underlying reader.
116 ///
117 /// It is likely invalid to read directly from the underlying reader and then use the `BufReader`
118 /// again.
119 pub fn get_ref(&self) -> &R {
120 &self.inner
121 }
122
123 /// Gets a mutable reference to the underlying reader.
124 ///
125 /// It is likely invalid to read directly from the underlying reader and then use the `BufReader`
126 /// again.
127 pub fn get_mut(&mut self) -> &R {
128 &mut self.inner
129 }
130
131 /// Sets the `BufReader`s internal buffer position to `pos`.
132 ///
133 ///
134 /// This is _highly_ unsafe for the following reasons:
135 ///
136 /// - the internal buffer may have uninitialized memory, and moving the read position back
137 /// will mean reading uninitialized memory
138 ///
139 /// - the pos is not validated, meaning it is possible to move the pos past the end of the
140 /// internal buffer. This will cause a panic on the next use of `try_read_full`.
141 ///
142 /// - it is possible to move _past_ the internal "capacity" end position, meaning a read may
143 /// panic due to the beginning of a read being after its end.
144 ///
145 /// This function should only be used for setting up a new `BufReader` with a buffer that
146 /// contains known, existing contents.
147 ///
148 /// # Examples
149 /// ```
150 /// # extern crate futures_cpupool;
151 /// # extern crate futures;
152 /// # extern crate futures_bufio;
153 /// #
154 /// # use futures::Future;
155 /// # use futures_cpupool::CpuPool;
156 /// # use futures_bufio::BufReader;
157 /// # use std::io;
158 /// # fn main() {
159 /// let f = io::Cursor::new(vec![]);
160 /// let pool = CpuPool::new(1);
161 /// let mut buf = vec![0; 4096].into_boxed_slice();
162 ///
163 /// let p = b"pre-existing text";
164 /// let buf_len = buf.len();
165 ///
166 /// // copy some known text to our buffer - note it must be at the end
167 /// &mut buf[buf_len-p.len()..].copy_from_slice(p);
168 ///
169 /// let mut reader = BufReader::with_pool_and_buf(pool, buf, f);
170 ///
171 /// // unsafely move the reader's position to the beginning of our known text, and read it
172 /// unsafe { reader.set_pos(buf_len-p.len()); }
173 /// let (_, b, _) = reader
174 /// .try_read_full(vec![0; p.len()])
175 /// .wait()
176 /// .unwrap_or_else(|(_, _, e)| {
177 /// panic!("unable to read: {}", e);
178 /// });
179 ///
180 /// // our read should be all of our known contents
181 /// assert_eq!(&*b, p);
182 /// # }
183 /// ```
184 pub unsafe fn set_pos(&mut self, pos: usize) {
185 self.pos = pos;
186 }
187
188 /// Returns the internal components of a `BufReader`, allowing reuse. This
189 /// is unsafe because it does not zero the memory of the buffer, meaning
190 /// the buffer could countain uninitialized memory.
191 ///
192 /// # Examples
193 /// ```
194 /// # extern crate futures_cpupool;
195 /// # extern crate futures_bufio;
196 /// #
197 /// # use futures_cpupool::CpuPool;
198 /// # use futures_bufio::BufReader;
199 /// # use std::io;
200 /// # fn main() {
201 /// let f = io::Cursor::new(b"foo text".to_vec());
202 /// let pool = CpuPool::new(1);
203 /// let reader = BufReader::with_pool_and_capacity(pool, 4<<10, f);
204 ///
205 /// let (f, buf, pool) = unsafe { reader.components() };
206 /// assert_eq!(f.get_ref(), b"foo text");
207 /// assert_eq!(buf.len(), 4<<10);
208 /// # }
209 /// ```
210 pub unsafe fn components(mut self) -> (R, Box<[u8]>, CpuPool) {
211 let r = mem::replace(&mut self.inner, mem::uninitialized());
212 let buf = mem::replace(&mut self.buf, mem::uninitialized());
213 let mut pool = mem::replace(&mut self.pool, mem::uninitialized());
214 let pool = pool.take().expect(EXP_POOL);
215 mem::forget(self);
216 (r, buf, pool)
217 }
218
219 /// Reads into `buf` until `buf` is filled or the underlying reader returns a zero read (hits
220 /// EOF).
221 ///
222 /// This returns the buffer and the number of bytes read. The buffer may need sized down on use
223 /// if this returns with a short read.
224 ///
225 /// If used on `io::File`'s, `BufReader` could be valuable for performing page-aligned reads.
226 /// In this case, once this function returns a short read, we reached EOF and any futures
227 /// reads may be un-aligned.
228 ///
229 /// # Examples
230 /// ```
231 /// # extern crate futures_cpupool;
232 /// # extern crate futures;
233 /// # extern crate futures_bufio;
234 /// #
235 /// # use futures::Future;
236 /// # use futures_cpupool::CpuPool;
237 /// # use futures_bufio::BufReader;
238 /// # use std::io;
239 /// # fn main() {
240 /// let f = io::Cursor::new(b"foo text".to_vec());
241 /// let pool = CpuPool::new(1);
242 /// let reader = BufReader::with_pool_and_capacity(pool, 10, f);
243 ///
244 /// let buf = vec![0; 10];
245 /// let (reader, buf, n) = reader.try_read_full(buf).wait().unwrap_or_else(|(_, _, e)| {
246 /// // in real usage, we have the option to deconstruct our BufReader or reuse buf here
247 /// panic!("unable to read full: {}", e);
248 /// });
249 /// assert_eq!(n, 8);
250 /// assert_eq!(&*buf, b"foo text\0\0");
251 /// assert_eq!(&buf[..n], b"foo text");
252 /// # }
253 /// ```
254 pub fn try_read_full<B>(
255 mut self,
256 mut buf: B,
257 ) -> impl Future<Item = OkRead<Self, B>, Error = ErrRead<Self, B>>
258 where
259 B: DerefMut<Target = [u8]> + Send + 'static,
260 {
261 const U8READ: &str = "&[u8] reads never error";
262 let mut rem = buf.len();
263 let mut at = 0;
264
265 if self.pos != self.cap {
266 at = (&self.buf[self.pos..self.cap]).read(&mut buf).expect(
267 U8READ,
268 );
269 rem -= at;
270 self.pos += at;
271
272 if rem == 0 {
273 return Either::A(ok::<OkRead<Self, B>, ErrRead<Self, B>>((self, buf, at)));
274 }
275 }
276 // self.pos == self.cap
277
278 let pool = self.pool.take().expect(EXP_POOL);
279
280 let block = if self.cap > 0 {
281 rem - rem % self.cap
282 } else {
283 rem
284 };
285
286 let fut = pool.spawn_fn(move || {
287 if block > 0 {
288 let (block_read, err) = try_read_full(&mut self.inner, &mut buf[at..at + block]);
289 if let Some(e) = err {
290 return Err((self, buf, e));
291 }
292
293 at += block_read;
294 rem -= block_read;
295 if rem == 0 {
296 return Ok((self, buf, at));
297 }
298 }
299
300 let (buf_read, err) = try_read_full(&mut self.inner, &mut self.buf);
301 match err {
302 Some(e) => Err((self, buf, e)),
303 None => {
304 self.cap = buf_read;
305 self.pos = (&self.buf[..self.cap]).read(&mut buf[at..]).expect(U8READ);
306 at += self.pos;
307 Ok((self, buf, at))
308 }
309 }
310 });
311
312 Either::B(fut.then(|res| match res {
313 Ok(mut x) => {
314 x.0.pool = Some(pool);
315 Ok(x)
316 }
317 Err(mut x) => {
318 x.0.pool = Some(pool);
319 Err(x)
320 }
321 }))
322 }
323}
324
325fn try_read_full<R: Read>(r: &mut R, mut buf: &mut [u8]) -> (usize, Option<io::Error>) {
326 let mut nn: usize = 0;
327 while !buf.is_empty() {
328 match r.read(buf) {
329 Ok(0) => break,
330 Ok(n) => {
331 let tmp = buf;
332 buf = &mut tmp[n..];
333 nn += n;
334 }
335 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
336 Err(e) => return (nn, Some(e)),
337 }
338 }
339 (nn, None)
340}
341
342#[test]
343fn test_read() {
344 use std::fs;
345 use std::io::Write;
346
347 // create the test file
348 fs::OpenOptions::new()
349 .write(true)
350 .create_new(true)
351 .open("bar.txt")
352 .expect("unable to exclusively create foo")
353 .write_all(
354 "Strapped down to my bed, feet cold, eyes red. I'm out of my head. Am I alive? Am I \
355 dead?"
356 .as_bytes(),
357 )
358 .expect("unable to write all");
359
360
361 // create our buffered reader
362 let f = BufReader::with_pool_and_capacity(
363 CpuPool::new(1),
364 10,
365 fs::OpenOptions::new().read(true).open("bar.txt").expect(
366 "foo does not exist?",
367 ),
368 );
369 assert_eq!(f.pos, 10);
370
371 // disk read, no blocks
372 let (f, buf, n) = f.try_read_full(vec![0; 5]).wait().unwrap_or_else(
373 |(_, _, e)| {
374 panic!("unable to read: {}", e)
375 },
376 );
377 assert_eq!(n, 5);
378 assert_eq!(&*buf, b"Strap");
379 assert_eq!(f.pos, 5);
380 assert_eq!(f.cap, 10);
381 assert_eq!(&*f.buf, b"Strapped d");
382
383 // mem read only
384 let (f, buf, n) = f.try_read_full(vec![0; 2]).wait().unwrap_or_else(
385 |(_, _, e)| {
386 panic!("unable to read: {}", e)
387 },
388 );
389 assert_eq!(n, 2);
390 assert_eq!(&*buf, b"pe");
391 assert_eq!(f.pos, 7);
392 assert_eq!(f.cap, 10);
393 assert_eq!(&*f.buf, b"Strapped d");
394
395 // mem (3) + disk blocks (20) + more mem (2)
396 let (f, buf, n) = f.try_read_full(vec![0; 25]).wait().unwrap_or_else(
397 |(_, _, e)| {
398 panic!("unable to read: {}", e)
399 },
400 );
401 assert_eq!(n, 25);
402 assert_eq!(&*buf, b"d down to my bed, feet co");
403 assert_eq!(f.pos, 2);
404 assert_eq!(f.cap, 10);
405 assert_eq!(&*f.buf, b"cold, eyes");
406
407 // mem (8) + disk block (10)
408 let (f, buf, n) = f.try_read_full(vec![0; 18]).wait().unwrap_or_else(
409 |(_, _, e)| {
410 panic!("unable to read: {}", e)
411 },
412 );
413 assert_eq!(n, 18);
414 assert_eq!(&*buf, b"ld, eyes red. I'm ");
415 assert_eq!(f.pos, 10);
416 assert_eq!(f.cap, 10);
417 assert_eq!(&*f.buf, b"cold, eyes"); // non-reset buf
418
419 // disk block (10)
420 let (f, buf, n) = f.try_read_full(vec![0; 10]).wait().unwrap_or_else(
421 |(_, _, e)| {
422 panic!("unable to read: {}", e)
423 },
424 );
425 assert_eq!(n, 10);
426 assert_eq!(&*buf, b"out of my ");
427 assert_eq!(f.pos, 10);
428 assert_eq!(f.cap, 10);
429 assert_eq!(&*f.buf, b"cold, eyes");
430
431 // disk block (20) + mem (9) (over-read by one byte)
432 let (f, buf, n) = f.try_read_full(vec![0; 29]).wait().unwrap_or_else(
433 |(_, _, e)| {
434 panic!("unable to read: {}", e)
435 },
436 );
437 assert_eq!(n, 28);
438 assert_eq!(&*buf, b"head. Am I alive? Am I dead?\0");
439 assert_eq!(f.pos, 8);
440 assert_eq!(f.cap, 8);
441 assert_eq!(&*f.buf, b" I dead?es");
442
443 let (f, buf, n) = f.try_read_full(vec![0; 2]).wait().unwrap_or_else(
444 |(_, _, e)| {
445 panic!("unable to read: {}", e)
446 },
447 );
448 assert_eq!(n, 0);
449 assert_eq!(&*buf, b"\0\0");
450 assert_eq!(f.pos, 0);
451 assert_eq!(f.cap, 0);
452 assert_eq!(&*f.buf, b" I dead?es");
453
454 fs::remove_file("bar.txt").expect("expected file to be removed");
455}