1#![warn(missing_docs)]
24
25extern crate libc;
26extern crate futures;
27extern crate tokio_io;
28extern crate tokio_core;
29extern crate futures_cpupool;
30
31use std::io;
32use std::mem;
33use std::cmp::min;
34use std::fs::File;
35use std::path::PathBuf;
36
37#[cfg(windows)]
38use std::sync::Mutex;
39
40use futures::{Future, Poll, Async, BoxFuture, finished, failed};
41use futures_cpupool::{CpuPool, CpuFuture};
42use tokio_io::AsyncWrite;
43use tokio_core::net::TcpStream;
44
45#[derive(Clone)]
47pub struct DiskPool {
48 pool: CpuPool,
49}
50
51pub trait FileOpener: Send + 'static {
63 fn from_cache(&mut self) -> Option<Result<&[u8], io::Error>> {
70 None
71 }
72 fn open(&mut self) -> Result<(&FileReader, u64), io::Error>;
76}
77
78
79pub trait FileReader {
84 fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<usize>;
88}
89
90pub trait IntoFileOpener: Send {
99 type Opener: FileOpener + Send + 'static;
101 fn into_file_opener(self) -> Self::Opener;
103}
104
105#[derive(Debug)]
107#[cfg(unix)]
108pub struct PathOpener(PathBuf, Option<(File, u64)>);
109
110#[derive(Debug)]
112#[cfg(windows)]
113pub struct PathOpener(PathBuf, Option<(Mutex<File>, u64)>);
114
115pub trait Destination: AsyncWrite + Send {
121
122 fn write_file<O: FileOpener>(&mut self, file: &mut Sendfile<O>)
126 -> Result<usize, io::Error>;
127
128 fn poll_write(&self) -> Async<()>;
133}
134
135pub struct Sendfile<O: FileOpener + Send + 'static> {
137 file: O,
138 pool: DiskPool,
139 cached: bool,
140 offset: u64,
141 size: u64,
142}
143
144pub struct WriteFile<F: FileOpener, D: Destination>(DiskPool, WriteState<F, D>)
150 where F: Send + 'static, D: Send + 'static;
151
152enum WriteState<F: FileOpener, D: Destination> {
153 Mem(Sendfile<F>, D),
154 WaitSend(CpuFuture<(Sendfile<F>, D), io::Error>),
155 WaitWrite(Sendfile<F>, D),
156 Empty,
157}
158
159impl<T: Into<PathBuf> + Send> IntoFileOpener for T {
160 type Opener = PathOpener;
161 fn into_file_opener(self) -> PathOpener {
162 PathOpener(self.into(), None)
163 }
164}
165
166#[cfg(unix)]
167impl FileOpener for PathOpener {
168 fn open(&mut self) -> Result<(&FileReader, u64), io::Error> {
169 if self.1.is_none() {
170 let file = File::open(&self.0)?;
171 let meta = file.metadata()?;
172 if !meta.file_type().is_file() {
173 return Err(io::Error::new(io::ErrorKind::Other,
174 "Not a regular file"));
175 }
176 self.1 = Some((file, meta.len()));
177 }
178 Ok(self.1.as_ref().map(|&(ref f, s)| (f as &FileReader, s)).unwrap())
179 }
180}
181
182#[cfg(windows)]
183impl FileOpener for PathOpener {
184 fn open(&mut self) -> Result<(&FileReader, u64), io::Error> {
185 if self.1.is_none() {
186 let file = File::open(&self.0)?;
187 let meta = file.metadata()?;
188 if !meta.file_type().is_file() {
189 return Err(io::Error::new(io::ErrorKind::Other,
190 "Not a regular file"));
191 }
192 self.1 = Some((Mutex::new(file), meta.len()));
193 }
194 Ok(self.1.as_ref().map(|&(ref f, s)| (f as &FileReader, s)).unwrap())
195 }
196}
197
198impl DiskPool {
199 pub fn new(pool: CpuPool) -> DiskPool {
201 DiskPool {
202 pool: pool,
203 }
204 }
205 pub fn open<F>(&self, file: F)
207 -> BoxFuture<Sendfile<F::Opener>, io::Error>
209 where F: IntoFileOpener + Send + Sized + 'static,
210 {
211 let mut file = file.into_file_opener();
212 let cached_size = match file.from_cache() {
213 Some(Ok(cache_ref)) => {
214 Some(cache_ref.len() as u64)
215 }
216 Some(Err(e)) => {
217 return failed(e).boxed();
218 }
219 None => None,
220 };
221 let pool = self.clone();
222 if let Some(size) = cached_size {
223 finished(Sendfile {
224 file: file,
225 pool: pool,
226 cached: true,
227 offset: 0,
228 size: size,
229 }).boxed()
230 } else {
231 self.pool.spawn_fn(move || {
232 let (_, size) = file.open()?;
233 let file = Sendfile {
234 file: file,
235 pool: pool,
236 cached: false,
237 offset: 0,
238 size: size,
239 };
240 Ok(file)
241 }).boxed()
242 }
243 }
244 pub fn send<F, D>(&self, file: F, destination: D)
246 -> futures::BoxFuture<D, io::Error>
247 where F: IntoFileOpener + Send + Sized + 'static,
248 D: Destination + Send + Sized + 'static,
249 {
250 self.open(file).and_then(|file| file.write_into(destination)).boxed()
251 }
252}
253
254impl Destination for TcpStream {
255 fn write_file<O: FileOpener>(&mut self, file: &mut Sendfile<O>)
256 -> Result<usize, io::Error>
257 {
258 let (file_ref, size) = file.file.open()?;
259 let mut buf = [0u8; 65536];
260 let max_bytes = min(size.saturating_sub(file.offset), 65536) as usize;
261 let nbytes = file_ref.read_at(file.offset, &mut buf[..max_bytes])?;
262 if nbytes == 0 {
263 return Err(io::ErrorKind::UnexpectedEof.into())
264 }
265 io::Write::write(self, &buf[..nbytes])
266 }
267 fn poll_write(&self) -> Async<()> {
268 <TcpStream>::poll_write(self)
269 }
270}
271
272impl<O: FileOpener> Sendfile<O> {
273 pub fn size(&self) -> u64 {
279 return self.size;
280 }
281 pub fn write_into<D: Destination>(self, dest: D) -> WriteFile<O, D> {
284 if self.cached {
285 WriteFile(self.pool.clone(), WriteState::Mem(self, dest))
286 } else {
287 WriteFile(self.pool.clone(), WriteState::WaitWrite(self, dest))
288 }
289 }
290 pub fn get_inner(&self) -> &O {
292 return &self.file;
293 }
294 pub fn get_mut(&mut self) -> &mut O {
296 return &mut self.file;
297 }
298}
299
300impl<F: FileOpener, D: Destination> Future for WriteFile<F, D>
301 where F: Send + 'static, D: Send + 'static,
302{
303 type Item = D;
304 type Error = io::Error;
305 fn poll(&mut self) -> Poll<D, io::Error> {
306 use self::WriteState::*;
307 loop {
308 let (newstate, cont) = match mem::replace(&mut self.1, Empty) {
309 Mem(mut file, mut dest) => {
310 let need_switch = match file.file.from_cache() {
311 Some(Ok(slice)) => {
312 if (slice.len() as u64) < file.size {
313 return Err(io::Error::new(
314 io::ErrorKind::WriteZero,
315 "cached file truncated during writing"));
316 }
317 let target_slice = &slice[file.offset as usize..];
318 if target_slice.len() == 0 {
320 return Ok(Async::Ready(dest));
321 }
322 match dest.write(target_slice) {
323 Ok(0) => {
324 return Err(io::Error::new(
325 io::ErrorKind::WriteZero,
326 "connection closed while sending \
327 file from cache"));
328 }
329 Ok(bytes) => {
330 file.offset += bytes as u64;
331 if file.offset >= file.size {
332 return Ok(Async::Ready(dest));
333 }
334 }
335 Err(e) => {
336 return Err(e);
337 }
338 }
339 false
340 }
341 Some(Err(e)) => {
342 return Err(e);
343 }
344 None => {
345 true
349 }
350 };
351 if need_switch {
352 (WaitWrite(file, dest), true)
353 } else {
354 (Mem(file, dest), false)
355 }
356 }
357 WaitSend(mut future) => {
358 match future.poll() {
359 Ok(Async::Ready((file, dest))) => {
360 if file.size <= file.offset {
361 return Ok(Async::Ready(dest));
362 } else {
363 (WaitWrite(file, dest), true)
364 }
365 }
366 Ok(Async::NotReady) => (WaitSend(future), false),
367 Err(e) => return Err(e),
368 }
369 }
370 WaitWrite(mut file, mut dest) => {
371 match dest.poll_write() {
372 Async::Ready(()) => {
373 (WaitSend(self.0.pool.spawn_fn(move || {
374 loop {
375 match dest.write_file(&mut file) {
376 Ok(0) => {
377 return Err(io::Error::new(
378 io::ErrorKind::WriteZero,
379 "connection closed while \
380 sending a file"))
381 }
382 Ok(bytes_sent) => {
383 file.offset += bytes_sent as u64;
384 if file.offset >= file.size {
385 return Ok((file, dest));
386 } else {
387 continue;
388 }
389 }
390 Err(ref e)
391 if e.kind() == io::ErrorKind::WouldBlock
392 => {
393 return Ok((file, dest))
394 }
395 Err(e) => return Err(e),
396 }
397 }
398 })), true)
399 }
400 Async::NotReady => {
401 (WaitWrite(file, dest), false)
402 }
403 }
404 }
405 Empty => unreachable!(),
406 };
407 self.1 = newstate;
408 if !cont {
409 return Ok(Async::NotReady);
410 }
411 }
412 }
413}
414
415#[cfg(unix)]
416impl FileReader for File {
417 fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<usize> {
418 use libc::{pread, c_void};
419 use std::os::unix::io::AsRawFd;
420
421 let rc = unsafe { pread(self.as_raw_fd(),
422 buf.as_ptr() as *mut c_void,
423 buf.len(), offset as i64) };
424 if rc < 0 {
425 Err(io::Error::last_os_error())
426 } else {
427 Ok(rc as usize)
428 }
429 }
430}
431
432#[cfg(windows)]
433impl FileReader for Mutex<File> {
434 fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<usize> {
435 use std::io::{Read, Seek};
436
437 let mut real_file = self.lock().expect("mutex is not poisoned");
438 real_file.seek(io::SeekFrom::Start(offset))?;
439 real_file.read(buf)
440 }
441}