pingora_core/protocols/http/
server.rs

1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! HTTP server session APIs
16
17use super::error_resp;
18use super::v1::server::HttpSession as SessionV1;
19use super::v2::server::HttpSession as SessionV2;
20use super::HttpTask;
21use crate::protocols::{Digest, SocketAddr, Stream};
22use bytes::Bytes;
23use http::HeaderValue;
24use http::{header::AsHeaderName, HeaderMap};
25use pingora_error::Result;
26use pingora_http::{RequestHeader, ResponseHeader};
27use std::time::Duration;
28
29/// HTTP server session object for both HTTP/1.x and HTTP/2
30pub enum Session {
31    H1(SessionV1),
32    H2(SessionV2),
33}
34
35impl Session {
36    /// Create a new [`Session`] from an established connection for HTTP/1.x
37    pub fn new_http1(stream: Stream) -> Self {
38        Self::H1(SessionV1::new(stream))
39    }
40
41    /// Create a new [`Session`] from an established HTTP/2 stream
42    pub fn new_http2(session: SessionV2) -> Self {
43        Self::H2(session)
44    }
45
46    /// Whether the session is HTTP/2. If not it is HTTP/1.x
47    pub fn is_http2(&self) -> bool {
48        matches!(self, Self::H2(_))
49    }
50
51    /// Read the request header. This method is required to be called first before doing anything
52    /// else with the session.
53    /// - `Ok(true)`: successful
54    /// - `Ok(false)`: client exit without sending any bytes. This is normal on reused connection.
55    ///   In this case the user should give up this session.
56    pub async fn read_request(&mut self) -> Result<bool> {
57        match self {
58            Self::H1(s) => {
59                let read = s.read_request().await?;
60                Ok(read.is_some())
61            }
62            // This call will always return `Ok(true)` for Http2 because the request is already read
63            Self::H2(_) => Ok(true),
64        }
65    }
66
67    /// Return the request header it just read.
68    /// # Panic
69    /// This function will panic if [`Self::read_request()`] is not called.
70    pub fn req_header(&self) -> &RequestHeader {
71        match self {
72            Self::H1(s) => s.req_header(),
73            Self::H2(s) => s.req_header(),
74        }
75    }
76
77    /// Return a mutable reference to request header it just read.
78    /// # Panic
79    /// This function will panic if [`Self::read_request()`] is not called.
80    pub fn req_header_mut(&mut self) -> &mut RequestHeader {
81        match self {
82            Self::H1(s) => s.req_header_mut(),
83            Self::H2(s) => s.req_header_mut(),
84        }
85    }
86
87    /// Return the header by name. None if the header doesn't exist.
88    ///
89    /// In case there are multiple headers under the same name, the first one will be returned. To
90    /// get all the headers: use `self.req_header().headers.get_all()`.
91    pub fn get_header<K: AsHeaderName>(&self, key: K) -> Option<&HeaderValue> {
92        self.req_header().headers.get(key)
93    }
94
95    /// Get the header value in its raw format.
96    /// If the header doesn't exist, return an empty slice.
97    pub fn get_header_bytes<K: AsHeaderName>(&self, key: K) -> &[u8] {
98        self.get_header(key).map_or(b"", |v| v.as_bytes())
99    }
100
101    /// Read the request body. Ok(None) if no (more) body to read
102    pub async fn read_request_body(&mut self) -> Result<Option<Bytes>> {
103        match self {
104            Self::H1(s) => s.read_body_bytes().await,
105            Self::H2(s) => s.read_body_bytes().await,
106        }
107    }
108
109    /// Discard the request body by reading it until completion.
110    ///
111    /// This is useful for making streams reusable (in particular for HTTP/1.1) after returning an
112    /// error before the whole body has been read.
113    pub async fn drain_request_body(&mut self) -> Result<()> {
114        match self {
115            Self::H1(s) => s.drain_request_body().await,
116            Self::H2(s) => s.drain_request_body().await,
117        }
118    }
119
120    /// Write the response header to client
121    /// Informational headers (status code 100-199, excluding 101) can be written multiple times the final
122    /// response header (status code 200+ or 101) is written.
123    pub async fn write_response_header(&mut self, resp: Box<ResponseHeader>) -> Result<()> {
124        match self {
125            Self::H1(s) => {
126                s.write_response_header(resp).await?;
127                Ok(())
128            }
129            Self::H2(s) => s.write_response_header(resp, false),
130        }
131    }
132
133    /// Similar to `write_response_header()`, this fn will clone the `resp` internally
134    pub async fn write_response_header_ref(&mut self, resp: &ResponseHeader) -> Result<()> {
135        match self {
136            Self::H1(s) => {
137                s.write_response_header_ref(resp).await?;
138                Ok(())
139            }
140            Self::H2(s) => s.write_response_header_ref(resp, false),
141        }
142    }
143
144    /// Write the response body to client
145    pub async fn write_response_body(&mut self, data: Bytes, end: bool) -> Result<()> {
146        if data.is_empty() && !end {
147            // writing 0 byte to a chunked encoding h1 would finish the stream
148            // writing 0 bytes to h2 is noop
149            // we don't want to actually write in either cases
150            return Ok(());
151        }
152        match self {
153            Self::H1(s) => {
154                s.write_body(&data).await?;
155                Ok(())
156            }
157            Self::H2(s) => s.write_body(data, end).await,
158        }
159    }
160
161    /// Write the response trailers to client
162    pub async fn write_response_trailers(&mut self, trailers: HeaderMap) -> Result<()> {
163        match self {
164            Self::H1(_) => Ok(()), // TODO: support trailers for h1
165            Self::H2(s) => s.write_trailers(trailers),
166        }
167    }
168
169    /// Finish the life of this request.
170    /// For H1, if connection reuse is supported, a Some(Stream) will be returned, otherwise None.
171    /// For H2, always return None because H2 stream is not reusable.
172    pub async fn finish(self) -> Result<Option<Stream>> {
173        match self {
174            Self::H1(mut s) => {
175                // need to flush body due to buffering
176                s.finish_body().await?;
177                s.reuse().await
178            }
179            Self::H2(mut s) => {
180                s.finish()?;
181                Ok(None)
182            }
183        }
184    }
185
186    pub async fn response_duplex_vec(&mut self, tasks: Vec<HttpTask>) -> Result<bool> {
187        match self {
188            Self::H1(s) => s.response_duplex_vec(tasks).await,
189            Self::H2(s) => s.response_duplex_vec(tasks).await,
190        }
191    }
192
193    /// Set connection reuse. `duration` defines how long the connection is kept open for the next
194    /// request to reuse. Noop for h2
195    pub fn set_keepalive(&mut self, duration: Option<u64>) {
196        match self {
197            Self::H1(s) => s.set_server_keepalive(duration),
198            Self::H2(_) => {}
199        }
200    }
201
202    /// Get the keepalive timeout. None if keepalive is disabled. Not applicable for h2
203    pub fn get_keepalive(&self) -> Option<u64> {
204        match self {
205            Self::H1(s) => s.get_keepalive_timeout(),
206            Self::H2(_) => None,
207        }
208    }
209
210    /// Sets the downstream read timeout. This will trigger if we're unable
211    /// to read from the stream after `timeout`.
212    ///
213    /// This is a noop for h2.
214    pub fn set_read_timeout(&mut self, timeout: Option<Duration>) {
215        match self {
216            Self::H1(s) => s.set_read_timeout(timeout),
217            Self::H2(_) => {}
218        }
219    }
220
221    /// Sets the downstream write timeout. This will trigger if we're unable
222    /// to write to the stream after `timeout`. If a `min_send_rate` is
223    /// configured then the `min_send_rate` calculated timeout has higher priority.
224    pub fn set_write_timeout(&mut self, timeout: Option<Duration>) {
225        match self {
226            Self::H1(s) => s.set_write_timeout(timeout),
227            Self::H2(s) => s.set_write_timeout(timeout),
228        }
229    }
230
231    /// Sets the total drain timeout, which will be applied while discarding the
232    /// request body using `drain_request_body`.
233    ///
234    /// For HTTP/1.1, reusing a session requires ensuring that the request body
235    /// is consumed. If the timeout is exceeded, the caller should give up on
236    /// trying to reuse the session.
237    pub fn set_total_drain_timeout(&mut self, timeout: Option<Duration>) {
238        match self {
239            Self::H1(s) => s.set_total_drain_timeout(timeout),
240            Self::H2(s) => s.set_total_drain_timeout(timeout),
241        }
242    }
243
244    /// Sets the minimum downstream send rate in bytes per second. This
245    /// is used to calculate a write timeout in seconds based on the size
246    /// of the buffer being written. If a `min_send_rate` is configured it
247    /// has higher priority over a set `write_timeout`. The minimum send
248    /// rate must be greater than zero.
249    ///
250    /// Calculated write timeout is guaranteed to be at least 1s if `min_send_rate`
251    /// is greater than zero, a send rate of zero is equivalent to disabling.
252    ///
253    /// This is a noop for h2.
254    pub fn set_min_send_rate(&mut self, rate: Option<usize>) {
255        match self {
256            Self::H1(s) => s.set_min_send_rate(rate),
257            Self::H2(_) => {}
258        }
259    }
260
261    /// Sets whether we ignore writing informational responses downstream.
262    ///
263    /// For HTTP/1.1 this is a noop if the response is Upgrade or Continue and
264    /// Expect: 100-continue was set on the request.
265    ///
266    /// This is a noop for h2 because informational responses are always ignored.
267    pub fn set_ignore_info_resp(&mut self, ignore: bool) {
268        match self {
269            Self::H1(s) => s.set_ignore_info_resp(ignore),
270            Self::H2(_) => {} // always ignored
271        }
272    }
273
274    /// Sets whether keepalive should be disabled if response is written prior to
275    /// downstream body finishing.
276    ///
277    /// This is a noop for h2.
278    pub fn set_close_on_response_before_downstream_finish(&mut self, close: bool) {
279        match self {
280            Self::H1(s) => s.set_close_on_response_before_downstream_finish(close),
281            Self::H2(_) => {} // always ignored
282        }
283    }
284
285    /// Return a digest of the request including the method, path and Host header
286    // TODO: make this use a `Formatter`
287    pub fn request_summary(&self) -> String {
288        match self {
289            Self::H1(s) => s.request_summary(),
290            Self::H2(s) => s.request_summary(),
291        }
292    }
293
294    /// Return the written response header. `None` if it is not written yet.
295    /// Only the final (status code >= 200 or 101) response header will be returned
296    pub fn response_written(&self) -> Option<&ResponseHeader> {
297        match self {
298            Self::H1(s) => s.response_written(),
299            Self::H2(s) => s.response_written(),
300        }
301    }
302
303    /// Give up the http session abruptly.
304    /// For H1 this will close the underlying connection
305    /// For H2 this will send RESET frame to end this stream without impacting the connection
306    pub async fn shutdown(&mut self) {
307        match self {
308            Self::H1(s) => s.shutdown().await,
309            Self::H2(s) => s.shutdown(),
310        }
311    }
312
313    pub fn to_h1_raw(&self) -> Bytes {
314        match self {
315            Self::H1(s) => s.get_headers_raw_bytes(),
316            Self::H2(s) => s.pseudo_raw_h1_request_header(),
317        }
318    }
319
320    /// Whether the whole request body is sent
321    pub fn is_body_done(&mut self) -> bool {
322        match self {
323            Self::H1(s) => s.is_body_done(),
324            Self::H2(s) => s.is_body_done(),
325        }
326    }
327
328    /// Notify the client that the entire body is sent
329    /// for H1 chunked encoding, this will end the last empty chunk
330    /// for H1 content-length, this has no effect.
331    /// for H2, this will send an empty DATA frame with END_STREAM flag
332    pub async fn finish_body(&mut self) -> Result<()> {
333        match self {
334            Self::H1(s) => s.finish_body().await.map(|_| ()),
335            Self::H2(s) => s.finish(),
336        }
337    }
338
339    pub fn generate_error(error: u16) -> ResponseHeader {
340        match error {
341            /* common error responses are pre-generated */
342            502 => error_resp::HTTP_502_RESPONSE.clone(),
343            400 => error_resp::HTTP_400_RESPONSE.clone(),
344            _ => error_resp::gen_error_response(error),
345        }
346    }
347
348    /// Send error response to client using a pre-generated error message.
349    pub async fn respond_error(&mut self, error: u16) -> Result<()> {
350        self.respond_error_with_body(error, Bytes::default()).await
351    }
352
353    /// Send error response to client using a pre-generated error message and custom body.
354    pub async fn respond_error_with_body(&mut self, error: u16, body: Bytes) -> Result<()> {
355        let mut resp = Self::generate_error(error);
356        if !body.is_empty() {
357            // error responses have a default content-length of zero
358            resp.set_content_length(body.len())?
359        }
360        self.write_error_response(resp, body).await
361    }
362
363    /// Send an error response to a client with a response header and body.
364    pub async fn write_error_response(&mut self, resp: ResponseHeader, body: Bytes) -> Result<()> {
365        // TODO: we shouldn't be closing downstream connections on internally generated errors
366        // and possibly other upstream connect() errors (connection refused, timeout, etc)
367        //
368        // This change is only here because we DO NOT re-use downstream connections
369        // today on these errors and we should signal to the client that pingora is dropping it
370        // rather than a misleading the client with 'keep-alive'
371        self.set_keepalive(None);
372
373        // If a response was already written and it's not informational 1xx, return.
374        // The only exception is an informational 101 Switching Protocols, which is treated
375        // as final response https://www.rfc-editor.org/rfc/rfc9110#section-15.2.2.
376        if let Some(resp_written) = self.response_written().as_ref() {
377            if !resp_written.status.is_informational() || resp_written.status == 101 {
378                return Ok(());
379            }
380        }
381
382        self.write_response_header(Box::new(resp)).await?;
383
384        if !body.is_empty() {
385            self.write_response_body(body, true).await?;
386        } else {
387            self.finish_body().await?;
388        }
389
390        Ok(())
391    }
392
393    /// Whether there is no request body
394    pub fn is_body_empty(&mut self) -> bool {
395        match self {
396            Self::H1(s) => s.is_body_empty(),
397            Self::H2(s) => s.is_body_empty(),
398        }
399    }
400
401    pub fn retry_buffer_truncated(&self) -> bool {
402        match self {
403            Self::H1(s) => s.retry_buffer_truncated(),
404            Self::H2(s) => s.retry_buffer_truncated(),
405        }
406    }
407
408    pub fn enable_retry_buffering(&mut self) {
409        match self {
410            Self::H1(s) => s.enable_retry_buffering(),
411            Self::H2(s) => s.enable_retry_buffering(),
412        }
413    }
414
415    pub fn get_retry_buffer(&self) -> Option<Bytes> {
416        match self {
417            Self::H1(s) => s.get_retry_buffer(),
418            Self::H2(s) => s.get_retry_buffer(),
419        }
420    }
421
422    /// Read body (same as `read_request_body()`) or pending forever until downstream
423    /// terminates the session.
424    pub async fn read_body_or_idle(&mut self, no_body_expected: bool) -> Result<Option<Bytes>> {
425        match self {
426            Self::H1(s) => s.read_body_or_idle(no_body_expected).await,
427            Self::H2(s) => s.read_body_or_idle(no_body_expected).await,
428        }
429    }
430
431    pub fn as_http1(&self) -> Option<&SessionV1> {
432        match self {
433            Self::H1(s) => Some(s),
434            Self::H2(_) => None,
435        }
436    }
437
438    pub fn as_http2(&self) -> Option<&SessionV2> {
439        match self {
440            Self::H1(_) => None,
441            Self::H2(s) => Some(s),
442        }
443    }
444
445    /// Write a 100 Continue response to the client.
446    pub async fn write_continue_response(&mut self) -> Result<()> {
447        match self {
448            Self::H1(s) => s.write_continue_response().await,
449            Self::H2(s) => s.write_response_header(
450                Box::new(ResponseHeader::build(100, Some(0)).unwrap()),
451                false,
452            ),
453        }
454    }
455
456    /// Whether this request is for upgrade (e.g., websocket)
457    pub fn is_upgrade_req(&self) -> bool {
458        match self {
459            Self::H1(s) => s.is_upgrade_req(),
460            Self::H2(_) => false,
461        }
462    }
463
464    /// Return how many response body bytes (application, not wire) already sent downstream
465    pub fn body_bytes_sent(&self) -> usize {
466        match self {
467            Self::H1(s) => s.body_bytes_sent(),
468            Self::H2(s) => s.body_bytes_sent(),
469        }
470    }
471
472    /// Return how many request body bytes (application, not wire) already read from downstream
473    pub fn body_bytes_read(&self) -> usize {
474        match self {
475            Self::H1(s) => s.body_bytes_read(),
476            Self::H2(s) => s.body_bytes_read(),
477        }
478    }
479
480    /// Return the [Digest] for the connection.
481    pub fn digest(&self) -> Option<&Digest> {
482        match self {
483            Self::H1(s) => Some(s.digest()),
484            Self::H2(s) => s.digest(),
485        }
486    }
487
488    /// Return a mutable [Digest] reference for the connection.
489    ///
490    /// Will return `None` if multiple H2 streams are open.
491    pub fn digest_mut(&mut self) -> Option<&mut Digest> {
492        match self {
493            Self::H1(s) => Some(s.digest_mut()),
494            Self::H2(s) => s.digest_mut(),
495        }
496    }
497
498    /// Return the client (peer) address of the connection.
499    pub fn client_addr(&self) -> Option<&SocketAddr> {
500        match self {
501            Self::H1(s) => s.client_addr(),
502            Self::H2(s) => s.client_addr(),
503        }
504    }
505
506    /// Return the server (local) address of the connection.
507    pub fn server_addr(&self) -> Option<&SocketAddr> {
508        match self {
509            Self::H1(s) => s.server_addr(),
510            Self::H2(s) => s.server_addr(),
511        }
512    }
513
514    /// Get the reference of the [Stream] that this HTTP/1 session is operating upon.
515    /// None if the HTTP session is over H2
516    pub fn stream(&self) -> Option<&Stream> {
517        match self {
518            Self::H1(s) => Some(s.stream()),
519            Self::H2(_) => None,
520        }
521    }
522}