tihu_native/
http.rs

1use async_trait::async_trait;
2use bytes::Bytes;
3use bytes::BytesMut;
4use futures::Stream;
5use futures::StreamExt;
6use futures::TryStreamExt;
7use headers::Cookie;
8use headers::HeaderMapExt;
9use http::Extensions;
10use http_body_util::BodyExt;
11use hyper::body::Frame;
12use hyper::body::Incoming;
13use hyper::header::HeaderValue;
14use hyper::HeaderMap;
15use hyper::Method;
16use hyper::Uri;
17use hyper::Version;
18use hyper::{Request, Response};
19use pin_project::pin_project;
20use std::any::Any;
21use std::any::TypeId;
22use std::borrow::Cow;
23use std::collections::HashMap;
24use std::fmt::{Debug, Formatter};
25use std::net::SocketAddr;
26use std::pin::Pin;
27use std::task::Context;
28use std::task::Poll;
29use sync_wrapper::SyncStream;
30use tihu::SharedString;
31
32pub type BoxBody = http_body_util::combinators::BoxBody<Bytes, anyhow::Error>;
33
34/// A body object for requests and responses.
35#[derive(Default)]
36#[pin_project]
37pub struct Body(#[pin] pub(crate) BoxBody);
38
39impl From<Body> for BoxBody {
40    #[inline]
41    fn from(body: Body) -> Self {
42        body.0
43    }
44}
45
46impl From<BoxBody> for Body {
47    #[inline]
48    fn from(body: BoxBody) -> Self {
49        Body(body)
50    }
51}
52
53impl Debug for Body {
54    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
55        f.debug_struct("Body").finish()
56    }
57}
58
59impl From<&'static [u8]> for Body {
60    #[inline]
61    fn from(data: &'static [u8]) -> Self {
62        Self(BoxBody::new(
63            http_body_util::Full::new(data.into()).map_err::<_, anyhow::Error>(|_| unreachable!()),
64        ))
65    }
66}
67
68impl From<&'static str> for Body {
69    #[inline]
70    fn from(data: &'static str) -> Self {
71        Self(BoxBody::new(
72            http_body_util::Full::new(data.into()).map_err::<_, anyhow::Error>(|_| unreachable!()),
73        ))
74    }
75}
76
77impl From<Bytes> for Body {
78    #[inline]
79    fn from(data: Bytes) -> Self {
80        Self(
81            http_body_util::Full::new(data)
82                .map_err::<_, anyhow::Error>(|_| unreachable!())
83                .boxed(),
84        )
85    }
86}
87
88impl From<Vec<u8>> for Body {
89    #[inline]
90    fn from(data: Vec<u8>) -> Self {
91        Self(
92            http_body_util::Full::new(data.into())
93                .map_err::<_, anyhow::Error>(|_| unreachable!())
94                .boxed(),
95        )
96    }
97}
98
99impl From<Cow<'static, [u8]>> for Body {
100    #[inline]
101    fn from(data: Cow<'static, [u8]>) -> Self {
102        Self(
103            http_body_util::Full::from(data)
104                .map_err::<_, anyhow::Error>(|_| unreachable!())
105                .boxed(),
106        )
107    }
108}
109
110impl From<String> for Body {
111    #[inline]
112    fn from(data: String) -> Self {
113        data.into_bytes().into()
114    }
115}
116
117impl From<SharedString> for Body {
118    #[inline]
119    fn from(data: SharedString) -> Self {
120        match data {
121            SharedString::Arc(data) => Body::from(data.to_string()),
122            SharedString::Static(data) => Body::from(data),
123        }
124    }
125}
126
127impl From<()> for Body {
128    #[inline]
129    fn from(_: ()) -> Self {
130        Body::empty()
131    }
132}
133
134impl Body {
135    /// Create a body object from [`Bytes`].
136    #[inline]
137    pub fn from_bytes(data: Bytes) -> Self {
138        data.into()
139    }
140
141    /// Create a body object from [`String`].
142    #[inline]
143    pub fn from_string(data: String) -> Self {
144        data.into()
145    }
146
147    /// Create a body object from bytes stream.
148    pub fn from_bytes_stream<S, O, E>(stream: S) -> Self
149    where
150        S: Stream<Item = Result<O, E>> + Send + 'static,
151        O: Into<Bytes> + 'static,
152        E: Into<anyhow::Error> + 'static,
153    {
154        Self(BoxBody::new(http_body_util::StreamBody::new(
155            SyncStream::new(
156                stream
157                    .map_ok(|data| Frame::data(data.into()))
158                    .map_err(Into::into),
159            ),
160        )))
161    }
162
163    /// Create a body object from [`Vec<u8>`].
164    #[inline]
165    pub fn from_vec(data: Vec<u8>) -> Self {
166        data.into()
167    }
168
169    /// Create an empty body.
170    #[inline]
171    pub fn empty() -> Self {
172        Self(
173            http_body_util::Empty::new()
174                .map_err::<_, anyhow::Error>(|_| unreachable!())
175                .boxed(),
176        )
177    }
178
179    #[inline]
180    pub fn into_inner(self) -> BoxBody {
181        self.0
182    }
183}
184
185impl hyper::body::Body for Body {
186    type Data = Bytes;
187    type Error = anyhow::Error;
188    fn poll_frame(
189        self: Pin<&mut Self>,
190        cx: &mut Context<'_>,
191    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
192        let this = self.project();
193        hyper::body::Body::poll_frame(this.0, cx)
194    }
195}
196
197pub fn body_to_stream<B>(
198    mut body: B,
199) -> impl Stream<Item = Result<hyper::body::Frame<Bytes>, anyhow::Error>>
200where
201    B: hyper::body::Body<Data = Bytes, Error = anyhow::Error> + Unpin,
202{
203    futures::stream::poll_fn(
204        move |cx| -> std::task::Poll<Option<Result<hyper::body::Frame<Bytes>, anyhow::Error>>> {
205            hyper::body::Body::poll_frame(std::pin::Pin::new(&mut body), cx)
206        },
207    )
208}
209
210pub async fn read_body<B>(body: B) -> Result<Bytes, anyhow::Error>
211where
212    B: hyper::body::Body<Data = Bytes, Error = anyhow::Error> + Unpin,
213{
214    let mut bytes = BytesMut::new();
215    let mut stream = body_to_stream(body);
216    while let Some(frame) = stream.next().await {
217        let frame = frame?;
218        if let Some(frame) = frame.data_ref() {
219            bytes.extend_from_slice(frame);
220        }
221    }
222    return Ok(bytes.into());
223}
224
225#[async_trait]
226pub trait HttpHandler: Sync + Send + 'static {
227    fn namespace(&self) -> &[SharedString];
228    async fn handle(
229        &self,
230        request: Request<Incoming>,
231        remote_addr: SocketAddr,
232        request_data: &mut RequestData,
233        prefix: Option<&str>,
234    ) -> Result<Response<BoxBody>, anyhow::Error>;
235}
236
237#[async_trait]
238pub trait HttpAuthorizer: Sync + Send + 'static {
239    async fn authorize(
240        &self,
241        request: &Request<Incoming>,
242        remote_addr: SocketAddr,
243        request_data: &mut RequestData,
244        prefix: Option<&str>,
245    ) -> Result<bool, anyhow::Error>;
246}
247
248#[async_trait]
249pub trait FromRequest: Sync + Send + 'static {
250    async fn try_extract(
251        request: &Request<Incoming>,
252        remote_addr: SocketAddr,
253        request_data: &mut RequestData,
254    ) -> Result<Self, anyhow::Error>
255    where
256        Self: Sized;
257}
258
259#[derive(Default)]
260pub struct RequestData {
261    data_map: HashMap<TypeId, Box<dyn Any + Sync + Send>>,
262}
263
264impl RequestData {
265    pub fn new() -> Self {
266        Default::default()
267    }
268}
269
270impl RequestData {
271    pub async fn try_get<T>(
272        &mut self,
273        request: &Request<Incoming>,
274        remote_addr: SocketAddr,
275    ) -> Result<&T, anyhow::Error>
276    where
277        T: FromRequest,
278    {
279        let type_id = TypeId::of::<T>();
280        let exist = self.data_map.get(&type_id).is_some();
281        if !exist {
282            let data = T::try_extract(request, remote_addr, self).await?;
283            self.data_map.insert(type_id, Box::new(data));
284        }
285        let data = self
286            .data_map
287            .get(&type_id)
288            .ok_or_else(|| SharedString::from_static("Data is empty!"))?;
289        let data = data
290            .downcast_ref::<T>()
291            .ok_or_else(|| SharedString::from_static("Data not match the type!"))?;
292        return Ok(data);
293    }
294    pub fn remove<T>(&mut self) -> Result<Option<Box<T>>, anyhow::Error>
295    where
296        T: FromRequest,
297    {
298        let type_id = TypeId::of::<T>();
299        if let Some(data) = self.data_map.remove(&type_id) {
300            match data.downcast::<T>() {
301                Ok(data) => {
302                    return Ok(Some(data));
303                }
304                Err(data) => {
305                    self.data_map.insert(type_id, Box::new(data));
306                    return Err(SharedString::from_static("Data not match the type!").into());
307                }
308            }
309        } else {
310            return Ok(None);
311        }
312    }
313    pub async fn remove_or_get<T>(
314        &mut self,
315        request: &Request<Incoming>,
316        remote_addr: SocketAddr,
317    ) -> Result<T, anyhow::Error>
318    where
319        T: FromRequest,
320    {
321        let data_opt = self.remove::<T>()?;
322        if let Some(data) = data_opt {
323            return Ok(*data);
324        } else {
325            let data = T::try_extract(request, remote_addr, self).await?;
326            return Ok(data);
327        }
328    }
329}
330
331#[async_trait]
332impl FromRequest for Option<Cookie> {
333    async fn try_extract(
334        request: &Request<Incoming>,
335        _remote_addr: SocketAddr,
336        _request_data: &mut RequestData,
337    ) -> Result<Self, anyhow::Error> {
338        let cookie = request.headers().typed_get::<Cookie>();
339        return Ok(cookie);
340    }
341}
342
343#[async_trait]
344impl FromRequest for Method {
345    async fn try_extract(
346        request: &Request<Incoming>,
347        _remote_addr: SocketAddr,
348        _request_data: &mut RequestData,
349    ) -> Result<Self, anyhow::Error> {
350        return Ok(request.method().clone());
351    }
352}
353
354#[async_trait]
355impl FromRequest for Uri {
356    async fn try_extract(
357        request: &Request<Incoming>,
358        _remote_addr: SocketAddr,
359        _request_data: &mut RequestData,
360    ) -> Result<Self, anyhow::Error> {
361        return Ok(request.uri().clone());
362    }
363}
364
365#[async_trait]
366impl FromRequest for Version {
367    async fn try_extract(
368        request: &Request<Incoming>,
369        _remote_addr: SocketAddr,
370        _request_data: &mut RequestData,
371    ) -> Result<Self, anyhow::Error> {
372        return Ok(request.version());
373    }
374}
375
376#[async_trait]
377impl FromRequest for HeaderMap<HeaderValue> {
378    async fn try_extract(
379        request: &Request<Incoming>,
380        _remote_addr: SocketAddr,
381        _request_data: &mut RequestData,
382    ) -> Result<Self, anyhow::Error> {
383        return Ok(request.headers().clone());
384    }
385}
386
387#[async_trait]
388impl FromRequest for Extensions {
389    async fn try_extract(
390        request: &Request<Incoming>,
391        _remote_addr: SocketAddr,
392        _request_data: &mut RequestData,
393    ) -> Result<Self, anyhow::Error> {
394        return Ok(request.extensions().clone());
395    }
396}