1use bytes::Bytes;
2use http::Response;
3use monoio::io::{
4 sink::{Sink, SinkExt},
5 stream::Stream,
6 AsyncReadRent, AsyncWriteRent,
7};
8use monoio_http::{
9 common::{
10 body::{Body, HttpBody},
11 error::HttpError,
12 request::{Request, RequestHead},
13 IntoParts,
14 },
15 h1::{
16 codec::{
17 decoder::{DecodeError, PayloadDecoder},
18 ClientCodec,
19 },
20 payload::{fixed_payload_pair, stream_payload_pair, Payload},
21 },
22 h2::client::SendRequest,
23};
24
25use crate::pool::{Key, Poolable, Pooled};
26
27pub struct Http1Connection<IO: AsyncWriteRent> {
29 framed: ClientCodec<IO>,
30 using: bool,
31 open: bool,
32}
33
34impl<IO: AsyncWriteRent> Http1Connection<IO> {
35 pub fn new(framed: ClientCodec<IO>) -> Self {
36 Self {
37 framed,
38 using: false,
39 open: true,
40 }
41 }
42}
43
44impl<IO: AsyncWriteRent> Poolable for Http1Connection<IO> {
45 #[inline]
46 fn is_open(&self) -> bool {
47 match self {
48 Self { using, open, .. } => *open && !*using,
49 }
50 }
51}
52
53impl<IO: AsyncReadRent + AsyncWriteRent> Http1Connection<IO> {
54 pub async fn send_request<R, E>(
55 &mut self,
56 request: R,
57 ) -> (Result<Response<HttpBody>, HttpError>, bool)
58 where
59 ClientCodec<IO>: Sink<R, Error = E>,
60 E: std::fmt::Debug + Into<HttpError>,
61 {
62 let handle = &mut self.framed;
63
64 if let Err(e) = handle.send_and_flush(request).await {
65 #[cfg(feature = "logging")]
66 tracing::error!("send upstream request error {:?}", e);
67 self.open = false;
68 return (Err(e.into()), false);
69 }
70
71 match handle.next().await {
72 Some(Ok(resp)) => {
73 let (parts, payload_decoder) = resp.into_parts();
74 match payload_decoder {
75 PayloadDecoder::None => {
76 let payload = Payload::None;
77 let response = Response::from_parts(parts, payload.into());
78 (Ok(response), false)
79 }
80 PayloadDecoder::Fixed(_) => {
81 let mut framed_payload = payload_decoder.with_io(handle);
82 let (payload, payload_sender) = fixed_payload_pair();
83 if let Some(data) = framed_payload.next_data().await {
84 payload_sender.feed(data)
85 }
86 let payload = Payload::Fixed(payload);
87 let response = Response::from_parts(parts, payload.into());
88 (Ok(response), false)
89 }
90 PayloadDecoder::Streamed(_) => {
91 let mut framed_payload = payload_decoder.with_io(handle);
92 let (payload, mut payload_sender) = stream_payload_pair();
93 loop {
94 match framed_payload.next_data().await {
95 Some(Ok(data)) => payload_sender.feed_data(Some(data)),
96 Some(Err(e)) => {
97 #[cfg(feature = "logging")]
98 tracing::error!("decode upstream response error {:?}", e);
99 self.open = false;
100 return (Err(e), false);
101 }
102 None => {
103 payload_sender.feed_data(None);
104 break;
105 }
106 }
107 }
108 let payload = Payload::Stream(payload);
109 let response = Response::from_parts(parts, payload.into());
110 (Ok(response), false)
111 }
112 }
113 }
114 Some(Err(e)) => {
115 #[cfg(feature = "logging")]
116 tracing::error!("decode upstream response error {:?}", e);
117 self.open = false;
118 (Err(e), false)
119 }
120 None => {
121 #[cfg(feature = "logging")]
122 tracing::error!("upstream return eof");
123 self.open = false;
124 (Err(DecodeError::UnexpectedEof.into()), false)
125 }
126 }
127 }
128}
129
130#[derive(Clone, Debug)]
132pub struct Http2Connection {
133 tx: SendRequest<Bytes>,
134}
135
136impl Poolable for Http2Connection {
137 #[inline]
138 fn is_open(&self) -> bool {
139 !self.tx.has_conn_error()
140 }
141}
142
143impl Http2Connection {
144 pub fn new(tx: SendRequest<Bytes>) -> Self {
145 Self { tx }
146 }
147
148 #[allow(dead_code)]
149 fn to_owned(&self) -> Self {
150 Self {
151 tx: self.tx.clone(),
152 }
153 }
154
155 pub fn conn_error(&self) -> Option<HttpError> {
156 self.tx.conn_error()
157 }
158}
159
160impl Http2Connection {
161 pub async fn send_request<R>(
162 &mut self,
163 request: R,
164 ) -> (Result<Response<HttpBody>, HttpError>, bool)
165 where
166 R: IntoParts<Parts = RequestHead>,
167 R::Body: Body<Data = Bytes, Error = HttpError>,
168 {
169 let mut client = match self.tx.clone().ready().await {
170 Ok(client) => client,
171 Err(e) => {
172 return (Err(e.into()), false);
173 }
174 };
175
176 let (parts, mut body) = request.into_parts();
177 let h2_request = Request::from_parts(parts, ());
178
179 let (response, mut send_stream) = match client.send_request(h2_request, false) {
180 Ok((response, send_stream)) => (response, send_stream),
181 Err(e) => {
182 return (Err(e.into()), false);
183 }
184 };
185
186 while let Some(data) = body.next_data().await {
187 match data {
188 Ok(data) => {
189 if let Err(e) = send_stream.send_data(data, false) {
190 #[cfg(feature = "logging")]
191 tracing::error!("H2 client body send error {:?}", e);
192 return (Err(e.into()), false);
193 }
194 }
195 Err(e) => {
196 #[cfg(feature = "logging")]
197 tracing::error!("H2 request body stream error {:?}", e);
198 return (Err(e), false);
199 }
200 }
201 }
202 let _ = send_stream.send_data(Bytes::new(), true);
204
205 let response = match response.await {
206 Ok(response) => response,
207 Err(e) => {
208 #[cfg(feature = "logging")]
209 tracing::error!("H2 client response error {:?}", e);
210 return (Err(e.into()), false);
211 }
212 };
213
214 let (parts, body) = response.into_parts();
215 (Ok(Response::from_parts(parts, body.into())), true)
216 }
217}
218
219pub enum HttpConnection<K: Key, IO: AsyncReadRent + AsyncWriteRent> {
225 Http1(Pooled<K, Http1Connection<IO>>),
226 Http2(Http2Connection),
227}
228
229impl<K: Key, IO: AsyncWriteRent + AsyncReadRent> Poolable for HttpConnection<K, IO> {
230 #[inline]
231 fn is_open(&self) -> bool {
232 match self {
233 Self::Http1(conn) => conn.is_open(),
234 Self::Http2(conn) => conn.is_open(),
235 }
236 }
237}
238
239impl<K: Key, IO: AsyncReadRent + AsyncWriteRent> From<Pooled<K, Http1Connection<IO>>>
240 for HttpConnection<K, IO>
241{
242 fn from(pooled_conn: Pooled<K, Http1Connection<IO>>) -> Self {
243 Self::Http1(pooled_conn)
244 }
245}
246
247impl<K: Key, IO: AsyncReadRent + AsyncWriteRent> From<Http2Connection> for HttpConnection<K, IO> {
248 fn from(conn: Http2Connection) -> Self {
249 Self::Http2(conn)
250 }
251}
252
253impl<K: Key, IO: AsyncReadRent + AsyncWriteRent> HttpConnection<K, IO> {
254 pub async fn send_request<R, E>(
288 &mut self,
289 request: R,
290 ) -> (Result<Response<HttpBody>, HttpError>, bool)
291 where
292 ClientCodec<IO>: Sink<R, Error = E>,
293 E: std::fmt::Debug + Into<HttpError>,
294 R: IntoParts<Parts = RequestHead>,
295 R::Body: Body<Data = Bytes, Error = HttpError>,
296 {
297 match self {
298 Self::Http1(conn) => conn.send_request(request).await,
299 Self::Http2(conn) => conn.send_request(request).await,
300 }
301 }
302}