futures_fs/
write.rs

1use std::fmt;
2use std::fs::{File, OpenOptions};
3use std::io::{self, Write};
4use std::path::Path;
5
6use bytes::Bytes;
7use futures::{Async, AsyncSink, Future, Poll, Sink, StartSend};
8use futures::future::lazy;
9use futures::sync::oneshot;
10
11use FsPool;
12use FsFuture;
13
14pub(crate) fn new<P>(pool: &FsPool, path: P, opts: WriteOptions) -> FsWriteSink
15where
16    P: AsRef<Path> + Send + 'static,
17{
18    let (tx, rx) = oneshot::channel();
19
20    let fut = Box::new(lazy(move || {
21        let res = opts.open.open(path).map_err(From::from);
22
23        tx.send(res).map_err(|_| ())
24    }));
25
26    pool.executor.execute(fut).unwrap();
27
28    FsWriteSink {
29        pool: pool.clone(),
30        state: State::Working(super::fs(rx)),
31    }
32}
33
34pub(crate) fn new_from_file(pool: &FsPool, file: File) -> FsWriteSink {
35    FsWriteSink {
36        pool: pool.clone(),
37        state: State::Ready(file),
38    }
39}
40
41/// A `Sink` to send bytes to be written to a target file.
42pub struct FsWriteSink {
43    pool: FsPool,
44    state: State,
45}
46
47/// Options for how to write to the target file.
48///
49/// The default is to create a new file at the path.
50///
51/// This can be created from `std::fs::OpenOptions`.
52#[derive(Debug)]
53pub struct WriteOptions {
54    open: OpenOptions,
55}
56
57impl Default for WriteOptions {
58    fn default() -> WriteOptions {
59        let mut opts = OpenOptions::new();
60        opts.write(true).create(true);
61        WriteOptions { open: opts }
62    }
63}
64
65impl From<OpenOptions> for WriteOptions {
66    fn from(open: OpenOptions) -> WriteOptions {
67        WriteOptions { open: open }
68    }
69}
70
71enum State {
72    Working(FsFuture<File>),
73    Ready(File),
74    Swapping,
75}
76
77impl FsWriteSink {
78    fn poll_working(&mut self) -> Poll<(), io::Error> {
79        let state = match self.state {
80            State::Working(ref mut rx) => {
81                let file = try_ready!(rx.poll());
82                State::Ready(file)
83            }
84            State::Ready(_) => {
85                return Ok(Async::Ready(()));
86            }
87            State::Swapping => unreachable!(),
88        };
89        self.state = state;
90        Ok(Async::Ready(()))
91    }
92}
93
94impl Sink for FsWriteSink {
95    type SinkItem = Bytes;
96    type SinkError = io::Error;
97
98    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
99        let state = self.poll_working()?;
100        if state.is_ready() {
101            let mut file = match ::std::mem::replace(&mut self.state, State::Swapping) {
102                State::Ready(file) => file,
103                _ => unreachable!(),
104            };
105
106            let (tx, rx) = oneshot::channel();
107
108            let fut = Box::new(lazy(move || {
109                let res = file.write_all(item.as_ref())
110                    .map(|_| file)
111                    .map_err(From::from);
112
113                tx.send(res).map_err(|_| ())
114            }));
115
116            self.pool.executor.execute(fut).unwrap();
117
118            self.state = State::Working(super::fs(rx));
119            Ok(AsyncSink::Ready)
120        } else {
121            Ok(AsyncSink::NotReady(item))
122        }
123    }
124
125    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
126        self.poll_working()
127    }
128}
129
130impl fmt::Debug for FsWriteSink {
131    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
132        f.debug_struct("FsWriteSink").finish()
133    }
134}