1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
use std::hint::unreachable_unchecked;
use bytes::{Bytes, BytesMut};
use http::{Extensions, HeaderMap, HeaderValue, StatusCode, Version};
use monoio::io::stream::Stream;
use monoio_http::h1::payload::Payload;
pub struct ClientResponse {
status: StatusCode,
version: Version,
headers: HeaderMap<HeaderValue>,
extensions: Extensions,
body: Payload,
}
impl ClientResponse {
pub fn new(inner: http::Response<Payload>) -> Self {
let (head, body) = inner.into_parts();
Self {
status: head.status,
version: head.version,
headers: head.headers,
extensions: head.extensions,
body,
}
}
#[inline]
pub fn status(&self) -> StatusCode {
self.status
}
#[inline]
pub fn version(&self) -> Version {
self.version
}
#[inline]
pub fn headers(&self) -> &HeaderMap {
&self.headers
}
#[inline]
pub fn headers_mut(&mut self) -> &mut HeaderMap {
&mut self.headers
}
pub fn extensions(&self) -> &http::Extensions {
&self.extensions
}
pub fn extensions_mut(&mut self) -> &mut http::Extensions {
&mut self.extensions
}
pub async fn bytes(self) -> crate::Result<Bytes> {
match self.body {
Payload::None => Ok(Bytes::new()),
Payload::Fixed(p) => p.get().await.map_err(Into::into),
Payload::Stream(mut s) => {
let mut ret = BytesMut::new();
while let Some(payload_result) = s.next().await {
let data = payload_result?;
ret.extend(data);
}
Ok(ret.freeze())
}
}
}
pub async fn chunk(&mut self) -> crate::Result<Option<Bytes>> {
match &mut self.body {
Payload::None => Ok(None),
Payload::Fixed(_) => {
let p = match std::mem::replace(&mut self.body, Payload::None) {
Payload::None => unsafe { unreachable_unchecked() },
Payload::Fixed(p) => p,
Payload::Stream(_) => unsafe { unreachable_unchecked() },
};
p.get().await.map_err(Into::into).map(Option::Some)
}
Payload::Stream(s) => s.next().await.transpose().map_err(Into::into),
}
}
pub fn raw_body(self) -> Payload {
self.body
}
pub async fn json<T: serde::de::DeserializeOwned>(self) -> crate::Result<T> {
let bytes = self.bytes().await?;
let d = serde_json::from_slice(&bytes)?;
Ok(d)
}
}