1#![deny(missing_docs)]
2#![cfg_attr(test, deny(warnings))]
3#![deny(missing_debug_implementations)]
4#![doc(html_root_url = "https://docs.rs/futures-fs/0.0.5")]
5
6extern crate bytes;
35#[macro_use]
36extern crate futures;
37extern crate futures_cpupool;
38
39use std::{fmt, fs, io};
40use std::path::Path;
41use std::sync::Arc;
42
43use futures::{Async, Future, Poll};
44use futures::future::{lazy, Executor};
45use futures::sync::oneshot::{self, Receiver};
46use futures_cpupool::CpuPool;
47
48pub use self::read::{FsReadStream, ReadOptions};
49pub use self::write::{FsWriteSink, WriteOptions};
50
51mod read;
52mod write;
53
54#[derive(Clone)]
56pub struct FsPool {
57 executor: Arc<Executor<Box<Future<Item = (), Error = ()> + Send>> + Send + Sync>,
58}
59
60impl FsPool {
63 pub fn new(threads: usize) -> Self {
65 FsPool {
66 executor: Arc::new(CpuPool::new(threads)),
67 }
68 }
69
70 pub fn with_executor<E>(executor: E) -> Self
82 where
83 E: Executor<Box<Future<Item = (), Error = ()> + Send>> + Send + Sync + 'static,
84 {
85 FsPool {
86 executor: Arc::new(executor),
87 }
88 }
89
90 #[doc(hidden)]
91 #[deprecated(note = "renamed to with_executor")]
92 pub fn from_executor<E>(executor: E) -> Self
93 where
94 E: Executor<Box<Future<Item = (), Error = ()> + Send>> + Send + Sync + 'static,
95 {
96 FsPool {
97 executor: Arc::new(executor),
98 }
99 }
100
101 pub fn read<P>(&self, path: P, opts: ReadOptions) -> FsReadStream
103 where
104 P: AsRef<Path> + Send + 'static,
105 {
106 ::read::new(self, path, opts)
107 }
108
109 pub fn read_file(&self, file: fs::File, opts: ReadOptions) -> FsReadStream {
111 ::read::new_from_file(self, file, opts)
112 }
113
114 pub fn write<P>(&self, path: P, opts: WriteOptions) -> FsWriteSink
116 where
117 P: AsRef<Path> + Send + 'static,
118 {
119 ::write::new(self, path, opts)
120 }
121
122 pub fn write_file(&self, file: fs::File) -> FsWriteSink {
124 ::write::new_from_file(self, file)
125 }
126
127 pub fn delete<P>(&self, path: P) -> FsFuture<()>
129 where
130 P: AsRef<Path> + Send + 'static,
131 {
132 let (tx, rx) = oneshot::channel();
133
134 let fut = Box::new(lazy(move || {
135 tx.send(fs::remove_file(path).map_err(From::from))
136 .map_err(|_| ())
137 }));
138
139 self.executor.execute(fut).unwrap();
140
141 fs(rx)
142 }
143}
144
145impl Default for FsPool {
146 fn default() -> FsPool {
147 FsPool::new(4)
148 }
149}
150
151impl fmt::Debug for FsPool {
152 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
153 f.debug_struct("FsPool").finish()
154 }
155}
156
157
158pub struct FsFuture<T> {
162 inner: Receiver<io::Result<T>>,
163}
164
165fn fs<T: Send>(rx: Receiver<io::Result<T>>) -> FsFuture<T> {
166 FsFuture { inner: rx }
167}
168
169impl<T: Send + 'static> Future for FsFuture<T> {
170 type Item = T;
171 type Error = io::Error;
172
173 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
174 match self.inner.poll().unwrap() {
175 Async::Ready(Ok(item)) => Ok(Async::Ready(item)),
176 Async::Ready(Err(e)) => Err(e),
177 Async::NotReady => Ok(Async::NotReady),
178 }
179 }
180}
181
182impl<T> fmt::Debug for FsFuture<T> {
183 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
184 f.debug_struct("FsFuture").finish()
185 }
186}
187
188fn _assert_kinds() {
189 fn assert_send<T: Send>() {}
190 fn assert_sync<T: Sync>() {}
191 fn assert_clone<T: Clone>() {}
192
193 assert_send::<FsPool>();
194 assert_sync::<FsPool>();
195 assert_clone::<FsPool>();
196
197 assert_send::<FsFuture<()>>();
198}