pingora_core/protocols/http/server.rs
1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! HTTP server session APIs
16
17use super::error_resp;
18use super::v1::server::HttpSession as SessionV1;
19use super::v2::server::HttpSession as SessionV2;
20use super::HttpTask;
21use crate::protocols::{Digest, SocketAddr, Stream};
22use bytes::Bytes;
23use http::HeaderValue;
24use http::{header::AsHeaderName, HeaderMap};
25use pingora_error::Result;
26use pingora_http::{RequestHeader, ResponseHeader};
27use std::time::Duration;
28
29/// HTTP server session object for both HTTP/1.x and HTTP/2
30pub enum Session {
31 H1(SessionV1),
32 H2(SessionV2),
33}
34
35impl Session {
36 /// Create a new [`Session`] from an established connection for HTTP/1.x
37 pub fn new_http1(stream: Stream) -> Self {
38 Self::H1(SessionV1::new(stream))
39 }
40
41 /// Create a new [`Session`] from an established HTTP/2 stream
42 pub fn new_http2(session: SessionV2) -> Self {
43 Self::H2(session)
44 }
45
46 /// Whether the session is HTTP/2. If not it is HTTP/1.x
47 pub fn is_http2(&self) -> bool {
48 matches!(self, Self::H2(_))
49 }
50
51 /// Read the request header. This method is required to be called first before doing anything
52 /// else with the session.
53 /// - `Ok(true)`: successful
54 /// - `Ok(false)`: client exit without sending any bytes. This is normal on reused connection.
55 /// In this case the user should give up this session.
56 pub async fn read_request(&mut self) -> Result<bool> {
57 match self {
58 Self::H1(s) => {
59 let read = s.read_request().await?;
60 Ok(read.is_some())
61 }
62 // This call will always return `Ok(true)` for Http2 because the request is already read
63 Self::H2(_) => Ok(true),
64 }
65 }
66
67 /// Return the request header it just read.
68 /// # Panic
69 /// This function will panic if [`Self::read_request()`] is not called.
70 pub fn req_header(&self) -> &RequestHeader {
71 match self {
72 Self::H1(s) => s.req_header(),
73 Self::H2(s) => s.req_header(),
74 }
75 }
76
77 /// Return a mutable reference to request header it just read.
78 /// # Panic
79 /// This function will panic if [`Self::read_request()`] is not called.
80 pub fn req_header_mut(&mut self) -> &mut RequestHeader {
81 match self {
82 Self::H1(s) => s.req_header_mut(),
83 Self::H2(s) => s.req_header_mut(),
84 }
85 }
86
87 /// Return the header by name. None if the header doesn't exist.
88 ///
89 /// In case there are multiple headers under the same name, the first one will be returned. To
90 /// get all the headers: use `self.req_header().headers.get_all()`.
91 pub fn get_header<K: AsHeaderName>(&self, key: K) -> Option<&HeaderValue> {
92 self.req_header().headers.get(key)
93 }
94
95 /// Get the header value in its raw format.
96 /// If the header doesn't exist, return an empty slice.
97 pub fn get_header_bytes<K: AsHeaderName>(&self, key: K) -> &[u8] {
98 self.get_header(key).map_or(b"", |v| v.as_bytes())
99 }
100
101 /// Read the request body. Ok(None) if no (more) body to read
102 pub async fn read_request_body(&mut self) -> Result<Option<Bytes>> {
103 match self {
104 Self::H1(s) => s.read_body_bytes().await,
105 Self::H2(s) => s.read_body_bytes().await,
106 }
107 }
108
109 /// Discard the request body by reading it until completion.
110 ///
111 /// This is useful for making streams reusable (in particular for HTTP/1.1) after returning an
112 /// error before the whole body has been read.
113 pub async fn drain_request_body(&mut self) -> Result<()> {
114 match self {
115 Self::H1(s) => s.drain_request_body().await,
116 Self::H2(s) => s.drain_request_body().await,
117 }
118 }
119
120 /// Write the response header to client
121 /// Informational headers (status code 100-199, excluding 101) can be written multiple times the final
122 /// response header (status code 200+ or 101) is written.
123 pub async fn write_response_header(&mut self, resp: Box<ResponseHeader>) -> Result<()> {
124 match self {
125 Self::H1(s) => {
126 s.write_response_header(resp).await?;
127 Ok(())
128 }
129 Self::H2(s) => s.write_response_header(resp, false),
130 }
131 }
132
133 /// Similar to `write_response_header()`, this fn will clone the `resp` internally
134 pub async fn write_response_header_ref(&mut self, resp: &ResponseHeader) -> Result<()> {
135 match self {
136 Self::H1(s) => {
137 s.write_response_header_ref(resp).await?;
138 Ok(())
139 }
140 Self::H2(s) => s.write_response_header_ref(resp, false),
141 }
142 }
143
144 /// Write the response body to client
145 pub async fn write_response_body(&mut self, data: Bytes, end: bool) -> Result<()> {
146 if data.is_empty() && !end {
147 // writing 0 byte to a chunked encoding h1 would finish the stream
148 // writing 0 bytes to h2 is noop
149 // we don't want to actually write in either cases
150 return Ok(());
151 }
152 match self {
153 Self::H1(s) => {
154 s.write_body(&data).await?;
155 Ok(())
156 }
157 Self::H2(s) => s.write_body(data, end).await,
158 }
159 }
160
161 /// Write the response trailers to client
162 pub async fn write_response_trailers(&mut self, trailers: HeaderMap) -> Result<()> {
163 match self {
164 Self::H1(_) => Ok(()), // TODO: support trailers for h1
165 Self::H2(s) => s.write_trailers(trailers),
166 }
167 }
168
169 /// Finish the life of this request.
170 /// For H1, if connection reuse is supported, a Some(Stream) will be returned, otherwise None.
171 /// For H2, always return None because H2 stream is not reusable.
172 pub async fn finish(self) -> Result<Option<Stream>> {
173 match self {
174 Self::H1(mut s) => {
175 // need to flush body due to buffering
176 s.finish_body().await?;
177 s.reuse().await
178 }
179 Self::H2(mut s) => {
180 s.finish()?;
181 Ok(None)
182 }
183 }
184 }
185
186 pub async fn response_duplex_vec(&mut self, tasks: Vec<HttpTask>) -> Result<bool> {
187 match self {
188 Self::H1(s) => s.response_duplex_vec(tasks).await,
189 Self::H2(s) => s.response_duplex_vec(tasks).await,
190 }
191 }
192
193 /// Set connection reuse. `duration` defines how long the connection is kept open for the next
194 /// request to reuse. Noop for h2
195 pub fn set_keepalive(&mut self, duration: Option<u64>) {
196 match self {
197 Self::H1(s) => s.set_server_keepalive(duration),
198 Self::H2(_) => {}
199 }
200 }
201
202 /// Sets the downstream read timeout. This will trigger if we're unable
203 /// to read from the stream after `timeout`.
204 ///
205 /// This is a noop for h2.
206 pub fn set_read_timeout(&mut self, timeout: Duration) {
207 match self {
208 Self::H1(s) => s.set_read_timeout(timeout),
209 Self::H2(_) => {}
210 }
211 }
212
213 /// Sets the downstream write timeout. This will trigger if we're unable
214 /// to write to the stream after `timeout`. If a `min_send_rate` is
215 /// configured then the `min_send_rate` calculated timeout has higher priority.
216 ///
217 /// This is a noop for h2.
218 pub fn set_write_timeout(&mut self, timeout: Duration) {
219 match self {
220 Self::H1(s) => s.set_write_timeout(timeout),
221 Self::H2(_) => {}
222 }
223 }
224
225 /// Sets the total drain timeout, which will be applied while discarding the
226 /// request body using `drain_request_body`.
227 ///
228 /// For HTTP/1.1, reusing a session requires ensuring that the request body
229 /// is consumed. If the timeout is exceeded, the caller should give up on
230 /// trying to reuse the session.
231 pub fn set_total_drain_timeout(&mut self, timeout: Duration) {
232 match self {
233 Self::H1(s) => s.set_total_drain_timeout(timeout),
234 Self::H2(s) => s.set_total_drain_timeout(timeout),
235 }
236 }
237
238 /// Sets the minimum downstream send rate in bytes per second. This
239 /// is used to calculate a write timeout in seconds based on the size
240 /// of the buffer being written. If a `min_send_rate` is configured it
241 /// has higher priority over a set `write_timeout`. The minimum send
242 /// rate must be greater than zero.
243 ///
244 /// Calculated write timeout is guaranteed to be at least 1s if `min_send_rate`
245 /// is greater than zero, a send rate of zero is a noop.
246 ///
247 /// This is a noop for h2.
248 pub fn set_min_send_rate(&mut self, rate: usize) {
249 match self {
250 Self::H1(s) => s.set_min_send_rate(rate),
251 Self::H2(_) => {}
252 }
253 }
254
255 /// Sets whether we ignore writing informational responses downstream.
256 ///
257 /// For HTTP/1.1 this is a noop if the response is Upgrade or Continue and
258 /// Expect: 100-continue was set on the request.
259 ///
260 /// This is a noop for h2 because informational responses are always ignored.
261 pub fn set_ignore_info_resp(&mut self, ignore: bool) {
262 match self {
263 Self::H1(s) => s.set_ignore_info_resp(ignore),
264 Self::H2(_) => {} // always ignored
265 }
266 }
267
268 /// Return a digest of the request including the method, path and Host header
269 // TODO: make this use a `Formatter`
270 pub fn request_summary(&self) -> String {
271 match self {
272 Self::H1(s) => s.request_summary(),
273 Self::H2(s) => s.request_summary(),
274 }
275 }
276
277 /// Return the written response header. `None` if it is not written yet.
278 /// Only the final (status code >= 200 or 101) response header will be returned
279 pub fn response_written(&self) -> Option<&ResponseHeader> {
280 match self {
281 Self::H1(s) => s.response_written(),
282 Self::H2(s) => s.response_written(),
283 }
284 }
285
286 /// Give up the http session abruptly.
287 /// For H1 this will close the underlying connection
288 /// For H2 this will send RESET frame to end this stream without impacting the connection
289 pub async fn shutdown(&mut self) {
290 match self {
291 Self::H1(s) => s.shutdown().await,
292 Self::H2(s) => s.shutdown(),
293 }
294 }
295
296 pub fn to_h1_raw(&self) -> Bytes {
297 match self {
298 Self::H1(s) => s.get_headers_raw_bytes(),
299 Self::H2(s) => s.pseudo_raw_h1_request_header(),
300 }
301 }
302
303 /// Whether the whole request body is sent
304 pub fn is_body_done(&mut self) -> bool {
305 match self {
306 Self::H1(s) => s.is_body_done(),
307 Self::H2(s) => s.is_body_done(),
308 }
309 }
310
311 /// Notify the client that the entire body is sent
312 /// for H1 chunked encoding, this will end the last empty chunk
313 /// for H1 content-length, this has no effect.
314 /// for H2, this will send an empty DATA frame with END_STREAM flag
315 pub async fn finish_body(&mut self) -> Result<()> {
316 match self {
317 Self::H1(s) => s.finish_body().await.map(|_| ()),
318 Self::H2(s) => s.finish(),
319 }
320 }
321
322 pub fn generate_error(error: u16) -> ResponseHeader {
323 match error {
324 /* common error responses are pre-generated */
325 502 => error_resp::HTTP_502_RESPONSE.clone(),
326 400 => error_resp::HTTP_400_RESPONSE.clone(),
327 _ => error_resp::gen_error_response(error),
328 }
329 }
330
331 /// Send error response to client using a pre-generated error message.
332 pub async fn respond_error(&mut self, error: u16) -> Result<()> {
333 self.respond_error_with_body(error, Bytes::default()).await
334 }
335
336 /// Send error response to client using a pre-generated error message and custom body.
337 pub async fn respond_error_with_body(&mut self, error: u16, body: Bytes) -> Result<()> {
338 let mut resp = Self::generate_error(error);
339 if !body.is_empty() {
340 // error responses have a default content-length of zero
341 resp.set_content_length(body.len())?
342 }
343 self.write_error_response(resp, body).await
344 }
345
346 /// Send an error response to a client with a response header and body.
347 pub async fn write_error_response(&mut self, resp: ResponseHeader, body: Bytes) -> Result<()> {
348 // TODO: we shouldn't be closing downstream connections on internally generated errors
349 // and possibly other upstream connect() errors (connection refused, timeout, etc)
350 //
351 // This change is only here because we DO NOT re-use downstream connections
352 // today on these errors and we should signal to the client that pingora is dropping it
353 // rather than a misleading the client with 'keep-alive'
354 self.set_keepalive(None);
355
356 // If a response was already written and it's not informational 1xx, return.
357 // The only exception is an informational 101 Switching Protocols, which is treated
358 // as final response https://www.rfc-editor.org/rfc/rfc9110#section-15.2.2.
359 if let Some(resp_written) = self.response_written().as_ref() {
360 if !resp_written.status.is_informational() || resp_written.status == 101 {
361 return Ok(());
362 }
363 }
364
365 self.write_response_header(Box::new(resp)).await?;
366
367 if !body.is_empty() {
368 self.write_response_body(body, true).await?;
369 } else {
370 self.finish_body().await?;
371 }
372
373 Ok(())
374 }
375
376 /// Whether there is no request body
377 pub fn is_body_empty(&mut self) -> bool {
378 match self {
379 Self::H1(s) => s.is_body_empty(),
380 Self::H2(s) => s.is_body_empty(),
381 }
382 }
383
384 pub fn retry_buffer_truncated(&self) -> bool {
385 match self {
386 Self::H1(s) => s.retry_buffer_truncated(),
387 Self::H2(s) => s.retry_buffer_truncated(),
388 }
389 }
390
391 pub fn enable_retry_buffering(&mut self) {
392 match self {
393 Self::H1(s) => s.enable_retry_buffering(),
394 Self::H2(s) => s.enable_retry_buffering(),
395 }
396 }
397
398 pub fn get_retry_buffer(&self) -> Option<Bytes> {
399 match self {
400 Self::H1(s) => s.get_retry_buffer(),
401 Self::H2(s) => s.get_retry_buffer(),
402 }
403 }
404
405 /// Read body (same as `read_request_body()`) or pending forever until downstream
406 /// terminates the session.
407 pub async fn read_body_or_idle(&mut self, no_body_expected: bool) -> Result<Option<Bytes>> {
408 match self {
409 Self::H1(s) => s.read_body_or_idle(no_body_expected).await,
410 Self::H2(s) => s.read_body_or_idle(no_body_expected).await,
411 }
412 }
413
414 pub fn as_http1(&self) -> Option<&SessionV1> {
415 match self {
416 Self::H1(s) => Some(s),
417 Self::H2(_) => None,
418 }
419 }
420
421 pub fn as_http2(&self) -> Option<&SessionV2> {
422 match self {
423 Self::H1(_) => None,
424 Self::H2(s) => Some(s),
425 }
426 }
427
428 /// Write a 100 Continue response to the client.
429 pub async fn write_continue_response(&mut self) -> Result<()> {
430 match self {
431 Self::H1(s) => s.write_continue_response().await,
432 Self::H2(s) => s.write_response_header(
433 Box::new(ResponseHeader::build(100, Some(0)).unwrap()),
434 false,
435 ),
436 }
437 }
438
439 /// Whether this request is for upgrade (e.g., websocket)
440 pub fn is_upgrade_req(&self) -> bool {
441 match self {
442 Self::H1(s) => s.is_upgrade_req(),
443 Self::H2(_) => false,
444 }
445 }
446
447 /// Return how many response body bytes (application, not wire) already sent downstream
448 pub fn body_bytes_sent(&self) -> usize {
449 match self {
450 Self::H1(s) => s.body_bytes_sent(),
451 Self::H2(s) => s.body_bytes_sent(),
452 }
453 }
454
455 /// Return how many request body bytes (application, not wire) already read from downstream
456 pub fn body_bytes_read(&self) -> usize {
457 match self {
458 Self::H1(s) => s.body_bytes_read(),
459 Self::H2(s) => s.body_bytes_read(),
460 }
461 }
462
463 /// Return the [Digest] for the connection.
464 pub fn digest(&self) -> Option<&Digest> {
465 match self {
466 Self::H1(s) => Some(s.digest()),
467 Self::H2(s) => s.digest(),
468 }
469 }
470
471 /// Return a mutable [Digest] reference for the connection.
472 ///
473 /// Will return `None` if multiple H2 streams are open.
474 pub fn digest_mut(&mut self) -> Option<&mut Digest> {
475 match self {
476 Self::H1(s) => Some(s.digest_mut()),
477 Self::H2(s) => s.digest_mut(),
478 }
479 }
480
481 /// Return the client (peer) address of the connection.
482 pub fn client_addr(&self) -> Option<&SocketAddr> {
483 match self {
484 Self::H1(s) => s.client_addr(),
485 Self::H2(s) => s.client_addr(),
486 }
487 }
488
489 /// Return the server (local) address of the connection.
490 pub fn server_addr(&self) -> Option<&SocketAddr> {
491 match self {
492 Self::H1(s) => s.server_addr(),
493 Self::H2(s) => s.server_addr(),
494 }
495 }
496
497 /// Get the reference of the [Stream] that this HTTP/1 session is operating upon.
498 /// None if the HTTP session is over H2
499 pub fn stream(&self) -> Option<&Stream> {
500 match self {
501 Self::H1(s) => Some(s.stream()),
502 Self::H2(_) => None,
503 }
504 }
505}