1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
use std::io;
use std::panic;
use futures::Poll;
use futures::stream;
use futures::stream::Stream;
use bytes::Bytes;
use error;
use solicit::header::Headers;
use solicit_async::*;
use misc::any_to_string;
#[derive(Debug)]
pub enum HttpStreamPartContent {
Headers(Headers),
Data(Bytes),
}
pub struct HttpStreamPart {
pub content: HttpStreamPartContent,
pub last: bool,
}
impl HttpStreamPart {
pub fn last_headers(headers: Headers) -> Self {
HttpStreamPart {
content: HttpStreamPartContent::Headers(headers),
last: true,
}
}
pub fn intermediate_headers(headers: Headers) -> Self {
HttpStreamPart {
content: HttpStreamPartContent::Headers(headers),
last: false,
}
}
pub fn intermediate_data(data: Bytes) -> Self {
HttpStreamPart {
content: HttpStreamPartContent::Data(data),
last: false,
}
}
pub fn last_data(data: Bytes) -> Self {
HttpStreamPart {
content: HttpStreamPartContent::Data(data),
last: true,
}
}
}
pub struct HttpPartStream(
pub HttpFutureStreamSend<HttpStreamPart>
);
impl HttpPartStream {
pub fn new<S>(s: S) -> HttpPartStream
where S : Stream<Item=HttpStreamPart, Error=error::Error> + Send + 'static
{
HttpPartStream(Box::new(s))
}
pub fn empty() -> HttpPartStream {
HttpPartStream::new(stream::empty())
}
pub fn bytes<S>(bytes: S) -> HttpPartStream
where S : Stream<Item=Bytes, Error=error::Error> + Send + 'static
{
HttpPartStream::new(bytes.map(HttpStreamPart::intermediate_data))
}
pub fn once(part: HttpStreamPartContent) -> HttpPartStream {
HttpPartStream::new(stream::once(Ok(HttpStreamPart { content: part, last: true })))
}
pub fn once_bytes<B>(bytes: B) -> HttpPartStream
where B : Into<Bytes>
{
HttpPartStream::once(HttpStreamPartContent::Data(bytes.into()))
}
pub fn drop_last_flag(self) -> HttpFutureStreamSend<HttpStreamPartContent> {
Box::new(self.map(|HttpStreamPart { content, .. }| content))
}
pub fn filter_data(self) -> HttpFutureStreamSend<Bytes> {
Box::new(self.filter_map(|HttpStreamPart { content, .. }| {
match content {
HttpStreamPartContent::Data(data) => Some(data),
_ => None,
}
}))
}
pub fn check_only_data(self) -> HttpFutureStreamSend<Bytes> {
Box::new(self.and_then(|HttpStreamPart { content, .. }| {
match content {
HttpStreamPartContent::Data(data) => {
Ok(data)
},
HttpStreamPartContent::Headers(..) => {
Err(error::Error::from(io::Error::new(io::ErrorKind::Other, "expecting only DATA frames")))
},
}
}))
}
pub fn catch_unwind(self) -> HttpPartStream {
HttpPartStream::new(panic::AssertUnwindSafe(self.0).catch_unwind().then(|r| {
match r {
Ok(r) => r,
Err(e) => {
let e = any_to_string(e);
warn!("handler panicked: {}", e);
Err(error::Error::HandlerPanicked(e))
},
}
}))
}
}
impl Stream for HttpPartStream {
type Item = HttpStreamPart;
type Error = error::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.0.poll()
}
}