pingora_core/protocols/http/server.rs
1// Copyright 2024 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! HTTP server session APIs
16
17use super::error_resp;
18use super::v1::server::HttpSession as SessionV1;
19use super::v2::server::HttpSession as SessionV2;
20use super::HttpTask;
21use crate::protocols::{Digest, SocketAddr, Stream};
22use bytes::Bytes;
23use http::HeaderValue;
24use http::{header::AsHeaderName, HeaderMap};
25use log::error;
26use pingora_error::Result;
27use pingora_http::{RequestHeader, ResponseHeader};
28use std::time::Duration;
29
30/// HTTP server session object for both HTTP/1.x and HTTP/2
31pub enum Session {
32 H1(SessionV1),
33 H2(SessionV2),
34}
35
36impl Session {
37 /// Create a new [`Session`] from an established connection for HTTP/1.x
38 pub fn new_http1(stream: Stream) -> Self {
39 Self::H1(SessionV1::new(stream))
40 }
41
42 /// Create a new [`Session`] from an established HTTP/2 stream
43 pub fn new_http2(session: SessionV2) -> Self {
44 Self::H2(session)
45 }
46
47 /// Whether the session is HTTP/2. If not it is HTTP/1.x
48 pub fn is_http2(&self) -> bool {
49 matches!(self, Self::H2(_))
50 }
51
52 /// Read the request header. This method is required to be called first before doing anything
53 /// else with the session.
54 /// - `Ok(true)`: successful
55 /// - `Ok(false)`: client exit without sending any bytes. This is normal on reused connection.
56 /// In this case the user should give up this session.
57 pub async fn read_request(&mut self) -> Result<bool> {
58 match self {
59 Self::H1(s) => {
60 let read = s.read_request().await?;
61 Ok(read.is_some())
62 }
63 // This call will always return `Ok(true)` for Http2 because the request is already read
64 Self::H2(_) => Ok(true),
65 }
66 }
67
68 /// Return the request header it just read.
69 /// # Panic
70 /// This function will panic if [`Self::read_request()`] is not called.
71 pub fn req_header(&self) -> &RequestHeader {
72 match self {
73 Self::H1(s) => s.req_header(),
74 Self::H2(s) => s.req_header(),
75 }
76 }
77
78 /// Return a mutable reference to request header it just read.
79 /// # Panic
80 /// This function will panic if [`Self::read_request()`] is not called.
81 pub fn req_header_mut(&mut self) -> &mut RequestHeader {
82 match self {
83 Self::H1(s) => s.req_header_mut(),
84 Self::H2(s) => s.req_header_mut(),
85 }
86 }
87
88 /// Return the header by name. None if the header doesn't exist.
89 ///
90 /// In case there are multiple headers under the same name, the first one will be returned. To
91 /// get all the headers: use `self.req_header().headers.get_all()`.
92 pub fn get_header<K: AsHeaderName>(&self, key: K) -> Option<&HeaderValue> {
93 self.req_header().headers.get(key)
94 }
95
96 /// Get the header value in its raw format.
97 /// If the header doesn't exist, return an empty slice.
98 pub fn get_header_bytes<K: AsHeaderName>(&self, key: K) -> &[u8] {
99 self.get_header(key).map_or(b"", |v| v.as_bytes())
100 }
101
102 /// Read the request body. Ok(None) if no (more) body to read
103 pub async fn read_request_body(&mut self) -> Result<Option<Bytes>> {
104 match self {
105 Self::H1(s) => s.read_body_bytes().await,
106 Self::H2(s) => s.read_body_bytes().await,
107 }
108 }
109
110 /// Write the response header to client
111 /// Informational headers (status code 100-199, excluding 101) can be written multiple times the final
112 /// response header (status code 200+ or 101) is written.
113 pub async fn write_response_header(&mut self, resp: Box<ResponseHeader>) -> Result<()> {
114 match self {
115 Self::H1(s) => {
116 s.write_response_header(resp).await?;
117 Ok(())
118 }
119 Self::H2(s) => s.write_response_header(resp, false),
120 }
121 }
122
123 /// Similar to `write_response_header()`, this fn will clone the `resp` internally
124 pub async fn write_response_header_ref(&mut self, resp: &ResponseHeader) -> Result<()> {
125 match self {
126 Self::H1(s) => {
127 s.write_response_header_ref(resp).await?;
128 Ok(())
129 }
130 Self::H2(s) => s.write_response_header_ref(resp, false),
131 }
132 }
133
134 /// Write the response body to client
135 pub async fn write_response_body(&mut self, data: Bytes, end: bool) -> Result<()> {
136 if data.is_empty() && !end {
137 // writing 0 byte to a chunked encoding h1 would finish the stream
138 // writing 0 bytes to h2 is noop
139 // we don't want to actually write in either cases
140 return Ok(());
141 }
142 match self {
143 Self::H1(s) => {
144 s.write_body(&data).await?;
145 Ok(())
146 }
147 Self::H2(s) => s.write_body(data, end),
148 }
149 }
150
151 /// Write the response trailers to client
152 pub async fn write_response_trailers(&mut self, trailers: HeaderMap) -> Result<()> {
153 match self {
154 Self::H1(_) => Ok(()), // TODO: support trailers for h1
155 Self::H2(s) => s.write_trailers(trailers),
156 }
157 }
158
159 /// Finish the life of this request.
160 /// For H1, if connection reuse is supported, a Some(Stream) will be returned, otherwise None.
161 /// For H2, always return None because H2 stream is not reusable.
162 pub async fn finish(self) -> Result<Option<Stream>> {
163 match self {
164 Self::H1(mut s) => {
165 // need to flush body due to buffering
166 s.finish_body().await?;
167 Ok(s.reuse().await)
168 }
169 Self::H2(mut s) => {
170 s.finish()?;
171 Ok(None)
172 }
173 }
174 }
175
176 pub async fn response_duplex_vec(&mut self, tasks: Vec<HttpTask>) -> Result<bool> {
177 match self {
178 Self::H1(s) => s.response_duplex_vec(tasks).await,
179 Self::H2(s) => s.response_duplex_vec(tasks),
180 }
181 }
182
183 /// Set connection reuse. `duration` defines how long the connection is kept open for the next
184 /// request to reuse. Noop for h2
185 pub fn set_keepalive(&mut self, duration: Option<u64>) {
186 match self {
187 Self::H1(s) => s.set_server_keepalive(duration),
188 Self::H2(_) => {}
189 }
190 }
191
192 /// Sets the downstream write timeout. This will trigger if we're unable
193 /// to write to the stream after `duration`. If a `min_send_rate` is
194 /// configured then the `min_send_rate` calculated timeout has higher priority.
195 ///
196 /// This is a noop for h2.
197 pub fn set_write_timeout(&mut self, timeout: Duration) {
198 match self {
199 Self::H1(s) => s.set_write_timeout(timeout),
200 Self::H2(_) => {}
201 }
202 }
203
204 /// Sets the minimum downstream send rate in bytes per second. This
205 /// is used to calculate a write timeout in seconds based on the size
206 /// of the buffer being written. If a `min_send_rate` is configured it
207 /// has higher priority over a set `write_timeout`. The minimum send
208 /// rate must be greater than zero.
209 ///
210 /// Calculated write timeout is guaranteed to be at least 1s if `min_send_rate`
211 /// is greater than zero, a send rate of zero is a noop.
212 ///
213 /// This is a noop for h2.
214 pub fn set_min_send_rate(&mut self, rate: usize) {
215 match self {
216 Self::H1(s) => s.set_min_send_rate(rate),
217 Self::H2(_) => {}
218 }
219 }
220
221 /// Sets whether we ignore writing informational responses downstream.
222 ///
223 /// For HTTP/1.1 this is a noop if the response is Upgrade or Continue and
224 /// Expect: 100-continue was set on the request.
225 ///
226 /// This is a noop for h2 because informational responses are always ignored.
227 pub fn set_ignore_info_resp(&mut self, ignore: bool) {
228 match self {
229 Self::H1(s) => s.set_ignore_info_resp(ignore),
230 Self::H2(_) => {} // always ignored
231 }
232 }
233
234 /// Return a digest of the request including the method, path and Host header
235 // TODO: make this use a `Formatter`
236 pub fn request_summary(&self) -> String {
237 match self {
238 Self::H1(s) => s.request_summary(),
239 Self::H2(s) => s.request_summary(),
240 }
241 }
242
243 /// Return the written response header. `None` if it is not written yet.
244 /// Only the final (status code >= 200 or 101) response header will be returned
245 pub fn response_written(&self) -> Option<&ResponseHeader> {
246 match self {
247 Self::H1(s) => s.response_written(),
248 Self::H2(s) => s.response_written(),
249 }
250 }
251
252 /// Give up the http session abruptly.
253 /// For H1 this will close the underlying connection
254 /// For H2 this will send RESET frame to end this stream without impacting the connection
255 pub async fn shutdown(&mut self) {
256 match self {
257 Self::H1(s) => s.shutdown().await,
258 Self::H2(s) => s.shutdown(),
259 }
260 }
261
262 pub fn to_h1_raw(&self) -> Bytes {
263 match self {
264 Self::H1(s) => s.get_headers_raw_bytes(),
265 Self::H2(s) => s.pseudo_raw_h1_request_header(),
266 }
267 }
268
269 /// Whether the whole request body is sent
270 pub fn is_body_done(&mut self) -> bool {
271 match self {
272 Self::H1(s) => s.is_body_done(),
273 Self::H2(s) => s.is_body_done(),
274 }
275 }
276
277 /// Notify the client that the entire body is sent
278 /// for H1 chunked encoding, this will end the last empty chunk
279 /// for H1 content-length, this has no effect.
280 /// for H2, this will send an empty DATA frame with END_STREAM flag
281 pub async fn finish_body(&mut self) -> Result<()> {
282 match self {
283 Self::H1(s) => s.finish_body().await.map(|_| ()),
284 Self::H2(s) => s.finish(),
285 }
286 }
287
288 pub fn generate_error(error: u16) -> ResponseHeader {
289 match error {
290 /* common error responses are pre-generated */
291 502 => error_resp::HTTP_502_RESPONSE.clone(),
292 400 => error_resp::HTTP_400_RESPONSE.clone(),
293 _ => error_resp::gen_error_response(error),
294 }
295 }
296
297 /// Send error response to client
298 pub async fn respond_error(&mut self, error: u16) {
299 let resp = Self::generate_error(error);
300
301 // TODO: we shouldn't be closing downstream connections on internally generated errors
302 // and possibly other upstream connect() errors (connection refused, timeout, etc)
303 //
304 // This change is only here because we DO NOT re-use downstream connections
305 // today on these errors and we should signal to the client that pingora is dropping it
306 // rather than a misleading the client with 'keep-alive'
307 self.set_keepalive(None);
308
309 self.write_response_header(Box::new(resp))
310 .await
311 .unwrap_or_else(|e| {
312 error!("failed to send error response to downstream: {e}");
313 });
314 }
315
316 /// Whether there is no request body
317 pub fn is_body_empty(&mut self) -> bool {
318 match self {
319 Self::H1(s) => s.is_body_empty(),
320 Self::H2(s) => s.is_body_empty(),
321 }
322 }
323
324 pub fn retry_buffer_truncated(&self) -> bool {
325 match self {
326 Self::H1(s) => s.retry_buffer_truncated(),
327 Self::H2(s) => s.retry_buffer_truncated(),
328 }
329 }
330
331 pub fn enable_retry_buffering(&mut self) {
332 match self {
333 Self::H1(s) => s.enable_retry_buffering(),
334 Self::H2(s) => s.enable_retry_buffering(),
335 }
336 }
337
338 pub fn get_retry_buffer(&self) -> Option<Bytes> {
339 match self {
340 Self::H1(s) => s.get_retry_buffer(),
341 Self::H2(s) => s.get_retry_buffer(),
342 }
343 }
344
345 /// Read body (same as `read_request_body()`) or pending forever until downstream
346 /// terminates the session.
347 pub async fn read_body_or_idle(&mut self, no_body_expected: bool) -> Result<Option<Bytes>> {
348 match self {
349 Self::H1(s) => s.read_body_or_idle(no_body_expected).await,
350 Self::H2(s) => s.read_body_or_idle(no_body_expected).await,
351 }
352 }
353
354 pub fn as_http1(&self) -> Option<&SessionV1> {
355 match self {
356 Self::H1(s) => Some(s),
357 Self::H2(_) => None,
358 }
359 }
360
361 pub fn as_http2(&self) -> Option<&SessionV2> {
362 match self {
363 Self::H1(_) => None,
364 Self::H2(s) => Some(s),
365 }
366 }
367
368 /// Write a 100 Continue response to the client.
369 pub async fn write_continue_response(&mut self) -> Result<()> {
370 match self {
371 Self::H1(s) => s.write_continue_response().await,
372 Self::H2(s) => s.write_response_header(
373 Box::new(ResponseHeader::build(100, Some(0)).unwrap()),
374 false,
375 ),
376 }
377 }
378
379 /// Whether this request is for upgrade (e.g., websocket)
380 pub fn is_upgrade_req(&self) -> bool {
381 match self {
382 Self::H1(s) => s.is_upgrade_req(),
383 Self::H2(_) => false,
384 }
385 }
386
387 /// Return how many response body bytes (application, not wire) already sent downstream
388 pub fn body_bytes_sent(&self) -> usize {
389 match self {
390 Self::H1(s) => s.body_bytes_sent(),
391 Self::H2(s) => s.body_bytes_sent(),
392 }
393 }
394
395 /// Return how many request body bytes (application, not wire) already read from downstream
396 pub fn body_bytes_read(&self) -> usize {
397 match self {
398 Self::H1(s) => s.body_bytes_read(),
399 Self::H2(s) => s.body_bytes_read(),
400 }
401 }
402
403 /// Return the [Digest] for the connection.
404 pub fn digest(&self) -> Option<&Digest> {
405 match self {
406 Self::H1(s) => Some(s.digest()),
407 Self::H2(s) => s.digest(),
408 }
409 }
410
411 /// Return a mutable [Digest] reference for the connection.
412 ///
413 /// Will return `None` if multiple H2 streams are open.
414 pub fn digest_mut(&mut self) -> Option<&mut Digest> {
415 match self {
416 Self::H1(s) => Some(s.digest_mut()),
417 Self::H2(s) => s.digest_mut(),
418 }
419 }
420
421 /// Return the client (peer) address of the connection.
422 pub fn client_addr(&self) -> Option<&SocketAddr> {
423 match self {
424 Self::H1(s) => s.client_addr(),
425 Self::H2(s) => s.client_addr(),
426 }
427 }
428
429 /// Return the server (local) address of the connection.
430 pub fn server_addr(&self) -> Option<&SocketAddr> {
431 match self {
432 Self::H1(s) => s.server_addr(),
433 Self::H2(s) => s.server_addr(),
434 }
435 }
436
437 /// Get the reference of the [Stream] that this HTTP/1 session is operating upon.
438 /// None if the HTTP session is over H2
439 pub fn stream(&self) -> Option<&Stream> {
440 match self {
441 Self::H1(s) => Some(s.stream()),
442 Self::H2(_) => None,
443 }
444 }
445}