Skip to main content

pingora_core/apps/
mod.rs

1// Copyright 2026 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The abstraction and implementation interface for service application logic
16
17pub mod http_app;
18pub mod prometheus_http_app;
19
20use crate::server::ShutdownWatch;
21use async_trait::async_trait;
22use log::{debug, error};
23use std::future::poll_fn;
24use std::sync::Arc;
25
26use crate::protocols::http::v2::server;
27use crate::protocols::http::ServerSession;
28use crate::protocols::Digest;
29use crate::protocols::Stream;
30use crate::protocols::ALPN;
31
32// https://datatracker.ietf.org/doc/html/rfc9113#section-3.4
33const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
34
35#[async_trait]
36/// This trait defines the interface of a transport layer (TCP or TLS) application.
37pub trait ServerApp {
38    /// Whenever a new connection is established, this function will be called with the established
39    /// [`Stream`] object provided.
40    ///
41    /// The application can do whatever it wants with the `session`.
42    ///
43    /// After processing the `session`, if the `session`'s connection is reusable, This function
44    /// can return it to the service by returning `Some(session)`. The returned `session` will be
45    /// fed to another [`Self::process_new()`] for another round of processing.
46    /// If not reusable, `None` should be returned.
47    ///
48    /// The `shutdown` argument will change from `false` to `true` when the server receives a
49    /// signal to shutdown. This argument allows the application to react accordingly.
50    async fn process_new(
51        self: &Arc<Self>,
52        mut session: Stream,
53        // TODO: make this ShutdownWatch so that all task can await on this event
54        shutdown: &ShutdownWatch,
55    ) -> Option<Stream>;
56
57    /// This callback will be called once after the service stops listening to its endpoints.
58    async fn cleanup(&self) {}
59}
60#[non_exhaustive]
61#[derive(Default)]
62/// HTTP Server options that control how the server handles some transport types.
63pub struct HttpServerOptions {
64    /// Allow HTTP/2 for plaintext.
65    pub h2c: bool,
66
67    /// Allow proxying CONNECT requests when handling HTTP traffic.
68    ///
69    /// When disabled, CONNECT requests are rejected with 405 by proxy services.
70    pub allow_connect_method_proxying: bool,
71
72    #[doc(hidden)]
73    pub force_custom: bool,
74
75    /// Maximum number of requests that this connection will handle. This is
76    /// equivalent to [Nginx's keepalive requests](https://nginx.org/en/docs/http/ngx_http_upstream_module.html#keepalive_requests)
77    /// which says:
78    ///
79    /// > Closing connections periodically is necessary to free per-connection
80    /// > memory allocations. Therefore, using too high maximum number of
81    /// > requests could result in excessive memory usage and not recommended.
82    ///
83    /// Unlike nginx, the default behavior here is _no limit_.
84    pub keepalive_request_limit: Option<u32>,
85}
86
87#[derive(Debug, Clone)]
88pub struct HttpPersistentSettings {
89    keepalive_timeout: Option<u64>,
90    keepalive_reuses_remaining: Option<u32>,
91}
92
93impl HttpPersistentSettings {
94    pub fn for_session(session: &ServerSession) -> Self {
95        HttpPersistentSettings {
96            keepalive_timeout: session.get_keepalive(),
97            keepalive_reuses_remaining: session.get_keepalive_reuses_remaining(),
98        }
99    }
100
101    pub fn apply_to_session(self, session: &mut ServerSession) {
102        let Self {
103            keepalive_timeout,
104            mut keepalive_reuses_remaining,
105        } = self;
106
107        // Reduce the number of times the connection for this session can be
108        // reused by one. A session with reuse count of zero won't be reused
109        if let Some(reuses) = keepalive_reuses_remaining.as_mut() {
110            *reuses = reuses.saturating_sub(1);
111        }
112
113        session.set_keepalive(keepalive_timeout);
114        session.set_keepalive_reuses_remaining(keepalive_reuses_remaining);
115    }
116}
117
118#[derive(Debug)]
119pub struct ReusedHttpStream {
120    stream: Stream,
121    persistent_settings: Option<HttpPersistentSettings>,
122}
123
124impl ReusedHttpStream {
125    pub fn new(stream: Stream, persistent_settings: Option<HttpPersistentSettings>) -> Self {
126        ReusedHttpStream {
127            stream,
128            persistent_settings,
129        }
130    }
131
132    pub fn consume(self) -> (Stream, Option<HttpPersistentSettings>) {
133        (self.stream, self.persistent_settings)
134    }
135}
136
137/// This trait defines the interface of an HTTP application.
138#[async_trait]
139pub trait HttpServerApp {
140    /// Similar to the [`ServerApp`], this function is called whenever a new HTTP session is established.
141    ///
142    /// After successful processing, [`ServerSession::finish()`] can be called to return an optionally reusable
143    /// connection back to the service. The caller needs to make sure that the connection is in a reusable state
144    /// i.e., no error or incomplete read or write headers or bodies. Otherwise a `None` should be returned.
145    async fn process_new_http(
146        self: &Arc<Self>,
147        mut session: ServerSession,
148        // TODO: make this ShutdownWatch so that all task can await on this event
149        shutdown: &ShutdownWatch,
150    ) -> Option<ReusedHttpStream>;
151
152    /// Provide options on how HTTP/2 connection should be established. This function will be called
153    /// every time a new HTTP/2 **connection** needs to be established.
154    ///
155    /// A `None` means to use the built-in default options. See [`server::H2Options`] for more details.
156    fn h2_options(&self) -> Option<server::H2Options> {
157        None
158    }
159
160    /// Provide HTTP server options used to override default behavior. This function will be called
161    /// every time a new connection is processed.
162    ///
163    /// A `None` means no server options will be applied.
164    fn server_options(&self) -> Option<&HttpServerOptions> {
165        None
166    }
167
168    async fn http_cleanup(&self) {}
169
170    #[doc(hidden)]
171    async fn process_custom_session(
172        self: Arc<Self>,
173        _stream: Stream,
174        _shutdown: &ShutdownWatch,
175    ) -> Option<Stream> {
176        None
177    }
178}
179
180#[async_trait]
181impl<T> ServerApp for T
182where
183    T: HttpServerApp + Send + Sync + 'static,
184{
185    async fn process_new(
186        self: &Arc<Self>,
187        mut stream: Stream,
188        shutdown: &ShutdownWatch,
189    ) -> Option<Stream> {
190        let mut h2c = self.server_options().as_ref().map_or(false, |o| o.h2c);
191        let custom = self
192            .server_options()
193            .as_ref()
194            .map_or(false, |o| o.force_custom);
195
196        // try to read h2 preface
197        if h2c && !custom {
198            let mut buf = [0u8; H2_PREFACE.len()];
199            let peeked = stream
200                .try_peek(&mut buf)
201                .await
202                .map_err(|e| {
203                    // this error is normal when h1 reuse and close the connection
204                    debug!("Read error while peeking h2c preface {e}");
205                    e
206                })
207                .ok()?;
208            // not all streams support peeking
209            if peeked {
210                // turn off h2c (use h1) if h2 preface doesn't exist
211                h2c = buf == H2_PREFACE;
212            }
213        }
214        if h2c || matches!(stream.selected_alpn_proto(), Some(ALPN::H2)) {
215            // create a shared connection digest
216            let digest = Arc::new(Digest {
217                ssl_digest: stream.get_ssl_digest(),
218                // TODO: log h2 handshake time
219                timing_digest: stream.get_timing_digest(),
220                proxy_digest: stream.get_proxy_digest(),
221                socket_digest: stream.get_socket_digest(),
222            });
223
224            let h2_options = self.h2_options();
225            let h2_conn = server::handshake(stream, h2_options).await;
226            let mut h2_conn = match h2_conn {
227                Err(e) => {
228                    error!("H2 handshake error {e}");
229                    return None;
230                }
231                Ok(c) => c,
232            };
233
234            let mut shutdown = shutdown.clone();
235            loop {
236                // this loop ends when the client decides to close the h2 conn
237                // TODO: add a timeout?
238                let h2_stream = tokio::select! {
239                    _ = shutdown.changed() => {
240                        h2_conn.graceful_shutdown();
241                        let _ = poll_fn(|cx| h2_conn.poll_closed(cx))
242                            .await.map_err(|e| error!("H2 error waiting for shutdown {e}"));
243                        return None;
244                    }
245                    h2_stream = server::HttpSession::from_h2_conn(&mut h2_conn, digest.clone()) => h2_stream
246                };
247                let h2_stream = match h2_stream {
248                    Err(e) => {
249                        // It is common for the client to just disconnect TCP without properly
250                        // closing H2. So we don't log the errors here
251                        debug!("H2 error when accepting new stream {e}");
252                        return None;
253                    }
254                    Ok(s) => s?, // None means the connection is ready to be closed
255                };
256                let app = self.clone();
257                let shutdown = shutdown.clone();
258                pingora_runtime::current_handle().spawn(async move {
259                    // Note, `PersistentSettings` not currently relevant for h2
260                    app.process_new_http(ServerSession::new_http2(h2_stream), &shutdown)
261                        .await;
262                });
263            }
264        } else if custom || matches!(stream.selected_alpn_proto(), Some(ALPN::Custom(_))) {
265            return self.clone().process_custom_session(stream, shutdown).await;
266        } else {
267            // No ALPN or ALPN::H1 and h2c was not configured, fallback to HTTP/1.1
268            let mut session = ServerSession::new_http1(stream);
269            if *shutdown.borrow() {
270                // stop downstream from reusing if this service is shutting down soon
271                session.set_keepalive(None);
272            } else {
273                // default 60s
274                session.set_keepalive(Some(60));
275            }
276            session.set_keepalive_reuses_remaining(
277                self.server_options()
278                    .and_then(|opts| opts.keepalive_request_limit),
279            );
280
281            let mut result = self.process_new_http(session, shutdown).await;
282            while let Some((stream, persistent_settings)) = result.map(|r| r.consume()) {
283                let mut session = ServerSession::new_http1(stream);
284                if let Some(persistent_settings) = persistent_settings {
285                    persistent_settings.apply_to_session(&mut session);
286                }
287
288                result = self.process_new_http(session, shutdown).await;
289            }
290        }
291        None
292    }
293
294    async fn cleanup(&self) {
295        self.http_cleanup().await;
296    }
297}