Skip to main content

pingora_core/protocols/http/
server.rs

1// Copyright 2026 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! HTTP server session APIs
16
17use super::custom::server::Session as SessionCustom;
18use super::error_resp;
19use super::subrequest::server::HttpSession as SessionSubrequest;
20use super::v1::server::HttpSession as SessionV1;
21use super::v2::server::HttpSession as SessionV2;
22use super::HttpTask;
23use crate::custom_session;
24use crate::protocols::{Digest, SocketAddr, Stream};
25use bytes::Bytes;
26use http::HeaderValue;
27use http::{header::AsHeaderName, HeaderMap};
28use pingora_error::{Error, Result};
29use pingora_http::{RequestHeader, ResponseHeader};
30use std::time::Duration;
31
32/// HTTP server session object for both HTTP/1.x and HTTP/2
33pub enum Session {
34    H1(SessionV1),
35    H2(SessionV2),
36    Subrequest(SessionSubrequest),
37    Custom(Box<dyn SessionCustom>),
38}
39
40impl Session {
41    /// Create a new [`Session`] from an established connection for HTTP/1.x
42    pub fn new_http1(stream: Stream) -> Self {
43        Self::H1(SessionV1::new(stream))
44    }
45
46    /// Create a new [`Session`] from an established HTTP/2 stream
47    pub fn new_http2(session: SessionV2) -> Self {
48        Self::H2(session)
49    }
50
51    /// Create a new [`Session`] from a subrequest session
52    pub fn new_subrequest(session: SessionSubrequest) -> Self {
53        Self::Subrequest(session)
54    }
55
56    /// Create a new [`Session`] from a custom session
57    pub fn new_custom(session: Box<dyn SessionCustom>) -> Self {
58        Self::Custom(session)
59    }
60
61    /// Whether the session is HTTP/2. If not it is HTTP/1.x
62    pub fn is_http2(&self) -> bool {
63        matches!(self, Self::H2(_))
64    }
65
66    /// Whether the session is for a subrequest.
67    pub fn is_subrequest(&self) -> bool {
68        matches!(self, Self::Subrequest(_))
69    }
70
71    /// Whether the session is Custom
72    pub fn is_custom(&self) -> bool {
73        matches!(self, Self::Custom(_))
74    }
75
76    /// Read the request header. This method is required to be called first before doing anything
77    /// else with the session.
78    /// - `Ok(true)`: successful
79    /// - `Ok(false)`: client exit without sending any bytes. This is normal on reused connection.
80    ///   In this case the user should give up this session.
81    pub async fn read_request(&mut self) -> Result<bool> {
82        match self {
83            Self::H1(s) => {
84                let read = s.read_request().await?;
85                Ok(read.is_some())
86            }
87            // This call will always return `Ok(true)` for Http2 because the request is already read
88            Self::H2(_) => Ok(true),
89            Self::Subrequest(s) => {
90                let read = s.read_request().await?;
91                Ok(read.is_some())
92            }
93            Self::Custom(_) => Ok(true),
94        }
95    }
96
97    /// Return the request header it just read.
98    /// # Panic
99    /// This function will panic if [`Self::read_request()`] is not called.
100    pub fn req_header(&self) -> &RequestHeader {
101        match self {
102            Self::H1(s) => s.req_header(),
103            Self::H2(s) => s.req_header(),
104            Self::Subrequest(s) => s.req_header(),
105            Self::Custom(s) => s.req_header(),
106        }
107    }
108
109    /// Return a mutable reference to request header it just read.
110    /// # Panic
111    /// This function will panic if [`Self::read_request()`] is not called.
112    pub fn req_header_mut(&mut self) -> &mut RequestHeader {
113        match self {
114            Self::H1(s) => s.req_header_mut(),
115            Self::H2(s) => s.req_header_mut(),
116            Self::Subrequest(s) => s.req_header_mut(),
117            Self::Custom(s) => s.req_header_mut(),
118        }
119    }
120
121    /// Return the header by name. None if the header doesn't exist.
122    ///
123    /// In case there are multiple headers under the same name, the first one will be returned. To
124    /// get all the headers: use `self.req_header().headers.get_all()`.
125    pub fn get_header<K: AsHeaderName>(&self, key: K) -> Option<&HeaderValue> {
126        self.req_header().headers.get(key)
127    }
128
129    /// Get the header value in its raw format.
130    /// If the header doesn't exist, return an empty slice.
131    pub fn get_header_bytes<K: AsHeaderName>(&self, key: K) -> &[u8] {
132        self.get_header(key).map_or(b"", |v| v.as_bytes())
133    }
134
135    /// Read the request body. Ok(None) if no (more) body to read
136    pub async fn read_request_body(&mut self) -> Result<Option<Bytes>> {
137        match self {
138            Self::H1(s) => s.read_body_bytes().await,
139            Self::H2(s) => s.read_body_bytes().await,
140            Self::Subrequest(s) => s.read_body_bytes().await,
141            Self::Custom(s) => s.read_body_bytes().await,
142        }
143    }
144
145    /// Discard the request body by reading it until completion.
146    ///
147    /// This is useful for making streams reusable (in particular for HTTP/1.1) after returning an
148    /// error before the whole body has been read.
149    pub async fn drain_request_body(&mut self) -> Result<()> {
150        match self {
151            Self::H1(s) => s.drain_request_body().await,
152            Self::H2(s) => s.drain_request_body().await,
153            Self::Subrequest(s) => s.drain_request_body().await,
154            Self::Custom(s) => s.drain_request_body().await,
155        }
156    }
157
158    /// Write the response header to client
159    /// Informational headers (status code 100-199, excluding 101) can be written multiple times the final
160    /// response header (status code 200+ or 101) is written.
161    pub async fn write_response_header(&mut self, resp: Box<ResponseHeader>) -> Result<()> {
162        match self {
163            Self::H1(s) => {
164                s.write_response_header(resp).await?;
165                Ok(())
166            }
167            Self::H2(s) => s.write_response_header(resp, false),
168            Self::Subrequest(s) => {
169                s.write_response_header(resp).await?;
170                Ok(())
171            }
172            Self::Custom(s) => s.write_response_header(resp, false).await,
173        }
174    }
175
176    /// Similar to `write_response_header()`, this fn will clone the `resp` internally
177    pub async fn write_response_header_ref(&mut self, resp: &ResponseHeader) -> Result<()> {
178        match self {
179            Self::H1(s) => {
180                s.write_response_header_ref(resp).await?;
181                Ok(())
182            }
183            Self::H2(s) => s.write_response_header_ref(resp, false),
184            Self::Subrequest(s) => {
185                s.write_response_header_ref(resp).await?;
186                Ok(())
187            }
188            Self::Custom(s) => s.write_response_header_ref(resp, false).await,
189        }
190    }
191
192    /// Write the response body to client
193    pub async fn write_response_body(&mut self, data: Bytes, end: bool) -> Result<()> {
194        if data.is_empty() && !end {
195            // writing 0 byte to a chunked encoding h1 would finish the stream
196            // writing 0 bytes to h2 is noop
197            // we don't want to actually write in either cases
198            return Ok(());
199        }
200        match self {
201            Self::H1(s) => {
202                if !data.is_empty() {
203                    s.write_body(&data).await?;
204                }
205                if end {
206                    s.finish_body().await?;
207                }
208                Ok(())
209            }
210            Self::H2(s) => s.write_body(data, end).await,
211            Self::Subrequest(s) => {
212                s.write_body(data).await?;
213                Ok(())
214            }
215            Self::Custom(s) => s.write_body(data, end).await,
216        }
217    }
218
219    /// Write the response trailers to client
220    pub async fn write_response_trailers(&mut self, trailers: HeaderMap) -> Result<()> {
221        match self {
222            Self::H1(_) => Ok(()), // TODO: support trailers for h1
223            Self::H2(s) => s.write_trailers(trailers),
224            Self::Subrequest(s) => s.write_trailers(Some(Box::new(trailers))).await,
225            Self::Custom(s) => s.write_trailers(trailers).await,
226        }
227    }
228
229    /// Finish the life of this request.
230    /// For H1, if connection reuse is supported, a Some(Stream) will be returned, otherwise None.
231    /// For H2, always return None because H2 stream is not reusable.
232    /// For subrequests, there is no true underlying stream to return.
233    pub async fn finish(self) -> Result<Option<Stream>> {
234        match self {
235            Self::H1(mut s) => {
236                // need to flush body due to buffering
237                s.finish_body().await?;
238                s.reuse().await
239            }
240            Self::H2(mut s) => {
241                s.finish()?;
242                Ok(None)
243            }
244            Self::Subrequest(mut s) => {
245                s.finish().await?;
246                Ok(None)
247            }
248            Self::Custom(mut s) => {
249                s.finish().await?;
250                Ok(None)
251            }
252        }
253    }
254
255    /// Callback for cleanup logic on downstream specifically when we fail to proxy the session
256    /// other than cleanup via finish().
257    ///
258    /// If caching the downstream failure may be independent of (and precede) an upstream error in
259    /// which case this function may be called more than once.
260    pub fn on_proxy_failure(&mut self, e: Box<Error>) {
261        match self {
262            Self::H1(_) | Self::H2(_) | Self::Custom(_) => {
263                // all cleanup logic handled in finish(),
264                // stream and resources dropped when session dropped
265            }
266            Self::Subrequest(ref mut s) => s.on_proxy_failure(e),
267        }
268    }
269
270    pub async fn response_duplex_vec(&mut self, tasks: Vec<HttpTask>) -> Result<bool> {
271        match self {
272            Self::H1(s) => s.response_duplex_vec(tasks).await,
273            Self::H2(s) => s.response_duplex_vec(tasks).await,
274            Self::Subrequest(s) => s.response_duplex_vec(tasks).await,
275            Self::Custom(s) => s.response_duplex_vec(tasks).await,
276        }
277    }
278
279    /// Set connection reuse. `duration` defines how long the connection is kept open for the next
280    /// request to reuse. Noop for h2 and subrequest
281    pub fn set_keepalive(&mut self, duration: Option<u64>) {
282        match self {
283            Self::H1(s) => s.set_server_keepalive(duration),
284            Self::H2(_) => {}
285            Self::Subrequest(_) => {}
286            Self::Custom(_) => {}
287        }
288    }
289
290    /// Get the keepalive timeout. None if keepalive is disabled. Not applicable for h2 or
291    /// subrequest
292    pub fn get_keepalive(&self) -> Option<u64> {
293        match self {
294            Self::H1(s) => s.get_keepalive_timeout(),
295            Self::H2(_) => None,
296            Self::Subrequest(_) => None,
297            Self::Custom(_) => None,
298        }
299    }
300
301    /// Set the number of times the upstream connection connection for this
302    /// session can be reused via keepalive. Noop for h2 and subrequest
303    pub fn set_keepalive_reuses_remaining(&mut self, reuses: Option<u32>) {
304        if let Self::H1(s) = self {
305            s.set_keepalive_reuses_remaining(reuses);
306        }
307    }
308
309    /// Get the number of times the upstream connection connection for this
310    /// session can be reused via keepalive. Not applicable for h2 or
311    /// subrequest
312    pub fn get_keepalive_reuses_remaining(&self) -> Option<u32> {
313        if let Self::H1(s) = self {
314            s.get_keepalive_reuses_remaining()
315        } else {
316            None
317        }
318    }
319
320    /// Sets the downstream read timeout. This will trigger if we're unable
321    /// to read from the stream after `timeout`.
322    ///
323    /// This is a noop for h2.
324    pub fn set_read_timeout(&mut self, timeout: Option<Duration>) {
325        match self {
326            Self::H1(s) => s.set_read_timeout(timeout),
327            Self::H2(_) => {}
328            Self::Subrequest(s) => s.set_read_timeout(timeout),
329            Self::Custom(c) => c.set_read_timeout(timeout),
330        }
331    }
332
333    /// Gets the downstream read timeout if set.
334    pub fn get_read_timeout(&self) -> Option<Duration> {
335        match self {
336            Self::H1(s) => s.get_read_timeout(),
337            Self::H2(_) => None,
338            Self::Subrequest(s) => s.get_read_timeout(),
339            Self::Custom(s) => s.get_read_timeout(),
340        }
341    }
342
343    /// Sets the downstream write timeout. This will trigger if we're unable
344    /// to write to the stream after `timeout`. If a `min_send_rate` is
345    /// configured then the `min_send_rate` calculated timeout has higher priority.
346    pub fn set_write_timeout(&mut self, timeout: Option<Duration>) {
347        match self {
348            Self::H1(s) => s.set_write_timeout(timeout),
349            Self::H2(s) => s.set_write_timeout(timeout),
350            Self::Subrequest(s) => s.set_write_timeout(timeout),
351            Self::Custom(c) => c.set_write_timeout(timeout),
352        }
353    }
354
355    /// Gets the downstream write timeout if set.
356    pub fn get_write_timeout(&self) -> Option<Duration> {
357        match self {
358            Self::H1(s) => s.get_write_timeout(),
359            Self::H2(s) => s.get_write_timeout(),
360            Self::Subrequest(s) => s.get_write_timeout(),
361            Self::Custom(s) => s.get_write_timeout(),
362        }
363    }
364
365    /// Sets the total drain timeout, which will be applied while discarding the
366    /// request body using `drain_request_body`.
367    ///
368    /// For HTTP/1.1, reusing a session requires ensuring that the request body
369    /// is consumed. If the timeout is exceeded, the caller should give up on
370    /// trying to reuse the session.
371    pub fn set_total_drain_timeout(&mut self, timeout: Option<Duration>) {
372        match self {
373            Self::H1(s) => s.set_total_drain_timeout(timeout),
374            Self::H2(s) => s.set_total_drain_timeout(timeout),
375            Self::Subrequest(s) => s.set_total_drain_timeout(timeout),
376            Self::Custom(c) => c.set_total_drain_timeout(timeout),
377        }
378    }
379
380    /// Gets the total drain timeout if set.
381    pub fn get_total_drain_timeout(&self) -> Option<Duration> {
382        match self {
383            Self::H1(s) => s.get_total_drain_timeout(),
384            Self::H2(s) => s.get_total_drain_timeout(),
385            Self::Subrequest(s) => s.get_total_drain_timeout(),
386            Self::Custom(s) => s.get_total_drain_timeout(),
387        }
388    }
389
390    /// Sets the minimum downstream send rate in bytes per second. This
391    /// is used to calculate a write timeout in seconds based on the size
392    /// of the buffer being written. If a `min_send_rate` is configured it
393    /// has higher priority over a set `write_timeout`. The minimum send
394    /// rate must be greater than zero.
395    ///
396    /// Calculated write timeout is guaranteed to be at least 1s if `min_send_rate`
397    /// is greater than zero, a send rate of zero is equivalent to disabling.
398    ///
399    /// This is a noop for h2.
400    pub fn set_min_send_rate(&mut self, rate: Option<usize>) {
401        match self {
402            Self::H1(s) => s.set_min_send_rate(rate),
403            Self::H2(_) => {}
404            Self::Subrequest(_) => {}
405            Self::Custom(_) => {}
406        }
407    }
408
409    /// Sets whether we ignore writing informational responses downstream.
410    ///
411    /// For HTTP/1.1 this is a noop if the response is Upgrade or Continue and
412    /// Expect: 100-continue was set on the request.
413    ///
414    /// This is a noop for h2 because informational responses are always ignored.
415    /// Subrequests will always proxy the info response and let the true downstream
416    /// decide to ignore or not.
417    pub fn set_ignore_info_resp(&mut self, ignore: bool) {
418        match self {
419            Self::H1(s) => s.set_ignore_info_resp(ignore),
420            Self::H2(_) => {} // always ignored
421            Self::Subrequest(_) => {}
422            Self::Custom(_) => {} // always ignored
423        }
424    }
425
426    /// Sets whether keepalive should be disabled if response is written prior to
427    /// downstream body finishing.
428    ///
429    /// This is a noop for h2.
430    pub fn set_close_on_response_before_downstream_finish(&mut self, close: bool) {
431        match self {
432            Self::H1(s) => s.set_close_on_response_before_downstream_finish(close),
433            Self::H2(_) => {}         // always ignored
434            Self::Subrequest(_) => {} // always ignored
435            Self::Custom(_) => {}     // always ignored
436        }
437    }
438
439    /// Return a digest of the request including the method, path and Host header
440    // TODO: make this use a `Formatter`
441    pub fn request_summary(&self) -> String {
442        match self {
443            Self::H1(s) => s.request_summary(),
444            Self::H2(s) => s.request_summary(),
445            Self::Subrequest(s) => s.request_summary(),
446            Self::Custom(s) => s.request_summary(),
447        }
448    }
449
450    /// Return the written response header. `None` if it is not written yet.
451    /// Only the final (status code >= 200 or 101) response header will be returned
452    pub fn response_written(&self) -> Option<&ResponseHeader> {
453        match self {
454            Self::H1(s) => s.response_written(),
455            Self::H2(s) => s.response_written(),
456            Self::Subrequest(s) => s.response_written(),
457            Self::Custom(s) => s.response_written(),
458        }
459    }
460
461    /// Give up the http session abruptly.
462    /// For H1 this will close the underlying connection
463    /// For H2 this will send RESET frame to end this stream without impacting the connection
464    /// For subrequests, this will drop task senders and receivers.
465    pub async fn shutdown(&mut self) {
466        match self {
467            Self::H1(s) => s.shutdown().await,
468            Self::H2(s) => s.shutdown(),
469            Self::Subrequest(s) => s.shutdown(),
470            Self::Custom(s) => s.shutdown(0, "shutdown").await,
471        }
472    }
473
474    pub fn to_h1_raw(&self) -> Bytes {
475        match self {
476            Self::H1(s) => s.get_headers_raw_bytes(),
477            Self::H2(s) => s.pseudo_raw_h1_request_header(),
478            Self::Subrequest(s) => s.get_headers_raw_bytes(),
479            Self::Custom(c) => c.pseudo_raw_h1_request_header(),
480        }
481    }
482
483    /// Whether the whole request body is sent
484    pub fn is_body_done(&mut self) -> bool {
485        match self {
486            Self::H1(s) => s.is_body_done(),
487            Self::H2(s) => s.is_body_done(),
488            Self::Subrequest(s) => s.is_body_done(),
489            Self::Custom(s) => s.is_body_done(),
490        }
491    }
492
493    /// Notify the client that the entire body is sent
494    /// for H1 chunked encoding, this will end the last empty chunk
495    /// for H1 content-length, this has no effect.
496    /// for H2, this will send an empty DATA frame with END_STREAM flag
497    /// for subrequest, this will send a Done http task
498    pub async fn finish_body(&mut self) -> Result<()> {
499        match self {
500            Self::H1(s) => s.finish_body().await.map(|_| ()),
501            Self::H2(s) => s.finish(),
502            Self::Subrequest(s) => s.finish().await.map(|_| ()),
503            Self::Custom(s) => s.finish().await,
504        }
505    }
506
507    pub fn generate_error(error: u16) -> ResponseHeader {
508        match error {
509            /* common error responses are pre-generated */
510            502 => error_resp::HTTP_502_RESPONSE.clone(),
511            400 => error_resp::HTTP_400_RESPONSE.clone(),
512            _ => error_resp::gen_error_response(error),
513        }
514    }
515
516    /// Send error response to client using a pre-generated error message.
517    pub async fn respond_error(&mut self, error: u16) -> Result<()> {
518        self.respond_error_with_body(error, Bytes::default()).await
519    }
520
521    /// Send error response to client using a pre-generated error message and custom body.
522    pub async fn respond_error_with_body(&mut self, error: u16, body: Bytes) -> Result<()> {
523        let mut resp = Self::generate_error(error);
524        if !body.is_empty() {
525            // error responses have a default content-length of zero
526            resp.set_content_length(body.len())?
527        }
528        self.write_error_response(resp, body).await
529    }
530
531    /// Send an error response to a client with a response header and body.
532    pub async fn write_error_response(&mut self, resp: ResponseHeader, body: Bytes) -> Result<()> {
533        // TODO: we shouldn't be closing downstream connections on internally generated errors
534        // and possibly other upstream connect() errors (connection refused, timeout, etc)
535        //
536        // This change is only here because we DO NOT re-use downstream connections
537        // today on these errors and we should signal to the client that pingora is dropping it
538        // rather than a misleading the client with 'keep-alive'
539        self.set_keepalive(None);
540
541        // If a response was already written and it's not informational 1xx, return.
542        // The only exception is an informational 101 Switching Protocols, which is treated
543        // as final response https://www.rfc-editor.org/rfc/rfc9110#section-15.2.2.
544        if let Some(resp_written) = self.response_written().as_ref() {
545            if !resp_written.status.is_informational() || resp_written.status == 101 {
546                return Ok(());
547            }
548        }
549
550        self.write_response_header(Box::new(resp)).await?;
551
552        if !body.is_empty() {
553            self.write_response_body(body, true).await?;
554        } else {
555            self.finish_body().await?;
556        }
557
558        custom_session!(self.finish_custom().await?);
559
560        Ok(())
561    }
562
563    /// Whether there is no request body
564    pub fn is_body_empty(&mut self) -> bool {
565        match self {
566            Self::H1(s) => s.is_body_empty(),
567            Self::H2(s) => s.is_body_empty(),
568            Self::Subrequest(s) => s.is_body_empty(),
569            Self::Custom(s) => s.is_body_empty(),
570        }
571    }
572
573    pub fn retry_buffer_truncated(&self) -> bool {
574        match self {
575            Self::H1(s) => s.retry_buffer_truncated(),
576            Self::H2(s) => s.retry_buffer_truncated(),
577            Self::Subrequest(s) => s.retry_buffer_truncated(),
578            Self::Custom(s) => s.retry_buffer_truncated(),
579        }
580    }
581
582    pub fn enable_retry_buffering(&mut self) {
583        match self {
584            Self::H1(s) => s.enable_retry_buffering(),
585            Self::H2(s) => s.enable_retry_buffering(),
586            Self::Subrequest(s) => s.enable_retry_buffering(),
587            Self::Custom(s) => s.enable_retry_buffering(),
588        }
589    }
590
591    pub fn get_retry_buffer(&self) -> Option<Bytes> {
592        match self {
593            Self::H1(s) => s.get_retry_buffer(),
594            Self::H2(s) => s.get_retry_buffer(),
595            Self::Subrequest(s) => s.get_retry_buffer(),
596            Self::Custom(s) => s.get_retry_buffer(),
597        }
598    }
599
600    /// Read body (same as `read_request_body()`) or pending forever until downstream
601    /// terminates the session.
602    pub async fn read_body_or_idle(&mut self, no_body_expected: bool) -> Result<Option<Bytes>> {
603        match self {
604            Self::H1(s) => s.read_body_or_idle(no_body_expected).await,
605            Self::H2(s) => s.read_body_or_idle(no_body_expected).await,
606            Self::Subrequest(s) => s.read_body_or_idle(no_body_expected).await,
607            Self::Custom(s) => s.read_body_or_idle(no_body_expected).await,
608        }
609    }
610
611    pub fn as_http1(&self) -> Option<&SessionV1> {
612        match self {
613            Self::H1(s) => Some(s),
614            Self::H2(_) => None,
615            Self::Subrequest(_) => None,
616            Self::Custom(_) => None,
617        }
618    }
619
620    pub fn as_http2(&self) -> Option<&SessionV2> {
621        match self {
622            Self::H1(_) => None,
623            Self::H2(s) => Some(s),
624            Self::Subrequest(_) => None,
625            Self::Custom(_) => None,
626        }
627    }
628
629    pub fn as_subrequest(&self) -> Option<&SessionSubrequest> {
630        match self {
631            Self::H1(_) => None,
632            Self::H2(_) => None,
633            Self::Subrequest(s) => Some(s),
634            Self::Custom(_) => None,
635        }
636    }
637
638    pub fn as_subrequest_mut(&mut self) -> Option<&mut SessionSubrequest> {
639        match self {
640            Self::H1(_) => None,
641            Self::H2(_) => None,
642            Self::Subrequest(s) => Some(s),
643            Self::Custom(_) => None,
644        }
645    }
646
647    pub fn as_custom(&self) -> Option<&dyn SessionCustom> {
648        match self {
649            Self::H1(_) => None,
650            Self::H2(_) => None,
651            Self::Subrequest(_) => None,
652            Self::Custom(c) => Some(c.as_ref()),
653        }
654    }
655
656    pub fn as_custom_mut(&mut self) -> Option<&mut Box<dyn SessionCustom>> {
657        match self {
658            Self::H1(_) => None,
659            Self::H2(_) => None,
660            Self::Subrequest(_) => None,
661            Self::Custom(c) => Some(c),
662        }
663    }
664
665    /// Write a 100 Continue response to the client.
666    pub async fn write_continue_response(&mut self) -> Result<()> {
667        match self {
668            Self::H1(s) => s.write_continue_response().await,
669            Self::H2(s) => s.write_response_header(
670                Box::new(ResponseHeader::build(100, Some(0)).unwrap()),
671                false,
672            ),
673            Self::Subrequest(s) => s.write_continue_response().await,
674            // TODO(slava): is there any write_continue_response calls?
675            Self::Custom(s) => {
676                s.write_response_header(
677                    Box::new(ResponseHeader::build(100, Some(0)).unwrap()),
678                    false,
679                )
680                .await
681            }
682        }
683    }
684
685    /// Whether this request is for upgrade (e.g., websocket).
686    pub fn is_upgrade_req(&self) -> bool {
687        match self {
688            Self::H1(s) => s.is_upgrade_req(),
689            Self::H2(_) => false,
690            Self::Subrequest(s) => s.is_upgrade_req(),
691            Self::Custom(s) => s.is_upgrade_req(),
692        }
693    }
694
695    /// Whether this session was fully upgraded (completed Upgrade handshake).
696    pub fn was_upgraded(&self) -> bool {
697        match self {
698            Self::H1(s) => s.was_upgraded(),
699            Self::H2(_) => false,
700            Self::Subrequest(s) => s.was_upgraded(),
701            Self::Custom(s) => s.was_upgraded(),
702        }
703    }
704
705    /// Return how many response body bytes (application, not wire) already sent downstream
706    pub fn body_bytes_sent(&self) -> usize {
707        match self {
708            Self::H1(s) => s.body_bytes_sent(),
709            Self::H2(s) => s.body_bytes_sent(),
710            Self::Subrequest(s) => s.body_bytes_sent(),
711            Self::Custom(s) => s.body_bytes_sent(),
712        }
713    }
714
715    /// Return how many request body bytes (application, not wire) already read from downstream
716    pub fn body_bytes_read(&self) -> usize {
717        match self {
718            Self::H1(s) => s.body_bytes_read(),
719            Self::H2(s) => s.body_bytes_read(),
720            Self::Subrequest(s) => s.body_bytes_read(),
721            Self::Custom(s) => s.body_bytes_read(),
722        }
723    }
724
725    /// Return the [Digest] for the connection.
726    pub fn digest(&self) -> Option<&Digest> {
727        match self {
728            Self::H1(s) => Some(s.digest()),
729            Self::H2(s) => s.digest(),
730            Self::Subrequest(s) => s.digest(),
731            Self::Custom(s) => s.digest(),
732        }
733    }
734
735    /// Return a mutable [Digest] reference for the connection.
736    ///
737    /// Will return `None` if multiple H2 streams are open.
738    pub fn digest_mut(&mut self) -> Option<&mut Digest> {
739        match self {
740            Self::H1(s) => Some(s.digest_mut()),
741            Self::H2(s) => s.digest_mut(),
742            Self::Subrequest(s) => s.digest_mut(),
743            Self::Custom(s) => s.digest_mut(),
744        }
745    }
746
747    /// Return the client (peer) address of the connection.
748    pub fn client_addr(&self) -> Option<&SocketAddr> {
749        match self {
750            Self::H1(s) => s.client_addr(),
751            Self::H2(s) => s.client_addr(),
752            Self::Subrequest(s) => s.client_addr(),
753            Self::Custom(s) => s.client_addr(),
754        }
755    }
756
757    /// Return the server (local) address of the connection.
758    pub fn server_addr(&self) -> Option<&SocketAddr> {
759        match self {
760            Self::H1(s) => s.server_addr(),
761            Self::H2(s) => s.server_addr(),
762            Self::Subrequest(s) => s.server_addr(),
763            Self::Custom(s) => s.server_addr(),
764        }
765    }
766
767    /// Get the reference of the [Stream] that this HTTP/1 session is operating upon.
768    /// None if the HTTP session is over H2, or a subrequest
769    pub fn stream(&self) -> Option<&Stream> {
770        match self {
771            Self::H1(s) => Some(s.stream()),
772            Self::H2(_) => None,
773            Self::Subrequest(_) => None,
774            Self::Custom(_) => None,
775        }
776    }
777}