#![deny(missing_docs)]
#![cfg_attr(test, deny(warnings))]
#![deny(missing_debug_implementations)]
#![doc(html_root_url = "https://docs.rs/futures-fs/0.0.5")]
extern crate bytes;
#[macro_use]
extern crate futures;
extern crate futures_cpupool;
use std::{fmt, fs, io};
use std::path::Path;
use std::sync::Arc;
use futures::{Async, Future, Poll};
use futures::future::{lazy, Executor};
use futures::sync::oneshot::{self, Receiver};
use futures_cpupool::CpuPool;
pub use self::read::{FsReadStream, ReadOptions};
pub use self::write::{FsWriteSink, WriteOptions};
mod read;
mod write;
#[derive(Clone)]
pub struct FsPool {
executor: Arc<Executor<Box<Future<Item = (), Error = ()> + Send>> + Send + Sync>,
}
impl FsPool {
pub fn new(threads: usize) -> Self {
FsPool {
executor: Arc::new(CpuPool::new(threads)),
}
}
pub fn with_executor<E>(executor: E) -> Self
where
E: Executor<Box<Future<Item = (), Error = ()> + Send>> + Send + Sync + 'static,
{
FsPool {
executor: Arc::new(executor),
}
}
#[doc(hidden)]
#[deprecated(note = "renamed to with_executor")]
pub fn from_executor<E>(executor: E) -> Self
where
E: Executor<Box<Future<Item = (), Error = ()> + Send>> + Send + Sync + 'static,
{
FsPool {
executor: Arc::new(executor),
}
}
pub fn read<P>(&self, path: P, opts: ReadOptions) -> FsReadStream
where
P: AsRef<Path> + Send + 'static,
{
::read::new(self, path, opts)
}
pub fn read_file(&self, file: fs::File, opts: ReadOptions) -> FsReadStream {
::read::new_from_file(self, file, opts)
}
pub fn write<P>(&self, path: P, opts: WriteOptions) -> FsWriteSink
where
P: AsRef<Path> + Send + 'static,
{
::write::new(self, path, opts)
}
pub fn write_file(&self, file: fs::File) -> FsWriteSink {
::write::new_from_file(self, file)
}
pub fn delete<P>(&self, path: P) -> FsFuture<()>
where
P: AsRef<Path> + Send + 'static,
{
let (tx, rx) = oneshot::channel();
let fut = Box::new(lazy(move || {
tx.send(fs::remove_file(path).map_err(From::from))
.map_err(|_| ())
}));
self.executor.execute(fut).unwrap();
fs(rx)
}
}
impl Default for FsPool {
fn default() -> FsPool {
FsPool::new(4)
}
}
impl fmt::Debug for FsPool {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("FsPool").finish()
}
}
pub struct FsFuture<T> {
inner: Receiver<io::Result<T>>,
}
fn fs<T: Send>(rx: Receiver<io::Result<T>>) -> FsFuture<T> {
FsFuture { inner: rx }
}
impl<T: Send + 'static> Future for FsFuture<T> {
type Item = T;
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.inner.poll().unwrap() {
Async::Ready(Ok(item)) => Ok(Async::Ready(item)),
Async::Ready(Err(e)) => Err(e),
Async::NotReady => Ok(Async::NotReady),
}
}
}
impl<T> fmt::Debug for FsFuture<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("FsFuture").finish()
}
}
fn _assert_kinds() {
fn assert_send<T: Send>() {}
fn assert_sync<T: Sync>() {}
fn assert_clone<T: Clone>() {}
assert_send::<FsPool>();
assert_sync::<FsPool>();
assert_clone::<FsPool>();
assert_send::<FsFuture<()>>();
}