tk_sendfile/
lib.rs

1//! A thread pool that can process file requests and send data to the socket
2//! with zero copy (using sendfile).
3//!
4//! Use `DiskPool` structure to request file operations.
5//!
6//! # Example
7//!
8//! ```rust,ignore
9//!     let pool = DiskPool::new(CpuPool::new(40));
10//!     pool.send("file", socket)
11//! ```
12//!
13//! # Settings
14//!
15//! It's recommended to make large number of threads in the pool
16//! for three reasons:
17//!
18//! 1. To make use of device parallelism
19//! 2. To allow kernel to merge some disk operations
20//! 3. To fix head of line blocking when some request reach disk but others
21//!    could be served immediately from cache (we don't know which ones are
22//!    cached, so we run all of them in a pool)
23#![warn(missing_docs)]
24
25extern crate libc;
26extern crate futures;
27extern crate tokio_io;
28extern crate tokio_core;
29extern crate futures_cpupool;
30
31use std::io;
32use std::mem;
33use std::cmp::min;
34use std::fs::File;
35use std::path::PathBuf;
36
37#[cfg(windows)]
38use std::sync::Mutex;
39
40use futures::{Future, Poll, Async, BoxFuture, finished, failed};
41use futures_cpupool::{CpuPool, CpuFuture};
42use tokio_io::AsyncWrite;
43use tokio_core::net::TcpStream;
44
45/// A reference to a thread pool for disk operations
46#[derive(Clone)]
47pub struct DiskPool {
48    pool: CpuPool,
49}
50
51/// This trait represents anything that can open the file
52///
53/// You can convert anything that is `AsRef<Path>` to this trait and
54/// it will just open a file at specified path.
55/// But often you want some checks of permissions or visibility of the file
56/// while opening it. You can't do even `stat()` or `open()` in main loop
57/// because even such a simple operation can potentially block for indefinite
58/// period of time.
59///
60/// So file opener could be anything that validates a path,
61/// caches file descriptor, and in the result returns a file.
62pub trait FileOpener: Send + 'static {
63    /// Read file from cache
64    ///
65    /// Note: this can be both positive and negative cache
66    ///
67    /// You don't have to implement this method if you don't have in-memory
68    /// cache of files
69    fn from_cache(&mut self) -> Option<Result<&[u8], io::Error>> {
70        None
71    }
72    /// Open the file
73    ///
74    /// This function is called in disk thread
75    fn open(&mut self) -> Result<(&FileReader, u64), io::Error>;
76}
77
78
79/// This trait represents file that can atomically be read at specific point
80///
81/// For unix we implement it for `File + AsRawFd` with `pread()` system call.
82/// For windows we're implementing it for `Mutex<File>` and use seek.
83pub trait FileReader {
84    /// Read file at specified location
85    ///
86    /// This corresponds to the `pread` system call on most unix systems
87    fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<usize>;
88}
89
90/// Trait that represents something that can be converted into a file
91/// FileOpener
92///
93/// This is very similar to IntoIterator or IntoFuture and used in similar
94/// way.
95///
96/// Note unlike methods in FileOpener itself this trait is executed in
97/// caller thread, **not** in disk thread.
98pub trait IntoFileOpener: Send {
99    /// The final type returned after conversion
100    type Opener: FileOpener + Send + 'static;
101    /// Convert the type into a file opener
102    fn into_file_opener(self) -> Self::Opener;
103}
104
105/// File opener implementation that opens specified file path directly
106#[derive(Debug)]
107#[cfg(unix)]
108pub struct PathOpener(PathBuf, Option<(File, u64)>);
109
110/// File opener implementation that opens specified file path directly
111#[derive(Debug)]
112#[cfg(windows)]
113pub struct PathOpener(PathBuf, Option<(Mutex<File>, u64)>);
114
115/// A trait that represents anything that file can be sent to
116///
117/// The trait is implemented for TcpStream right away but you might want
118/// to implement your own thing, for example to prepend the data with file
119/// length header
120pub trait Destination: AsyncWrite + Send {
121
122    /// This method does the actual sendfile call
123    ///
124    /// Note: this method is called in other thread
125    fn write_file<O: FileOpener>(&mut self, file: &mut Sendfile<O>)
126        -> Result<usize, io::Error>;
127
128    /// Test whether this socket is ready to be written to or not.
129    ///
130    /// If socket isn't writable current taks must be scheduled to get a
131    /// notification when socket does become writable.
132    fn poll_write(&self) -> Async<()>;
133}
134
135/// A structure that tracks progress of sending a file
136pub struct Sendfile<O: FileOpener + Send + 'static> {
137    file: O,
138    pool: DiskPool,
139    cached: bool,
140    offset: u64,
141    size: u64,
142}
143
144// Todo return non-boxed future
145// /// Future that is returned from `DiskPool::send`
146// type SendfileFuture<D> = futures_cpupool::CpuFuture<D, io::Error>;
147
148/// Future returned by `Sendfile::write_into()`
149pub struct WriteFile<F: FileOpener, D: Destination>(DiskPool, WriteState<F, D>)
150    where F: Send + 'static, D: Send + 'static;
151
152enum WriteState<F: FileOpener, D: Destination> {
153    Mem(Sendfile<F>, D),
154    WaitSend(CpuFuture<(Sendfile<F>, D), io::Error>),
155    WaitWrite(Sendfile<F>, D),
156    Empty,
157}
158
159impl<T: Into<PathBuf> + Send> IntoFileOpener for T {
160    type Opener = PathOpener;
161    fn into_file_opener(self) -> PathOpener {
162        PathOpener(self.into(), None)
163    }
164}
165
166#[cfg(unix)]
167impl FileOpener for PathOpener {
168    fn open(&mut self) -> Result<(&FileReader, u64), io::Error> {
169        if self.1.is_none() {
170            let file = File::open(&self.0)?;
171            let meta = file.metadata()?;
172            if !meta.file_type().is_file() {
173                return Err(io::Error::new(io::ErrorKind::Other,
174                    "Not a regular file"));
175            }
176            self.1 = Some((file, meta.len()));
177        }
178        Ok(self.1.as_ref().map(|&(ref f, s)| (f as &FileReader, s)).unwrap())
179    }
180}
181
182#[cfg(windows)]
183impl FileOpener for PathOpener {
184    fn open(&mut self) -> Result<(&FileReader, u64), io::Error> {
185        if self.1.is_none() {
186            let file = File::open(&self.0)?;
187            let meta = file.metadata()?;
188            if !meta.file_type().is_file() {
189                return Err(io::Error::new(io::ErrorKind::Other,
190                    "Not a regular file"));
191            }
192            self.1 = Some((Mutex::new(file), meta.len()));
193        }
194        Ok(self.1.as_ref().map(|&(ref f, s)| (f as &FileReader, s)).unwrap())
195    }
196}
197
198impl DiskPool {
199    /// Create a disk pool that sends its tasks into the CpuPool
200    pub fn new(pool: CpuPool) -> DiskPool {
201        DiskPool {
202            pool: pool,
203        }
204    }
205    /// Start a file send operation
206    pub fn open<F>(&self, file: F)
207        // TODO(tailhook) unbox a future
208        -> BoxFuture<Sendfile<F::Opener>, io::Error>
209        where F: IntoFileOpener + Send + Sized + 'static,
210    {
211        let mut file = file.into_file_opener();
212        let cached_size = match file.from_cache() {
213            Some(Ok(cache_ref)) => {
214                Some(cache_ref.len() as u64)
215            }
216            Some(Err(e)) => {
217                return failed(e).boxed();
218            }
219            None => None,
220        };
221        let pool = self.clone();
222        if let Some(size) = cached_size {
223            finished(Sendfile {
224                file: file,
225                pool: pool,
226                cached: true,
227                offset: 0,
228                size: size,
229            }).boxed()
230        } else {
231            self.pool.spawn_fn(move || {
232                let (_, size) = file.open()?;
233                let file = Sendfile {
234                    file: file,
235                    pool: pool,
236                    cached: false,
237                    offset: 0,
238                    size: size,
239                };
240                Ok(file)
241            }).boxed()
242        }
243    }
244    /// A shortcut method to send whole file without headers
245    pub fn send<F, D>(&self, file: F, destination: D)
246        -> futures::BoxFuture<D, io::Error>
247        where F: IntoFileOpener + Send + Sized + 'static,
248              D: Destination + Send + Sized + 'static,
249    {
250        self.open(file).and_then(|file| file.write_into(destination)).boxed()
251    }
252}
253
254impl Destination for TcpStream {
255    fn write_file<O: FileOpener>(&mut self, file: &mut Sendfile<O>)
256        -> Result<usize, io::Error>
257    {
258        let (file_ref, size) = file.file.open()?;
259        let mut buf = [0u8; 65536];
260        let max_bytes = min(size.saturating_sub(file.offset), 65536) as usize;
261        let nbytes = file_ref.read_at(file.offset, &mut buf[..max_bytes])?;
262        if nbytes == 0 {
263            return Err(io::ErrorKind::UnexpectedEof.into())
264        }
265        io::Write::write(self, &buf[..nbytes])
266    }
267    fn poll_write(&self) -> Async<()> {
268        <TcpStream>::poll_write(self)
269    }
270}
271
272impl<O: FileOpener> Sendfile<O> {
273    /// Returns full size of the file
274    ///
275    /// Note that if file changes while we are reading it, we may not be
276    /// able to send this number of bytes. In this case we will return
277    /// `WriteZero` error however.
278    pub fn size(&self) -> u64 {
279        return self.size;
280    }
281    /// Returns a future which resolves to original socket when file has been
282    /// written into a file
283    pub fn write_into<D: Destination>(self, dest: D) -> WriteFile<O, D> {
284        if self.cached {
285            WriteFile(self.pool.clone(), WriteState::Mem(self, dest))
286        } else {
287            WriteFile(self.pool.clone(), WriteState::WaitWrite(self, dest))
288        }
289    }
290    /// Get inner file opener
291    pub fn get_inner(&self) -> &O {
292        return &self.file;
293    }
294    /// Get mutlable reference to inner file opener
295    pub fn get_mut(&mut self) -> &mut O {
296        return &mut self.file;
297    }
298}
299
300impl<F: FileOpener, D: Destination> Future for WriteFile<F, D>
301    where F: Send + 'static, D: Send + 'static,
302{
303    type Item = D;
304    type Error = io::Error;
305    fn poll(&mut self) -> Poll<D, io::Error> {
306        use self::WriteState::*;
307        loop {
308            let (newstate, cont) = match mem::replace(&mut self.1, Empty) {
309                Mem(mut file, mut dest) => {
310                    let need_switch = match file.file.from_cache() {
311                        Some(Ok(slice)) => {
312                            if (slice.len() as u64) < file.size {
313                                return Err(io::Error::new(
314                                    io::ErrorKind::WriteZero,
315                                    "cached file truncated during writing"));
316                            }
317                            let target_slice = &slice[file.offset as usize..];
318                            // Not sure why we can reach it, but it's safe
319                            if target_slice.len() == 0 {
320                                return Ok(Async::Ready(dest));
321                            }
322                            match dest.write(target_slice) {
323                                Ok(0) => {
324                                    return Err(io::Error::new(
325                                        io::ErrorKind::WriteZero,
326                                        "connection closed while sending \
327                                         file from cache"));
328                                }
329                                Ok(bytes) => {
330                                    file.offset += bytes as u64;
331                                    if file.offset >= file.size {
332                                        return Ok(Async::Ready(dest));
333                                    }
334                                }
335                                Err(e) => {
336                                    return Err(e);
337                                }
338                            }
339                            false
340                        }
341                        Some(Err(e)) => {
342                            return Err(e);
343                        }
344                        None => {
345                            // File evicted from cache in the middle of sending
346                            // Switch to non-cached variant
347                            // TODO(tailhook) should we log it?
348                            true
349                        }
350                    };
351                    if need_switch {
352                        (WaitWrite(file, dest), true)
353                    } else {
354                        (Mem(file, dest), false)
355                    }
356                }
357                WaitSend(mut future) => {
358                    match future.poll() {
359                        Ok(Async::Ready((file, dest))) => {
360                            if file.size <= file.offset {
361                                return Ok(Async::Ready(dest));
362                            } else {
363                                (WaitWrite(file, dest), true)
364                            }
365                        }
366                        Ok(Async::NotReady) => (WaitSend(future), false),
367                        Err(e) => return Err(e),
368                    }
369                }
370                WaitWrite(mut file, mut dest) => {
371                    match dest.poll_write() {
372                        Async::Ready(()) => {
373                            (WaitSend(self.0.pool.spawn_fn(move || {
374                                loop {
375                                    match dest.write_file(&mut file) {
376                                        Ok(0) => {
377                                            return Err(io::Error::new(
378                                                io::ErrorKind::WriteZero,
379                                                "connection closed while \
380                                                 sending a file"))
381                                        }
382                                        Ok(bytes_sent) => {
383                                            file.offset += bytes_sent as u64;
384                                            if file.offset >= file.size {
385                                                return Ok((file, dest));
386                                            } else {
387                                                continue;
388                                            }
389                                        }
390                                        Err(ref e)
391                                        if e.kind() == io::ErrorKind::WouldBlock
392                                        => {
393                                            return Ok((file, dest))
394                                        }
395                                        Err(e) => return Err(e),
396                                    }
397                                }
398                            })), true)
399                        }
400                        Async::NotReady => {
401                            (WaitWrite(file, dest), false)
402                        }
403                    }
404                }
405                Empty => unreachable!(),
406            };
407            self.1 = newstate;
408            if !cont {
409                return Ok(Async::NotReady);
410            }
411        }
412    }
413}
414
415#[cfg(unix)]
416impl FileReader for File {
417    fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<usize> {
418        use libc::{pread, c_void};
419        use std::os::unix::io::AsRawFd;
420
421        let rc = unsafe { pread(self.as_raw_fd(),
422            buf.as_ptr() as *mut c_void,
423            buf.len(), offset as i64) };
424        if rc < 0 {
425            Err(io::Error::last_os_error())
426        } else {
427            Ok(rc as usize)
428        }
429    }
430}
431
432#[cfg(windows)]
433impl FileReader for Mutex<File> {
434    fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<usize> {
435        use std::io::{Read, Seek};
436
437        let mut real_file = self.lock().expect("mutex is not poisoned");
438        real_file.seek(io::SeekFrom::Start(offset))?;
439        real_file.read(buf)
440    }
441}