rama_hyper/server/conn/
http1.rs

1//! HTTP/1 Server Connections
2
3use std::error::Error as StdError;
4use std::fmt;
5use std::future::Future;
6use std::marker::Unpin;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::task::{Context, Poll};
10use std::time::Duration;
11
12use crate::rt::{Read, Write};
13use crate::upgrade::Upgraded;
14use bytes::Bytes;
15
16use crate::body::{Body, Incoming as IncomingBody};
17use crate::proto;
18use crate::service::HttpService;
19use crate::{
20    common::time::{Dur, Time},
21    rt::Timer,
22};
23
24type Http1Dispatcher<T, B, S> = proto::h1::Dispatcher<
25    proto::h1::dispatch::Server<S, IncomingBody>,
26    B,
27    T,
28    proto::ServerTransaction,
29>;
30
31pin_project_lite::pin_project! {
32    /// A [`Future`](core::future::Future) representing an HTTP/1 connection, bound to a
33    /// [`Service`](crate::service::Service), returned from
34    /// [`Builder::serve_connection`](struct.Builder.html#method.serve_connection).
35    ///
36    /// To drive HTTP on this connection this future **must be polled**, typically with
37    /// `.await`. If it isn't polled, no progress will be made on this connection.
38    #[must_use = "futures do nothing unless polled"]
39    pub struct Connection<T, S>
40    where
41        S: HttpService<IncomingBody>,
42    {
43        conn: Http1Dispatcher<T, S::ResBody, S>,
44    }
45}
46
47/// A configuration builder for HTTP/1 server connections.
48///
49/// **Note**: The default values of options are *not considered stable*. They
50/// are subject to change at any time.
51///
52/// # Example
53///
54/// ```
55/// # use std::time::Duration;
56/// # use hyper::server::conn::http1::Builder;
57/// # fn main() {
58/// let mut http = Builder::new();
59/// // Set options one at a time
60/// http.header_read_timeout(Duration::from_millis(200));
61///
62/// // Or, chain multiple options
63/// http.keep_alive(false).title_case_headers(true).max_buf_size(8192);
64///
65/// # }
66/// ```
67///
68/// Use [`Builder::serve_connection`](struct.Builder.html#method.serve_connection)
69/// to bind the built connection to a service.
70#[derive(Clone, Debug)]
71pub struct Builder {
72    timer: Time,
73    h1_half_close: bool,
74    h1_keep_alive: bool,
75    h1_title_case_headers: bool,
76    h1_preserve_header_case: bool,
77    h1_header_read_timeout: Dur,
78    h1_writev: Option<bool>,
79    max_buf_size: Option<usize>,
80    pipeline_flush: bool,
81}
82
83/// Deconstructed parts of a `Connection`.
84///
85/// This allows taking apart a `Connection` at a later time, in order to
86/// reclaim the IO object, and additional related pieces.
87#[derive(Debug)]
88pub struct Parts<T, S> {
89    /// The original IO object used in the handshake.
90    pub io: T,
91    /// A buffer of bytes that have been read but not processed as HTTP.
92    ///
93    /// If the client sent additional bytes after its last request, and
94    /// this connection "ended" with an upgrade, the read buffer will contain
95    /// those bytes.
96    ///
97    /// You will want to check for any existing bytes if you plan to continue
98    /// communicating on the IO object.
99    pub read_buf: Bytes,
100    /// The `Service` used to serve this connection.
101    pub service: S,
102    _inner: (),
103}
104
105// ===== impl Connection =====
106
107impl<I, S> fmt::Debug for Connection<I, S>
108where
109    S: HttpService<IncomingBody>,
110{
111    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
112        f.debug_struct("Connection").finish()
113    }
114}
115
116impl<I, B, S> Connection<I, S>
117where
118    S: HttpService<IncomingBody, ResBody = B>,
119    S::Error: Into<Box<dyn StdError + Send + Sync>>,
120    I: Read + Write + Unpin,
121    B: Body + 'static,
122    B::Error: Into<Box<dyn StdError + Send + Sync>>,
123{
124    /// Start a graceful shutdown process for this connection.
125    ///
126    /// This `Connection` should continue to be polled until shutdown
127    /// can finish.
128    ///
129    /// # Note
130    ///
131    /// This should only be called while the `Connection` future is still
132    /// pending. If called after `Connection::poll` has resolved, this does
133    /// nothing.
134    pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
135        self.conn.disable_keep_alive();
136    }
137
138    /// Return the inner IO object, and additional information.
139    ///
140    /// If the IO object has been "rewound" the io will not contain those bytes rewound.
141    /// This should only be called after `poll_without_shutdown` signals
142    /// that the connection is "done". Otherwise, it may not have finished
143    /// flushing all necessary HTTP bytes.
144    ///
145    /// # Panics
146    /// This method will panic if this connection is using an h2 protocol.
147    pub fn into_parts(self) -> Parts<I, S> {
148        let (io, read_buf, dispatch) = self.conn.into_inner();
149        Parts {
150            io,
151            read_buf,
152            service: dispatch.into_service(),
153            _inner: (),
154        }
155    }
156
157    /// Poll the connection for completion, but without calling `shutdown`
158    /// on the underlying IO.
159    ///
160    /// This is useful to allow running a connection while doing an HTTP
161    /// upgrade. Once the upgrade is completed, the connection would be "done",
162    /// but it is not desired to actually shutdown the IO object. Instead you
163    /// would take it back using `into_parts`.
164    pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>>
165    where
166        S: Unpin,
167        S::Future: Unpin,
168        B: Unpin,
169    {
170        self.conn.poll_without_shutdown(cx)
171    }
172
173    /// Prevent shutdown of the underlying IO object at the end of service the request,
174    /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`.
175    ///
176    /// # Error
177    ///
178    /// This errors if the underlying connection protocol is not HTTP/1.
179    pub fn without_shutdown(self) -> impl Future<Output = crate::Result<Parts<I, S>>>
180    where
181        S: Unpin,
182        S::Future: Unpin,
183        B: Unpin,
184    {
185        let mut zelf = Some(self);
186        futures_util::future::poll_fn(move |cx| {
187            ready!(zelf.as_mut().unwrap().conn.poll_without_shutdown(cx))?;
188            Poll::Ready(Ok(zelf.take().unwrap().into_parts()))
189        })
190    }
191
192    /// Enable this connection to support higher-level HTTP upgrades.
193    ///
194    /// See [the `upgrade` module](crate::upgrade) for more.
195    pub fn with_upgrades(self) -> UpgradeableConnection<I, S>
196    where
197        I: Send,
198    {
199        UpgradeableConnection { inner: Some(self) }
200    }
201}
202
203impl<I, B, S> Future for Connection<I, S>
204where
205    S: HttpService<IncomingBody, ResBody = B>,
206    S::Error: Into<Box<dyn StdError + Send + Sync>>,
207    I: Read + Write + Unpin + 'static,
208    B: Body + 'static,
209    B::Error: Into<Box<dyn StdError + Send + Sync>>,
210{
211    type Output = crate::Result<()>;
212
213    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
214        match ready!(Pin::new(&mut self.conn).poll(cx)) {
215            Ok(done) => {
216                match done {
217                    proto::Dispatched::Shutdown => {}
218                    proto::Dispatched::Upgrade(pending) => {
219                        // With no `Send` bound on `I`, we can't try to do
220                        // upgrades here. In case a user was trying to use
221                        // `Body::on_upgrade` with this API, send a special
222                        // error letting them know about that.
223                        pending.manual();
224                    }
225                };
226                Poll::Ready(Ok(()))
227            }
228            Err(e) => Poll::Ready(Err(e)),
229        }
230    }
231}
232
233// ===== impl Builder =====
234
235impl Builder {
236    /// Create a new connection builder.
237    pub fn new() -> Self {
238        Self {
239            timer: Time::Empty,
240            h1_half_close: false,
241            h1_keep_alive: true,
242            h1_title_case_headers: false,
243            h1_preserve_header_case: false,
244            h1_header_read_timeout: Dur::Default(Some(Duration::from_secs(30))),
245            h1_writev: None,
246            max_buf_size: None,
247            pipeline_flush: false,
248        }
249    }
250    /// Set whether HTTP/1 connections should support half-closures.
251    ///
252    /// Clients can chose to shutdown their write-side while waiting
253    /// for the server to respond. Setting this to `true` will
254    /// prevent closing the connection immediately if `read`
255    /// detects an EOF in the middle of a request.
256    ///
257    /// Default is `false`.
258    pub fn half_close(&mut self, val: bool) -> &mut Self {
259        self.h1_half_close = val;
260        self
261    }
262
263    /// Enables or disables HTTP/1 keep-alive.
264    ///
265    /// Default is true.
266    pub fn keep_alive(&mut self, val: bool) -> &mut Self {
267        self.h1_keep_alive = val;
268        self
269    }
270
271    /// Set whether HTTP/1 connections will write header names as title case at
272    /// the socket level.
273    ///
274    /// Default is false.
275    pub fn title_case_headers(&mut self, enabled: bool) -> &mut Self {
276        self.h1_title_case_headers = enabled;
277        self
278    }
279
280    /// Set whether to support preserving original header cases.
281    ///
282    /// Currently, this will record the original cases received, and store them
283    /// in a private extension on the `Request`. It will also look for and use
284    /// such an extension in any provided `Response`.
285    ///
286    /// Since the relevant extension is still private, there is no way to
287    /// interact with the original cases. The only effect this can have now is
288    /// to forward the cases in a proxy-like fashion.
289    ///
290    /// Default is false.
291    pub fn preserve_header_case(&mut self, enabled: bool) -> &mut Self {
292        self.h1_preserve_header_case = enabled;
293        self
294    }
295
296    /// Set a timeout for reading client request headers. If a client does not
297    /// transmit the entire header within this time, the connection is closed.
298    ///
299    /// Pass `None` to disable.
300    ///
301    /// Default is 30 seconds.
302    pub fn header_read_timeout(&mut self, read_timeout: impl Into<Option<Duration>>) -> &mut Self {
303        self.h1_header_read_timeout = Dur::Configured(read_timeout.into());
304        self
305    }
306
307    /// Set whether HTTP/1 connections should try to use vectored writes,
308    /// or always flatten into a single buffer.
309    ///
310    /// Note that setting this to false may mean more copies of body data,
311    /// but may also improve performance when an IO transport doesn't
312    /// support vectored writes well, such as most TLS implementations.
313    ///
314    /// Setting this to true will force hyper to use queued strategy
315    /// which may eliminate unnecessary cloning on some TLS backends
316    ///
317    /// Default is `auto`. In this mode hyper will try to guess which
318    /// mode to use
319    pub fn writev(&mut self, val: bool) -> &mut Self {
320        self.h1_writev = Some(val);
321        self
322    }
323
324    /// Set the maximum buffer size for the connection.
325    ///
326    /// Default is ~400kb.
327    ///
328    /// # Panics
329    ///
330    /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
331    pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
332        assert!(
333            max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE,
334            "the max_buf_size cannot be smaller than the minimum that h1 specifies."
335        );
336        self.max_buf_size = Some(max);
337        self
338    }
339
340    /// Aggregates flushes to better support pipelined responses.
341    ///
342    /// Experimental, may have bugs.
343    ///
344    /// Default is false.
345    pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self {
346        self.pipeline_flush = enabled;
347        self
348    }
349
350    /// Set the timer used in background tasks.
351    pub fn timer<M>(&mut self, timer: M) -> &mut Self
352    where
353        M: Timer + Send + Sync + 'static,
354    {
355        self.timer = Time::Timer(Arc::new(timer));
356        self
357    }
358
359    /// Bind a connection together with a [`Service`](crate::service::Service).
360    ///
361    /// This returns a Future that must be polled in order for HTTP to be
362    /// driven on the connection.
363    ///
364    /// # Panics
365    ///
366    /// If a timeout option has been configured, but a `timer` has not been
367    /// provided, calling `serve_connection` will panic.
368    ///
369    /// # Example
370    ///
371    /// ```
372    /// # use hyper::{body::Incoming, Request, Response};
373    /// # use hyper::service::Service;
374    /// # use hyper::server::conn::http1::Builder;
375    /// # use hyper::rt::{Read, Write};
376    /// # async fn run<I, S>(some_io: I, some_service: S)
377    /// # where
378    /// #     I: Read + Write + Unpin + Send + 'static,
379    /// #     S: Service<hyper::Request<Incoming>, Response=hyper::Response<Incoming>> + Send + 'static,
380    /// #     S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
381    /// #     S::Future: Send,
382    /// # {
383    /// let http = Builder::new();
384    /// let conn = http.serve_connection(some_io, some_service);
385    ///
386    /// if let Err(e) = conn.await {
387    ///     eprintln!("server connection error: {}", e);
388    /// }
389    /// # }
390    /// # fn main() {}
391    /// ```
392    pub fn serve_connection<I, S>(&self, io: I, service: S) -> Connection<I, S>
393    where
394        S: HttpService<IncomingBody>,
395        S::Error: Into<Box<dyn StdError + Send + Sync>>,
396        S::ResBody: 'static,
397        <S::ResBody as Body>::Error: Into<Box<dyn StdError + Send + Sync>>,
398        I: Read + Write + Unpin,
399    {
400        let mut conn = proto::Conn::new(io);
401        conn.set_timer(self.timer.clone());
402        if !self.h1_keep_alive {
403            conn.disable_keep_alive();
404        }
405        if self.h1_half_close {
406            conn.set_allow_half_close();
407        }
408        if self.h1_title_case_headers {
409            conn.set_title_case_headers();
410        }
411        if self.h1_preserve_header_case {
412            conn.set_preserve_header_case();
413        }
414        if let Some(dur) = self
415            .timer
416            .check(self.h1_header_read_timeout, "header_read_timeout")
417        {
418            conn.set_http1_header_read_timeout(dur);
419        };
420        if let Some(writev) = self.h1_writev {
421            if writev {
422                conn.set_write_strategy_queue();
423            } else {
424                conn.set_write_strategy_flatten();
425            }
426        }
427        conn.set_flush_pipeline(self.pipeline_flush);
428        if let Some(max) = self.max_buf_size {
429            conn.set_max_buf_size(max);
430        }
431        let sd = proto::h1::dispatch::Server::new(service);
432        let proto = proto::h1::Dispatcher::new(sd, conn);
433        Connection { conn: proto }
434    }
435}
436
437/// A future binding a connection with a Service with Upgrade support.
438#[must_use = "futures do nothing unless polled"]
439#[allow(missing_debug_implementations)]
440pub struct UpgradeableConnection<T, S>
441where
442    S: HttpService<IncomingBody>,
443{
444    pub(super) inner: Option<Connection<T, S>>,
445}
446
447impl<I, B, S> UpgradeableConnection<I, S>
448where
449    S: HttpService<IncomingBody, ResBody = B>,
450    S::Error: Into<Box<dyn StdError + Send + Sync>>,
451    I: Read + Write + Unpin,
452    B: Body + 'static,
453    B::Error: Into<Box<dyn StdError + Send + Sync>>,
454{
455    /// Start a graceful shutdown process for this connection.
456    ///
457    /// This `Connection` should continue to be polled until shutdown
458    /// can finish.
459    pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
460        Pin::new(self.inner.as_mut().unwrap()).graceful_shutdown()
461    }
462}
463
464impl<I, B, S> Future for UpgradeableConnection<I, S>
465where
466    S: HttpService<IncomingBody, ResBody = B>,
467    S::Error: Into<Box<dyn StdError + Send + Sync>>,
468    I: Read + Write + Unpin + Send + 'static,
469    B: Body + 'static,
470    B::Error: Into<Box<dyn StdError + Send + Sync>>,
471{
472    type Output = crate::Result<()>;
473
474    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
475        match ready!(Pin::new(&mut self.inner.as_mut().unwrap().conn).poll(cx)) {
476            Ok(proto::Dispatched::Shutdown) => Poll::Ready(Ok(())),
477            Ok(proto::Dispatched::Upgrade(pending)) => {
478                let (io, buf, _) = self.inner.take().unwrap().conn.into_inner();
479                pending.fulfill(Upgraded::new(io, buf));
480                Poll::Ready(Ok(()))
481            }
482            Err(e) => Poll::Ready(Err(e)),
483        }
484    }
485}