futures_fs/
lib.rs

1#![deny(missing_docs)]
2#![cfg_attr(test, deny(warnings))]
3#![deny(missing_debug_implementations)]
4#![doc(html_root_url = "https://docs.rs/futures-fs/0.0.5")]
5
6//! A thread pool to handle file IO operations.
7//!
8//! # Examples
9//!
10//! ```rust
11//! extern crate futures;
12//! extern crate futures_fs;
13//!
14//! use futures::{Future, Stream};
15//! use futures_fs::FsPool;
16//!
17//! # fn run() {
18//! let fs = FsPool::default();
19//!
20//! // our source file
21//! let read = fs.read("/home/sean/foo.txt", Default::default());
22//!
23//! // default writes options to create a new file
24//! let write = fs.write("/home/sean/out.txt", Default::default());
25//!
26//! // block this thread!
27//! // the reading and writing however will happen off-thread
28//! read.forward(write).wait()
29//!     .expect("IO error piping foo.txt to out.txt");
30//! # }
31//! # fn main() {}
32//! ```
33
34extern crate bytes;
35#[macro_use]
36extern crate futures;
37extern crate futures_cpupool;
38
39use std::{fmt, fs, io};
40use std::path::Path;
41use std::sync::Arc;
42
43use futures::{Async, Future, Poll};
44use futures::future::{lazy, Executor};
45use futures::sync::oneshot::{self, Receiver};
46use futures_cpupool::CpuPool;
47
48pub use self::read::{FsReadStream, ReadOptions};
49pub use self::write::{FsWriteSink, WriteOptions};
50
51mod read;
52mod write;
53
54/// A pool of threads to handle file IO.
55#[derive(Clone)]
56pub struct FsPool {
57    executor: Arc<Executor<Box<Future<Item = (), Error = ()> + Send>> + Send + Sync>,
58}
59
60// ===== impl FsPool ======
61
62impl FsPool {
63    /// Creates a new `FsPool`, with the supplied number of threads.
64    pub fn new(threads: usize) -> Self {
65        FsPool {
66            executor: Arc::new(CpuPool::new(threads)),
67        }
68    }
69
70    /// Creates a new `FsPool`, from an existing `Executor`.
71    ///
72    /// # Note
73    ///
74    /// The executor will be used to spawn tasks that can block the thread.
75    /// It likely should not be an executor that is also handling light-weight
76    /// tasks, but a dedicated thread pool.
77    ///
78    /// The most common use of this constructor is to allow creating a single
79    /// `CpuPool` for your application for blocking tasks, and sharing it with
80    /// `FsPool` and any other things needing a thread pool.
81    pub fn with_executor<E>(executor: E) -> Self
82    where
83        E: Executor<Box<Future<Item = (), Error = ()> + Send>> + Send + Sync + 'static,
84    {
85        FsPool {
86            executor: Arc::new(executor),
87        }
88    }
89
90    #[doc(hidden)]
91    #[deprecated(note = "renamed to with_executor")]
92    pub fn from_executor<E>(executor: E) -> Self
93    where
94        E: Executor<Box<Future<Item = (), Error = ()> + Send>> + Send + Sync + 'static,
95    {
96        FsPool {
97            executor: Arc::new(executor),
98        }
99    }
100
101    /// Returns a `Stream` of the contents of the file at the supplied path.
102    pub fn read<P>(&self, path: P, opts: ReadOptions) -> FsReadStream
103    where
104        P: AsRef<Path> + Send + 'static,
105    {
106        ::read::new(self, path, opts)
107    }
108
109    /// Returns a `Stream` of the contents of the supplied file.
110    pub fn read_file(&self, file: fs::File, opts: ReadOptions) -> FsReadStream {
111        ::read::new_from_file(self, file, opts)
112    }
113
114    /// Returns a `Sink` to send bytes to be written to the file at the supplied path.
115    pub fn write<P>(&self, path: P, opts: WriteOptions) -> FsWriteSink
116    where
117        P: AsRef<Path> + Send + 'static,
118    {
119        ::write::new(self, path, opts)
120    }
121
122    /// Returns a `Sink` to send bytes to be written to the supplied file.
123    pub fn write_file(&self, file: fs::File) -> FsWriteSink {
124        ::write::new_from_file(self, file)
125    }
126
127    /// Returns a `Future` that resolves when the target file is deleted.
128    pub fn delete<P>(&self, path: P) -> FsFuture<()>
129    where
130        P: AsRef<Path> + Send + 'static,
131    {
132        let (tx, rx) = oneshot::channel();
133
134        let fut = Box::new(lazy(move || {
135            tx.send(fs::remove_file(path).map_err(From::from))
136                .map_err(|_| ())
137        }));
138
139        self.executor.execute(fut).unwrap();
140
141        fs(rx)
142    }
143}
144
145impl Default for FsPool {
146    fn default() -> FsPool {
147        FsPool::new(4)
148    }
149}
150
151impl fmt::Debug for FsPool {
152    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
153        f.debug_struct("FsPool").finish()
154    }
155}
156
157
158// ===== impl FsFuture =====
159
160/// A future representing work in the `FsPool`.
161pub struct FsFuture<T> {
162    inner: Receiver<io::Result<T>>,
163}
164
165fn fs<T: Send>(rx: Receiver<io::Result<T>>) -> FsFuture<T> {
166    FsFuture { inner: rx }
167}
168
169impl<T: Send + 'static> Future for FsFuture<T> {
170    type Item = T;
171    type Error = io::Error;
172
173    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
174        match self.inner.poll().unwrap() {
175            Async::Ready(Ok(item)) => Ok(Async::Ready(item)),
176            Async::Ready(Err(e)) => Err(e),
177            Async::NotReady => Ok(Async::NotReady),
178        }
179    }
180}
181
182impl<T> fmt::Debug for FsFuture<T> {
183    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
184        f.debug_struct("FsFuture").finish()
185    }
186}
187
188fn _assert_kinds() {
189    fn assert_send<T: Send>() {}
190    fn assert_sync<T: Sync>() {}
191    fn assert_clone<T: Clone>() {}
192
193    assert_send::<FsPool>();
194    assert_sync::<FsPool>();
195    assert_clone::<FsPool>();
196
197    assert_send::<FsFuture<()>>();
198}