futures_fs/
read.rs

1use std::{cmp, fmt, mem};
2use std::fs::{File, Metadata};
3use std::io::{self, Read};
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6
7use bytes::{BufMut, Bytes, BytesMut};
8use futures::{Async, Future, Poll, Stream};
9use futures::future::lazy;
10use futures::sync::oneshot;
11
12use FsPool;
13use FsFuture;
14
15const BUF_SIZE: usize = 8192;
16
17/// Options for how to read the file.
18///
19/// The default is to automatically determine the buffer size.
20#[derive(Debug, Default)]
21pub struct ReadOptions {
22    /// The buffer size to use.
23    ///
24    /// If set to `None`, this is automatically determined from the operating system.
25    buffer_size: Option<usize>,
26}
27
28impl ReadOptions {
29    /// The buffer size to use when reading.
30    ///
31    /// Default is automatically determined from the operating system.
32    ///
33    /// # Panic
34    ///
35    /// The passed argument must be larger than 0.
36    pub fn buffer_size(mut self, buffer_size: usize) -> Self {
37        assert!(buffer_size > 0, "buffer size must be larger than 0");
38        self.buffer_size = Some(buffer_size);
39        self
40    }
41}
42
43pub(crate) fn new<P>(pool: &FsPool, path: P, opts: ReadOptions) -> FsReadStream
44where
45    P: AsRef<Path> + Send + 'static,
46{
47    FsReadStream {
48        buffer: BytesMut::with_capacity(0),
49        //TODO: can we adjust bounds, since this is making an owned copy anyways?
50        path: Arc::new(path.as_ref().to_owned()),
51        pool: pool.clone(),
52        state: State::Init(opts.buffer_size),
53    }
54}
55
56pub(crate) fn new_from_file(pool: &FsPool, file: File, opts: ReadOptions) -> FsReadStream {
57    let final_buf_size = finalize_buf_size(opts.buffer_size, &file);
58    FsReadStream {
59        buffer: BytesMut::with_capacity(0),
60        //TODO: can we adjust bounds, since this is making an owned copy anyways?
61        path: Arc::new(PathBuf::new()),
62        pool: pool.clone(),
63        state: State::Ready(file, final_buf_size),
64    }
65}
66
67/// A `Stream` of bytes from a target file.
68pub struct FsReadStream {
69    buffer: BytesMut,
70    path: Arc<PathBuf>,
71    pool: FsPool,
72    state: State,
73}
74
75enum State {
76    Init(Option<usize>),
77    Opening(FsFuture<(File, BytesMut)>),
78    Working(FsFuture<(File, BytesMut)>, usize),
79    Ready(File, usize),
80    Eof,
81    Swapping,
82}
83
84impl FsReadStream {
85    fn handle_read(
86        &mut self,
87        file: File,
88        chunk: BytesMut,
89        buf_size: usize,
90    ) -> Poll<Option<<Self as Stream>::Item>, <Self as Stream>::Error> {
91        if chunk.is_empty() {
92            self.state = State::Eof;
93            return Ok(Async::Ready(None));
94        } else {
95            self.buffer = chunk;
96            self.state = State::Ready(file, buf_size);
97            return Ok(Async::Ready(Some(self.buffer.take().freeze())));
98        }
99    }
100}
101
102impl Stream for FsReadStream {
103    type Item = Bytes;
104    type Error = io::Error;
105
106    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
107        loop {
108            match mem::replace(&mut self.state, State::Swapping) {
109                State::Init(buf_size) => {
110                    let path = self.path.clone();
111
112                    let (tx, rx) = oneshot::channel();
113
114                    let fut = Box::new(lazy(move || {
115                        let res = open_and_read(&path, buf_size).map_err(From::from);
116
117                        tx.send(res).map_err(|_| ())
118                    }));
119
120                    self.pool.executor.execute(fut).unwrap();
121
122                    self.state = State::Opening(super::fs(rx));
123                }
124                State::Opening(mut rx) => {
125                    let polled = rx.poll();
126                    self.state = State::Opening(rx);
127                    let (file, chunk) = try_ready!(polled);
128                    let buf_size = chunk.capacity();
129
130                    return self.handle_read(file, chunk, buf_size);
131                }
132                State::Working(mut rx, buf_size) => {
133                    let polled = rx.poll();
134                    self.state = State::Working(rx, buf_size);
135                    let (file, chunk) = try_ready!(polled);
136
137                    return self.handle_read(file, chunk, buf_size);
138                }
139                State::Ready(file, buf_size) => {
140                    let buf = self.buffer.split_off(0);
141
142                    let (tx, rx) = oneshot::channel();
143
144                    let fut = Box::new(lazy(move || {
145                        let res = read(file, buf_size, buf).map_err(From::from);
146
147                        tx.send(res).map_err(|_| ())
148                    }));
149
150                    self.pool.executor.execute(fut).unwrap();
151
152                    self.state = State::Working(super::fs(rx), buf_size);
153                }
154                State::Eof => {
155                    self.state = State::Eof;
156                    return Ok(Async::Ready(None));
157                }
158                State::Swapping => unreachable!(),
159            }
160        }
161    }
162}
163
164impl fmt::Debug for FsReadStream {
165    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
166        f.debug_struct("FsReadStream")
167            .field("path", &self.path)
168            .finish()
169    }
170}
171
172fn read(mut file: File, buf_size: usize, mut buf: BytesMut) -> io::Result<(File, BytesMut)> {
173    if !buf.has_remaining_mut() {
174        buf.reserve(buf_size);
175    }
176    let n = file.read(unsafe { buf.bytes_mut() })?;
177    unsafe { buf.advance_mut(n) };
178    Ok((file, buf))
179}
180
181fn finalize_buf_size(buf_size: Option<usize>, file: &File) -> usize {
182    match file.metadata() {
183        Ok(metadata) => {
184            // try to get the buffer size from the OS if necessary
185            let buf_size = buf_size.unwrap_or_else(|| get_block_size(&metadata));
186
187            // if size is smaller than our chunk size, don't reserve wasted space
188            cmp::min(metadata.len() as usize, buf_size)
189        }
190        _ => buf_size.unwrap_or(BUF_SIZE),
191    }
192}
193
194fn open_and_read(path: &Path, buf_size: Option<usize>) -> io::Result<(File, BytesMut)> {
195    let file = File::open(path)?;
196    let final_buf_size = finalize_buf_size(buf_size, &file);
197    read(
198        file,
199        final_buf_size,
200        BytesMut::with_capacity(final_buf_size),
201    )
202}
203
204#[cfg(unix)]
205fn get_block_size(metadata: &Metadata) -> usize {
206    use std::os::unix::fs::MetadataExt;
207    metadata.blksize() as usize
208}
209
210#[cfg(not(unix))]
211fn get_block_size(_metadata: &Metadata) -> usize {
212    BUF_SIZE
213}