jsonrpsee_core/
http_helpers.rs1use crate::BoxError;
30use bytes::{Buf, Bytes};
31use http_body::Frame;
32use http_body_util::{BodyExt, Limited};
33use std::{
34 pin::Pin,
35 task::{Context, Poll},
36};
37
38pub type Request<T = Body> = http::Request<T>;
40pub type Response<T = Body> = http::Response<T>;
42
43#[derive(Debug, Default)]
45pub struct Body(http_body_util::combinators::UnsyncBoxBody<Bytes, BoxError>);
46
47impl Body {
48 pub fn empty() -> Self {
50 Self::default()
51 }
52
53 pub fn new<B>(body: B) -> Self
55 where
56 B: http_body::Body<Data = Bytes> + Send + 'static,
57 B::Data: Send + 'static,
58 B::Error: Into<BoxError>,
59 {
60 Self(body.map_err(|e| e.into()).boxed_unsync())
61 }
62}
63
64impl From<String> for Body {
65 fn from(s: String) -> Self {
66 let body = http_body_util::Full::from(s);
67 Self::new(body)
68 }
69}
70
71impl From<&'static str> for Body {
72 fn from(s: &'static str) -> Self {
73 let body = http_body_util::Full::from(s);
74 Self::new(body)
75 }
76}
77
78impl From<Vec<u8>> for Body {
79 fn from(bytes: Vec<u8>) -> Self {
80 let body = http_body_util::Full::from(bytes);
81 Self::new(body)
82 }
83}
84
85impl http_body::Body for Body {
86 type Data = Bytes;
87 type Error = BoxError;
88
89 #[inline]
90 fn poll_frame(
91 mut self: Pin<&mut Self>,
92 cx: &mut Context<'_>,
93 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
94 Pin::new(&mut self.0).poll_frame(cx)
95 }
96
97 #[inline]
98 fn size_hint(&self) -> http_body::SizeHint {
99 self.0.size_hint()
100 }
101
102 #[inline]
103 fn is_end_stream(&self) -> bool {
104 self.0.is_end_stream()
105 }
106}
107
108#[derive(Debug, thiserror::Error)]
110pub enum HttpError {
111 #[error("The HTTP message was too big")]
113 TooLarge,
114 #[error("Malformed request")]
116 Malformed,
117 #[error(transparent)]
119 Stream(#[from] BoxError),
120}
121
122pub async fn read_body<B>(headers: &http::HeaderMap, body: B, max_body_size: u32) -> Result<(Vec<u8>, bool), HttpError>
128where
129 B: http_body::Body<Data = Bytes> + Send + 'static,
130 B::Data: Send,
131 B::Error: Into<BoxError>,
132{
133 let body_size = read_header_content_length(headers).unwrap_or(0);
136
137 if body_size > max_body_size {
138 return Err(HttpError::TooLarge);
139 }
140
141 futures_util::pin_mut!(body);
142
143 let mut received_data = Vec::with_capacity(std::cmp::min(body_size as usize, 16 * 1024));
145 let mut limited_body = Limited::new(body, max_body_size as usize);
146
147 let mut is_single = None;
148
149 while let Some(frame_or_err) = limited_body.frame().await {
150 let frame = frame_or_err.map_err(HttpError::Stream)?;
151 let Some(data) = frame.data_ref() else {
152 continue;
153 };
154
155 if received_data.is_empty() {
157 let first_non_whitespace =
158 data.chunk().iter().enumerate().take(128).find(|(_, byte)| !byte.is_ascii_whitespace());
159
160 let skip = match first_non_whitespace {
161 Some((idx, b'{')) => {
162 is_single = Some(true);
163 idx
164 }
165 Some((idx, b'[')) => {
166 is_single = Some(false);
167 idx
168 }
169 _ => return Err(HttpError::Malformed),
170 };
171
172 received_data.extend_from_slice(&data.chunk()[skip..]);
174 } else {
175 received_data.extend_from_slice(data.chunk());
176 }
177 }
178
179 match is_single {
180 Some(single) if !received_data.is_empty() => {
181 tracing::trace!(
182 target: "jsonrpsee-http",
183 "HTTP body: {}",
184 std::str::from_utf8(&received_data).unwrap_or("Invalid UTF-8 data")
185 );
186 Ok((received_data, single))
187 }
188 _ => Err(HttpError::Malformed),
189 }
190}
191
192fn read_header_content_length(headers: &http::header::HeaderMap) -> Option<u32> {
197 let length = read_header_value(headers, http::header::CONTENT_LENGTH)?;
198 length.parse::<u32>().ok()
200}
201
202pub fn read_header_value(headers: &http::header::HeaderMap, header_name: http::header::HeaderName) -> Option<&str> {
204 let mut values = headers.get_all(header_name).iter();
205 let val = values.next()?;
206 if values.next().is_none() {
207 val.to_str().ok()
208 } else {
209 None
210 }
211}
212
213pub fn read_header_values<'a>(
215 headers: &'a http::header::HeaderMap,
216 header_name: &str,
217) -> http::header::GetAll<'a, http::header::HeaderValue> {
218 headers.get_all(header_name)
219}
220
221#[cfg(test)]
222mod tests {
223 use super::{read_body, read_header_content_length, HttpError};
224 use http_body_util::BodyExt;
225
226 type Body = http_body_util::Full<bytes::Bytes>;
227
228 #[tokio::test]
229 async fn body_to_bytes_size_limit_works() {
230 let headers = http::header::HeaderMap::new();
231 let full_body = Body::from(vec![0; 128]);
232 let body = full_body.map_err(|e| HttpError::Stream(e.into()));
233 assert!(read_body(&headers, body, 127).await.is_err());
234 }
235
236 #[test]
237 fn read_content_length_works() {
238 let mut headers = http::header::HeaderMap::new();
239 headers.insert(http::header::CONTENT_LENGTH, "177".parse().unwrap());
240 assert_eq!(read_header_content_length(&headers), Some(177));
241
242 headers.append(http::header::CONTENT_LENGTH, "999".parse().unwrap());
243 assert_eq!(read_header_content_length(&headers), None);
244 }
245
246 #[test]
247 fn read_content_length_too_big_value() {
248 let mut headers = http::header::HeaderMap::new();
249 headers.insert(http::header::CONTENT_LENGTH, "18446744073709551616".parse().unwrap());
250 assert_eq!(read_header_content_length(&headers), None);
251 }
252}