1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
//! Utilities for http server-side application.
//!
//!

use std::io::{Error, ErrorKind, Result};

use futures::{io::WriteHalf, AsyncRead, AsyncReadExt, AsyncWrite, Stream, StreamExt};
use http::{Request, Response, StatusCode};

use crate::{body::BodyReader, reader::Requester, writer::HttpWriter};

pub struct HttpServer<I> {
    /// debug information.
    label: Option<String>,
    /// incoming http connection stream.
    incoming: I,
}

impl<I> HttpServer<I> {
    /// Start http server with provided http incoming connection stream.
    pub fn on(label: Option<&str>, incoming: I) -> Self {
        Self {
            label: label.map(|label| label.to_owned()),
            incoming,
        }
    }

    /// Accept new incoming http connection.
    pub async fn accept<S, E>(&mut self) -> Result<(Request<BodyReader>, WriteHalf<S>)>
    where
        I: Stream<Item = std::result::Result<S, E>> + Unpin,
        S: AsyncRead + AsyncWrite + Send + Unpin + 'static,
        E: std::error::Error,
    {
        loop {
            match self
                .incoming
                .next()
                .await
                .ok_or(Error::new(ErrorKind::BrokenPipe, "http server shutdown."))?
            {
                Ok(stream) => {
                    let (read, mut write) = stream.split();

                    let request = match Requester::new(read).parse().await {
                        Ok(request) => request,
                        Err(err) => {
                            log::error!(
                                "{}, parse request error,{}",
                                self.label.as_deref().unwrap_or("Unknown"),
                                err
                            );

                            if let Err(err) = write
                                .write_response(
                                    Response::builder()
                                        .status(StatusCode::BAD_REQUEST)
                                        .body(BodyReader::empty())
                                        .unwrap(),
                                )
                                .await
                            {
                                log::error!(
                                    "{}, send BAD_REQUEST to client,{}",
                                    self.label.as_deref().unwrap_or("Unknown"),
                                    err
                                );
                            }

                            continue;
                        }
                    };

                    return Ok((request, write));
                }
                Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())),
            }
        }
    }

    pub fn into_incoming<S, E>(
        self,
    ) -> impl Stream<Item = Result<(Request<BodyReader>, WriteHalf<S>)>> + Unpin
    where
        I: Stream<Item = std::result::Result<S, E>> + Unpin,
        S: AsyncRead + AsyncWrite + Send + Unpin + 'static,
        E: std::error::Error,
    {
        Box::pin(futures::stream::unfold(self, |mut listener| async move {
            let res = listener.accept().await;
            Some((res, listener))
        }))
    }
}