pingora_core/protocols/http/
server.rs

1// Copyright 2024 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! HTTP server session APIs
16
17use super::error_resp;
18use super::v1::server::HttpSession as SessionV1;
19use super::v2::server::HttpSession as SessionV2;
20use super::HttpTask;
21use crate::protocols::{Digest, SocketAddr, Stream};
22use bytes::Bytes;
23use http::HeaderValue;
24use http::{header::AsHeaderName, HeaderMap};
25use log::error;
26use pingora_error::Result;
27use pingora_http::{RequestHeader, ResponseHeader};
28use std::time::Duration;
29
30/// HTTP server session object for both HTTP/1.x and HTTP/2
31pub enum Session {
32    H1(SessionV1),
33    H2(SessionV2),
34}
35
36impl Session {
37    /// Create a new [`Session`] from an established connection for HTTP/1.x
38    pub fn new_http1(stream: Stream) -> Self {
39        Self::H1(SessionV1::new(stream))
40    }
41
42    /// Create a new [`Session`] from an established HTTP/2 stream
43    pub fn new_http2(session: SessionV2) -> Self {
44        Self::H2(session)
45    }
46
47    /// Whether the session is HTTP/2. If not it is HTTP/1.x
48    pub fn is_http2(&self) -> bool {
49        matches!(self, Self::H2(_))
50    }
51
52    /// Read the request header. This method is required to be called first before doing anything
53    /// else with the session.
54    /// - `Ok(true)`: successful
55    /// - `Ok(false)`: client exit without sending any bytes. This is normal on reused connection.
56    ///   In this case the user should give up this session.
57    pub async fn read_request(&mut self) -> Result<bool> {
58        match self {
59            Self::H1(s) => {
60                let read = s.read_request().await?;
61                Ok(read.is_some())
62            }
63            // This call will always return `Ok(true)` for Http2 because the request is already read
64            Self::H2(_) => Ok(true),
65        }
66    }
67
68    /// Return the request header it just read.
69    /// # Panic
70    /// This function will panic if [`Self::read_request()`] is not called.
71    pub fn req_header(&self) -> &RequestHeader {
72        match self {
73            Self::H1(s) => s.req_header(),
74            Self::H2(s) => s.req_header(),
75        }
76    }
77
78    /// Return a mutable reference to request header it just read.
79    /// # Panic
80    /// This function will panic if [`Self::read_request()`] is not called.
81    pub fn req_header_mut(&mut self) -> &mut RequestHeader {
82        match self {
83            Self::H1(s) => s.req_header_mut(),
84            Self::H2(s) => s.req_header_mut(),
85        }
86    }
87
88    /// Return the header by name. None if the header doesn't exist.
89    ///
90    /// In case there are multiple headers under the same name, the first one will be returned. To
91    /// get all the headers: use `self.req_header().headers.get_all()`.
92    pub fn get_header<K: AsHeaderName>(&self, key: K) -> Option<&HeaderValue> {
93        self.req_header().headers.get(key)
94    }
95
96    /// Get the header value in its raw format.
97    /// If the header doesn't exist, return an empty slice.
98    pub fn get_header_bytes<K: AsHeaderName>(&self, key: K) -> &[u8] {
99        self.get_header(key).map_or(b"", |v| v.as_bytes())
100    }
101
102    /// Read the request body. Ok(None) if no (more) body to read
103    pub async fn read_request_body(&mut self) -> Result<Option<Bytes>> {
104        match self {
105            Self::H1(s) => s.read_body_bytes().await,
106            Self::H2(s) => s.read_body_bytes().await,
107        }
108    }
109
110    /// Write the response header to client
111    /// Informational headers (status code 100-199, excluding 101) can be written multiple times the final
112    /// response header (status code 200+ or 101) is written.
113    pub async fn write_response_header(&mut self, resp: Box<ResponseHeader>) -> Result<()> {
114        match self {
115            Self::H1(s) => {
116                s.write_response_header(resp).await?;
117                Ok(())
118            }
119            Self::H2(s) => s.write_response_header(resp, false),
120        }
121    }
122
123    /// Similar to `write_response_header()`, this fn will clone the `resp` internally
124    pub async fn write_response_header_ref(&mut self, resp: &ResponseHeader) -> Result<()> {
125        match self {
126            Self::H1(s) => {
127                s.write_response_header_ref(resp).await?;
128                Ok(())
129            }
130            Self::H2(s) => s.write_response_header_ref(resp, false),
131        }
132    }
133
134    /// Write the response body to client
135    pub async fn write_response_body(&mut self, data: Bytes, end: bool) -> Result<()> {
136        if data.is_empty() && !end {
137            // writing 0 byte to a chunked encoding h1 would finish the stream
138            // writing 0 bytes to h2 is noop
139            // we don't want to actually write in either cases
140            return Ok(());
141        }
142        match self {
143            Self::H1(s) => {
144                s.write_body(&data).await?;
145                Ok(())
146            }
147            Self::H2(s) => s.write_body(data, end),
148        }
149    }
150
151    /// Write the response trailers to client
152    pub async fn write_response_trailers(&mut self, trailers: HeaderMap) -> Result<()> {
153        match self {
154            Self::H1(_) => Ok(()), // TODO: support trailers for h1
155            Self::H2(s) => s.write_trailers(trailers),
156        }
157    }
158
159    /// Finish the life of this request.
160    /// For H1, if connection reuse is supported, a Some(Stream) will be returned, otherwise None.
161    /// For H2, always return None because H2 stream is not reusable.
162    pub async fn finish(self) -> Result<Option<Stream>> {
163        match self {
164            Self::H1(mut s) => {
165                // need to flush body due to buffering
166                s.finish_body().await?;
167                Ok(s.reuse().await)
168            }
169            Self::H2(mut s) => {
170                s.finish()?;
171                Ok(None)
172            }
173        }
174    }
175
176    pub async fn response_duplex_vec(&mut self, tasks: Vec<HttpTask>) -> Result<bool> {
177        match self {
178            Self::H1(s) => s.response_duplex_vec(tasks).await,
179            Self::H2(s) => s.response_duplex_vec(tasks),
180        }
181    }
182
183    /// Set connection reuse. `duration` defines how long the connection is kept open for the next
184    /// request to reuse. Noop for h2
185    pub fn set_keepalive(&mut self, duration: Option<u64>) {
186        match self {
187            Self::H1(s) => s.set_server_keepalive(duration),
188            Self::H2(_) => {}
189        }
190    }
191
192    /// Sets the downstream write timeout. This will trigger if we're unable
193    /// to write to the stream after `duration`. If a `min_send_rate` is
194    /// configured then the `min_send_rate` calculated timeout has higher priority.
195    ///
196    /// This is a noop for h2.
197    pub fn set_write_timeout(&mut self, timeout: Duration) {
198        match self {
199            Self::H1(s) => s.set_write_timeout(timeout),
200            Self::H2(_) => {}
201        }
202    }
203
204    /// Sets the minimum downstream send rate in bytes per second. This
205    /// is used to calculate a write timeout in seconds based on the size
206    /// of the buffer being written. If a `min_send_rate` is configured it
207    /// has higher priority over a set `write_timeout`. The minimum send
208    /// rate must be greater than zero.
209    ///
210    /// Calculated write timeout is guaranteed to be at least 1s if `min_send_rate`
211    /// is greater than zero, a send rate of zero is a noop.
212    ///
213    /// This is a noop for h2.
214    pub fn set_min_send_rate(&mut self, rate: usize) {
215        match self {
216            Self::H1(s) => s.set_min_send_rate(rate),
217            Self::H2(_) => {}
218        }
219    }
220
221    /// Sets whether we ignore writing informational responses downstream.
222    ///
223    /// For HTTP/1.1 this is a noop if the response is Upgrade or Continue and
224    /// Expect: 100-continue was set on the request.
225    ///
226    /// This is a noop for h2 because informational responses are always ignored.
227    pub fn set_ignore_info_resp(&mut self, ignore: bool) {
228        match self {
229            Self::H1(s) => s.set_ignore_info_resp(ignore),
230            Self::H2(_) => {} // always ignored
231        }
232    }
233
234    /// Return a digest of the request including the method, path and Host header
235    // TODO: make this use a `Formatter`
236    pub fn request_summary(&self) -> String {
237        match self {
238            Self::H1(s) => s.request_summary(),
239            Self::H2(s) => s.request_summary(),
240        }
241    }
242
243    /// Return the written response header. `None` if it is not written yet.
244    /// Only the final (status code >= 200 or 101) response header will be returned
245    pub fn response_written(&self) -> Option<&ResponseHeader> {
246        match self {
247            Self::H1(s) => s.response_written(),
248            Self::H2(s) => s.response_written(),
249        }
250    }
251
252    /// Give up the http session abruptly.
253    /// For H1 this will close the underlying connection
254    /// For H2 this will send RESET frame to end this stream without impacting the connection
255    pub async fn shutdown(&mut self) {
256        match self {
257            Self::H1(s) => s.shutdown().await,
258            Self::H2(s) => s.shutdown(),
259        }
260    }
261
262    pub fn to_h1_raw(&self) -> Bytes {
263        match self {
264            Self::H1(s) => s.get_headers_raw_bytes(),
265            Self::H2(s) => s.pseudo_raw_h1_request_header(),
266        }
267    }
268
269    /// Whether the whole request body is sent
270    pub fn is_body_done(&mut self) -> bool {
271        match self {
272            Self::H1(s) => s.is_body_done(),
273            Self::H2(s) => s.is_body_done(),
274        }
275    }
276
277    /// Notify the client that the entire body is sent
278    /// for H1 chunked encoding, this will end the last empty chunk
279    /// for H1 content-length, this has no effect.
280    /// for H2, this will send an empty DATA frame with END_STREAM flag
281    pub async fn finish_body(&mut self) -> Result<()> {
282        match self {
283            Self::H1(s) => s.finish_body().await.map(|_| ()),
284            Self::H2(s) => s.finish(),
285        }
286    }
287
288    pub fn generate_error(error: u16) -> ResponseHeader {
289        match error {
290            /* common error responses are pre-generated */
291            502 => error_resp::HTTP_502_RESPONSE.clone(),
292            400 => error_resp::HTTP_400_RESPONSE.clone(),
293            _ => error_resp::gen_error_response(error),
294        }
295    }
296
297    /// Send error response to client
298    pub async fn respond_error(&mut self, error: u16) {
299        let resp = Self::generate_error(error);
300
301        // TODO: we shouldn't be closing downstream connections on internally generated errors
302        // and possibly other upstream connect() errors (connection refused, timeout, etc)
303        //
304        // This change is only here because we DO NOT re-use downstream connections
305        // today on these errors and we should signal to the client that pingora is dropping it
306        // rather than a misleading the client with 'keep-alive'
307        self.set_keepalive(None);
308
309        self.write_response_header(Box::new(resp))
310            .await
311            .unwrap_or_else(|e| {
312                error!("failed to send error response to downstream: {e}");
313            });
314    }
315
316    /// Whether there is no request body
317    pub fn is_body_empty(&mut self) -> bool {
318        match self {
319            Self::H1(s) => s.is_body_empty(),
320            Self::H2(s) => s.is_body_empty(),
321        }
322    }
323
324    pub fn retry_buffer_truncated(&self) -> bool {
325        match self {
326            Self::H1(s) => s.retry_buffer_truncated(),
327            Self::H2(s) => s.retry_buffer_truncated(),
328        }
329    }
330
331    pub fn enable_retry_buffering(&mut self) {
332        match self {
333            Self::H1(s) => s.enable_retry_buffering(),
334            Self::H2(s) => s.enable_retry_buffering(),
335        }
336    }
337
338    pub fn get_retry_buffer(&self) -> Option<Bytes> {
339        match self {
340            Self::H1(s) => s.get_retry_buffer(),
341            Self::H2(s) => s.get_retry_buffer(),
342        }
343    }
344
345    /// Read body (same as `read_request_body()`) or pending forever until downstream
346    /// terminates the session.
347    pub async fn read_body_or_idle(&mut self, no_body_expected: bool) -> Result<Option<Bytes>> {
348        match self {
349            Self::H1(s) => s.read_body_or_idle(no_body_expected).await,
350            Self::H2(s) => s.read_body_or_idle(no_body_expected).await,
351        }
352    }
353
354    pub fn as_http1(&self) -> Option<&SessionV1> {
355        match self {
356            Self::H1(s) => Some(s),
357            Self::H2(_) => None,
358        }
359    }
360
361    pub fn as_http2(&self) -> Option<&SessionV2> {
362        match self {
363            Self::H1(_) => None,
364            Self::H2(s) => Some(s),
365        }
366    }
367
368    /// Write a 100 Continue response to the client.
369    pub async fn write_continue_response(&mut self) -> Result<()> {
370        match self {
371            Self::H1(s) => s.write_continue_response().await,
372            Self::H2(s) => s.write_response_header(
373                Box::new(ResponseHeader::build(100, Some(0)).unwrap()),
374                false,
375            ),
376        }
377    }
378
379    /// Whether this request is for upgrade (e.g., websocket)
380    pub fn is_upgrade_req(&self) -> bool {
381        match self {
382            Self::H1(s) => s.is_upgrade_req(),
383            Self::H2(_) => false,
384        }
385    }
386
387    /// Return how many response body bytes (application, not wire) already sent downstream
388    pub fn body_bytes_sent(&self) -> usize {
389        match self {
390            Self::H1(s) => s.body_bytes_sent(),
391            Self::H2(s) => s.body_bytes_sent(),
392        }
393    }
394
395    /// Return how many request body bytes (application, not wire) already read from downstream
396    pub fn body_bytes_read(&self) -> usize {
397        match self {
398            Self::H1(s) => s.body_bytes_read(),
399            Self::H2(s) => s.body_bytes_read(),
400        }
401    }
402
403    /// Return the [Digest] for the connection.
404    pub fn digest(&self) -> Option<&Digest> {
405        match self {
406            Self::H1(s) => Some(s.digest()),
407            Self::H2(s) => s.digest(),
408        }
409    }
410
411    /// Return a mutable [Digest] reference for the connection.
412    ///
413    /// Will return `None` if multiple H2 streams are open.
414    pub fn digest_mut(&mut self) -> Option<&mut Digest> {
415        match self {
416            Self::H1(s) => Some(s.digest_mut()),
417            Self::H2(s) => s.digest_mut(),
418        }
419    }
420
421    /// Return the client (peer) address of the connection.
422    pub fn client_addr(&self) -> Option<&SocketAddr> {
423        match self {
424            Self::H1(s) => s.client_addr(),
425            Self::H2(s) => s.client_addr(),
426        }
427    }
428
429    /// Return the server (local) address of the connection.
430    pub fn server_addr(&self) -> Option<&SocketAddr> {
431        match self {
432            Self::H1(s) => s.server_addr(),
433            Self::H2(s) => s.server_addr(),
434        }
435    }
436
437    /// Get the reference of the [Stream] that this HTTP/1 session is operating upon.
438    /// None if the HTTP session is over H2
439    pub fn stream(&self) -> Option<&Stream> {
440        match self {
441            Self::H1(s) => Some(s.stream()),
442            Self::H2(_) => None,
443        }
444    }
445}