rama_hyper/server/conn/http1.rs
1//! HTTP/1 Server Connections
2
3use std::error::Error as StdError;
4use std::fmt;
5use std::future::Future;
6use std::marker::Unpin;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::task::{Context, Poll};
10use std::time::Duration;
11
12use crate::rt::{Read, Write};
13use crate::upgrade::Upgraded;
14use bytes::Bytes;
15
16use crate::body::{Body, Incoming as IncomingBody};
17use crate::proto;
18use crate::service::HttpService;
19use crate::{
20 common::time::{Dur, Time},
21 rt::Timer,
22};
23
24type Http1Dispatcher<T, B, S> = proto::h1::Dispatcher<
25 proto::h1::dispatch::Server<S, IncomingBody>,
26 B,
27 T,
28 proto::ServerTransaction,
29>;
30
31pin_project_lite::pin_project! {
32 /// A [`Future`](core::future::Future) representing an HTTP/1 connection, bound to a
33 /// [`Service`](crate::service::Service), returned from
34 /// [`Builder::serve_connection`](struct.Builder.html#method.serve_connection).
35 ///
36 /// To drive HTTP on this connection this future **must be polled**, typically with
37 /// `.await`. If it isn't polled, no progress will be made on this connection.
38 #[must_use = "futures do nothing unless polled"]
39 pub struct Connection<T, S>
40 where
41 S: HttpService<IncomingBody>,
42 {
43 conn: Http1Dispatcher<T, S::ResBody, S>,
44 }
45}
46
47/// A configuration builder for HTTP/1 server connections.
48///
49/// **Note**: The default values of options are *not considered stable*. They
50/// are subject to change at any time.
51///
52/// # Example
53///
54/// ```
55/// # use std::time::Duration;
56/// # use hyper::server::conn::http1::Builder;
57/// # fn main() {
58/// let mut http = Builder::new();
59/// // Set options one at a time
60/// http.header_read_timeout(Duration::from_millis(200));
61///
62/// // Or, chain multiple options
63/// http.keep_alive(false).title_case_headers(true).max_buf_size(8192);
64///
65/// # }
66/// ```
67///
68/// Use [`Builder::serve_connection`](struct.Builder.html#method.serve_connection)
69/// to bind the built connection to a service.
70#[derive(Clone, Debug)]
71pub struct Builder {
72 timer: Time,
73 h1_half_close: bool,
74 h1_keep_alive: bool,
75 h1_title_case_headers: bool,
76 h1_preserve_header_case: bool,
77 h1_header_read_timeout: Dur,
78 h1_writev: Option<bool>,
79 max_buf_size: Option<usize>,
80 pipeline_flush: bool,
81}
82
83/// Deconstructed parts of a `Connection`.
84///
85/// This allows taking apart a `Connection` at a later time, in order to
86/// reclaim the IO object, and additional related pieces.
87#[derive(Debug)]
88pub struct Parts<T, S> {
89 /// The original IO object used in the handshake.
90 pub io: T,
91 /// A buffer of bytes that have been read but not processed as HTTP.
92 ///
93 /// If the client sent additional bytes after its last request, and
94 /// this connection "ended" with an upgrade, the read buffer will contain
95 /// those bytes.
96 ///
97 /// You will want to check for any existing bytes if you plan to continue
98 /// communicating on the IO object.
99 pub read_buf: Bytes,
100 /// The `Service` used to serve this connection.
101 pub service: S,
102 _inner: (),
103}
104
105// ===== impl Connection =====
106
107impl<I, S> fmt::Debug for Connection<I, S>
108where
109 S: HttpService<IncomingBody>,
110{
111 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
112 f.debug_struct("Connection").finish()
113 }
114}
115
116impl<I, B, S> Connection<I, S>
117where
118 S: HttpService<IncomingBody, ResBody = B>,
119 S::Error: Into<Box<dyn StdError + Send + Sync>>,
120 I: Read + Write + Unpin,
121 B: Body + 'static,
122 B::Error: Into<Box<dyn StdError + Send + Sync>>,
123{
124 /// Start a graceful shutdown process for this connection.
125 ///
126 /// This `Connection` should continue to be polled until shutdown
127 /// can finish.
128 ///
129 /// # Note
130 ///
131 /// This should only be called while the `Connection` future is still
132 /// pending. If called after `Connection::poll` has resolved, this does
133 /// nothing.
134 pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
135 self.conn.disable_keep_alive();
136 }
137
138 /// Return the inner IO object, and additional information.
139 ///
140 /// If the IO object has been "rewound" the io will not contain those bytes rewound.
141 /// This should only be called after `poll_without_shutdown` signals
142 /// that the connection is "done". Otherwise, it may not have finished
143 /// flushing all necessary HTTP bytes.
144 ///
145 /// # Panics
146 /// This method will panic if this connection is using an h2 protocol.
147 pub fn into_parts(self) -> Parts<I, S> {
148 let (io, read_buf, dispatch) = self.conn.into_inner();
149 Parts {
150 io,
151 read_buf,
152 service: dispatch.into_service(),
153 _inner: (),
154 }
155 }
156
157 /// Poll the connection for completion, but without calling `shutdown`
158 /// on the underlying IO.
159 ///
160 /// This is useful to allow running a connection while doing an HTTP
161 /// upgrade. Once the upgrade is completed, the connection would be "done",
162 /// but it is not desired to actually shutdown the IO object. Instead you
163 /// would take it back using `into_parts`.
164 pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>>
165 where
166 S: Unpin,
167 S::Future: Unpin,
168 B: Unpin,
169 {
170 self.conn.poll_without_shutdown(cx)
171 }
172
173 /// Prevent shutdown of the underlying IO object at the end of service the request,
174 /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`.
175 ///
176 /// # Error
177 ///
178 /// This errors if the underlying connection protocol is not HTTP/1.
179 pub fn without_shutdown(self) -> impl Future<Output = crate::Result<Parts<I, S>>>
180 where
181 S: Unpin,
182 S::Future: Unpin,
183 B: Unpin,
184 {
185 let mut zelf = Some(self);
186 futures_util::future::poll_fn(move |cx| {
187 ready!(zelf.as_mut().unwrap().conn.poll_without_shutdown(cx))?;
188 Poll::Ready(Ok(zelf.take().unwrap().into_parts()))
189 })
190 }
191
192 /// Enable this connection to support higher-level HTTP upgrades.
193 ///
194 /// See [the `upgrade` module](crate::upgrade) for more.
195 pub fn with_upgrades(self) -> UpgradeableConnection<I, S>
196 where
197 I: Send,
198 {
199 UpgradeableConnection { inner: Some(self) }
200 }
201}
202
203impl<I, B, S> Future for Connection<I, S>
204where
205 S: HttpService<IncomingBody, ResBody = B>,
206 S::Error: Into<Box<dyn StdError + Send + Sync>>,
207 I: Read + Write + Unpin + 'static,
208 B: Body + 'static,
209 B::Error: Into<Box<dyn StdError + Send + Sync>>,
210{
211 type Output = crate::Result<()>;
212
213 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
214 match ready!(Pin::new(&mut self.conn).poll(cx)) {
215 Ok(done) => {
216 match done {
217 proto::Dispatched::Shutdown => {}
218 proto::Dispatched::Upgrade(pending) => {
219 // With no `Send` bound on `I`, we can't try to do
220 // upgrades here. In case a user was trying to use
221 // `Body::on_upgrade` with this API, send a special
222 // error letting them know about that.
223 pending.manual();
224 }
225 };
226 Poll::Ready(Ok(()))
227 }
228 Err(e) => Poll::Ready(Err(e)),
229 }
230 }
231}
232
233// ===== impl Builder =====
234
235impl Builder {
236 /// Create a new connection builder.
237 pub fn new() -> Self {
238 Self {
239 timer: Time::Empty,
240 h1_half_close: false,
241 h1_keep_alive: true,
242 h1_title_case_headers: false,
243 h1_preserve_header_case: false,
244 h1_header_read_timeout: Dur::Default(Some(Duration::from_secs(30))),
245 h1_writev: None,
246 max_buf_size: None,
247 pipeline_flush: false,
248 }
249 }
250 /// Set whether HTTP/1 connections should support half-closures.
251 ///
252 /// Clients can chose to shutdown their write-side while waiting
253 /// for the server to respond. Setting this to `true` will
254 /// prevent closing the connection immediately if `read`
255 /// detects an EOF in the middle of a request.
256 ///
257 /// Default is `false`.
258 pub fn half_close(&mut self, val: bool) -> &mut Self {
259 self.h1_half_close = val;
260 self
261 }
262
263 /// Enables or disables HTTP/1 keep-alive.
264 ///
265 /// Default is true.
266 pub fn keep_alive(&mut self, val: bool) -> &mut Self {
267 self.h1_keep_alive = val;
268 self
269 }
270
271 /// Set whether HTTP/1 connections will write header names as title case at
272 /// the socket level.
273 ///
274 /// Default is false.
275 pub fn title_case_headers(&mut self, enabled: bool) -> &mut Self {
276 self.h1_title_case_headers = enabled;
277 self
278 }
279
280 /// Set whether to support preserving original header cases.
281 ///
282 /// Currently, this will record the original cases received, and store them
283 /// in a private extension on the `Request`. It will also look for and use
284 /// such an extension in any provided `Response`.
285 ///
286 /// Since the relevant extension is still private, there is no way to
287 /// interact with the original cases. The only effect this can have now is
288 /// to forward the cases in a proxy-like fashion.
289 ///
290 /// Default is false.
291 pub fn preserve_header_case(&mut self, enabled: bool) -> &mut Self {
292 self.h1_preserve_header_case = enabled;
293 self
294 }
295
296 /// Set a timeout for reading client request headers. If a client does not
297 /// transmit the entire header within this time, the connection is closed.
298 ///
299 /// Pass `None` to disable.
300 ///
301 /// Default is 30 seconds.
302 pub fn header_read_timeout(&mut self, read_timeout: impl Into<Option<Duration>>) -> &mut Self {
303 self.h1_header_read_timeout = Dur::Configured(read_timeout.into());
304 self
305 }
306
307 /// Set whether HTTP/1 connections should try to use vectored writes,
308 /// or always flatten into a single buffer.
309 ///
310 /// Note that setting this to false may mean more copies of body data,
311 /// but may also improve performance when an IO transport doesn't
312 /// support vectored writes well, such as most TLS implementations.
313 ///
314 /// Setting this to true will force hyper to use queued strategy
315 /// which may eliminate unnecessary cloning on some TLS backends
316 ///
317 /// Default is `auto`. In this mode hyper will try to guess which
318 /// mode to use
319 pub fn writev(&mut self, val: bool) -> &mut Self {
320 self.h1_writev = Some(val);
321 self
322 }
323
324 /// Set the maximum buffer size for the connection.
325 ///
326 /// Default is ~400kb.
327 ///
328 /// # Panics
329 ///
330 /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
331 pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
332 assert!(
333 max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE,
334 "the max_buf_size cannot be smaller than the minimum that h1 specifies."
335 );
336 self.max_buf_size = Some(max);
337 self
338 }
339
340 /// Aggregates flushes to better support pipelined responses.
341 ///
342 /// Experimental, may have bugs.
343 ///
344 /// Default is false.
345 pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self {
346 self.pipeline_flush = enabled;
347 self
348 }
349
350 /// Set the timer used in background tasks.
351 pub fn timer<M>(&mut self, timer: M) -> &mut Self
352 where
353 M: Timer + Send + Sync + 'static,
354 {
355 self.timer = Time::Timer(Arc::new(timer));
356 self
357 }
358
359 /// Bind a connection together with a [`Service`](crate::service::Service).
360 ///
361 /// This returns a Future that must be polled in order for HTTP to be
362 /// driven on the connection.
363 ///
364 /// # Panics
365 ///
366 /// If a timeout option has been configured, but a `timer` has not been
367 /// provided, calling `serve_connection` will panic.
368 ///
369 /// # Example
370 ///
371 /// ```
372 /// # use hyper::{body::Incoming, Request, Response};
373 /// # use hyper::service::Service;
374 /// # use hyper::server::conn::http1::Builder;
375 /// # use hyper::rt::{Read, Write};
376 /// # async fn run<I, S>(some_io: I, some_service: S)
377 /// # where
378 /// # I: Read + Write + Unpin + Send + 'static,
379 /// # S: Service<hyper::Request<Incoming>, Response=hyper::Response<Incoming>> + Send + 'static,
380 /// # S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
381 /// # S::Future: Send,
382 /// # {
383 /// let http = Builder::new();
384 /// let conn = http.serve_connection(some_io, some_service);
385 ///
386 /// if let Err(e) = conn.await {
387 /// eprintln!("server connection error: {}", e);
388 /// }
389 /// # }
390 /// # fn main() {}
391 /// ```
392 pub fn serve_connection<I, S>(&self, io: I, service: S) -> Connection<I, S>
393 where
394 S: HttpService<IncomingBody>,
395 S::Error: Into<Box<dyn StdError + Send + Sync>>,
396 S::ResBody: 'static,
397 <S::ResBody as Body>::Error: Into<Box<dyn StdError + Send + Sync>>,
398 I: Read + Write + Unpin,
399 {
400 let mut conn = proto::Conn::new(io);
401 conn.set_timer(self.timer.clone());
402 if !self.h1_keep_alive {
403 conn.disable_keep_alive();
404 }
405 if self.h1_half_close {
406 conn.set_allow_half_close();
407 }
408 if self.h1_title_case_headers {
409 conn.set_title_case_headers();
410 }
411 if self.h1_preserve_header_case {
412 conn.set_preserve_header_case();
413 }
414 if let Some(dur) = self
415 .timer
416 .check(self.h1_header_read_timeout, "header_read_timeout")
417 {
418 conn.set_http1_header_read_timeout(dur);
419 };
420 if let Some(writev) = self.h1_writev {
421 if writev {
422 conn.set_write_strategy_queue();
423 } else {
424 conn.set_write_strategy_flatten();
425 }
426 }
427 conn.set_flush_pipeline(self.pipeline_flush);
428 if let Some(max) = self.max_buf_size {
429 conn.set_max_buf_size(max);
430 }
431 let sd = proto::h1::dispatch::Server::new(service);
432 let proto = proto::h1::Dispatcher::new(sd, conn);
433 Connection { conn: proto }
434 }
435}
436
437/// A future binding a connection with a Service with Upgrade support.
438#[must_use = "futures do nothing unless polled"]
439#[allow(missing_debug_implementations)]
440pub struct UpgradeableConnection<T, S>
441where
442 S: HttpService<IncomingBody>,
443{
444 pub(super) inner: Option<Connection<T, S>>,
445}
446
447impl<I, B, S> UpgradeableConnection<I, S>
448where
449 S: HttpService<IncomingBody, ResBody = B>,
450 S::Error: Into<Box<dyn StdError + Send + Sync>>,
451 I: Read + Write + Unpin,
452 B: Body + 'static,
453 B::Error: Into<Box<dyn StdError + Send + Sync>>,
454{
455 /// Start a graceful shutdown process for this connection.
456 ///
457 /// This `Connection` should continue to be polled until shutdown
458 /// can finish.
459 pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
460 Pin::new(self.inner.as_mut().unwrap()).graceful_shutdown()
461 }
462}
463
464impl<I, B, S> Future for UpgradeableConnection<I, S>
465where
466 S: HttpService<IncomingBody, ResBody = B>,
467 S::Error: Into<Box<dyn StdError + Send + Sync>>,
468 I: Read + Write + Unpin + Send + 'static,
469 B: Body + 'static,
470 B::Error: Into<Box<dyn StdError + Send + Sync>>,
471{
472 type Output = crate::Result<()>;
473
474 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
475 match ready!(Pin::new(&mut self.inner.as_mut().unwrap().conn).poll(cx)) {
476 Ok(proto::Dispatched::Shutdown) => Poll::Ready(Ok(())),
477 Ok(proto::Dispatched::Upgrade(pending)) => {
478 let (io, buf, _) = self.inner.take().unwrap().conn.into_inner();
479 pending.fulfill(Upgraded::new(io, buf));
480 Poll::Ready(Ok(()))
481 }
482 Err(e) => Poll::Ready(Err(e)),
483 }
484 }
485}