futures_http/
server.rs

1//! Utilities for http server-side application.
2//!
3//!
4
5use std::io::{Error, ErrorKind, Result};
6
7use futures::{io::WriteHalf, AsyncRead, AsyncReadExt, AsyncWrite, Stream, StreamExt};
8use http::{Request, Response, StatusCode};
9
10use crate::{body::BodyReader, reader::Requester, writer::HttpWriter};
11
12pub struct HttpServer<I> {
13    /// debug information.
14    label: Option<String>,
15    /// incoming http connection stream.
16    incoming: I,
17}
18
19impl<I> HttpServer<I> {
20    /// Start http server with provided http incoming connection stream.
21    pub fn on(label: Option<&str>, incoming: I) -> Self {
22        Self {
23            label: label.map(|label| label.to_owned()),
24            incoming,
25        }
26    }
27
28    /// Accept new incoming http connection.
29    pub async fn accept<S, E>(&mut self) -> Result<(Request<BodyReader>, WriteHalf<S>)>
30    where
31        I: Stream<Item = std::result::Result<S, E>> + Unpin,
32        S: AsyncRead + AsyncWrite + Send + Unpin + 'static,
33        E: std::error::Error,
34    {
35        loop {
36            match self
37                .incoming
38                .next()
39                .await
40                .ok_or(Error::new(ErrorKind::BrokenPipe, "http server shutdown."))?
41            {
42                Ok(stream) => {
43                    let (read, mut write) = stream.split();
44
45                    let request = match Requester::new(read).parse().await {
46                        Ok(request) => request,
47                        Err(err) => {
48                            log::error!(
49                                "{}, parse request error,{}",
50                                self.label.as_deref().unwrap_or("Unknown"),
51                                err
52                            );
53
54                            if let Err(err) = write
55                                .write_response(
56                                    Response::builder()
57                                        .status(StatusCode::BAD_REQUEST)
58                                        .body(BodyReader::empty())
59                                        .unwrap(),
60                                )
61                                .await
62                            {
63                                log::error!(
64                                    "{}, send BAD_REQUEST to client,{}",
65                                    self.label.as_deref().unwrap_or("Unknown"),
66                                    err
67                                );
68                            }
69
70                            continue;
71                        }
72                    };
73
74                    return Ok((request, write));
75                }
76                Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())),
77            }
78        }
79    }
80
81    pub fn into_incoming<S, E>(
82        self,
83    ) -> impl Stream<Item = Result<(Request<BodyReader>, WriteHalf<S>)>> + Unpin
84    where
85        I: Stream<Item = std::result::Result<S, E>> + Unpin,
86        S: AsyncRead + AsyncWrite + Send + Unpin + 'static,
87        E: std::error::Error,
88    {
89        Box::pin(futures::stream::unfold(self, |mut listener| async move {
90            let res = listener.accept().await;
91            Some((res, listener))
92        }))
93    }
94}