micro_http/connection/
http_connection.rs

1use std::error::Error;
2use std::fmt::Display;
3
4use bytes::Bytes;
5use std::sync::Arc;
6
7use futures::{SinkExt, StreamExt};
8use http::header::EXPECT;
9use http::{Response, StatusCode};
10use http_body::Body;
11use http_body_util::{BodyExt, Empty};
12use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
13use tokio::select;
14
15use crate::codec::{RequestDecoder, ResponseEncoder};
16use crate::handler::Handler;
17use crate::protocol::body::ReqBody;
18use crate::protocol::{HttpError, Message, ParseError, PayloadItem, PayloadSize, RequestHeader, ResponseHead, SendError};
19
20use tokio_util::codec::{FramedRead, FramedWrite};
21use tracing::{error, info};
22
23/// An HTTP connection that manages request processing and response streaming
24///
25/// `HttpConnection` handles the full lifecycle of an HTTP connection, including:
26/// - Reading and decoding requests
27/// - Processing request headers and bodies
28/// - Handling expect-continue mechanism
29/// - Streaming responses back to clients
30///
31/// # Type Parameters
32///
33/// * `R`: The async readable stream type
34/// * `W`: The async writable stream type
35///
36pub struct HttpConnection<R, W> {
37    framed_read: FramedRead<R, RequestDecoder>,
38    framed_write: FramedWrite<W, ResponseEncoder>,
39}
40
41impl<R, W> HttpConnection<R, W>
42where
43    R: AsyncRead + Unpin,
44    W: AsyncWrite + Unpin,
45{
46    pub fn new(reader: R, writer: W) -> Self {
47        Self {
48            framed_read: FramedRead::with_capacity(reader, RequestDecoder::new(), 8 * 1024),
49            framed_write: FramedWrite::new(writer, ResponseEncoder::new()),
50        }
51    }
52
53    pub async fn process<H>(mut self, mut handler: Arc<H>) -> Result<(), HttpError>
54    where
55        H: Handler,
56        H::RespBody: Body<Data = Bytes> + Unpin,
57        <H::RespBody as Body>::Error: Display,
58    {
59        loop {
60            match self.framed_read.next().await {
61                Some(Ok(Message::Header(header))) => {
62                    self.do_process(header, &mut handler).await?;
63                }
64
65                Some(Ok(Message::Payload(_))) => {
66                    error!("error status because chunked has read in do_process");
67                    let error_response = build_error_response(StatusCode::BAD_REQUEST);
68                    self.do_send_response(error_response).await?;
69                    return Err(ParseError::invalid_body("need header while receive body").into());
70                }
71
72                Some(Err(e)) => {
73                    error!("can't receive next request, cause {}", e);
74                    let error_response = build_error_response(StatusCode::BAD_REQUEST);
75                    self.do_send_response(error_response).await?;
76                    return Err(e.into());
77                }
78
79                None => {
80                    info!("cant read more request, break this connection down");
81                    return Ok(());
82                }
83            }
84        }
85    }
86
87    async fn do_process<H>(&mut self, header: RequestHeader, handler: &mut Arc<H>) -> Result<(), HttpError>
88    where
89        H: Handler,
90        H::RespBody: Body<Data = Bytes> + Unpin,
91        <H::RespBody as Body>::Error: Display,
92    {
93        // Check if the request header contains the "Expect: 100-continue" field.
94        if let Some(value) = header.headers().get(EXPECT) {
95            let slice = value.as_bytes();
96            // Verify if the value of the "Expect" field is "100-continue".
97            if slice.len() >= 4 && &slice[0..4] == b"100-" {
98                let writer = self.framed_write.get_mut();
99                // Send a "100 Continue" response to the client.
100                let _ = writer.write(b"HTTP/1.1 100 Continue\r\n\r\n").await.map_err(SendError::io)?;
101                writer.flush().await.map_err(SendError::io)?;
102                // Log the event of sending a "100 Continue" response.
103                info!("receive expect request header, sent continue response");
104            }
105        }
106
107        let (req_body, mut body_sender) = ReqBody::body_channel(&mut self.framed_read);
108
109        let request = header.body(req_body);
110
111        // This block handles concurrent processing of the request handler and request body streaming.
112        // We need this concurrent processing because:
113        // 1. The request handler may not read the entire body, but we still need to drain the body
114        //    from the underlying TCP stream to maintain protocol correctness
115        // 2. The request handler and body streaming need to happen simultaneously to avoid deadlocks,
116        //    since the handler may be waiting for body data while the body sender is waiting to send
117        let response_result = {
118            // Pin both futures to the stack since they are used in select! macro
119            // The futures are lazy and won't start executing until polled
120            tokio::pin! {
121                let request_handle_future = handler.call(request);
122                let body_sender_future = body_sender.send_body();
123            }
124
125            // Store the handler result to return after body is fully processed
126            #[allow(unused_assignments)]
127            let mut result = Option::<Result<_, _>>::None;
128
129            // Keep processing until handler completes
130            loop {
131                select! {
132                    // biased ensures we prioritize handling the response
133                    biased;
134                    // When handler completes, store result and break
135                    response = &mut request_handle_future => {
136                        result = Some(response);
137                        break;
138                    }
139                    // Keep processing body chunks in background
140                    _ = &mut body_sender_future => {
141                        // No action needed - just keep streaming body
142                    }
143                }
144            }
145            // Safe: result is Some if handler completed
146            result.unwrap()
147        };
148
149        // skip body if request handler don't read body
150        body_sender.skip_body().await;
151
152        self.send_response(response_result).await?;
153
154        Ok(())
155    }
156
157    async fn send_response<T, E>(&mut self, response_result: Result<Response<T>, E>) -> Result<(), HttpError>
158    where
159        T: Body + Unpin,
160        T::Error: Display,
161        E: Into<Box<dyn Error + Send + Sync>>,
162    {
163        match response_result {
164            Ok(response) => self.do_send_response(response).await,
165            Err(e) => {
166                error!("handle response error, cause: {}", e.into());
167                let error_response = build_error_response(StatusCode::INTERNAL_SERVER_ERROR);
168                self.do_send_response(error_response).await
169            }
170        }
171    }
172
173    async fn do_send_response<T>(&mut self, response: Response<T>) -> Result<(), HttpError>
174    where
175        T: Body + Unpin,
176        T::Error: Display,
177    {
178        let (header_parts, mut body) = response.into_parts();
179
180        let payload_size = {
181            let size_hint = body.size_hint();
182            match size_hint.exact() {
183                Some(0) => PayloadSize::Empty,
184                Some(length) => PayloadSize::Length(length),
185                None => PayloadSize::Chunked,
186            }
187        };
188
189        let header = Message::<_, T::Data>::Header((ResponseHead::from_parts(header_parts, ()), payload_size));
190        if !payload_size.is_empty() {
191            self.framed_write.feed(header).await?;
192        } else {
193            // using send instead of feed, because we want to flush the underlying IO
194            // when response only has header, we need to send header,
195            // otherwise, we just feed header to the buffer
196            self.framed_write.send(header).await?;
197        }
198
199        loop {
200            match body.frame().await {
201                Some(Ok(frame)) => {
202                    let payload_item =
203                        frame.into_data().map(PayloadItem::Chunk).map_err(|_e| SendError::invalid_body("resolve body response error"))?;
204
205                    self.framed_write
206                        .send(Message::Payload(payload_item))
207                        .await
208                        .map_err(|_e| SendError::invalid_body("can't send response"))?;
209                }
210                Some(Err(e)) => return Err(SendError::invalid_body(format!("resolve response body error: {e}")).into()),
211                None => {
212                    self.framed_write
213                        // using feed instead of send, because we don't want to flush the underlying IO
214                        .feed(Message::Payload(PayloadItem::<T::Data>::Eof))
215                        .await
216                        .map_err(|e| SendError::invalid_body(format!("can't send eof response: {}", e)))?;
217                    return Ok(());
218                }
219            }
220        }
221    }
222}
223
224fn build_error_response(status_code: StatusCode) -> Response<Empty<Bytes>> {
225    Response::builder().status(status_code).body(Empty::<Bytes>::new()).unwrap()
226}