micro_http/connection/
http_connection.rs1use std::error::Error;
2use std::fmt::Display;
3
4use bytes::Bytes;
5use std::sync::Arc;
6
7use futures::{SinkExt, StreamExt};
8use http::header::EXPECT;
9use http::{Response, StatusCode};
10use http_body::Body;
11use http_body_util::{BodyExt, Empty};
12use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
13use tokio::select;
14
15use crate::codec::{RequestDecoder, ResponseEncoder};
16use crate::handler::Handler;
17use crate::protocol::body::ReqBody;
18use crate::protocol::{HttpError, Message, ParseError, PayloadItem, PayloadSize, RequestHeader, ResponseHead, SendError};
19
20use tokio_util::codec::{FramedRead, FramedWrite};
21use tracing::{error, info};
22
23pub struct HttpConnection<R, W> {
37 framed_read: FramedRead<R, RequestDecoder>,
38 framed_write: FramedWrite<W, ResponseEncoder>,
39}
40
41impl<R, W> HttpConnection<R, W>
42where
43 R: AsyncRead + Unpin,
44 W: AsyncWrite + Unpin,
45{
46 pub fn new(reader: R, writer: W) -> Self {
47 Self {
48 framed_read: FramedRead::with_capacity(reader, RequestDecoder::new(), 8 * 1024),
49 framed_write: FramedWrite::new(writer, ResponseEncoder::new()),
50 }
51 }
52
53 pub async fn process<H>(mut self, mut handler: Arc<H>) -> Result<(), HttpError>
54 where
55 H: Handler,
56 H::RespBody: Body<Data = Bytes> + Unpin,
57 <H::RespBody as Body>::Error: Display,
58 {
59 loop {
60 match self.framed_read.next().await {
61 Some(Ok(Message::Header(header))) => {
62 self.do_process(header, &mut handler).await?;
63 }
64
65 Some(Ok(Message::Payload(_))) => {
66 error!("error status because chunked has read in do_process");
67 let error_response = build_error_response(StatusCode::BAD_REQUEST);
68 self.do_send_response(error_response).await?;
69 return Err(ParseError::invalid_body("need header while receive body").into());
70 }
71
72 Some(Err(e)) => {
73 error!("can't receive next request, cause {}", e);
74 let error_response = build_error_response(StatusCode::BAD_REQUEST);
75 self.do_send_response(error_response).await?;
76 return Err(e.into());
77 }
78
79 None => {
80 info!("cant read more request, break this connection down");
81 return Ok(());
82 }
83 }
84 }
85 }
86
87 async fn do_process<H>(&mut self, header: RequestHeader, handler: &mut Arc<H>) -> Result<(), HttpError>
88 where
89 H: Handler,
90 H::RespBody: Body<Data = Bytes> + Unpin,
91 <H::RespBody as Body>::Error: Display,
92 {
93 if let Some(value) = header.headers().get(EXPECT) {
95 let slice = value.as_bytes();
96 if slice.len() >= 4 && &slice[0..4] == b"100-" {
98 let writer = self.framed_write.get_mut();
99 let _ = writer.write(b"HTTP/1.1 100 Continue\r\n\r\n").await.map_err(SendError::io)?;
101 writer.flush().await.map_err(SendError::io)?;
102 info!("receive expect request header, sent continue response");
104 }
105 }
106
107 let (req_body, mut body_sender) = ReqBody::body_channel(&mut self.framed_read);
108
109 let request = header.body(req_body);
110
111 let response_result = {
118 tokio::pin! {
121 let request_handle_future = handler.call(request);
122 let body_sender_future = body_sender.send_body();
123 }
124
125 #[allow(unused_assignments)]
127 let mut result = Option::<Result<_, _>>::None;
128
129 loop {
131 select! {
132 biased;
134 response = &mut request_handle_future => {
136 result = Some(response);
137 break;
138 }
139 _ = &mut body_sender_future => {
141 }
143 }
144 }
145 result.unwrap()
147 };
148
149 body_sender.skip_body().await;
151
152 self.send_response(response_result).await?;
153
154 Ok(())
155 }
156
157 async fn send_response<T, E>(&mut self, response_result: Result<Response<T>, E>) -> Result<(), HttpError>
158 where
159 T: Body + Unpin,
160 T::Error: Display,
161 E: Into<Box<dyn Error + Send + Sync>>,
162 {
163 match response_result {
164 Ok(response) => self.do_send_response(response).await,
165 Err(e) => {
166 error!("handle response error, cause: {}", e.into());
167 let error_response = build_error_response(StatusCode::INTERNAL_SERVER_ERROR);
168 self.do_send_response(error_response).await
169 }
170 }
171 }
172
173 async fn do_send_response<T>(&mut self, response: Response<T>) -> Result<(), HttpError>
174 where
175 T: Body + Unpin,
176 T::Error: Display,
177 {
178 let (header_parts, mut body) = response.into_parts();
179
180 let payload_size = {
181 let size_hint = body.size_hint();
182 match size_hint.exact() {
183 Some(0) => PayloadSize::Empty,
184 Some(length) => PayloadSize::Length(length),
185 None => PayloadSize::Chunked,
186 }
187 };
188
189 let header = Message::<_, T::Data>::Header((ResponseHead::from_parts(header_parts, ()), payload_size));
190 if !payload_size.is_empty() {
191 self.framed_write.feed(header).await?;
192 } else {
193 self.framed_write.send(header).await?;
197 }
198
199 loop {
200 match body.frame().await {
201 Some(Ok(frame)) => {
202 let payload_item =
203 frame.into_data().map(PayloadItem::Chunk).map_err(|_e| SendError::invalid_body("resolve body response error"))?;
204
205 self.framed_write
206 .send(Message::Payload(payload_item))
207 .await
208 .map_err(|_e| SendError::invalid_body("can't send response"))?;
209 }
210 Some(Err(e)) => return Err(SendError::invalid_body(format!("resolve response body error: {e}")).into()),
211 None => {
212 self.framed_write
213 .feed(Message::Payload(PayloadItem::<T::Data>::Eof))
215 .await
216 .map_err(|e| SendError::invalid_body(format!("can't send eof response: {}", e)))?;
217 return Ok(());
218 }
219 }
220 }
221 }
222}
223
224fn build_error_response(status_code: StatusCode) -> Response<Empty<Bytes>> {
225 Response::builder().status(status_code).body(Empty::<Bytes>::new()).unwrap()
226}