1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#![deny(missing_docs)]
#![deny(warnings)]
#![deny(missing_debug_implementations)]
#![doc(html_root_url = "https://docs.rs/futures-fs/0.0.3")]

//! A thread pool to handle file IO operations.
//!
//! # Examples
//!
//! ```rust
//! extern crate futures;
//! extern crate futures_fs;
//!
//! use futures::{Future, Stream};
//! use futures_fs::FsPool;
//!
//! # fn run() {
//! let fs = FsPool::default();
//!
//! // our source file
//! let read = fs.read("/home/sean/foo.txt");
//!
//! // default writes options to create a new file
//! let write = fs.write("/home/sean/out.txt", Default::default());
//!
//! // block this thread!
//! // the reading and writing however will happen off-thread
//! read.forward(write).wait()
//!     .expect("IO error piping foo.txt to out.txt");
//! # }
//! # fn main() {}
//! ```

extern crate bytes;
#[macro_use]
extern crate futures;
extern crate futures_cpupool;

use std::{fmt, fs, io};
use std::path::Path;

use futures::{Future, Poll};
use futures_cpupool::{CpuPool, CpuFuture};

pub use self::read::FsReadStream;
pub use self::write::FsWriteSink;
pub use self::write::WriteOptions;

mod read;
mod write;

/// A pool of threads to handle file IO.
#[derive(Clone)]
pub struct FsPool {
    cpu_pool: CpuPool,
}

impl FsPool {
    /// Creates a new `FsPool`, with the supplied number of threads.
    pub fn new(threads: usize) -> FsPool {
        FsPool {
            cpu_pool: CpuPool::new(threads),
        }
    }

    /// Returns a `Stream` of the contents of the file at the supplied path.
    pub fn read<P: AsRef<Path> + Send + 'static>(&self, path: P) -> FsReadStream {
        ::read::new(self, path)
    }

    /// Returns a `Sink` to send bytes to be written to the file at the supplied path.
    pub fn write<P: AsRef<Path> + Send + 'static>(&self, path: P, opts: WriteOptions) -> FsWriteSink {
        ::write::new(self, path, opts)
    }

    /// Returns a `Future` that resolves when the target file is deleted.
    pub fn delete<P: AsRef<Path> + Send + 'static>(&self, path: P) -> FsFuture<()> {
        fs(self.cpu_pool.spawn_fn(move || {
            fs::remove_file(path)
        }))
    }
}

impl Default for FsPool {
    fn default() -> FsPool {
        FsPool::new(4)
    }
}

impl fmt::Debug for FsPool {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        f.debug_struct("FsPool")
            .finish()
    }
}

/// A future representing work in the `FsPool`.
pub struct FsFuture<T> {
    inner: CpuFuture<T, io::Error>,
}

fn fs<T: Send>(cpu: CpuFuture<T, io::Error>) -> FsFuture<T> {
    FsFuture {
        inner: cpu,
    }
}

impl<T: Send + 'static> Future for FsFuture<T> {
    type Item = T;
    type Error = io::Error;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        self.inner.poll()
    }
}

impl<T> fmt::Debug for FsFuture<T> {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        f.debug_struct("FsFuture")
            .finish()
    }
}