#![warn(missing_docs)]
extern crate libc;
extern crate futures;
extern crate tokio_io;
extern crate tokio_core;
extern crate futures_cpupool;
use std::io;
use std::mem;
use std::cmp::min;
use std::fs::File;
use std::path::PathBuf;
#[cfg(windows)]
use std::sync::Mutex;
use futures::{Future, Poll, Async, BoxFuture, finished, failed};
use futures_cpupool::{CpuPool, CpuFuture};
use tokio_io::AsyncWrite;
use tokio_core::net::TcpStream;
#[derive(Clone)]
pub struct DiskPool {
pool: CpuPool,
}
pub trait FileOpener: Send + 'static {
fn from_cache(&mut self) -> Option<Result<&[u8], io::Error>> {
None
}
fn open(&mut self) -> Result<(&FileReader, u64), io::Error>;
}
pub trait FileReader {
fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<usize>;
}
pub trait IntoFileOpener: Send {
type Opener: FileOpener + Send + 'static;
fn into_file_opener(self) -> Self::Opener;
}
#[derive(Debug)]
#[cfg(unix)]
pub struct PathOpener(PathBuf, Option<(File, u64)>);
#[derive(Debug)]
#[cfg(windows)]
pub struct PathOpener(PathBuf, Option<(Mutex<File>, u64)>);
pub trait Destination: AsyncWrite + Send {
fn write_file<O: FileOpener>(&mut self, file: &mut Sendfile<O>)
-> Result<usize, io::Error>;
fn poll_write(&self) -> Async<()>;
}
pub struct Sendfile<O: FileOpener + Send + 'static> {
file: O,
pool: DiskPool,
cached: bool,
offset: u64,
size: u64,
}
pub struct WriteFile<F: FileOpener, D: Destination>(DiskPool, WriteState<F, D>)
where F: Send + 'static, D: Send + 'static;
enum WriteState<F: FileOpener, D: Destination> {
Mem(Sendfile<F>, D),
WaitSend(CpuFuture<(Sendfile<F>, D), io::Error>),
WaitWrite(Sendfile<F>, D),
Empty,
}
impl<T: Into<PathBuf> + Send> IntoFileOpener for T {
type Opener = PathOpener;
fn into_file_opener(self) -> PathOpener {
PathOpener(self.into(), None)
}
}
#[cfg(unix)]
impl FileOpener for PathOpener {
fn open(&mut self) -> Result<(&FileReader, u64), io::Error> {
if self.1.is_none() {
let file = File::open(&self.0)?;
let meta = file.metadata()?;
if !meta.file_type().is_file() {
return Err(io::Error::new(io::ErrorKind::Other,
"Not a regular file"));
}
self.1 = Some((file, meta.len()));
}
Ok(self.1.as_ref().map(|&(ref f, s)| (f as &FileReader, s)).unwrap())
}
}
#[cfg(windows)]
impl FileOpener for PathOpener {
fn open(&mut self) -> Result<(&FileReader, u64), io::Error> {
if self.1.is_none() {
let file = File::open(&self.0)?;
let meta = file.metadata()?;
if !meta.file_type().is_file() {
return Err(io::Error::new(io::ErrorKind::Other,
"Not a regular file"));
}
self.1 = Some((Mutex::new(file), meta.len()));
}
Ok(self.1.as_ref().map(|&(ref f, s)| (f as &FileReader, s)).unwrap())
}
}
impl DiskPool {
pub fn new(pool: CpuPool) -> DiskPool {
DiskPool {
pool: pool,
}
}
pub fn open<F>(&self, file: F)
-> BoxFuture<Sendfile<F::Opener>, io::Error>
where F: IntoFileOpener + Send + Sized + 'static,
{
let mut file = file.into_file_opener();
let cached_size = match file.from_cache() {
Some(Ok(cache_ref)) => {
Some(cache_ref.len() as u64)
}
Some(Err(e)) => {
return failed(e).boxed();
}
None => None,
};
let pool = self.clone();
if let Some(size) = cached_size {
finished(Sendfile {
file: file,
pool: pool,
cached: true,
offset: 0,
size: size,
}).boxed()
} else {
self.pool.spawn_fn(move || {
let (_, size) = file.open()?;
let file = Sendfile {
file: file,
pool: pool,
cached: false,
offset: 0,
size: size,
};
Ok(file)
}).boxed()
}
}
pub fn send<F, D>(&self, file: F, destination: D)
-> futures::BoxFuture<D, io::Error>
where F: IntoFileOpener + Send + Sized + 'static,
D: Destination + Send + Sized + 'static,
{
self.open(file).and_then(|file| file.write_into(destination)).boxed()
}
}
impl Destination for TcpStream {
fn write_file<O: FileOpener>(&mut self, file: &mut Sendfile<O>)
-> Result<usize, io::Error>
{
let (file_ref, size) = file.file.open()?;
let mut buf = [0u8; 65536];
let max_bytes = min(size.saturating_sub(file.offset), 65536) as usize;
let nbytes = file_ref.read_at(file.offset, &mut buf[..max_bytes])?;
if nbytes == 0 {
return Err(io::ErrorKind::UnexpectedEof.into())
}
io::Write::write(self, &buf[..nbytes])
}
fn poll_write(&self) -> Async<()> {
<TcpStream>::poll_write(self)
}
}
impl<O: FileOpener> Sendfile<O> {
pub fn size(&self) -> u64 {
return self.size;
}
pub fn write_into<D: Destination>(self, dest: D) -> WriteFile<O, D> {
if self.cached {
WriteFile(self.pool.clone(), WriteState::Mem(self, dest))
} else {
WriteFile(self.pool.clone(), WriteState::WaitWrite(self, dest))
}
}
pub fn get_inner(&self) -> &O {
return &self.file;
}
pub fn get_mut(&mut self) -> &mut O {
return &mut self.file;
}
}
impl<F: FileOpener, D: Destination> Future for WriteFile<F, D>
where F: Send + 'static, D: Send + 'static,
{
type Item = D;
type Error = io::Error;
fn poll(&mut self) -> Poll<D, io::Error> {
use self::WriteState::*;
loop {
let (newstate, cont) = match mem::replace(&mut self.1, Empty) {
Mem(mut file, mut dest) => {
let need_switch = match file.file.from_cache() {
Some(Ok(slice)) => {
if (slice.len() as u64) < file.size {
return Err(io::Error::new(
io::ErrorKind::WriteZero,
"cached file truncated during writing"));
}
let target_slice = &slice[file.offset as usize..];
if target_slice.len() == 0 {
return Ok(Async::Ready(dest));
}
match dest.write(target_slice) {
Ok(0) => {
return Err(io::Error::new(
io::ErrorKind::WriteZero,
"connection closed while sending \
file from cache"));
}
Ok(bytes) => {
file.offset += bytes as u64;
if file.offset >= file.size {
return Ok(Async::Ready(dest));
}
}
Err(e) => {
return Err(e);
}
}
false
}
Some(Err(e)) => {
return Err(e);
}
None => {
true
}
};
if need_switch {
(WaitWrite(file, dest), true)
} else {
(Mem(file, dest), false)
}
}
WaitSend(mut future) => {
match future.poll() {
Ok(Async::Ready((file, dest))) => {
if file.size <= file.offset {
return Ok(Async::Ready(dest));
} else {
(WaitWrite(file, dest), true)
}
}
Ok(Async::NotReady) => (WaitSend(future), false),
Err(e) => return Err(e),
}
}
WaitWrite(mut file, mut dest) => {
match dest.poll_write() {
Async::Ready(()) => {
(WaitSend(self.0.pool.spawn_fn(move || {
loop {
match dest.write_file(&mut file) {
Ok(0) => {
return Err(io::Error::new(
io::ErrorKind::WriteZero,
"connection closed while \
sending a file"))
}
Ok(bytes_sent) => {
file.offset += bytes_sent as u64;
if file.offset >= file.size {
return Ok((file, dest));
} else {
continue;
}
}
Err(ref e)
if e.kind() == io::ErrorKind::WouldBlock
=> {
return Ok((file, dest))
}
Err(e) => return Err(e),
}
}
})), true)
}
Async::NotReady => {
(WaitWrite(file, dest), false)
}
}
}
Empty => unreachable!(),
};
self.1 = newstate;
if !cont {
return Ok(Async::NotReady);
}
}
}
}
#[cfg(unix)]
impl FileReader for File {
fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<usize> {
use libc::{pread, c_void};
use std::os::unix::io::AsRawFd;
let rc = unsafe { pread(self.as_raw_fd(),
buf.as_ptr() as *mut c_void,
buf.len(), offset as i64) };
if rc < 0 {
Err(io::Error::last_os_error())
} else {
Ok(rc as usize)
}
}
}
#[cfg(windows)]
impl FileReader for Mutex<File> {
fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<usize> {
use std::io::{Read, Seek};
let mut real_file = self.lock().expect("mutex is not poisoned");
real_file.seek(io::SeekFrom::Start(offset))?;
real_file.read(buf)
}
}