pingora_core/apps/mod.rs
1// Copyright 2026 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The abstraction and implementation interface for service application logic
16
17pub mod http_app;
18pub mod prometheus_http_app;
19
20use crate::server::ShutdownWatch;
21use async_trait::async_trait;
22use log::{debug, error};
23use std::future::poll_fn;
24use std::sync::Arc;
25
26use crate::protocols::http::v2::server;
27use crate::protocols::http::ServerSession;
28use crate::protocols::Digest;
29use crate::protocols::Stream;
30use crate::protocols::ALPN;
31
32// https://datatracker.ietf.org/doc/html/rfc9113#section-3.4
33const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
34
35#[async_trait]
36/// This trait defines the interface of a transport layer (TCP or TLS) application.
37pub trait ServerApp {
38 /// Whenever a new connection is established, this function will be called with the established
39 /// [`Stream`] object provided.
40 ///
41 /// The application can do whatever it wants with the `session`.
42 ///
43 /// After processing the `session`, if the `session`'s connection is reusable, This function
44 /// can return it to the service by returning `Some(session)`. The returned `session` will be
45 /// fed to another [`Self::process_new()`] for another round of processing.
46 /// If not reusable, `None` should be returned.
47 ///
48 /// The `shutdown` argument will change from `false` to `true` when the server receives a
49 /// signal to shutdown. This argument allows the application to react accordingly.
50 async fn process_new(
51 self: &Arc<Self>,
52 mut session: Stream,
53 // TODO: make this ShutdownWatch so that all task can await on this event
54 shutdown: &ShutdownWatch,
55 ) -> Option<Stream>;
56
57 /// This callback will be called once after the service stops listening to its endpoints.
58 async fn cleanup(&self) {}
59}
60#[non_exhaustive]
61#[derive(Default)]
62/// HTTP Server options that control how the server handles some transport types.
63pub struct HttpServerOptions {
64 /// Allow HTTP/2 for plaintext.
65 pub h2c: bool,
66
67 #[doc(hidden)]
68 pub force_custom: bool,
69}
70
71#[derive(Debug, Clone)]
72pub struct HttpPersistentSettings {
73 keepalive_timeout: Option<u64>,
74}
75
76impl HttpPersistentSettings {
77 pub fn for_session(session: &ServerSession) -> Self {
78 HttpPersistentSettings {
79 keepalive_timeout: session.get_keepalive(),
80 }
81 }
82
83 pub fn apply_to_session(&self, session: &mut ServerSession) {
84 session.set_keepalive(self.keepalive_timeout);
85 }
86}
87
88#[derive(Debug)]
89pub struct ReusedHttpStream {
90 stream: Stream,
91 persistent_settings: Option<HttpPersistentSettings>,
92}
93
94impl ReusedHttpStream {
95 pub fn new(stream: Stream, persistent_settings: Option<HttpPersistentSettings>) -> Self {
96 ReusedHttpStream {
97 stream,
98 persistent_settings,
99 }
100 }
101
102 pub fn consume(self) -> (Stream, Option<HttpPersistentSettings>) {
103 (self.stream, self.persistent_settings)
104 }
105}
106
107/// This trait defines the interface of an HTTP application.
108#[async_trait]
109pub trait HttpServerApp {
110 /// Similar to the [`ServerApp`], this function is called whenever a new HTTP session is established.
111 ///
112 /// After successful processing, [`ServerSession::finish()`] can be called to return an optionally reusable
113 /// connection back to the service. The caller needs to make sure that the connection is in a reusable state
114 /// i.e., no error or incomplete read or write headers or bodies. Otherwise a `None` should be returned.
115 async fn process_new_http(
116 self: &Arc<Self>,
117 mut session: ServerSession,
118 // TODO: make this ShutdownWatch so that all task can await on this event
119 shutdown: &ShutdownWatch,
120 ) -> Option<ReusedHttpStream>;
121
122 /// Provide options on how HTTP/2 connection should be established. This function will be called
123 /// every time a new HTTP/2 **connection** needs to be established.
124 ///
125 /// A `None` means to use the built-in default options. See [`server::H2Options`] for more details.
126 fn h2_options(&self) -> Option<server::H2Options> {
127 None
128 }
129
130 /// Provide HTTP server options used to override default behavior. This function will be called
131 /// every time a new connection is processed.
132 ///
133 /// A `None` means no server options will be applied.
134 fn server_options(&self) -> Option<&HttpServerOptions> {
135 None
136 }
137
138 async fn http_cleanup(&self) {}
139
140 #[doc(hidden)]
141 async fn process_custom_session(
142 self: Arc<Self>,
143 _stream: Stream,
144 _shutdown: &ShutdownWatch,
145 ) -> Option<Stream> {
146 None
147 }
148}
149
150#[async_trait]
151impl<T> ServerApp for T
152where
153 T: HttpServerApp + Send + Sync + 'static,
154{
155 async fn process_new(
156 self: &Arc<Self>,
157 mut stream: Stream,
158 shutdown: &ShutdownWatch,
159 ) -> Option<Stream> {
160 let mut h2c = self.server_options().as_ref().map_or(false, |o| o.h2c);
161 let custom = self
162 .server_options()
163 .as_ref()
164 .map_or(false, |o| o.force_custom);
165
166 // try to read h2 preface
167 if h2c && !custom {
168 let mut buf = [0u8; H2_PREFACE.len()];
169 let peeked = stream
170 .try_peek(&mut buf)
171 .await
172 .map_err(|e| {
173 // this error is normal when h1 reuse and close the connection
174 debug!("Read error while peeking h2c preface {e}");
175 e
176 })
177 .ok()?;
178 // not all streams support peeking
179 if peeked {
180 // turn off h2c (use h1) if h2 preface doesn't exist
181 h2c = buf == H2_PREFACE;
182 }
183 }
184 if h2c || matches!(stream.selected_alpn_proto(), Some(ALPN::H2)) {
185 // create a shared connection digest
186 let digest = Arc::new(Digest {
187 ssl_digest: stream.get_ssl_digest(),
188 // TODO: log h2 handshake time
189 timing_digest: stream.get_timing_digest(),
190 proxy_digest: stream.get_proxy_digest(),
191 socket_digest: stream.get_socket_digest(),
192 });
193
194 let h2_options = self.h2_options();
195 let h2_conn = server::handshake(stream, h2_options).await;
196 let mut h2_conn = match h2_conn {
197 Err(e) => {
198 error!("H2 handshake error {e}");
199 return None;
200 }
201 Ok(c) => c,
202 };
203
204 let mut shutdown = shutdown.clone();
205 loop {
206 // this loop ends when the client decides to close the h2 conn
207 // TODO: add a timeout?
208 let h2_stream = tokio::select! {
209 _ = shutdown.changed() => {
210 h2_conn.graceful_shutdown();
211 let _ = poll_fn(|cx| h2_conn.poll_closed(cx))
212 .await.map_err(|e| error!("H2 error waiting for shutdown {e}"));
213 return None;
214 }
215 h2_stream = server::HttpSession::from_h2_conn(&mut h2_conn, digest.clone()) => h2_stream
216 };
217 let h2_stream = match h2_stream {
218 Err(e) => {
219 // It is common for the client to just disconnect TCP without properly
220 // closing H2. So we don't log the errors here
221 debug!("H2 error when accepting new stream {e}");
222 return None;
223 }
224 Ok(s) => s?, // None means the connection is ready to be closed
225 };
226 let app = self.clone();
227 let shutdown = shutdown.clone();
228 pingora_runtime::current_handle().spawn(async move {
229 // Note, `PersistentSettings` not currently relevant for h2
230 app.process_new_http(ServerSession::new_http2(h2_stream), &shutdown)
231 .await;
232 });
233 }
234 } else if custom || matches!(stream.selected_alpn_proto(), Some(ALPN::Custom(_))) {
235 return self.clone().process_custom_session(stream, shutdown).await;
236 } else {
237 // No ALPN or ALPN::H1 and h2c was not configured, fallback to HTTP/1.1
238 let mut session = ServerSession::new_http1(stream);
239 if *shutdown.borrow() {
240 // stop downstream from reusing if this service is shutting down soon
241 session.set_keepalive(None);
242 } else {
243 // default 60s
244 session.set_keepalive(Some(60));
245 }
246
247 let mut result = self.process_new_http(session, shutdown).await;
248 while let Some((stream, persistent_settings)) = result.map(|r| r.consume()) {
249 let mut session = ServerSession::new_http1(stream);
250 if let Some(persistent_settings) = persistent_settings {
251 persistent_settings.apply_to_session(&mut session);
252 }
253
254 result = self.process_new_http(session, shutdown).await;
255 }
256 }
257 None
258 }
259
260 async fn cleanup(&self) {
261 self.http_cleanup().await;
262 }
263}