1use std::io::{Error, ErrorKind, Result};
6
7use futures::{io::WriteHalf, AsyncRead, AsyncReadExt, AsyncWrite, Stream, StreamExt};
8use http::{Request, Response, StatusCode};
9
10use crate::{body::BodyReader, reader::Requester, writer::HttpWriter};
11
12pub struct HttpServer<I> {
13 label: Option<String>,
15 incoming: I,
17}
18
19impl<I> HttpServer<I> {
20 pub fn on(label: Option<&str>, incoming: I) -> Self {
22 Self {
23 label: label.map(|label| label.to_owned()),
24 incoming,
25 }
26 }
27
28 pub async fn accept<S, E>(&mut self) -> Result<(Request<BodyReader>, WriteHalf<S>)>
30 where
31 I: Stream<Item = std::result::Result<S, E>> + Unpin,
32 S: AsyncRead + AsyncWrite + Send + Unpin + 'static,
33 E: std::error::Error,
34 {
35 loop {
36 match self
37 .incoming
38 .next()
39 .await
40 .ok_or(Error::new(ErrorKind::BrokenPipe, "http server shutdown."))?
41 {
42 Ok(stream) => {
43 let (read, mut write) = stream.split();
44
45 let request = match Requester::new(read).parse().await {
46 Ok(request) => request,
47 Err(err) => {
48 log::error!(
49 "{}, parse request error,{}",
50 self.label.as_deref().unwrap_or("Unknown"),
51 err
52 );
53
54 if let Err(err) = write
55 .write_response(
56 Response::builder()
57 .status(StatusCode::BAD_REQUEST)
58 .body(BodyReader::empty())
59 .unwrap(),
60 )
61 .await
62 {
63 log::error!(
64 "{}, send BAD_REQUEST to client,{}",
65 self.label.as_deref().unwrap_or("Unknown"),
66 err
67 );
68 }
69
70 continue;
71 }
72 };
73
74 return Ok((request, write));
75 }
76 Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())),
77 }
78 }
79 }
80
81 pub fn into_incoming<S, E>(
82 self,
83 ) -> impl Stream<Item = Result<(Request<BodyReader>, WriteHalf<S>)>> + Unpin
84 where
85 I: Stream<Item = std::result::Result<S, E>> + Unpin,
86 S: AsyncRead + AsyncWrite + Send + Unpin + 'static,
87 E: std::error::Error,
88 {
89 Box::pin(futures::stream::unfold(self, |mut listener| async move {
90 let res = listener.accept().await;
91 Some((res, listener))
92 }))
93 }
94}