1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
use failure::{bail, Error as Flare};
use log::{debug};
use body_image::{BodySink, Tunables};
use hyper;
use tokio_threadpool;
use futures::{Async, AsyncSink, Poll, Sink, StartSend};
#[derive(Debug)]
pub struct AsyncBodySink {
body: BodySink,
tune: Tunables,
}
impl AsyncBodySink {
pub fn new(body: BodySink, tune: Tunables) -> AsyncBodySink {
AsyncBodySink { body, tune }
}
pub fn body(&self) -> &BodySink {
&self.body
}
pub fn body_mut(&mut self) -> &mut BodySink {
&mut self.body
}
pub fn into_inner(self) -> BodySink {
self.body
}
}
macro_rules! unblock {
($c:ident, || $b:block) => (match tokio_threadpool::blocking(|| $b) {
Ok(Async::Ready(Ok(_))) => (),
Ok(Async::Ready(Err(e))) => return Err(e.into()),
Ok(Async::NotReady) => {
debug!("No blocking backup thread available -> NotReady");
return Ok(AsyncSink::NotReady($c));
}
Err(e) => return Err(e.into())
})
}
impl Sink for AsyncBodySink {
type SinkItem = hyper::Chunk;
type SinkError = Flare;
fn start_send(&mut self, chunk: hyper::Chunk)
-> StartSend<hyper::Chunk, Flare>
{
let new_len = self.body.len() + (chunk.len() as u64);
if new_len > self.tune.max_body() {
bail!("Response stream too long: {}+", new_len);
}
if self.body.is_ram() && new_len > self.tune.max_body_ram() {
unblock!(chunk, || {
debug!("to write back file (blocking, len: {})", new_len);
self.body.write_back(self.tune.temp_dir())
})
}
if self.body.is_ram() {
debug!("to save chunk (len: {})", chunk.len());
self.body.save(chunk).map_err(Flare::from)?;
} else {
unblock!(chunk, || {
debug!("to write chunk (blocking, len: {})", chunk.len());
self.body.write_all(&chunk)
})
}
Ok(AsyncSink::Ready)
}
fn poll_complete(&mut self) -> Poll<(), Flare> {
Ok(Async::Ready(()))
}
fn close(&mut self) -> Poll<(), Flare> {
Ok(Async::Ready(()))
}
}