pingora_core/apps/
http_app.rs

1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A simple HTTP application trait that maps a request to a response
16
17use async_trait::async_trait;
18use http::Response;
19use log::{debug, error, trace};
20use pingora_http::ResponseHeader;
21use std::sync::Arc;
22
23use crate::apps::{HttpPersistentSettings, HttpServerApp, HttpServerOptions, ReusedHttpStream};
24use crate::modules::http::{HttpModules, ModuleBuilder};
25use crate::protocols::http::v2::server::H2Options;
26use crate::protocols::http::HttpTask;
27use crate::protocols::http::ServerSession;
28use crate::server::ShutdownWatch;
29
30/// This trait defines how to map a request to a response
31#[async_trait]
32pub trait ServeHttp {
33    /// Define the mapping from a request to a response.
34    /// Note that the request header is already read, but the implementation needs to read the
35    /// request body if any.
36    ///
37    /// # Limitation
38    /// In this API, the entire response has to be generated before the end of this call.
39    /// So it is not suitable for streaming response or interactive communications.
40    /// Users need to implement their own [`super::HttpServerApp`] for those use cases.
41    async fn response(&self, http_session: &mut ServerSession) -> Response<Vec<u8>>;
42}
43
44// TODO: remove this in favor of HttpServer?
45#[async_trait]
46impl<SV> HttpServerApp for SV
47where
48    SV: ServeHttp + Send + Sync,
49{
50    async fn process_new_http(
51        self: &Arc<Self>,
52        mut http: ServerSession,
53        shutdown: &ShutdownWatch,
54    ) -> Option<ReusedHttpStream> {
55        match http.read_request().await {
56            Ok(res) => match res {
57                false => {
58                    debug!("Failed to read request header");
59                    return None;
60                }
61                true => {
62                    debug!("Successfully get a new request");
63                }
64            },
65            Err(e) => {
66                error!("HTTP server fails to read from downstream: {e}");
67                return None;
68            }
69        }
70        trace!("{:?}", http.req_header());
71        if *shutdown.borrow() {
72            http.set_keepalive(None);
73        } else {
74            http.set_keepalive(Some(60));
75        }
76        let new_response = self.response(&mut http).await;
77        let (parts, body) = new_response.into_parts();
78        let resp_header: ResponseHeader = parts.into();
79        match http.write_response_header(Box::new(resp_header)).await {
80            Ok(()) => {
81                debug!("HTTP response header done.");
82            }
83            Err(e) => {
84                error!(
85                    "HTTP server fails to write to downstream: {e}, {}",
86                    http.request_summary()
87                );
88            }
89        }
90        if !body.is_empty() {
91            // TODO: check if chunked encoding is needed
92            match http.write_response_body(body.into(), true).await {
93                Ok(_) => debug!("HTTP response written."),
94                Err(e) => error!(
95                    "HTTP server fails to write to downstream: {e}, {}",
96                    http.request_summary()
97                ),
98            }
99        }
100        let persistent_settings = HttpPersistentSettings::for_session(&http);
101        match http.finish().await {
102            Ok(c) => c.map(|s| ReusedHttpStream::new(s, Some(persistent_settings))),
103            Err(e) => {
104                error!("HTTP server fails to finish the request: {e}");
105                None
106            }
107        }
108    }
109}
110
111/// A helper struct for HTTP server with http modules embedded
112pub struct HttpServer<SV> {
113    app: SV,
114    modules: HttpModules,
115    pub server_options: Option<HttpServerOptions>,
116    pub h2_options: Option<H2Options>,
117}
118
119impl<SV> HttpServer<SV> {
120    /// Create a new [HttpServer] with the given app which implements [ServeHttp]
121    pub fn new_app(app: SV) -> Self {
122        HttpServer {
123            app,
124            modules: HttpModules::new(),
125            server_options: None,
126            h2_options: None,
127        }
128    }
129
130    /// Add [ModuleBuilder] to this [HttpServer]
131    pub fn add_module(&mut self, module: ModuleBuilder) {
132        self.modules.add_module(module)
133    }
134}
135
136#[async_trait]
137impl<SV> HttpServerApp for HttpServer<SV>
138where
139    SV: ServeHttp + Send + Sync,
140{
141    async fn process_new_http(
142        self: &Arc<Self>,
143        mut http: ServerSession,
144        shutdown: &ShutdownWatch,
145    ) -> Option<ReusedHttpStream> {
146        match http.read_request().await {
147            Ok(res) => match res {
148                false => {
149                    debug!("Failed to read request header");
150                    return None;
151                }
152                true => {
153                    debug!("Successfully get a new request");
154                }
155            },
156            Err(e) => {
157                error!("HTTP server fails to read from downstream: {e}");
158                return None;
159            }
160        }
161        trace!("{:?}", http.req_header());
162        if *shutdown.borrow() {
163            http.set_keepalive(None);
164        } else {
165            http.set_keepalive(Some(60));
166        }
167        let mut module_ctx = self.modules.build_ctx();
168        let req = http.req_header_mut();
169        module_ctx.request_header_filter(req).await.ok()?;
170        let new_response = self.app.response(&mut http).await;
171        let (parts, body) = new_response.into_parts();
172        let mut resp_header: ResponseHeader = parts.into();
173        module_ctx
174            .response_header_filter(&mut resp_header, body.is_empty())
175            .await
176            .ok()?;
177
178        let task = HttpTask::Header(Box::new(resp_header), body.is_empty());
179        trace!("{task:?}");
180
181        match http.response_duplex_vec(vec![task]).await {
182            Ok(_) => {
183                debug!("HTTP response header done.");
184            }
185            Err(e) => {
186                error!(
187                    "HTTP server fails to write to downstream: {e}, {}",
188                    http.request_summary()
189                );
190            }
191        }
192
193        let mut body = Some(body.into());
194        module_ctx.response_body_filter(&mut body, true).ok()?;
195
196        let task = HttpTask::Body(body, true);
197
198        trace!("{task:?}");
199
200        // TODO: check if chunked encoding is needed
201        match http.response_duplex_vec(vec![task]).await {
202            Ok(_) => debug!("HTTP response written."),
203            Err(e) => error!(
204                "HTTP server fails to write to downstream: {e}, {}",
205                http.request_summary()
206            ),
207        }
208        let persistent_settings = HttpPersistentSettings::for_session(&http);
209        match http.finish().await {
210            Ok(c) => c.map(|s| ReusedHttpStream::new(s, Some(persistent_settings))),
211            Err(e) => {
212                error!("HTTP server fails to finish the request: {e}");
213                None
214            }
215        }
216    }
217
218    fn h2_options(&self) -> Option<H2Options> {
219        self.h2_options.clone()
220    }
221
222    fn server_options(&self) -> Option<&HttpServerOptions> {
223        self.server_options.as_ref()
224    }
225}