pingora_core/protocols/http/client.rs
1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use bytes::Bytes;
16use pingora_error::Result;
17use pingora_http::{RequestHeader, ResponseHeader};
18use std::time::Duration;
19
20use super::v1::client::HttpSession as Http1Session;
21use super::v2::client::Http2Session;
22use crate::protocols::{Digest, SocketAddr, Stream};
23
24/// A type for Http client session. It can be either an Http1 connection or an Http2 stream.
25pub enum HttpSession {
26 H1(Http1Session),
27 H2(Http2Session),
28}
29
30impl HttpSession {
31 pub fn as_http1(&self) -> Option<&Http1Session> {
32 match self {
33 Self::H1(s) => Some(s),
34 Self::H2(_) => None,
35 }
36 }
37
38 pub fn as_http2(&self) -> Option<&Http2Session> {
39 match self {
40 Self::H1(_) => None,
41 Self::H2(s) => Some(s),
42 }
43 }
44 /// Write the request header to the server
45 /// After the request header is sent. The caller can either start reading the response or
46 /// sending request body if any.
47 pub async fn write_request_header(&mut self, req: Box<RequestHeader>) -> Result<()> {
48 match self {
49 HttpSession::H1(h1) => {
50 h1.write_request_header(req).await?;
51 Ok(())
52 }
53 HttpSession::H2(h2) => h2.write_request_header(req, false),
54 }
55 }
56
57 /// Write a chunk of the request body.
58 pub async fn write_request_body(&mut self, data: Bytes, end: bool) -> Result<()> {
59 match self {
60 HttpSession::H1(h1) => {
61 // TODO: maybe h1 should also have the concept of `end`
62 h1.write_body(&data).await?;
63 Ok(())
64 }
65 HttpSession::H2(h2) => h2.write_request_body(data, end).await,
66 }
67 }
68
69 /// Signal that the request body has ended
70 pub async fn finish_request_body(&mut self) -> Result<()> {
71 match self {
72 HttpSession::H1(h1) => {
73 h1.finish_body().await?;
74 Ok(())
75 }
76 HttpSession::H2(h2) => h2.finish_request_body(),
77 }
78 }
79
80 /// Set the read timeout for reading header and body.
81 ///
82 /// The timeout is per read operation, not on the overall time reading the entire response
83 pub fn set_read_timeout(&mut self, timeout: Duration) {
84 match self {
85 HttpSession::H1(h1) => h1.read_timeout = Some(timeout),
86 HttpSession::H2(h2) => h2.read_timeout = Some(timeout),
87 }
88 }
89
90 /// Set the write timeout for writing header and body.
91 ///
92 /// The timeout is per write operation, not on the overall time writing the entire request.
93 ///
94 /// This is a noop for h2.
95 pub fn set_write_timeout(&mut self, timeout: Duration) {
96 match self {
97 HttpSession::H1(h1) => h1.write_timeout = Some(timeout),
98 HttpSession::H2(_) => { /* no write timeout because the actual write happens async*/ }
99 }
100 }
101
102 /// Read the response header from the server
103 /// For http1, this function can be called multiple times, if the headers received are just
104 /// informational headers.
105 pub async fn read_response_header(&mut self) -> Result<()> {
106 match self {
107 HttpSession::H1(h1) => {
108 h1.read_response().await?;
109 Ok(())
110 }
111 HttpSession::H2(h2) => h2.read_response_header().await,
112 }
113 }
114
115 /// Read response body
116 ///
117 /// `None` when no more body to read.
118 pub async fn read_response_body(&mut self) -> Result<Option<Bytes>> {
119 match self {
120 HttpSession::H1(h1) => h1.read_body_bytes().await,
121 HttpSession::H2(h2) => h2.read_response_body().await,
122 }
123 }
124
125 /// No (more) body to read
126 pub fn response_done(&mut self) -> bool {
127 match self {
128 HttpSession::H1(h1) => h1.is_body_done(),
129 HttpSession::H2(h2) => h2.response_finished(),
130 }
131 }
132
133 /// Give up the http session abruptly.
134 /// For H1 this will close the underlying connection
135 /// For H2 this will send RST_STREAM frame to end this stream if the stream has not ended at all
136 pub async fn shutdown(&mut self) {
137 match self {
138 Self::H1(s) => s.shutdown().await,
139 Self::H2(s) => s.shutdown(),
140 }
141 }
142
143 /// Get the response header of the server
144 ///
145 /// `None` if the response header is not read yet.
146 pub fn response_header(&self) -> Option<&ResponseHeader> {
147 match self {
148 Self::H1(s) => s.resp_header(),
149 Self::H2(s) => s.response_header(),
150 }
151 }
152
153 /// Return the [Digest] of the connection
154 ///
155 /// For reused connection, the timing in the digest will reflect its initial handshakes
156 /// The caller should check if the connection is reused to avoid misuse of the timing field.
157 pub fn digest(&self) -> Option<&Digest> {
158 match self {
159 Self::H1(s) => Some(s.digest()),
160 Self::H2(s) => s.digest(),
161 }
162 }
163
164 /// Return a mutable [Digest] reference for the connection.
165 ///
166 /// Will return `None` if this is an H2 session and multiple streams are open.
167 pub fn digest_mut(&mut self) -> Option<&mut Digest> {
168 match self {
169 Self::H1(s) => Some(s.digest_mut()),
170 Self::H2(s) => s.digest_mut(),
171 }
172 }
173
174 /// Return the server (peer) address of the connection.
175 pub fn server_addr(&self) -> Option<&SocketAddr> {
176 match self {
177 Self::H1(s) => s.server_addr(),
178 Self::H2(s) => s.server_addr(),
179 }
180 }
181
182 /// Return the client (local) address of the connection.
183 pub fn client_addr(&self) -> Option<&SocketAddr> {
184 match self {
185 Self::H1(s) => s.client_addr(),
186 Self::H2(s) => s.client_addr(),
187 }
188 }
189
190 /// Get the reference of the [Stream] that this HTTP/1 session is operating upon.
191 /// None if the HTTP session is over H2
192 pub fn stream(&self) -> Option<&Stream> {
193 match self {
194 Self::H1(s) => Some(s.stream()),
195 Self::H2(_) => None,
196 }
197 }
198}