rama_http_core/server/conn/
http1.rs

1//! HTTP/1 Server Connections
2
3use std::fmt;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6use std::time::Duration;
7
8use crate::upgrade::Upgraded;
9use bytes::Bytes;
10use futures_util::ready;
11use httparse::ParserConfig;
12use tokio::io::{AsyncRead, AsyncWrite};
13
14use crate::body::Incoming as IncomingBody;
15use crate::proto;
16use crate::service::HttpService;
17
18type Http1Dispatcher<T, B, S> = proto::h1::Dispatcher<
19    proto::h1::dispatch::Server<S, IncomingBody>,
20    B,
21    T,
22    proto::ServerTransaction,
23>;
24
25pin_project_lite::pin_project! {
26    /// A [`Future`](core::future::Future) representing an HTTP/1 connection, bound to a
27    /// [`Service`](crate::service::Service), returned from
28    /// [`Builder::serve_connection`](struct.Builder.html#method.serve_connection).
29    ///
30    /// To drive HTTP on this connection this future **must be polled**, typically with
31    /// `.await`. If it isn't polled, no progress will be made on this connection.
32    #[must_use = "futures do nothing unless polled"]
33    pub struct Connection<T, S>
34    where
35        S: HttpService<IncomingBody>,
36    {
37        conn: Http1Dispatcher<T, rama_http_types::Body, S>,
38    }
39}
40
41/// A configuration builder for HTTP/1 server connections.
42///
43/// **Note**: The default values of options are *not considered stable*. They
44/// are subject to change at any time.
45///
46/// # Example
47///
48/// ```
49/// # use std::time::Duration;
50/// # use rama_http_core::server::conn::http1::Builder;
51/// # fn main() {
52/// let mut http = Builder::new();
53/// // Set options one at a time
54/// http.half_close(false);
55///
56/// // Or, chain multiple options
57/// http.keep_alive(false).title_case_headers(true).max_buf_size(8192);
58///
59/// # }
60/// ```
61///
62/// Use [`Builder::serve_connection`](struct.Builder.html#method.serve_connection)
63/// to bind the built connection to a service.
64#[derive(Clone, Debug)]
65pub struct Builder {
66    h1_parser_config: ParserConfig,
67    h1_half_close: bool,
68    h1_keep_alive: bool,
69    h1_title_case_headers: bool,
70    h1_max_headers: Option<usize>,
71    h1_header_read_timeout: Duration,
72    h1_writev: Option<bool>,
73    max_buf_size: Option<usize>,
74    pipeline_flush: bool,
75    date_header: bool,
76}
77
78/// Deconstructed parts of a `Connection`.
79///
80/// This allows taking apart a `Connection` at a later time, in order to
81/// reclaim the IO object, and additional related pieces.
82#[derive(Debug)]
83#[non_exhaustive]
84pub struct Parts<T, S> {
85    /// The original IO object used in the handshake.
86    pub io: T,
87    /// A buffer of bytes that have been read but not processed as HTTP.
88    ///
89    /// If the client sent additional bytes after its last request, and
90    /// this connection "ended" with an upgrade, the read buffer will contain
91    /// those bytes.
92    ///
93    /// You will want to check for any existing bytes if you plan to continue
94    /// communicating on the IO object.
95    pub read_buf: Bytes,
96    /// The `Service` used to serve this connection.
97    pub service: S,
98}
99
100// ===== impl Connection =====
101
102impl<I, S> fmt::Debug for Connection<I, S>
103where
104    S: HttpService<IncomingBody>,
105{
106    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107        f.debug_struct("Connection").finish()
108    }
109}
110
111impl<I, S> Connection<I, S>
112where
113    S: HttpService<IncomingBody>,
114    I: AsyncRead + AsyncWrite + Send + Unpin + 'static,
115{
116    /// Start a graceful shutdown process for this connection.
117    ///
118    /// This `Connection` should continue to be polled until shutdown
119    /// can finish.
120    ///
121    /// # Note
122    ///
123    /// This should only be called while the `Connection` future is still
124    /// pending. If called after `Connection::poll` has resolved, this does
125    /// nothing.
126    pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
127        self.conn.disable_keep_alive();
128    }
129
130    /// Return the inner IO object, and additional information.
131    ///
132    /// If the IO object has been "rewound" the io will not contain those bytes rewound.
133    /// This should only be called after `poll_without_shutdown` signals
134    /// that the connection is "done". Otherwise, it may not have finished
135    /// flushing all necessary HTTP bytes.
136    ///
137    /// # Panics
138    /// This method will panic if this connection is using an h2 protocol.
139    pub fn into_parts(self) -> Parts<I, S> {
140        let (io, read_buf, dispatch) = self.conn.into_inner();
141        Parts {
142            io,
143            read_buf,
144            service: dispatch.into_service(),
145        }
146    }
147
148    /// Poll the connection for completion, but without calling `shutdown`
149    /// on the underlying IO.
150    ///
151    /// This is useful to allow running a connection while doing an HTTP
152    /// upgrade. Once the upgrade is completed, the connection would be "done",
153    /// but it is not desired to actually shutdown the IO object. Instead you
154    /// would take it back using `into_parts`.
155    pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>>
156    where
157        S: Unpin,
158    {
159        self.conn.poll_without_shutdown(cx)
160    }
161
162    /// Prevent shutdown of the underlying IO object at the end of service the request,
163    /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`.
164    ///
165    /// # Error
166    ///
167    /// This errors if the underlying connection protocol is not HTTP/1.
168    pub fn without_shutdown(self) -> impl Future<Output = crate::Result<Parts<I, S>>> {
169        let mut zelf = Some(self);
170        futures_util::future::poll_fn(move |cx| {
171            ready!(zelf.as_mut().unwrap().conn.poll_without_shutdown(cx))?;
172            Poll::Ready(Ok(zelf.take().unwrap().into_parts()))
173        })
174    }
175
176    /// Enable this connection to support higher-level HTTP upgrades.
177    ///
178    /// See [the `upgrade` module](crate::upgrade) for more.
179    pub fn with_upgrades(self) -> UpgradeableConnection<I, S>
180    where
181        I: Send,
182    {
183        UpgradeableConnection { inner: Some(self) }
184    }
185}
186
187impl<I, S> Future for Connection<I, S>
188where
189    S: HttpService<IncomingBody>,
190    I: AsyncRead + AsyncWrite + Send + Unpin + 'static,
191{
192    type Output = crate::Result<()>;
193
194    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
195        match ready!(Pin::new(&mut self.conn).poll(cx)) {
196            Ok(done) => {
197                match done {
198                    proto::Dispatched::Shutdown => {}
199                    proto::Dispatched::Upgrade(pending) => {
200                        // With no `Send` bound on `I`, we can't try to do
201                        // upgrades here. In case a user was trying to use
202                        // `Body::on_upgrade` with this API, send a special
203                        // error letting them know about that.
204                        pending.manual();
205                    }
206                };
207                Poll::Ready(Ok(()))
208            }
209            Err(e) => Poll::Ready(Err(e)),
210        }
211    }
212}
213
214// ===== impl Builder =====
215
216impl Default for Builder {
217    fn default() -> Self {
218        Self::new()
219    }
220}
221
222impl Builder {
223    /// Create a new connection builder.
224    pub fn new() -> Self {
225        Self {
226            h1_parser_config: Default::default(),
227            h1_half_close: false,
228            h1_keep_alive: true,
229            h1_title_case_headers: false,
230            h1_max_headers: None,
231            h1_header_read_timeout: Duration::from_secs(30),
232            h1_writev: None,
233            max_buf_size: None,
234            pipeline_flush: false,
235            date_header: true,
236        }
237    }
238    /// Set whether HTTP/1 connections should support half-closures.
239    ///
240    /// Clients can chose to shutdown their write-side while waiting
241    /// for the server to respond. Setting this to `true` will
242    /// prevent closing the connection immediately if `read`
243    /// detects an EOF in the middle of a request.
244    ///
245    /// Default is `false`.
246    pub fn half_close(&mut self, val: bool) -> &mut Self {
247        self.h1_half_close = val;
248        self
249    }
250
251    /// Enables or disables HTTP/1 keep-alive.
252    ///
253    /// Default is true.
254    pub fn keep_alive(&mut self, val: bool) -> &mut Self {
255        self.h1_keep_alive = val;
256        self
257    }
258
259    /// Set whether HTTP/1 connections will write header names as title case at
260    /// the socket level.
261    ///
262    /// Default is false.
263    pub fn title_case_headers(&mut self, enabled: bool) -> &mut Self {
264        self.h1_title_case_headers = enabled;
265        self
266    }
267
268    /// Set whether HTTP/1 connections will silently ignored malformed header lines.
269    ///
270    /// If this is enabled and a header line does not start with a valid header
271    /// name, or does not include a colon at all, the line will be silently ignored
272    /// and no error will be reported.
273    ///
274    /// Default is false.
275    pub fn ignore_invalid_headers(&mut self, enabled: bool) -> &mut Builder {
276        self.h1_parser_config
277            .ignore_invalid_headers_in_requests(enabled);
278        self
279    }
280
281    /// Set the maximum number of headers.
282    ///
283    /// When a request is received, the parser will reserve a buffer to store headers for optimal
284    /// performance.
285    ///
286    /// If server receives more headers than the buffer size, it responds to the client with
287    /// "431 Request Header Fields Too Large".
288    ///
289    /// Note that headers is allocated on the stack by default, which has higher performance. After
290    /// setting this value, headers will be allocated in heap memory, that is, heap memory
291    /// allocation will occur for each request, and there will be a performance drop of about 5%.
292    ///
293    /// Default is 100.
294    pub fn max_headers(&mut self, val: usize) -> &mut Self {
295        self.h1_max_headers = Some(val);
296        self
297    }
298
299    /// Set a timeout for reading client request headers. If a client does not
300    /// transmit the entire header within this time, the connection is closed.
301    ///
302    /// Requires a [`Timer`] set by [`Builder::timer`] to take effect. Panics if `header_read_timeout` is configured
303    /// without a [`Timer`].
304    ///
305    /// Pass `None` to disable.
306    ///
307    /// Default is 30 seconds.
308    pub fn header_read_timeout(&mut self, read_timeout: Duration) -> &mut Self {
309        self.h1_header_read_timeout = read_timeout;
310        self
311    }
312
313    /// Set whether HTTP/1 connections should try to use vectored writes,
314    /// or always flatten into a single buffer.
315    ///
316    /// Note that setting this to false may mean more copies of body data,
317    /// but may also improve performance when an IO transport doesn't
318    /// support vectored writes well, such as most TLS implementations.
319    ///
320    /// Setting this to true will force rama_http_core to use queued strategy
321    /// which may eliminate unnecessary cloning on some TLS backends
322    ///
323    /// Default is `auto`. In this mode rama_http_core will try to guess which
324    /// mode to use
325    pub fn writev(&mut self, val: bool) -> &mut Self {
326        self.h1_writev = Some(val);
327        self
328    }
329
330    /// Set the maximum buffer size for the connection.
331    ///
332    /// Default is ~400kb.
333    ///
334    /// # Panics
335    ///
336    /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
337    pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
338        assert!(
339            max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE,
340            "the max_buf_size cannot be smaller than the minimum that h1 specifies."
341        );
342        self.max_buf_size = Some(max);
343        self
344    }
345
346    /// Set whether the `date` header should be included in HTTP responses.
347    ///
348    /// Note that including the `date` header is recommended by RFC 7231.
349    ///
350    /// Default is true.
351    pub fn auto_date_header(&mut self, enabled: bool) -> &mut Self {
352        self.date_header = enabled;
353        self
354    }
355
356    /// Aggregates flushes to better support pipelined responses.
357    ///
358    /// Experimental, may have bugs.
359    ///
360    /// Default is false.
361    pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self {
362        self.pipeline_flush = enabled;
363        self
364    }
365
366    /// Bind a connection together with a [`Service`](crate::service::Service).
367    ///
368    /// This returns a Future that must be polled in order for HTTP to be
369    /// driven on the connection.
370    ///
371    /// # Panics
372    ///
373    /// If a timeout option has been configured, but a `timer` has not been
374    /// provided, calling `serve_connection` will panic.
375    pub fn serve_connection<I, S>(&self, io: I, service: S) -> Connection<I, S>
376    where
377        S: HttpService<IncomingBody>,
378        I: AsyncRead + AsyncWrite + Send + Unpin + 'static,
379    {
380        let mut conn = proto::Conn::new(io);
381        conn.set_h1_parser_config(self.h1_parser_config.clone());
382        if !self.h1_keep_alive {
383            conn.disable_keep_alive();
384        }
385        if self.h1_half_close {
386            conn.set_allow_half_close();
387        }
388        if self.h1_title_case_headers {
389            conn.set_title_case_headers();
390        }
391        if let Some(max_headers) = self.h1_max_headers {
392            conn.set_http1_max_headers(max_headers);
393        }
394        conn.set_http1_header_read_timeout(self.h1_header_read_timeout);
395        if let Some(writev) = self.h1_writev {
396            if writev {
397                conn.set_write_strategy_queue();
398            } else {
399                conn.set_write_strategy_flatten();
400            }
401        }
402        conn.set_flush_pipeline(self.pipeline_flush);
403        if let Some(max) = self.max_buf_size {
404            conn.set_max_buf_size(max);
405        }
406        if !self.date_header {
407            conn.disable_date_header();
408        }
409        let sd = proto::h1::dispatch::Server::new(service);
410        let proto = proto::h1::Dispatcher::new(sd, conn);
411        Connection { conn: proto }
412    }
413}
414
415/// A future binding a connection with a Service with Upgrade support.
416#[must_use = "futures do nothing unless polled"]
417#[allow(missing_debug_implementations)]
418pub struct UpgradeableConnection<T, S>
419where
420    S: HttpService<IncomingBody>,
421{
422    pub(super) inner: Option<Connection<T, S>>,
423}
424
425impl<I, S> UpgradeableConnection<I, S>
426where
427    S: HttpService<IncomingBody>,
428    I: AsyncRead + AsyncWrite + Send + Unpin + 'static,
429{
430    /// Start a graceful shutdown process for this connection.
431    ///
432    /// This `Connection` should continue to be polled until shutdown
433    /// can finish.
434    pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
435        // Connection (`inner`) is `None` if it was upgraded (and `poll` is `Ready`).
436        // In that case, we don't need to call `graceful_shutdown`.
437        if let Some(conn) = self.inner.as_mut() {
438            Pin::new(conn).graceful_shutdown()
439        }
440    }
441}
442
443impl<I, S> Future for UpgradeableConnection<I, S>
444where
445    S: HttpService<IncomingBody>,
446    I: AsyncRead + AsyncWrite + Send + Unpin + 'static + Send + 'static,
447{
448    type Output = crate::Result<()>;
449
450    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
451        if let Some(conn) = self.inner.as_mut() {
452            match ready!(Pin::new(&mut conn.conn).poll(cx)) {
453                Ok(proto::Dispatched::Shutdown) => Poll::Ready(Ok(())),
454                Ok(proto::Dispatched::Upgrade(pending)) => {
455                    let (io, buf, _) = self.inner.take().unwrap().conn.into_inner();
456                    pending.fulfill(Upgraded::new(io, buf));
457                    Poll::Ready(Ok(()))
458                }
459                Err(e) => Poll::Ready(Err(e)),
460            }
461        } else {
462            // inner is `None`, meaning the connection was upgraded, thus it's `Poll::Ready(Ok(()))`
463            Poll::Ready(Ok(()))
464        }
465    }
466}
467
468#[cfg(test)]
469mod tests {
470    use crate::service::VoidHttpService;
471    use tokio::net::TcpStream;
472
473    use super::*;
474
475    #[test]
476    fn test_assert_send_static() {
477        fn g<T: Send + 'static>() {}
478        g::<Connection<TcpStream, VoidHttpService>>();
479        g::<UpgradeableConnection<TcpStream, VoidHttpService>>();
480    }
481}