pingora_core/protocols/http/
client.rs

1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use bytes::Bytes;
16use pingora_error::Result;
17use pingora_http::{RequestHeader, ResponseHeader};
18use std::time::Duration;
19
20use super::v1::client::HttpSession as Http1Session;
21use super::v2::client::Http2Session;
22use crate::protocols::{Digest, SocketAddr, Stream};
23
24/// A type for Http client session. It can be either an Http1 connection or an Http2 stream.
25pub enum HttpSession {
26    H1(Http1Session),
27    H2(Http2Session),
28}
29
30impl HttpSession {
31    pub fn as_http1(&self) -> Option<&Http1Session> {
32        match self {
33            Self::H1(s) => Some(s),
34            Self::H2(_) => None,
35        }
36    }
37
38    pub fn as_http2(&self) -> Option<&Http2Session> {
39        match self {
40            Self::H1(_) => None,
41            Self::H2(s) => Some(s),
42        }
43    }
44    /// Write the request header to the server
45    /// After the request header is sent. The caller can either start reading the response or
46    /// sending request body if any.
47    pub async fn write_request_header(&mut self, req: Box<RequestHeader>) -> Result<()> {
48        match self {
49            HttpSession::H1(h1) => {
50                h1.write_request_header(req).await?;
51                Ok(())
52            }
53            HttpSession::H2(h2) => h2.write_request_header(req, false),
54        }
55    }
56
57    /// Write a chunk of the request body.
58    pub async fn write_request_body(&mut self, data: Bytes, end: bool) -> Result<()> {
59        match self {
60            HttpSession::H1(h1) => {
61                // TODO: maybe h1 should also have the concept of `end`
62                h1.write_body(&data).await?;
63                Ok(())
64            }
65            HttpSession::H2(h2) => h2.write_request_body(data, end).await,
66        }
67    }
68
69    /// Signal that the request body has ended
70    pub async fn finish_request_body(&mut self) -> Result<()> {
71        match self {
72            HttpSession::H1(h1) => {
73                h1.finish_body().await?;
74                Ok(())
75            }
76            HttpSession::H2(h2) => h2.finish_request_body(),
77        }
78    }
79
80    /// Set the read timeout for reading header and body.
81    ///
82    /// The timeout is per read operation, not on the overall time reading the entire response
83    pub fn set_read_timeout(&mut self, timeout: Option<Duration>) {
84        match self {
85            HttpSession::H1(h1) => h1.read_timeout = timeout,
86            HttpSession::H2(h2) => h2.read_timeout = timeout,
87        }
88    }
89
90    /// Set the write timeout for writing header and body.
91    ///
92    /// The timeout is per write operation, not on the overall time writing the entire request.
93    pub fn set_write_timeout(&mut self, timeout: Option<Duration>) {
94        match self {
95            HttpSession::H1(h1) => h1.write_timeout = timeout,
96            HttpSession::H2(h2) => h2.write_timeout = timeout,
97        }
98    }
99
100    /// Read the response header from the server
101    /// For http1, this function can be called multiple times, if the headers received are just
102    /// informational headers.
103    pub async fn read_response_header(&mut self) -> Result<()> {
104        match self {
105            HttpSession::H1(h1) => {
106                h1.read_response().await?;
107                Ok(())
108            }
109            HttpSession::H2(h2) => h2.read_response_header().await,
110        }
111    }
112
113    /// Read response body
114    ///
115    /// `None` when no more body to read.
116    pub async fn read_response_body(&mut self) -> Result<Option<Bytes>> {
117        match self {
118            HttpSession::H1(h1) => h1.read_body_bytes().await,
119            HttpSession::H2(h2) => h2.read_response_body().await,
120        }
121    }
122
123    /// No (more) body to read
124    pub fn response_done(&mut self) -> bool {
125        match self {
126            HttpSession::H1(h1) => h1.is_body_done(),
127            HttpSession::H2(h2) => h2.response_finished(),
128        }
129    }
130
131    /// Give up the http session abruptly.
132    /// For H1 this will close the underlying connection
133    /// For H2 this will send RST_STREAM frame to end this stream if the stream has not ended at all
134    pub async fn shutdown(&mut self) {
135        match self {
136            Self::H1(s) => s.shutdown().await,
137            Self::H2(s) => s.shutdown(),
138        }
139    }
140
141    /// Get the response header of the server
142    ///
143    /// `None` if the response header is not read yet.
144    pub fn response_header(&self) -> Option<&ResponseHeader> {
145        match self {
146            Self::H1(s) => s.resp_header(),
147            Self::H2(s) => s.response_header(),
148        }
149    }
150
151    /// Return the [Digest] of the connection
152    ///
153    /// For reused connection, the timing in the digest will reflect its initial handshakes
154    /// The caller should check if the connection is reused to avoid misuse of the timing field.
155    pub fn digest(&self) -> Option<&Digest> {
156        match self {
157            Self::H1(s) => Some(s.digest()),
158            Self::H2(s) => s.digest(),
159        }
160    }
161
162    /// Return a mutable [Digest] reference for the connection.
163    ///
164    /// Will return `None` if this is an H2 session and multiple streams are open.
165    pub fn digest_mut(&mut self) -> Option<&mut Digest> {
166        match self {
167            Self::H1(s) => Some(s.digest_mut()),
168            Self::H2(s) => s.digest_mut(),
169        }
170    }
171
172    /// Return the server (peer) address of the connection.
173    pub fn server_addr(&self) -> Option<&SocketAddr> {
174        match self {
175            Self::H1(s) => s.server_addr(),
176            Self::H2(s) => s.server_addr(),
177        }
178    }
179
180    /// Return the client (local) address of the connection.
181    pub fn client_addr(&self) -> Option<&SocketAddr> {
182        match self {
183            Self::H1(s) => s.client_addr(),
184            Self::H2(s) => s.client_addr(),
185        }
186    }
187
188    /// Get the reference of the [Stream] that this HTTP/1 session is operating upon.
189    /// None if the HTTP session is over H2
190    pub fn stream(&self) -> Option<&Stream> {
191        match self {
192            Self::H1(s) => Some(s.stream()),
193            Self::H2(_) => None,
194        }
195    }
196}