1use async_trait::async_trait;
2use bytes::Bytes;
3use bytes::BytesMut;
4use futures::Stream;
5use futures::StreamExt;
6use futures::TryStreamExt;
7use headers::Cookie;
8use headers::HeaderMapExt;
9use http::Extensions;
10use http_body_util::BodyExt;
11use hyper::body::Frame;
12use hyper::body::Incoming;
13use hyper::header::HeaderValue;
14use hyper::HeaderMap;
15use hyper::Method;
16use hyper::Uri;
17use hyper::Version;
18use hyper::{Request, Response};
19use pin_project::pin_project;
20use std::any::Any;
21use std::any::TypeId;
22use std::borrow::Cow;
23use std::collections::HashMap;
24use std::fmt::{Debug, Formatter};
25use std::net::SocketAddr;
26use std::pin::Pin;
27use std::task::Context;
28use std::task::Poll;
29use sync_wrapper::SyncStream;
30use tihu::SharedString;
31
32pub type BoxBody = http_body_util::combinators::BoxBody<Bytes, anyhow::Error>;
33
34#[derive(Default)]
36#[pin_project]
37pub struct Body(#[pin] pub(crate) BoxBody);
38
39impl From<Body> for BoxBody {
40 #[inline]
41 fn from(body: Body) -> Self {
42 body.0
43 }
44}
45
46impl From<BoxBody> for Body {
47 #[inline]
48 fn from(body: BoxBody) -> Self {
49 Body(body)
50 }
51}
52
53impl Debug for Body {
54 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
55 f.debug_struct("Body").finish()
56 }
57}
58
59impl From<&'static [u8]> for Body {
60 #[inline]
61 fn from(data: &'static [u8]) -> Self {
62 Self(BoxBody::new(
63 http_body_util::Full::new(data.into()).map_err::<_, anyhow::Error>(|_| unreachable!()),
64 ))
65 }
66}
67
68impl From<&'static str> for Body {
69 #[inline]
70 fn from(data: &'static str) -> Self {
71 Self(BoxBody::new(
72 http_body_util::Full::new(data.into()).map_err::<_, anyhow::Error>(|_| unreachable!()),
73 ))
74 }
75}
76
77impl From<Bytes> for Body {
78 #[inline]
79 fn from(data: Bytes) -> Self {
80 Self(
81 http_body_util::Full::new(data)
82 .map_err::<_, anyhow::Error>(|_| unreachable!())
83 .boxed(),
84 )
85 }
86}
87
88impl From<Vec<u8>> for Body {
89 #[inline]
90 fn from(data: Vec<u8>) -> Self {
91 Self(
92 http_body_util::Full::new(data.into())
93 .map_err::<_, anyhow::Error>(|_| unreachable!())
94 .boxed(),
95 )
96 }
97}
98
99impl From<Cow<'static, [u8]>> for Body {
100 #[inline]
101 fn from(data: Cow<'static, [u8]>) -> Self {
102 Self(
103 http_body_util::Full::from(data)
104 .map_err::<_, anyhow::Error>(|_| unreachable!())
105 .boxed(),
106 )
107 }
108}
109
110impl From<String> for Body {
111 #[inline]
112 fn from(data: String) -> Self {
113 data.into_bytes().into()
114 }
115}
116
117impl From<SharedString> for Body {
118 #[inline]
119 fn from(data: SharedString) -> Self {
120 match data {
121 SharedString::Arc(data) => Body::from(data.to_string()),
122 SharedString::Static(data) => Body::from(data),
123 }
124 }
125}
126
127impl From<()> for Body {
128 #[inline]
129 fn from(_: ()) -> Self {
130 Body::empty()
131 }
132}
133
134impl Body {
135 #[inline]
137 pub fn from_bytes(data: Bytes) -> Self {
138 data.into()
139 }
140
141 #[inline]
143 pub fn from_string(data: String) -> Self {
144 data.into()
145 }
146
147 pub fn from_bytes_stream<S, O, E>(stream: S) -> Self
149 where
150 S: Stream<Item = Result<O, E>> + Send + 'static,
151 O: Into<Bytes> + 'static,
152 E: Into<anyhow::Error> + 'static,
153 {
154 Self(BoxBody::new(http_body_util::StreamBody::new(
155 SyncStream::new(
156 stream
157 .map_ok(|data| Frame::data(data.into()))
158 .map_err(Into::into),
159 ),
160 )))
161 }
162
163 #[inline]
165 pub fn from_vec(data: Vec<u8>) -> Self {
166 data.into()
167 }
168
169 #[inline]
171 pub fn empty() -> Self {
172 Self(
173 http_body_util::Empty::new()
174 .map_err::<_, anyhow::Error>(|_| unreachable!())
175 .boxed(),
176 )
177 }
178
179 #[inline]
180 pub fn into_inner(self) -> BoxBody {
181 self.0
182 }
183}
184
185impl hyper::body::Body for Body {
186 type Data = Bytes;
187 type Error = anyhow::Error;
188 fn poll_frame(
189 self: Pin<&mut Self>,
190 cx: &mut Context<'_>,
191 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
192 let this = self.project();
193 hyper::body::Body::poll_frame(this.0, cx)
194 }
195}
196
197pub fn body_to_stream<B>(
198 mut body: B,
199) -> impl Stream<Item = Result<hyper::body::Frame<Bytes>, anyhow::Error>>
200where
201 B: hyper::body::Body<Data = Bytes, Error = anyhow::Error> + Unpin,
202{
203 futures::stream::poll_fn(
204 move |cx| -> std::task::Poll<Option<Result<hyper::body::Frame<Bytes>, anyhow::Error>>> {
205 hyper::body::Body::poll_frame(std::pin::Pin::new(&mut body), cx)
206 },
207 )
208}
209
210pub async fn read_body<B>(body: B) -> Result<Bytes, anyhow::Error>
211where
212 B: hyper::body::Body<Data = Bytes, Error = anyhow::Error> + Unpin,
213{
214 let mut bytes = BytesMut::new();
215 let mut stream = body_to_stream(body);
216 while let Some(frame) = stream.next().await {
217 let frame = frame?;
218 if let Some(frame) = frame.data_ref() {
219 bytes.extend_from_slice(frame);
220 }
221 }
222 return Ok(bytes.into());
223}
224
225#[async_trait]
226pub trait HttpHandler: Sync + Send + 'static {
227 fn namespace(&self) -> &[SharedString];
228 async fn handle(
229 &self,
230 request: Request<Incoming>,
231 remote_addr: SocketAddr,
232 request_data: &mut RequestData,
233 prefix: Option<&str>,
234 ) -> Result<Response<BoxBody>, anyhow::Error>;
235}
236
237#[async_trait]
238pub trait HttpAuthorizer: Sync + Send + 'static {
239 async fn authorize(
240 &self,
241 request: &Request<Incoming>,
242 remote_addr: SocketAddr,
243 request_data: &mut RequestData,
244 prefix: Option<&str>,
245 ) -> Result<bool, anyhow::Error>;
246}
247
248#[async_trait]
249pub trait FromRequest: Sync + Send + 'static {
250 async fn try_extract(
251 request: &Request<Incoming>,
252 remote_addr: SocketAddr,
253 request_data: &mut RequestData,
254 ) -> Result<Self, anyhow::Error>
255 where
256 Self: Sized;
257}
258
259#[derive(Default)]
260pub struct RequestData {
261 data_map: HashMap<TypeId, Box<dyn Any + Sync + Send>>,
262}
263
264impl RequestData {
265 pub fn new() -> Self {
266 Default::default()
267 }
268}
269
270impl RequestData {
271 pub async fn try_get<T>(
272 &mut self,
273 request: &Request<Incoming>,
274 remote_addr: SocketAddr,
275 ) -> Result<&T, anyhow::Error>
276 where
277 T: FromRequest,
278 {
279 let type_id = TypeId::of::<T>();
280 let exist = self.data_map.get(&type_id).is_some();
281 if !exist {
282 let data = T::try_extract(request, remote_addr, self).await?;
283 self.data_map.insert(type_id, Box::new(data));
284 }
285 let data = self
286 .data_map
287 .get(&type_id)
288 .ok_or_else(|| SharedString::from_static("Data is empty!"))?;
289 let data = data
290 .downcast_ref::<T>()
291 .ok_or_else(|| SharedString::from_static("Data not match the type!"))?;
292 return Ok(data);
293 }
294 pub fn remove<T>(&mut self) -> Result<Option<Box<T>>, anyhow::Error>
295 where
296 T: FromRequest,
297 {
298 let type_id = TypeId::of::<T>();
299 if let Some(data) = self.data_map.remove(&type_id) {
300 match data.downcast::<T>() {
301 Ok(data) => {
302 return Ok(Some(data));
303 }
304 Err(data) => {
305 self.data_map.insert(type_id, Box::new(data));
306 return Err(SharedString::from_static("Data not match the type!").into());
307 }
308 }
309 } else {
310 return Ok(None);
311 }
312 }
313 pub async fn remove_or_get<T>(
314 &mut self,
315 request: &Request<Incoming>,
316 remote_addr: SocketAddr,
317 ) -> Result<T, anyhow::Error>
318 where
319 T: FromRequest,
320 {
321 let data_opt = self.remove::<T>()?;
322 if let Some(data) = data_opt {
323 return Ok(*data);
324 } else {
325 let data = T::try_extract(request, remote_addr, self).await?;
326 return Ok(data);
327 }
328 }
329}
330
331#[async_trait]
332impl FromRequest for Option<Cookie> {
333 async fn try_extract(
334 request: &Request<Incoming>,
335 _remote_addr: SocketAddr,
336 _request_data: &mut RequestData,
337 ) -> Result<Self, anyhow::Error> {
338 let cookie = request.headers().typed_get::<Cookie>();
339 return Ok(cookie);
340 }
341}
342
343#[async_trait]
344impl FromRequest for Method {
345 async fn try_extract(
346 request: &Request<Incoming>,
347 _remote_addr: SocketAddr,
348 _request_data: &mut RequestData,
349 ) -> Result<Self, anyhow::Error> {
350 return Ok(request.method().clone());
351 }
352}
353
354#[async_trait]
355impl FromRequest for Uri {
356 async fn try_extract(
357 request: &Request<Incoming>,
358 _remote_addr: SocketAddr,
359 _request_data: &mut RequestData,
360 ) -> Result<Self, anyhow::Error> {
361 return Ok(request.uri().clone());
362 }
363}
364
365#[async_trait]
366impl FromRequest for Version {
367 async fn try_extract(
368 request: &Request<Incoming>,
369 _remote_addr: SocketAddr,
370 _request_data: &mut RequestData,
371 ) -> Result<Self, anyhow::Error> {
372 return Ok(request.version());
373 }
374}
375
376#[async_trait]
377impl FromRequest for HeaderMap<HeaderValue> {
378 async fn try_extract(
379 request: &Request<Incoming>,
380 _remote_addr: SocketAddr,
381 _request_data: &mut RequestData,
382 ) -> Result<Self, anyhow::Error> {
383 return Ok(request.headers().clone());
384 }
385}
386
387#[async_trait]
388impl FromRequest for Extensions {
389 async fn try_extract(
390 request: &Request<Incoming>,
391 _remote_addr: SocketAddr,
392 _request_data: &mut RequestData,
393 ) -> Result<Self, anyhow::Error> {
394 return Ok(request.extensions().clone());
395 }
396}