rama_hyper/client/conn/http1.rs
1//! HTTP/1 client connections
2
3use std::error::Error as StdError;
4use std::fmt;
5use std::future::Future;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9use crate::rt::{Read, Write};
10use bytes::Bytes;
11use http::{Request, Response};
12use httparse::ParserConfig;
13
14use super::super::dispatch;
15use crate::body::{Body, Incoming as IncomingBody};
16use crate::proto;
17
18type Dispatcher<T, B> =
19 proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, proto::h1::ClientTransaction>;
20
21/// The sender side of an established connection.
22pub struct SendRequest<B> {
23 dispatch: dispatch::Sender<Request<B>, Response<IncomingBody>>,
24}
25
26/// Deconstructed parts of a `Connection`.
27///
28/// This allows taking apart a `Connection` at a later time, in order to
29/// reclaim the IO object, and additional related pieces.
30#[derive(Debug)]
31pub struct Parts<T> {
32 /// The original IO object used in the handshake.
33 pub io: T,
34 /// A buffer of bytes that have been read but not processed as HTTP.
35 ///
36 /// For instance, if the `Connection` is used for an HTTP upgrade request,
37 /// it is possible the server sent back the first bytes of the new protocol
38 /// along with the response upgrade.
39 ///
40 /// You will want to check for any existing bytes if you plan to continue
41 /// communicating on the IO object.
42 pub read_buf: Bytes,
43 _inner: (),
44}
45
46/// A future that processes all HTTP state for the IO object.
47///
48/// In most cases, this should just be spawned into an executor, so that it
49/// can process incoming and outgoing messages, notice hangups, and the like.
50#[must_use = "futures do nothing unless polled"]
51pub struct Connection<T, B>
52where
53 T: Read + Write + 'static,
54 B: Body + 'static,
55{
56 inner: Dispatcher<T, B>,
57}
58
59impl<T, B> Connection<T, B>
60where
61 T: Read + Write + Unpin + 'static,
62 B: Body + 'static,
63 B::Error: Into<Box<dyn StdError + Send + Sync>>,
64{
65 /// Return the inner IO object, and additional information.
66 ///
67 /// Only works for HTTP/1 connections. HTTP/2 connections will panic.
68 pub fn into_parts(self) -> Parts<T> {
69 let (io, read_buf, _) = self.inner.into_inner();
70 Parts {
71 io,
72 read_buf,
73 _inner: (),
74 }
75 }
76
77 /// Poll the connection for completion, but without calling `shutdown`
78 /// on the underlying IO.
79 ///
80 /// This is useful to allow running a connection while doing an HTTP
81 /// upgrade. Once the upgrade is completed, the connection would be "done",
82 /// but it is not desired to actually shutdown the IO object. Instead you
83 /// would take it back using `into_parts`.
84 ///
85 /// Use [`poll_fn`](https://docs.rs/futures/0.1.25/futures/future/fn.poll_fn.html)
86 /// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html)
87 /// to work with this function; or use the `without_shutdown` wrapper.
88 pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
89 self.inner.poll_without_shutdown(cx)
90 }
91
92 /// Prevent shutdown of the underlying IO object at the end of service the request,
93 /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`.
94 pub async fn without_shutdown(self) -> crate::Result<Parts<T>> {
95 let mut conn = Some(self);
96 futures_util::future::poll_fn(move |cx| -> Poll<crate::Result<Parts<T>>> {
97 ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?;
98 Poll::Ready(Ok(conn.take().unwrap().into_parts()))
99 })
100 .await
101 }
102}
103
104/// A builder to configure an HTTP connection.
105///
106/// After setting options, the builder is used to create a handshake future.
107///
108/// **Note**: The default values of options are *not considered stable*. They
109/// are subject to change at any time.
110#[derive(Clone, Debug)]
111pub struct Builder {
112 h09_responses: bool,
113 h1_parser_config: ParserConfig,
114 h1_writev: Option<bool>,
115 h1_title_case_headers: bool,
116 h1_preserve_header_case: bool,
117 #[cfg(feature = "ffi")]
118 h1_preserve_header_order: bool,
119 h1_read_buf_exact_size: Option<usize>,
120 h1_max_buf_size: Option<usize>,
121}
122
123/// Returns a handshake future over some IO.
124///
125/// This is a shortcut for `Builder::new().handshake(io)`.
126/// See [`client::conn`](crate::client::conn) for more.
127pub async fn handshake<T, B>(io: T) -> crate::Result<(SendRequest<B>, Connection<T, B>)>
128where
129 T: Read + Write + Unpin + 'static,
130 B: Body + 'static,
131 B::Data: Send,
132 B::Error: Into<Box<dyn StdError + Send + Sync>>,
133{
134 Builder::new().handshake(io).await
135}
136
137// ===== impl SendRequest
138
139impl<B> SendRequest<B> {
140 /// Polls to determine whether this sender can be used yet for a request.
141 ///
142 /// If the associated connection is closed, this returns an Error.
143 pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
144 self.dispatch.poll_ready(cx)
145 }
146
147 /// Waits until the dispatcher is ready
148 ///
149 /// If the associated connection is closed, this returns an Error.
150 pub async fn ready(&mut self) -> crate::Result<()> {
151 futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
152 }
153
154 /// Checks if the connection is currently ready to send a request.
155 ///
156 /// # Note
157 ///
158 /// This is mostly a hint. Due to inherent latency of networks, it is
159 /// possible that even after checking this is ready, sending a request
160 /// may still fail because the connection was closed in the meantime.
161 pub fn is_ready(&self) -> bool {
162 self.dispatch.is_ready()
163 }
164
165 /// Checks if the connection side has been closed.
166 pub fn is_closed(&self) -> bool {
167 self.dispatch.is_closed()
168 }
169}
170
171impl<B> SendRequest<B>
172where
173 B: Body + 'static,
174{
175 /// Sends a `Request` on the associated connection.
176 ///
177 /// Returns a future that if successful, yields the `Response`.
178 ///
179 /// # Note
180 ///
181 /// There are some key differences in what automatic things the `Client`
182 /// does for you that will not be done here:
183 ///
184 /// - `Client` requires absolute-form `Uri`s, since the scheme and
185 /// authority are needed to connect. They aren't required here.
186 /// - Since the `Client` requires absolute-form `Uri`s, it can add
187 /// the `Host` header based on it. You must add a `Host` header yourself
188 /// before calling this method.
189 /// - Since absolute-form `Uri`s are not required, if received, they will
190 /// be serialized as-is.
191 pub fn send_request(
192 &mut self,
193 req: Request<B>,
194 ) -> impl Future<Output = crate::Result<Response<IncomingBody>>> {
195 let sent = self.dispatch.send(req);
196
197 async move {
198 match sent {
199 Ok(rx) => match rx.await {
200 Ok(Ok(resp)) => Ok(resp),
201 Ok(Err(err)) => Err(err),
202 // this is definite bug if it happens, but it shouldn't happen!
203 Err(_canceled) => panic!("dispatch dropped without returning error"),
204 },
205 Err(_req) => {
206 debug!("connection was not ready");
207 Err(crate::Error::new_canceled().with("connection was not ready"))
208 }
209 }
210 }
211 }
212
213 /*
214 pub(super) fn send_request_retryable(
215 &mut self,
216 req: Request<B>,
217 ) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>> + Unpin
218 where
219 B: Send,
220 {
221 match self.dispatch.try_send(req) {
222 Ok(rx) => {
223 Either::Left(rx.then(move |res| {
224 match res {
225 Ok(Ok(res)) => future::ok(res),
226 Ok(Err(err)) => future::err(err),
227 // this is definite bug if it happens, but it shouldn't happen!
228 Err(_) => panic!("dispatch dropped without returning error"),
229 }
230 }))
231 }
232 Err(req) => {
233 debug!("connection was not ready");
234 let err = crate::Error::new_canceled().with("connection was not ready");
235 Either::Right(future::err((err, Some(req))))
236 }
237 }
238 }
239 */
240}
241
242impl<B> fmt::Debug for SendRequest<B> {
243 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
244 f.debug_struct("SendRequest").finish()
245 }
246}
247
248// ===== impl Connection
249
250impl<T, B> Connection<T, B>
251where
252 T: Read + Write + Unpin + Send + 'static,
253 B: Body + 'static,
254 B::Error: Into<Box<dyn StdError + Send + Sync>>,
255{
256 /// Enable this connection to support higher-level HTTP upgrades.
257 ///
258 /// See [the `upgrade` module](crate::upgrade) for more.
259 pub fn with_upgrades(self) -> upgrades::UpgradeableConnection<T, B> {
260 upgrades::UpgradeableConnection { inner: Some(self) }
261 }
262}
263
264impl<T, B> fmt::Debug for Connection<T, B>
265where
266 T: Read + Write + fmt::Debug + 'static,
267 B: Body + 'static,
268{
269 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
270 f.debug_struct("Connection").finish()
271 }
272}
273
274impl<T, B> Future for Connection<T, B>
275where
276 T: Read + Write + Unpin + 'static,
277 B: Body + 'static,
278 B::Data: Send,
279 B::Error: Into<Box<dyn StdError + Send + Sync>>,
280{
281 type Output = crate::Result<()>;
282
283 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
284 match ready!(Pin::new(&mut self.inner).poll(cx))? {
285 proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
286 proto::Dispatched::Upgrade(pending) => {
287 // With no `Send` bound on `I`, we can't try to do
288 // upgrades here. In case a user was trying to use
289 // `upgrade` with this API, send a special
290 // error letting them know about that.
291 pending.manual();
292 Poll::Ready(Ok(()))
293 }
294 }
295 }
296}
297
298// ===== impl Builder
299
300impl Builder {
301 /// Creates a new connection builder.
302 #[inline]
303 pub fn new() -> Builder {
304 Builder {
305 h09_responses: false,
306 h1_writev: None,
307 h1_read_buf_exact_size: None,
308 h1_parser_config: Default::default(),
309 h1_title_case_headers: false,
310 h1_preserve_header_case: false,
311 #[cfg(feature = "ffi")]
312 h1_preserve_header_order: false,
313 h1_max_buf_size: None,
314 }
315 }
316
317 /// Set whether HTTP/0.9 responses should be tolerated.
318 ///
319 /// Default is false.
320 pub fn http09_responses(&mut self, enabled: bool) -> &mut Builder {
321 self.h09_responses = enabled;
322 self
323 }
324
325 /// Set whether HTTP/1 connections will accept spaces between header names
326 /// and the colon that follow them in responses.
327 ///
328 /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
329 /// to say about it:
330 ///
331 /// > No whitespace is allowed between the header field-name and colon. In
332 /// > the past, differences in the handling of such whitespace have led to
333 /// > security vulnerabilities in request routing and response handling. A
334 /// > server MUST reject any received request message that contains
335 /// > whitespace between a header field-name and colon with a response code
336 /// > of 400 (Bad Request). A proxy MUST remove any such whitespace from a
337 /// > response message before forwarding the message downstream.
338 ///
339 /// Default is false.
340 ///
341 /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
342 pub fn allow_spaces_after_header_name_in_responses(&mut self, enabled: bool) -> &mut Builder {
343 self.h1_parser_config
344 .allow_spaces_after_header_name_in_responses(enabled);
345 self
346 }
347
348 /// Set whether HTTP/1 connections will accept obsolete line folding for
349 /// header values.
350 ///
351 /// Newline codepoints (`\r` and `\n`) will be transformed to spaces when
352 /// parsing.
353 ///
354 /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
355 /// to say about it:
356 ///
357 /// > A server that receives an obs-fold in a request message that is not
358 /// > within a message/http container MUST either reject the message by
359 /// > sending a 400 (Bad Request), preferably with a representation
360 /// > explaining that obsolete line folding is unacceptable, or replace
361 /// > each received obs-fold with one or more SP octets prior to
362 /// > interpreting the field value or forwarding the message downstream.
363 ///
364 /// > A proxy or gateway that receives an obs-fold in a response message
365 /// > that is not within a message/http container MUST either discard the
366 /// > message and replace it with a 502 (Bad Gateway) response, preferably
367 /// > with a representation explaining that unacceptable line folding was
368 /// > received, or replace each received obs-fold with one or more SP
369 /// > octets prior to interpreting the field value or forwarding the
370 /// > message downstream.
371 ///
372 /// > A user agent that receives an obs-fold in a response message that is
373 /// > not within a message/http container MUST replace each received
374 /// > obs-fold with one or more SP octets prior to interpreting the field
375 /// > value.
376 ///
377 /// Default is false.
378 ///
379 /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
380 pub fn allow_obsolete_multiline_headers_in_responses(&mut self, enabled: bool) -> &mut Builder {
381 self.h1_parser_config
382 .allow_obsolete_multiline_headers_in_responses(enabled);
383 self
384 }
385
386 /// Set whether HTTP/1 connections will silently ignored malformed header lines.
387 ///
388 /// If this is enabled and and a header line does not start with a valid header
389 /// name, or does not include a colon at all, the line will be silently ignored
390 /// and no error will be reported.
391 ///
392 /// Default is false.
393 pub fn ignore_invalid_headers_in_responses(&mut self, enabled: bool) -> &mut Builder {
394 self.h1_parser_config
395 .ignore_invalid_headers_in_responses(enabled);
396 self
397 }
398
399 /// Set whether HTTP/1 connections should try to use vectored writes,
400 /// or always flatten into a single buffer.
401 ///
402 /// Note that setting this to false may mean more copies of body data,
403 /// but may also improve performance when an IO transport doesn't
404 /// support vectored writes well, such as most TLS implementations.
405 ///
406 /// Setting this to true will force hyper to use queued strategy
407 /// which may eliminate unnecessary cloning on some TLS backends
408 ///
409 /// Default is `auto`. In this mode hyper will try to guess which
410 /// mode to use
411 pub fn writev(&mut self, enabled: bool) -> &mut Builder {
412 self.h1_writev = Some(enabled);
413 self
414 }
415
416 /// Set whether HTTP/1 connections will write header names as title case at
417 /// the socket level.
418 ///
419 /// Default is false.
420 pub fn title_case_headers(&mut self, enabled: bool) -> &mut Builder {
421 self.h1_title_case_headers = enabled;
422 self
423 }
424
425 /// Set whether to support preserving original header cases.
426 ///
427 /// Currently, this will record the original cases received, and store them
428 /// in a private extension on the `Response`. It will also look for and use
429 /// such an extension in any provided `Request`.
430 ///
431 /// Since the relevant extension is still private, there is no way to
432 /// interact with the original cases. The only effect this can have now is
433 /// to forward the cases in a proxy-like fashion.
434 ///
435 /// Default is false.
436 pub fn preserve_header_case(&mut self, enabled: bool) -> &mut Builder {
437 self.h1_preserve_header_case = enabled;
438 self
439 }
440
441 /// Set whether to support preserving original header order.
442 ///
443 /// Currently, this will record the order in which headers are received, and store this
444 /// ordering in a private extension on the `Response`. It will also look for and use
445 /// such an extension in any provided `Request`.
446 ///
447 /// Default is false.
448 #[cfg(feature = "ffi")]
449 pub fn preserve_header_order(&mut self, enabled: bool) -> &mut Builder {
450 self.h1_preserve_header_order = enabled;
451 self
452 }
453
454 /// Sets the exact size of the read buffer to *always* use.
455 ///
456 /// Note that setting this option unsets the `max_buf_size` option.
457 ///
458 /// Default is an adaptive read buffer.
459 pub fn read_buf_exact_size(&mut self, sz: Option<usize>) -> &mut Builder {
460 self.h1_read_buf_exact_size = sz;
461 self.h1_max_buf_size = None;
462 self
463 }
464
465 /// Set the maximum buffer size for the connection.
466 ///
467 /// Default is ~400kb.
468 ///
469 /// Note that setting this option unsets the `read_exact_buf_size` option.
470 ///
471 /// # Panics
472 ///
473 /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
474 pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
475 assert!(
476 max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE,
477 "the max_buf_size cannot be smaller than the minimum that h1 specifies."
478 );
479
480 self.h1_max_buf_size = Some(max);
481 self.h1_read_buf_exact_size = None;
482 self
483 }
484
485 /// Constructs a connection with the configured options and IO.
486 /// See [`client::conn`](crate::client::conn) for more.
487 ///
488 /// Note, if [`Connection`] is not `await`-ed, [`SendRequest`] will
489 /// do nothing.
490 pub fn handshake<T, B>(
491 &self,
492 io: T,
493 ) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
494 where
495 T: Read + Write + Unpin + 'static,
496 B: Body + 'static,
497 B::Data: Send,
498 B::Error: Into<Box<dyn StdError + Send + Sync>>,
499 {
500 let opts = self.clone();
501
502 async move {
503 trace!("client handshake HTTP/1");
504
505 let (tx, rx) = dispatch::channel();
506 let mut conn = proto::Conn::new(io);
507 conn.set_h1_parser_config(opts.h1_parser_config);
508 if let Some(writev) = opts.h1_writev {
509 if writev {
510 conn.set_write_strategy_queue();
511 } else {
512 conn.set_write_strategy_flatten();
513 }
514 }
515 if opts.h1_title_case_headers {
516 conn.set_title_case_headers();
517 }
518 if opts.h1_preserve_header_case {
519 conn.set_preserve_header_case();
520 }
521 #[cfg(feature = "ffi")]
522 if opts.h1_preserve_header_order {
523 conn.set_preserve_header_order();
524 }
525
526 if opts.h09_responses {
527 conn.set_h09_responses();
528 }
529
530 if let Some(sz) = opts.h1_read_buf_exact_size {
531 conn.set_read_buf_exact_size(sz);
532 }
533 if let Some(max) = opts.h1_max_buf_size {
534 conn.set_max_buf_size(max);
535 }
536 let cd = proto::h1::dispatch::Client::new(rx);
537 let proto = proto::h1::Dispatcher::new(cd, conn);
538
539 Ok((SendRequest { dispatch: tx }, Connection { inner: proto }))
540 }
541 }
542}
543
544mod upgrades {
545 use crate::upgrade::Upgraded;
546
547 use super::*;
548
549 // A future binding a connection with a Service with Upgrade support.
550 //
551 // This type is unnameable outside the crate.
552 #[must_use = "futures do nothing unless polled"]
553 #[allow(missing_debug_implementations)]
554 pub struct UpgradeableConnection<T, B>
555 where
556 T: Read + Write + Unpin + Send + 'static,
557 B: Body + 'static,
558 B::Error: Into<Box<dyn StdError + Send + Sync>>,
559 {
560 pub(super) inner: Option<Connection<T, B>>,
561 }
562
563 impl<I, B> Future for UpgradeableConnection<I, B>
564 where
565 I: Read + Write + Unpin + Send + 'static,
566 B: Body + 'static,
567 B::Data: Send,
568 B::Error: Into<Box<dyn StdError + Send + Sync>>,
569 {
570 type Output = crate::Result<()>;
571
572 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
573 match ready!(Pin::new(&mut self.inner.as_mut().unwrap().inner).poll(cx)) {
574 Ok(proto::Dispatched::Shutdown) => Poll::Ready(Ok(())),
575 Ok(proto::Dispatched::Upgrade(pending)) => {
576 let Parts {
577 io,
578 read_buf,
579 _inner,
580 } = self.inner.take().unwrap().into_parts();
581 pending.fulfill(Upgraded::new(io, read_buf));
582 Poll::Ready(Ok(()))
583 }
584 Err(e) => Poll::Ready(Err(e)),
585 }
586 }
587 }
588}