rama_http_core/server/conn/http1.rs
1//! HTTP/1 Server Connections
2
3use std::fmt;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6use std::time::Duration;
7
8use crate::upgrade::Upgraded;
9use bytes::Bytes;
10use futures_util::ready;
11use httparse::ParserConfig;
12use tokio::io::{AsyncRead, AsyncWrite};
13
14use crate::body::Incoming as IncomingBody;
15use crate::proto;
16use crate::service::HttpService;
17
18type Http1Dispatcher<T, B, S> = proto::h1::Dispatcher<
19 proto::h1::dispatch::Server<S, IncomingBody>,
20 B,
21 T,
22 proto::ServerTransaction,
23>;
24
25pin_project_lite::pin_project! {
26 /// A [`Future`](core::future::Future) representing an HTTP/1 connection, bound to a
27 /// [`Service`](crate::service::Service), returned from
28 /// [`Builder::serve_connection`](struct.Builder.html#method.serve_connection).
29 ///
30 /// To drive HTTP on this connection this future **must be polled**, typically with
31 /// `.await`. If it isn't polled, no progress will be made on this connection.
32 #[must_use = "futures do nothing unless polled"]
33 pub struct Connection<T, S>
34 where
35 S: HttpService<IncomingBody>,
36 {
37 conn: Http1Dispatcher<T, rama_http_types::Body, S>,
38 }
39}
40
41/// A configuration builder for HTTP/1 server connections.
42///
43/// **Note**: The default values of options are *not considered stable*. They
44/// are subject to change at any time.
45///
46/// # Example
47///
48/// ```
49/// # use std::time::Duration;
50/// # use rama_http_core::server::conn::http1::Builder;
51/// # fn main() {
52/// let mut http = Builder::new();
53/// // Set options one at a time
54/// http.half_close(false);
55///
56/// // Or, chain multiple options
57/// http.keep_alive(false).title_case_headers(true).max_buf_size(8192);
58///
59/// # }
60/// ```
61///
62/// Use [`Builder::serve_connection`](struct.Builder.html#method.serve_connection)
63/// to bind the built connection to a service.
64#[derive(Clone, Debug)]
65pub struct Builder {
66 h1_parser_config: ParserConfig,
67 h1_half_close: bool,
68 h1_keep_alive: bool,
69 h1_title_case_headers: bool,
70 h1_max_headers: Option<usize>,
71 h1_header_read_timeout: Duration,
72 h1_writev: Option<bool>,
73 max_buf_size: Option<usize>,
74 pipeline_flush: bool,
75 date_header: bool,
76}
77
78/// Deconstructed parts of a `Connection`.
79///
80/// This allows taking apart a `Connection` at a later time, in order to
81/// reclaim the IO object, and additional related pieces.
82#[derive(Debug)]
83#[non_exhaustive]
84pub struct Parts<T, S> {
85 /// The original IO object used in the handshake.
86 pub io: T,
87 /// A buffer of bytes that have been read but not processed as HTTP.
88 ///
89 /// If the client sent additional bytes after its last request, and
90 /// this connection "ended" with an upgrade, the read buffer will contain
91 /// those bytes.
92 ///
93 /// You will want to check for any existing bytes if you plan to continue
94 /// communicating on the IO object.
95 pub read_buf: Bytes,
96 /// The `Service` used to serve this connection.
97 pub service: S,
98}
99
100// ===== impl Connection =====
101
102impl<I, S> fmt::Debug for Connection<I, S>
103where
104 S: HttpService<IncomingBody>,
105{
106 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107 f.debug_struct("Connection").finish()
108 }
109}
110
111impl<I, S> Connection<I, S>
112where
113 S: HttpService<IncomingBody>,
114 I: AsyncRead + AsyncWrite + Send + Unpin + 'static,
115{
116 /// Start a graceful shutdown process for this connection.
117 ///
118 /// This `Connection` should continue to be polled until shutdown
119 /// can finish.
120 ///
121 /// # Note
122 ///
123 /// This should only be called while the `Connection` future is still
124 /// pending. If called after `Connection::poll` has resolved, this does
125 /// nothing.
126 pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
127 self.conn.disable_keep_alive();
128 }
129
130 /// Return the inner IO object, and additional information.
131 ///
132 /// If the IO object has been "rewound" the io will not contain those bytes rewound.
133 /// This should only be called after `poll_without_shutdown` signals
134 /// that the connection is "done". Otherwise, it may not have finished
135 /// flushing all necessary HTTP bytes.
136 ///
137 /// # Panics
138 /// This method will panic if this connection is using an h2 protocol.
139 pub fn into_parts(self) -> Parts<I, S> {
140 let (io, read_buf, dispatch) = self.conn.into_inner();
141 Parts {
142 io,
143 read_buf,
144 service: dispatch.into_service(),
145 }
146 }
147
148 /// Poll the connection for completion, but without calling `shutdown`
149 /// on the underlying IO.
150 ///
151 /// This is useful to allow running a connection while doing an HTTP
152 /// upgrade. Once the upgrade is completed, the connection would be "done",
153 /// but it is not desired to actually shutdown the IO object. Instead you
154 /// would take it back using `into_parts`.
155 pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>>
156 where
157 S: Unpin,
158 {
159 self.conn.poll_without_shutdown(cx)
160 }
161
162 /// Prevent shutdown of the underlying IO object at the end of service the request,
163 /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`.
164 ///
165 /// # Error
166 ///
167 /// This errors if the underlying connection protocol is not HTTP/1.
168 pub fn without_shutdown(self) -> impl Future<Output = crate::Result<Parts<I, S>>> {
169 let mut zelf = Some(self);
170 futures_util::future::poll_fn(move |cx| {
171 ready!(zelf.as_mut().unwrap().conn.poll_without_shutdown(cx))?;
172 Poll::Ready(Ok(zelf.take().unwrap().into_parts()))
173 })
174 }
175
176 /// Enable this connection to support higher-level HTTP upgrades.
177 ///
178 /// See [the `upgrade` module](crate::upgrade) for more.
179 pub fn with_upgrades(self) -> UpgradeableConnection<I, S>
180 where
181 I: Send,
182 {
183 UpgradeableConnection { inner: Some(self) }
184 }
185}
186
187impl<I, S> Future for Connection<I, S>
188where
189 S: HttpService<IncomingBody>,
190 I: AsyncRead + AsyncWrite + Send + Unpin + 'static,
191{
192 type Output = crate::Result<()>;
193
194 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
195 match ready!(Pin::new(&mut self.conn).poll(cx)) {
196 Ok(done) => {
197 match done {
198 proto::Dispatched::Shutdown => {}
199 proto::Dispatched::Upgrade(pending) => {
200 // With no `Send` bound on `I`, we can't try to do
201 // upgrades here. In case a user was trying to use
202 // `Body::on_upgrade` with this API, send a special
203 // error letting them know about that.
204 pending.manual();
205 }
206 };
207 Poll::Ready(Ok(()))
208 }
209 Err(e) => Poll::Ready(Err(e)),
210 }
211 }
212}
213
214// ===== impl Builder =====
215
216impl Default for Builder {
217 fn default() -> Self {
218 Self::new()
219 }
220}
221
222impl Builder {
223 /// Create a new connection builder.
224 pub fn new() -> Self {
225 Self {
226 h1_parser_config: Default::default(),
227 h1_half_close: false,
228 h1_keep_alive: true,
229 h1_title_case_headers: false,
230 h1_max_headers: None,
231 h1_header_read_timeout: Duration::from_secs(30),
232 h1_writev: None,
233 max_buf_size: None,
234 pipeline_flush: false,
235 date_header: true,
236 }
237 }
238 /// Set whether HTTP/1 connections should support half-closures.
239 ///
240 /// Clients can chose to shutdown their write-side while waiting
241 /// for the server to respond. Setting this to `true` will
242 /// prevent closing the connection immediately if `read`
243 /// detects an EOF in the middle of a request.
244 ///
245 /// Default is `false`.
246 pub fn half_close(&mut self, val: bool) -> &mut Self {
247 self.h1_half_close = val;
248 self
249 }
250
251 /// Enables or disables HTTP/1 keep-alive.
252 ///
253 /// Default is true.
254 pub fn keep_alive(&mut self, val: bool) -> &mut Self {
255 self.h1_keep_alive = val;
256 self
257 }
258
259 /// Set whether HTTP/1 connections will write header names as title case at
260 /// the socket level.
261 ///
262 /// Default is false.
263 pub fn title_case_headers(&mut self, enabled: bool) -> &mut Self {
264 self.h1_title_case_headers = enabled;
265 self
266 }
267
268 /// Set whether HTTP/1 connections will silently ignored malformed header lines.
269 ///
270 /// If this is enabled and a header line does not start with a valid header
271 /// name, or does not include a colon at all, the line will be silently ignored
272 /// and no error will be reported.
273 ///
274 /// Default is false.
275 pub fn ignore_invalid_headers(&mut self, enabled: bool) -> &mut Builder {
276 self.h1_parser_config
277 .ignore_invalid_headers_in_requests(enabled);
278 self
279 }
280
281 /// Set the maximum number of headers.
282 ///
283 /// When a request is received, the parser will reserve a buffer to store headers for optimal
284 /// performance.
285 ///
286 /// If server receives more headers than the buffer size, it responds to the client with
287 /// "431 Request Header Fields Too Large".
288 ///
289 /// Note that headers is allocated on the stack by default, which has higher performance. After
290 /// setting this value, headers will be allocated in heap memory, that is, heap memory
291 /// allocation will occur for each request, and there will be a performance drop of about 5%.
292 ///
293 /// Default is 100.
294 pub fn max_headers(&mut self, val: usize) -> &mut Self {
295 self.h1_max_headers = Some(val);
296 self
297 }
298
299 /// Set a timeout for reading client request headers. If a client does not
300 /// transmit the entire header within this time, the connection is closed.
301 ///
302 /// Requires a [`Timer`] set by [`Builder::timer`] to take effect. Panics if `header_read_timeout` is configured
303 /// without a [`Timer`].
304 ///
305 /// Pass `None` to disable.
306 ///
307 /// Default is 30 seconds.
308 pub fn header_read_timeout(&mut self, read_timeout: Duration) -> &mut Self {
309 self.h1_header_read_timeout = read_timeout;
310 self
311 }
312
313 /// Set whether HTTP/1 connections should try to use vectored writes,
314 /// or always flatten into a single buffer.
315 ///
316 /// Note that setting this to false may mean more copies of body data,
317 /// but may also improve performance when an IO transport doesn't
318 /// support vectored writes well, such as most TLS implementations.
319 ///
320 /// Setting this to true will force rama_http_core to use queued strategy
321 /// which may eliminate unnecessary cloning on some TLS backends
322 ///
323 /// Default is `auto`. In this mode rama_http_core will try to guess which
324 /// mode to use
325 pub fn writev(&mut self, val: bool) -> &mut Self {
326 self.h1_writev = Some(val);
327 self
328 }
329
330 /// Set the maximum buffer size for the connection.
331 ///
332 /// Default is ~400kb.
333 ///
334 /// # Panics
335 ///
336 /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
337 pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
338 assert!(
339 max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE,
340 "the max_buf_size cannot be smaller than the minimum that h1 specifies."
341 );
342 self.max_buf_size = Some(max);
343 self
344 }
345
346 /// Set whether the `date` header should be included in HTTP responses.
347 ///
348 /// Note that including the `date` header is recommended by RFC 7231.
349 ///
350 /// Default is true.
351 pub fn auto_date_header(&mut self, enabled: bool) -> &mut Self {
352 self.date_header = enabled;
353 self
354 }
355
356 /// Aggregates flushes to better support pipelined responses.
357 ///
358 /// Experimental, may have bugs.
359 ///
360 /// Default is false.
361 pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self {
362 self.pipeline_flush = enabled;
363 self
364 }
365
366 /// Bind a connection together with a [`Service`](crate::service::Service).
367 ///
368 /// This returns a Future that must be polled in order for HTTP to be
369 /// driven on the connection.
370 ///
371 /// # Panics
372 ///
373 /// If a timeout option has been configured, but a `timer` has not been
374 /// provided, calling `serve_connection` will panic.
375 pub fn serve_connection<I, S>(&self, io: I, service: S) -> Connection<I, S>
376 where
377 S: HttpService<IncomingBody>,
378 I: AsyncRead + AsyncWrite + Send + Unpin + 'static,
379 {
380 let mut conn = proto::Conn::new(io);
381 conn.set_h1_parser_config(self.h1_parser_config.clone());
382 if !self.h1_keep_alive {
383 conn.disable_keep_alive();
384 }
385 if self.h1_half_close {
386 conn.set_allow_half_close();
387 }
388 if self.h1_title_case_headers {
389 conn.set_title_case_headers();
390 }
391 if let Some(max_headers) = self.h1_max_headers {
392 conn.set_http1_max_headers(max_headers);
393 }
394 conn.set_http1_header_read_timeout(self.h1_header_read_timeout);
395 if let Some(writev) = self.h1_writev {
396 if writev {
397 conn.set_write_strategy_queue();
398 } else {
399 conn.set_write_strategy_flatten();
400 }
401 }
402 conn.set_flush_pipeline(self.pipeline_flush);
403 if let Some(max) = self.max_buf_size {
404 conn.set_max_buf_size(max);
405 }
406 if !self.date_header {
407 conn.disable_date_header();
408 }
409 let sd = proto::h1::dispatch::Server::new(service);
410 let proto = proto::h1::Dispatcher::new(sd, conn);
411 Connection { conn: proto }
412 }
413}
414
415/// A future binding a connection with a Service with Upgrade support.
416#[must_use = "futures do nothing unless polled"]
417#[allow(missing_debug_implementations)]
418pub struct UpgradeableConnection<T, S>
419where
420 S: HttpService<IncomingBody>,
421{
422 pub(super) inner: Option<Connection<T, S>>,
423}
424
425impl<I, S> UpgradeableConnection<I, S>
426where
427 S: HttpService<IncomingBody>,
428 I: AsyncRead + AsyncWrite + Send + Unpin + 'static,
429{
430 /// Start a graceful shutdown process for this connection.
431 ///
432 /// This `Connection` should continue to be polled until shutdown
433 /// can finish.
434 pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
435 // Connection (`inner`) is `None` if it was upgraded (and `poll` is `Ready`).
436 // In that case, we don't need to call `graceful_shutdown`.
437 if let Some(conn) = self.inner.as_mut() {
438 Pin::new(conn).graceful_shutdown()
439 }
440 }
441}
442
443impl<I, S> Future for UpgradeableConnection<I, S>
444where
445 S: HttpService<IncomingBody>,
446 I: AsyncRead + AsyncWrite + Send + Unpin + 'static + Send + 'static,
447{
448 type Output = crate::Result<()>;
449
450 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
451 if let Some(conn) = self.inner.as_mut() {
452 match ready!(Pin::new(&mut conn.conn).poll(cx)) {
453 Ok(proto::Dispatched::Shutdown) => Poll::Ready(Ok(())),
454 Ok(proto::Dispatched::Upgrade(pending)) => {
455 let (io, buf, _) = self.inner.take().unwrap().conn.into_inner();
456 pending.fulfill(Upgraded::new(io, buf));
457 Poll::Ready(Ok(()))
458 }
459 Err(e) => Poll::Ready(Err(e)),
460 }
461 } else {
462 // inner is `None`, meaning the connection was upgraded, thus it's `Poll::Ready(Ok(()))`
463 Poll::Ready(Ok(()))
464 }
465 }
466}
467
468#[cfg(test)]
469mod tests {
470 use crate::service::VoidHttpService;
471 use tokio::net::TcpStream;
472
473 use super::*;
474
475 #[test]
476 fn test_assert_send_static() {
477 fn g<T: Send + 'static>() {}
478 g::<Connection<TcpStream, VoidHttpService>>();
479 g::<UpgradeableConnection<TcpStream, VoidHttpService>>();
480 }
481}