1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
use std::fs::{File, OpenOptions};
use std::io::{self, Write};
use std::path::Path;
use bytes::Bytes;
use futures::{Async, AsyncSink, Future, Poll, Sink, StartSend};
use futures_cpupool::CpuFuture;
use ::FsPool;
pub fn new<P: AsRef<Path> + Send + 'static>(pool: &FsPool, path: P, opts: WriteOptions) -> FsWriteSink {
let open = pool.cpu_pool.spawn_fn(move || {
opts.open.open(path)
});
FsWriteSink {
pool: pool.clone(),
state: State::Working(open),
}
}
pub struct FsWriteSink {
pool: FsPool,
state: State,
}
#[derive(Debug)]
pub struct WriteOptions {
open: OpenOptions,
}
impl Default for WriteOptions {
fn default() -> WriteOptions {
let mut opts = OpenOptions::new();
opts.write(true)
.create(true);
WriteOptions {
open: opts,
}
}
}
impl From<OpenOptions> for WriteOptions {
fn from(open: OpenOptions) -> WriteOptions {
WriteOptions {
open: open,
}
}
}
enum State {
Working(CpuFuture<File, io::Error>),
Ready(File),
Swapping,
}
impl FsWriteSink {
fn poll_working(&mut self) -> Poll<(), io::Error> {
let state = match self.state {
State::Working(ref mut cpu) => {
let file = try_ready!(cpu.poll());
State::Ready(file)
}
State::Ready(_) => {
return Ok(Async::Ready(()));
},
State::Swapping => unreachable!(),
};
self.state = state;
Ok(Async::Ready(()))
}
}
impl Sink for FsWriteSink {
type SinkItem = Bytes;
type SinkError = io::Error;
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
let state = try!(self.poll_working());
if state.is_ready() {
let mut file = match ::std::mem::replace(&mut self.state, State::Swapping) {
State::Ready(file) => file,
_ => unreachable!(),
};
self.state = State::Working(self.pool.cpu_pool.spawn_fn(move || {
try!(file.write_all(item.as_ref()));
Ok(file)
}));
Ok(AsyncSink::Ready)
} else {
Ok(AsyncSink::NotReady(item))
}
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
self.poll_working()
}
}