monoio_transports/http/
connection.rs

1use bytes::Bytes;
2use http::Response;
3use monoio::io::{
4    sink::{Sink, SinkExt},
5    stream::Stream,
6    AsyncReadRent, AsyncWriteRent,
7};
8use monoio_http::{
9    common::{
10        body::{Body, HttpBody},
11        error::HttpError,
12        request::{Request, RequestHead},
13        IntoParts,
14    },
15    h1::{
16        codec::{
17            decoder::{DecodeError, PayloadDecoder},
18            ClientCodec,
19        },
20        payload::{fixed_payload_pair, stream_payload_pair, Payload},
21    },
22    h2::client::SendRequest,
23};
24
25use crate::pool::{Key, Poolable, Pooled};
26
27/// A HTTP/1.1 connection.
28pub struct Http1Connection<IO: AsyncWriteRent> {
29    framed: ClientCodec<IO>,
30    using: bool,
31    open: bool,
32}
33
34impl<IO: AsyncWriteRent> Http1Connection<IO> {
35    pub fn new(framed: ClientCodec<IO>) -> Self {
36        Self {
37            framed,
38            using: false,
39            open: true,
40        }
41    }
42}
43
44impl<IO: AsyncWriteRent> Poolable for Http1Connection<IO> {
45    #[inline]
46    fn is_open(&self) -> bool {
47        match self {
48            Self { using, open, .. } => *open && !*using,
49        }
50    }
51}
52
53impl<IO: AsyncReadRent + AsyncWriteRent> Http1Connection<IO> {
54    pub async fn send_request<R, E>(
55        &mut self,
56        request: R,
57    ) -> (Result<Response<HttpBody>, HttpError>, bool)
58    where
59        ClientCodec<IO>: Sink<R, Error = E>,
60        E: std::fmt::Debug + Into<HttpError>,
61    {
62        let handle = &mut self.framed;
63
64        if let Err(e) = handle.send_and_flush(request).await {
65            #[cfg(feature = "logging")]
66            tracing::error!("send upstream request error {:?}", e);
67            self.open = false;
68            return (Err(e.into()), false);
69        }
70
71        match handle.next().await {
72            Some(Ok(resp)) => {
73                let (parts, payload_decoder) = resp.into_parts();
74                match payload_decoder {
75                    PayloadDecoder::None => {
76                        let payload = Payload::None;
77                        let response = Response::from_parts(parts, payload.into());
78                        (Ok(response), false)
79                    }
80                    PayloadDecoder::Fixed(_) => {
81                        let mut framed_payload = payload_decoder.with_io(handle);
82                        let (payload, payload_sender) = fixed_payload_pair();
83                        if let Some(data) = framed_payload.next_data().await {
84                            payload_sender.feed(data)
85                        }
86                        let payload = Payload::Fixed(payload);
87                        let response = Response::from_parts(parts, payload.into());
88                        (Ok(response), false)
89                    }
90                    PayloadDecoder::Streamed(_) => {
91                        let mut framed_payload = payload_decoder.with_io(handle);
92                        let (payload, mut payload_sender) = stream_payload_pair();
93                        loop {
94                            match framed_payload.next_data().await {
95                                Some(Ok(data)) => payload_sender.feed_data(Some(data)),
96                                Some(Err(e)) => {
97                                    #[cfg(feature = "logging")]
98                                    tracing::error!("decode upstream response error {:?}", e);
99                                    self.open = false;
100                                    return (Err(e), false);
101                                }
102                                None => {
103                                    payload_sender.feed_data(None);
104                                    break;
105                                }
106                            }
107                        }
108                        let payload = Payload::Stream(payload);
109                        let response = Response::from_parts(parts, payload.into());
110                        (Ok(response), false)
111                    }
112                }
113            }
114            Some(Err(e)) => {
115                #[cfg(feature = "logging")]
116                tracing::error!("decode upstream response error {:?}", e);
117                self.open = false;
118                (Err(e), false)
119            }
120            None => {
121                #[cfg(feature = "logging")]
122                tracing::error!("upstream return eof");
123                self.open = false;
124                (Err(DecodeError::UnexpectedEof.into()), false)
125            }
126        }
127    }
128}
129
130/// A HTTP/2 connection.
131#[derive(Clone, Debug)]
132pub struct Http2Connection {
133    tx: SendRequest<Bytes>,
134}
135
136impl Poolable for Http2Connection {
137    #[inline]
138    fn is_open(&self) -> bool {
139        !self.tx.has_conn_error()
140    }
141}
142
143impl Http2Connection {
144    pub fn new(tx: SendRequest<Bytes>) -> Self {
145        Self { tx }
146    }
147
148    #[allow(dead_code)]
149    fn to_owned(&self) -> Self {
150        Self {
151            tx: self.tx.clone(),
152        }
153    }
154
155    pub fn conn_error(&self) -> Option<HttpError> {
156        self.tx.conn_error()
157    }
158}
159
160impl Http2Connection {
161    pub async fn send_request<R>(
162        &mut self,
163        request: R,
164    ) -> (Result<Response<HttpBody>, HttpError>, bool)
165    where
166        R: IntoParts<Parts = RequestHead>,
167        R::Body: Body<Data = Bytes, Error = HttpError>,
168    {
169        let mut client = match self.tx.clone().ready().await {
170            Ok(client) => client,
171            Err(e) => {
172                return (Err(e.into()), false);
173            }
174        };
175
176        let (parts, mut body) = request.into_parts();
177        let h2_request = Request::from_parts(parts, ());
178
179        let (response, mut send_stream) = match client.send_request(h2_request, false) {
180            Ok((response, send_stream)) => (response, send_stream),
181            Err(e) => {
182                return (Err(e.into()), false);
183            }
184        };
185
186        while let Some(data) = body.next_data().await {
187            match data {
188                Ok(data) => {
189                    if let Err(e) = send_stream.send_data(data, false) {
190                        #[cfg(feature = "logging")]
191                        tracing::error!("H2 client body send error {:?}", e);
192                        return (Err(e.into()), false);
193                    }
194                }
195                Err(e) => {
196                    #[cfg(feature = "logging")]
197                    tracing::error!("H2 request body stream error {:?}", e);
198                    return (Err(e), false);
199                }
200            }
201        }
202        // Mark end of stream
203        let _ = send_stream.send_data(Bytes::new(), true);
204
205        let response = match response.await {
206            Ok(response) => response,
207            Err(e) => {
208                #[cfg(feature = "logging")]
209                tracing::error!("H2 client response error {:?}", e);
210                return (Err(e.into()), false);
211            }
212        };
213
214        let (parts, body) = response.into_parts();
215        (Ok(Response::from_parts(parts, body.into())), true)
216    }
217}
218
219/// A unified representation of an HTTP connection, supporting both HTTP/1.1 and HTTP/2 protocols.
220///
221/// This enum is designed to work with monoio's native IO traits, which are optimized for io_uring.
222/// It allows for efficient handling of both HTTP/1.1 and HTTP/2 connections within the same
223/// abstraction.
224pub enum HttpConnection<K: Key, IO: AsyncReadRent + AsyncWriteRent> {
225    Http1(Pooled<K, Http1Connection<IO>>),
226    Http2(Http2Connection),
227}
228
229impl<K: Key, IO: AsyncWriteRent + AsyncReadRent> Poolable for HttpConnection<K, IO> {
230    #[inline]
231    fn is_open(&self) -> bool {
232        match self {
233            Self::Http1(conn) => conn.is_open(),
234            Self::Http2(conn) => conn.is_open(),
235        }
236    }
237}
238
239impl<K: Key, IO: AsyncReadRent + AsyncWriteRent> From<Pooled<K, Http1Connection<IO>>>
240    for HttpConnection<K, IO>
241{
242    fn from(pooled_conn: Pooled<K, Http1Connection<IO>>) -> Self {
243        Self::Http1(pooled_conn)
244    }
245}
246
247impl<K: Key, IO: AsyncReadRent + AsyncWriteRent> From<Http2Connection> for HttpConnection<K, IO> {
248    fn from(conn: Http2Connection) -> Self {
249        Self::Http2(conn)
250    }
251}
252
253impl<K: Key, IO: AsyncReadRent + AsyncWriteRent> HttpConnection<K, IO> {
254    /// Sends an HTTP request using the appropriate protocol (HTTP/1.1 or HTTP/2).
255    ///
256    /// This method automatically handles the differences between HTTP/1.1 and HTTP/2,
257    /// providing a unified interface for sending requests.
258    ///
259    /// # Arguments
260    ///
261    /// * `request` - The HTTP request to send.
262    ///
263    /// # Returns
264    ///
265    /// A tuple containing:
266    /// - `Result<Response<HttpBody>, HttpError>`: The HTTP response or an error.
267    /// - `bool`: Indicates whether the connection can be reused (true) or should be closed (false).
268    ///
269    /// # Type Parameters
270    ///
271    /// * `R`: The request type, which must be convertible into parts with a `RequestHead`.
272    /// * `E`: The error type for the `ClientCodec`, which must be convertible into `HttpError`.
273    ///
274    /// # Examples
275    ///
276    /// ```rust
277    /// # use crate::{HttpConnection, Request, Response, HttpBody, HttpError};
278    /// # async fn example<K: Key, IO: AsyncReadRent + AsyncWriteRent>(
279    /// #     mut conn: HttpConnection<K, IO>,
280    /// #     request: Request<Vec<u8>>
281    /// # ) -> Result<(), HttpError> {
282    /// let (response, can_reuse) = conn.send_request(request).await;
283    /// let response: Response<HttpBody> = response?;
284    ///  Ok(())
285    /// }
286    /// ```
287    pub async fn send_request<R, E>(
288        &mut self,
289        request: R,
290    ) -> (Result<Response<HttpBody>, HttpError>, bool)
291    where
292        ClientCodec<IO>: Sink<R, Error = E>,
293        E: std::fmt::Debug + Into<HttpError>,
294        R: IntoParts<Parts = RequestHead>,
295        R::Body: Body<Data = Bytes, Error = HttpError>,
296    {
297        match self {
298            Self::Http1(conn) => conn.send_request(request).await,
299            Self::Http2(conn) => conn.send_request(request).await,
300        }
301    }
302}