1use std::fmt;
2use std::fs::{File, OpenOptions};
3use std::io::{self, Write};
4use std::path::Path;
5
6use bytes::Bytes;
7use futures::{Async, AsyncSink, Future, Poll, Sink, StartSend};
8use futures::future::lazy;
9use futures::sync::oneshot;
10
11use FsPool;
12use FsFuture;
13
14pub(crate) fn new<P>(pool: &FsPool, path: P, opts: WriteOptions) -> FsWriteSink
15where
16 P: AsRef<Path> + Send + 'static,
17{
18 let (tx, rx) = oneshot::channel();
19
20 let fut = Box::new(lazy(move || {
21 let res = opts.open.open(path).map_err(From::from);
22
23 tx.send(res).map_err(|_| ())
24 }));
25
26 pool.executor.execute(fut).unwrap();
27
28 FsWriteSink {
29 pool: pool.clone(),
30 state: State::Working(super::fs(rx)),
31 }
32}
33
34pub(crate) fn new_from_file(pool: &FsPool, file: File) -> FsWriteSink {
35 FsWriteSink {
36 pool: pool.clone(),
37 state: State::Ready(file),
38 }
39}
40
41pub struct FsWriteSink {
43 pool: FsPool,
44 state: State,
45}
46
47#[derive(Debug)]
53pub struct WriteOptions {
54 open: OpenOptions,
55}
56
57impl Default for WriteOptions {
58 fn default() -> WriteOptions {
59 let mut opts = OpenOptions::new();
60 opts.write(true).create(true);
61 WriteOptions { open: opts }
62 }
63}
64
65impl From<OpenOptions> for WriteOptions {
66 fn from(open: OpenOptions) -> WriteOptions {
67 WriteOptions { open: open }
68 }
69}
70
71enum State {
72 Working(FsFuture<File>),
73 Ready(File),
74 Swapping,
75}
76
77impl FsWriteSink {
78 fn poll_working(&mut self) -> Poll<(), io::Error> {
79 let state = match self.state {
80 State::Working(ref mut rx) => {
81 let file = try_ready!(rx.poll());
82 State::Ready(file)
83 }
84 State::Ready(_) => {
85 return Ok(Async::Ready(()));
86 }
87 State::Swapping => unreachable!(),
88 };
89 self.state = state;
90 Ok(Async::Ready(()))
91 }
92}
93
94impl Sink for FsWriteSink {
95 type SinkItem = Bytes;
96 type SinkError = io::Error;
97
98 fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
99 let state = self.poll_working()?;
100 if state.is_ready() {
101 let mut file = match ::std::mem::replace(&mut self.state, State::Swapping) {
102 State::Ready(file) => file,
103 _ => unreachable!(),
104 };
105
106 let (tx, rx) = oneshot::channel();
107
108 let fut = Box::new(lazy(move || {
109 let res = file.write_all(item.as_ref())
110 .map(|_| file)
111 .map_err(From::from);
112
113 tx.send(res).map_err(|_| ())
114 }));
115
116 self.pool.executor.execute(fut).unwrap();
117
118 self.state = State::Working(super::fs(rx));
119 Ok(AsyncSink::Ready)
120 } else {
121 Ok(AsyncSink::NotReady(item))
122 }
123 }
124
125 fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
126 self.poll_working()
127 }
128}
129
130impl fmt::Debug for FsWriteSink {
131 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
132 f.debug_struct("FsWriteSink").finish()
133 }
134}