1use std::{cmp, fmt, mem};
2use std::fs::{File, Metadata};
3use std::io::{self, Read};
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6
7use bytes::{BufMut, Bytes, BytesMut};
8use futures::{Async, Future, Poll, Stream};
9use futures::future::lazy;
10use futures::sync::oneshot;
11
12use FsPool;
13use FsFuture;
14
15const BUF_SIZE: usize = 8192;
16
17#[derive(Debug, Default)]
21pub struct ReadOptions {
22 buffer_size: Option<usize>,
26}
27
28impl ReadOptions {
29 pub fn buffer_size(mut self, buffer_size: usize) -> Self {
37 assert!(buffer_size > 0, "buffer size must be larger than 0");
38 self.buffer_size = Some(buffer_size);
39 self
40 }
41}
42
43pub(crate) fn new<P>(pool: &FsPool, path: P, opts: ReadOptions) -> FsReadStream
44where
45 P: AsRef<Path> + Send + 'static,
46{
47 FsReadStream {
48 buffer: BytesMut::with_capacity(0),
49 path: Arc::new(path.as_ref().to_owned()),
51 pool: pool.clone(),
52 state: State::Init(opts.buffer_size),
53 }
54}
55
56pub(crate) fn new_from_file(pool: &FsPool, file: File, opts: ReadOptions) -> FsReadStream {
57 let final_buf_size = finalize_buf_size(opts.buffer_size, &file);
58 FsReadStream {
59 buffer: BytesMut::with_capacity(0),
60 path: Arc::new(PathBuf::new()),
62 pool: pool.clone(),
63 state: State::Ready(file, final_buf_size),
64 }
65}
66
67pub struct FsReadStream {
69 buffer: BytesMut,
70 path: Arc<PathBuf>,
71 pool: FsPool,
72 state: State,
73}
74
75enum State {
76 Init(Option<usize>),
77 Opening(FsFuture<(File, BytesMut)>),
78 Working(FsFuture<(File, BytesMut)>, usize),
79 Ready(File, usize),
80 Eof,
81 Swapping,
82}
83
84impl FsReadStream {
85 fn handle_read(
86 &mut self,
87 file: File,
88 chunk: BytesMut,
89 buf_size: usize,
90 ) -> Poll<Option<<Self as Stream>::Item>, <Self as Stream>::Error> {
91 if chunk.is_empty() {
92 self.state = State::Eof;
93 return Ok(Async::Ready(None));
94 } else {
95 self.buffer = chunk;
96 self.state = State::Ready(file, buf_size);
97 return Ok(Async::Ready(Some(self.buffer.take().freeze())));
98 }
99 }
100}
101
102impl Stream for FsReadStream {
103 type Item = Bytes;
104 type Error = io::Error;
105
106 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
107 loop {
108 match mem::replace(&mut self.state, State::Swapping) {
109 State::Init(buf_size) => {
110 let path = self.path.clone();
111
112 let (tx, rx) = oneshot::channel();
113
114 let fut = Box::new(lazy(move || {
115 let res = open_and_read(&path, buf_size).map_err(From::from);
116
117 tx.send(res).map_err(|_| ())
118 }));
119
120 self.pool.executor.execute(fut).unwrap();
121
122 self.state = State::Opening(super::fs(rx));
123 }
124 State::Opening(mut rx) => {
125 let polled = rx.poll();
126 self.state = State::Opening(rx);
127 let (file, chunk) = try_ready!(polled);
128 let buf_size = chunk.capacity();
129
130 return self.handle_read(file, chunk, buf_size);
131 }
132 State::Working(mut rx, buf_size) => {
133 let polled = rx.poll();
134 self.state = State::Working(rx, buf_size);
135 let (file, chunk) = try_ready!(polled);
136
137 return self.handle_read(file, chunk, buf_size);
138 }
139 State::Ready(file, buf_size) => {
140 let buf = self.buffer.split_off(0);
141
142 let (tx, rx) = oneshot::channel();
143
144 let fut = Box::new(lazy(move || {
145 let res = read(file, buf_size, buf).map_err(From::from);
146
147 tx.send(res).map_err(|_| ())
148 }));
149
150 self.pool.executor.execute(fut).unwrap();
151
152 self.state = State::Working(super::fs(rx), buf_size);
153 }
154 State::Eof => {
155 self.state = State::Eof;
156 return Ok(Async::Ready(None));
157 }
158 State::Swapping => unreachable!(),
159 }
160 }
161 }
162}
163
164impl fmt::Debug for FsReadStream {
165 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
166 f.debug_struct("FsReadStream")
167 .field("path", &self.path)
168 .finish()
169 }
170}
171
172fn read(mut file: File, buf_size: usize, mut buf: BytesMut) -> io::Result<(File, BytesMut)> {
173 if !buf.has_remaining_mut() {
174 buf.reserve(buf_size);
175 }
176 let n = file.read(unsafe { buf.bytes_mut() })?;
177 unsafe { buf.advance_mut(n) };
178 Ok((file, buf))
179}
180
181fn finalize_buf_size(buf_size: Option<usize>, file: &File) -> usize {
182 match file.metadata() {
183 Ok(metadata) => {
184 let buf_size = buf_size.unwrap_or_else(|| get_block_size(&metadata));
186
187 cmp::min(metadata.len() as usize, buf_size)
189 }
190 _ => buf_size.unwrap_or(BUF_SIZE),
191 }
192}
193
194fn open_and_read(path: &Path, buf_size: Option<usize>) -> io::Result<(File, BytesMut)> {
195 let file = File::open(path)?;
196 let final_buf_size = finalize_buf_size(buf_size, &file);
197 read(
198 file,
199 final_buf_size,
200 BytesMut::with_capacity(final_buf_size),
201 )
202}
203
204#[cfg(unix)]
205fn get_block_size(metadata: &Metadata) -> usize {
206 use std::os::unix::fs::MetadataExt;
207 metadata.blksize() as usize
208}
209
210#[cfg(not(unix))]
211fn get_block_size(_metadata: &Metadata) -> usize {
212 BUF_SIZE
213}