1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
use std::{cmp, fmt, mem};
use std::fs::{self, File};
use std::io::{self, Read};
use std::path::{PathBuf, Path};
use std::sync::Arc;
use bytes::{BufMut, Bytes, BytesMut};
use futures::{Async, Future, Poll, Stream};
use futures_cpupool::CpuFuture;
use ::FsPool;
const BUF_SIZE: usize = 8192;
pub fn new<P: AsRef<Path> + Send + 'static>(pool: &FsPool, path: P) -> FsReadStream {
FsReadStream {
buffer: BytesMut::with_capacity(0),
path: Arc::new(path.as_ref().to_owned()),
pool: pool.clone(),
state: State::Init,
}
}
pub struct FsReadStream {
buffer: BytesMut,
path: Arc<PathBuf>,
pool: FsPool,
state: State,
}
enum State {
Init,
Working(CpuFuture<(File, BytesMut), io::Error>),
Ready(File),
Eof,
Swapping,
}
impl Stream for FsReadStream {
type Item = Bytes;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
match mem::replace(&mut self.state, State::Swapping) {
State::Init => {
let path = self.path.clone();
self.state = State::Working(self.pool.cpu_pool.spawn_fn(move || {
open_and_read(&path)
}));
},
State::Working(mut cpu) => {
let polled = cpu.poll();
self.state = State::Working(cpu);
let (file, chunk) = try_ready!(polled);
if chunk.is_empty() {
self.state = State::Eof;
return Ok(Async::Ready(None));
} else {
self.buffer = chunk;
self.state = State::Ready(file);
return Ok(Async::Ready(Some(self.buffer.take().freeze())));
}
},
State::Ready(file) => {
let buf = self.buffer.split_off(0);
self.state = State::Working(self.pool.cpu_pool.spawn_fn(move || {
read(file, buf)
}));
},
State::Eof => {
self.state = State::Eof;
return Ok(Async::Ready(None));
},
State::Swapping => unreachable!(),
}
}
}
}
impl fmt::Debug for FsReadStream {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("FsReadStream")
.field("path", &self.path)
.finish()
}
}
fn read(mut file: File, mut buf: BytesMut) -> io::Result<(File, BytesMut)> {
if !buf.has_remaining_mut() {
buf.reserve(BUF_SIZE);
}
let n = try!(file.read(unsafe { buf.bytes_mut() }));
unsafe { buf.advance_mut(n) };
Ok((file, buf))
}
fn open_and_read(path: &Path) -> io::Result<(File, BytesMut)> {
let len = try!(fs::metadata(path)).len();
let file = try!(File::open(path));
let initial_cap = cmp::min(len as usize, BUF_SIZE);
read(file, BytesMut::with_capacity(initial_cap))
}