Skip to main content

pingora_core/protocols/http/
client.rs

1// Copyright 2026 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use bytes::Bytes;
16use pingora_error::Result;
17use pingora_http::{RequestHeader, ResponseHeader};
18use std::time::Duration;
19
20use super::v2::client::Http2Session;
21use super::{custom::client::Session, v1::client::HttpSession as Http1Session};
22use crate::protocols::{Digest, SocketAddr, Stream};
23
24/// A type for Http client session. It can be either an Http1 connection or an Http2 stream.
25pub enum HttpSession<S = ()> {
26    H1(Http1Session),
27    H2(Http2Session),
28    Custom(S),
29}
30
31impl<S: Session> HttpSession<S> {
32    pub fn as_http1(&self) -> Option<&Http1Session> {
33        match self {
34            Self::H1(s) => Some(s),
35            Self::H2(_) => None,
36            Self::Custom(_) => None,
37        }
38    }
39
40    pub fn as_http2(&self) -> Option<&Http2Session> {
41        match self {
42            Self::H1(_) => None,
43            Self::H2(s) => Some(s),
44            Self::Custom(_) => None,
45        }
46    }
47
48    pub fn as_custom(&self) -> Option<&S> {
49        match self {
50            Self::H1(_) => None,
51            Self::H2(_) => None,
52            Self::Custom(c) => Some(c),
53        }
54    }
55
56    pub fn as_custom_mut(&mut self) -> Option<&mut S> {
57        match self {
58            Self::H1(_) => None,
59            Self::H2(_) => None,
60            Self::Custom(c) => Some(c),
61        }
62    }
63
64    /// Write the request header to the server
65    /// After the request header is sent. The caller can either start reading the response or
66    /// sending request body if any.
67    pub async fn write_request_header(&mut self, req: Box<RequestHeader>) -> Result<()> {
68        match self {
69            HttpSession::H1(h1) => {
70                h1.write_request_header(req).await?;
71                Ok(())
72            }
73            HttpSession::H2(h2) => h2.write_request_header(req, false),
74            HttpSession::Custom(c) => c.write_request_header(req, false).await,
75        }
76    }
77
78    /// Write a chunk of the request body.
79    pub async fn write_request_body(&mut self, data: Bytes, end: bool) -> Result<()> {
80        match self {
81            HttpSession::H1(h1) => {
82                // TODO: maybe h1 should also have the concept of `end`
83                h1.write_body(&data).await?;
84                Ok(())
85            }
86            HttpSession::H2(h2) => h2.write_request_body(data, end).await,
87            HttpSession::Custom(c) => c.write_request_body(data, end).await,
88        }
89    }
90
91    /// Signal that the request body has ended
92    pub async fn finish_request_body(&mut self) -> Result<()> {
93        match self {
94            HttpSession::H1(h1) => {
95                h1.finish_body().await?;
96                Ok(())
97            }
98            HttpSession::H2(h2) => h2.finish_request_body(),
99            HttpSession::Custom(c) => c.finish_request_body().await,
100        }
101    }
102
103    /// Set the read timeout for reading header and body.
104    ///
105    /// The timeout is per read operation, not on the overall time reading the entire response
106    pub fn set_read_timeout(&mut self, timeout: Option<Duration>) {
107        match self {
108            HttpSession::H1(h1) => h1.read_timeout = timeout,
109            HttpSession::H2(h2) => h2.read_timeout = timeout,
110            HttpSession::Custom(c) => c.set_read_timeout(timeout),
111        }
112    }
113
114    /// Set the write timeout for writing header and body.
115    ///
116    /// The timeout is per write operation, not on the overall time writing the entire request.
117    pub fn set_write_timeout(&mut self, timeout: Option<Duration>) {
118        match self {
119            HttpSession::H1(h1) => h1.write_timeout = timeout,
120            HttpSession::H2(h2) => h2.write_timeout = timeout,
121            HttpSession::Custom(c) => c.set_write_timeout(timeout),
122        }
123    }
124
125    /// Read the response header from the server
126    /// For http1, this function can be called multiple times, if the headers received are just
127    /// informational headers.
128    pub async fn read_response_header(&mut self) -> Result<()> {
129        match self {
130            HttpSession::H1(h1) => {
131                h1.read_response().await?;
132                Ok(())
133            }
134            HttpSession::H2(h2) => h2.read_response_header().await,
135            HttpSession::Custom(c) => c.read_response_header().await,
136        }
137    }
138
139    /// Read response body
140    ///
141    /// `None` when no more body to read.
142    pub async fn read_response_body(&mut self) -> Result<Option<Bytes>> {
143        match self {
144            HttpSession::H1(h1) => h1.read_body_bytes().await,
145            HttpSession::H2(h2) => h2.read_response_body().await,
146            HttpSession::Custom(c) => c.read_response_body().await,
147        }
148    }
149
150    /// No (more) body to read
151    pub fn response_done(&mut self) -> bool {
152        match self {
153            HttpSession::H1(h1) => h1.is_body_done(),
154            HttpSession::H2(h2) => h2.response_finished(),
155            HttpSession::Custom(c) => c.response_finished(),
156        }
157    }
158
159    /// Give up the http session abruptly.
160    /// For H1 this will close the underlying connection
161    /// For H2 this will send RST_STREAM frame to end this stream if the stream has not ended at all
162    pub async fn shutdown(&mut self) {
163        match self {
164            Self::H1(s) => s.shutdown().await,
165            Self::H2(s) => s.shutdown(),
166            Self::Custom(c) => c.shutdown(1, "shutdown").await,
167        }
168    }
169
170    /// Get the response header of the server
171    ///
172    /// `None` if the response header is not read yet.
173    pub fn response_header(&self) -> Option<&ResponseHeader> {
174        match self {
175            Self::H1(s) => s.resp_header(),
176            Self::H2(s) => s.response_header(),
177            Self::Custom(c) => c.response_header(),
178        }
179    }
180
181    /// Return the [Digest] of the connection
182    ///
183    /// For reused connection, the timing in the digest will reflect its initial handshakes
184    /// The caller should check if the connection is reused to avoid misuse of the timing field.
185    pub fn digest(&self) -> Option<&Digest> {
186        match self {
187            Self::H1(s) => Some(s.digest()),
188            Self::H2(s) => s.digest(),
189            Self::Custom(c) => c.digest(),
190        }
191    }
192
193    /// Return a mutable [Digest] reference for the connection.
194    ///
195    /// Will return `None` if this is an H2 session and multiple streams are open.
196    pub fn digest_mut(&mut self) -> Option<&mut Digest> {
197        match self {
198            Self::H1(s) => Some(s.digest_mut()),
199            Self::H2(s) => s.digest_mut(),
200            Self::Custom(s) => s.digest_mut(),
201        }
202    }
203
204    /// Return the server (peer) address of the connection.
205    pub fn server_addr(&self) -> Option<&SocketAddr> {
206        match self {
207            Self::H1(s) => s.server_addr(),
208            Self::H2(s) => s.server_addr(),
209            Self::Custom(s) => s.server_addr(),
210        }
211    }
212
213    /// Return the client (local) address of the connection.
214    pub fn client_addr(&self) -> Option<&SocketAddr> {
215        match self {
216            Self::H1(s) => s.client_addr(),
217            Self::H2(s) => s.client_addr(),
218            Self::Custom(s) => s.client_addr(),
219        }
220    }
221
222    /// Get the reference of the [Stream] that this HTTP/1 session is operating upon.
223    /// None if the HTTP session is over H2
224    pub fn stream(&self) -> Option<&Stream> {
225        match self {
226            Self::H1(s) => Some(s.stream()),
227            Self::H2(_) => None,
228            Self::Custom(_) => None,
229        }
230    }
231}