1use std::{
4 future::Future,
5 pin::Pin,
6 task::{ready, Context, Poll},
7};
8
9use bytes::Bytes;
10use http::{Request, Response};
11use http_body::Body;
12use httparse::ParserConfig;
13use tokio::io::{AsyncRead, AsyncWrite};
14
15use crate::{
16 body::Incoming,
17 dispatch::{self, TrySendError},
18 error::BoxError,
19 proto::{
20 self,
21 http1::{self, conn::Conn, role::Client, Http1Options},
22 },
23 Error, Result,
24};
25
26pub struct SendRequest<B> {
28 dispatch: dispatch::Sender<Request<B>, Response<Incoming>>,
29}
30
31#[derive(Debug)]
36#[non_exhaustive]
37pub struct Parts<T> {
38 pub io: T,
40 pub read_buf: Bytes,
49}
50
51#[must_use = "futures do nothing unless polled"]
56pub struct Connection<T, B>
57where
58 T: AsyncRead + AsyncWrite,
59 B: Body + 'static,
60{
61 inner: http1::dispatch::Dispatcher<http1::dispatch::Client<B>, B, T, Client>,
62}
63
64impl<T, B> Connection<T, B>
65where
66 T: AsyncRead + AsyncWrite + Unpin,
67 B: Body + 'static,
68 B::Error: Into<BoxError>,
69{
70 #[inline]
74 pub fn into_parts(self) -> Parts<T> {
75 let (io, read_buf, _) = self.inner.into_inner();
76 Parts { io, read_buf }
77 }
78}
79
80#[derive(Debug, Default, Clone)]
87pub struct Builder {
88 opts: Http1Options,
89}
90
91impl<B> SendRequest<B> {
94 #[inline]
98 pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
99 self.dispatch.poll_ready(cx)
100 }
101
102 #[inline]
106 pub async fn ready(&mut self) -> Result<()> {
107 std::future::poll_fn(|cx| self.poll_ready(cx)).await
108 }
109
110 #[inline]
118 pub fn is_ready(&self) -> bool {
119 self.dispatch.is_ready()
120 }
121}
122
123impl<B> SendRequest<B>
124where
125 B: Body + 'static,
126{
127 pub fn try_send_request(
136 &mut self,
137 req: Request<B>,
138 ) -> impl Future<Output = Result<Response<Incoming>, TrySendError<Request<B>>>> {
139 let sent = self.dispatch.try_send(req);
140 async move {
141 match sent {
142 Ok(rx) => match rx.await {
143 Ok(res) => res,
144 Err(_) => panic!("dispatch dropped without returning error"),
146 },
147 Err(req) => {
148 debug!("connection was not ready");
149 Err(TrySendError {
150 error: Error::new_canceled().with("connection was not ready"),
151 message: Some(req),
152 })
153 }
154 }
155 }
156 }
157}
158
159impl<T, B> Connection<T, B>
162where
163 T: AsyncRead + AsyncWrite + Unpin + Send,
164 B: Body + 'static,
165 B::Error: Into<BoxError>,
166{
167 #[inline]
169 pub fn with_upgrades(self) -> upgrades::UpgradeableConnection<T, B> {
170 upgrades::UpgradeableConnection { inner: Some(self) }
171 }
172
173 pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
185 self.inner.poll_without_shutdown(cx)
186 }
187
188 pub async fn without_shutdown(self) -> crate::Result<Parts<T>> {
191 let mut conn = Some(self);
192 std::future::poll_fn(move |cx| -> Poll<crate::Result<Parts<T>>> {
193 ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?;
194 Poll::Ready(Ok(conn.take().unwrap().into_parts()))
195 })
196 .await
197 }
198}
199
200impl<T, B> Future for Connection<T, B>
201where
202 T: AsyncRead + AsyncWrite + Unpin,
203 B: Body + 'static,
204 B::Data: Send,
205 B::Error: Into<BoxError>,
206{
207 type Output = Result<()>;
208
209 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
210 match ready!(Pin::new(&mut self.inner).poll(cx))? {
211 proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
212 proto::Dispatched::Upgrade(pending) => {
213 pending.manual();
218 Poll::Ready(Ok(()))
219 }
220 }
221 }
222}
223
224impl Builder {
227 #[inline]
229 pub fn options(mut self, opts: Http1Options) -> Self {
230 self.opts = opts;
231 self
232 }
233
234 pub async fn handshake<T, B>(self, io: T) -> Result<(SendRequest<B>, Connection<T, B>)>
239 where
240 T: AsyncRead + AsyncWrite + Unpin,
241 B: Body + 'static,
242 B::Data: Send,
243 B::Error: Into<BoxError>,
244 {
245 trace!("client handshake HTTP/1");
246
247 let (tx, rx) = dispatch::channel();
248 let mut conn = Conn::new(io);
249
250 let h1_parser_config = {
252 let mut h1_parser_config = ParserConfig::default();
253 h1_parser_config
254 .ignore_invalid_headers_in_responses(self.opts.ignore_invalid_headers_in_responses)
255 .allow_spaces_after_header_name_in_responses(
256 self.opts.allow_spaces_after_header_name_in_responses,
257 )
258 .allow_obsolete_multiline_headers_in_responses(
259 self.opts.allow_obsolete_multiline_headers_in_responses,
260 );
261 h1_parser_config
262 };
263 conn.set_h1_parser_config(h1_parser_config);
264
265 if let Some(writev) = self.opts.h1_writev {
267 if writev {
268 conn.set_write_strategy_queue();
269 } else {
270 conn.set_write_strategy_flatten();
271 }
272 }
273
274 if let Some(max_headers) = self.opts.h1_max_headers {
276 conn.set_http1_max_headers(max_headers);
277 }
278
279 if self.opts.h09_responses {
281 conn.set_h09_responses();
282 }
283
284 if let Some(sz) = self.opts.h1_read_buf_exact_size {
286 conn.set_read_buf_exact_size(sz);
287 }
288
289 if let Some(max) = self.opts.h1_max_buf_size {
291 conn.set_max_buf_size(max);
292 }
293
294 let cd = http1::dispatch::Client::new(rx);
295 let proto = http1::dispatch::Dispatcher::new(cd, conn);
296
297 Ok((SendRequest { dispatch: tx }, Connection { inner: proto }))
298 }
299}
300
301mod upgrades {
302 use super::*;
303 use crate::upgrade::Upgraded;
304
305 #[must_use = "futures do nothing unless polled"]
309 pub struct UpgradeableConnection<T, B>
310 where
311 T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
312 B: Body + 'static,
313 B::Error: Into<BoxError>,
314 {
315 pub(super) inner: Option<Connection<T, B>>,
316 }
317
318 impl<I, B> Future for UpgradeableConnection<I, B>
319 where
320 I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
321 B: Body + 'static,
322 B::Data: Send,
323 B::Error: Into<BoxError>,
324 {
325 type Output = Result<()>;
326
327 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
328 match ready!(Pin::new(&mut self.inner.as_mut().unwrap().inner).poll(cx)) {
329 Ok(proto::Dispatched::Shutdown) => Poll::Ready(Ok(())),
330 Ok(proto::Dispatched::Upgrade(pending)) => {
331 let Parts { io, read_buf } = self.inner.take().unwrap().into_parts();
332 pending.fulfill(Upgraded::new(io, read_buf));
333 Poll::Ready(Ok(()))
334 }
335 Err(e) => Poll::Ready(Err(e)),
336 }
337 }
338 }
339}