pingora_core/apps/
http_app.rs1use async_trait::async_trait;
18use http::Response;
19use log::{debug, error, trace};
20use pingora_http::ResponseHeader;
21use std::sync::Arc;
22
23use crate::apps::{HttpPersistentSettings, HttpServerApp, HttpServerOptions, ReusedHttpStream};
24use crate::modules::http::{HttpModules, ModuleBuilder};
25use crate::protocols::http::v2::server::H2Options;
26use crate::protocols::http::HttpTask;
27use crate::protocols::http::ServerSession;
28use crate::server::ShutdownWatch;
29
30#[async_trait]
32pub trait ServeHttp {
33 async fn response(&self, http_session: &mut ServerSession) -> Response<Vec<u8>>;
42}
43
44#[async_trait]
46impl<SV> HttpServerApp for SV
47where
48 SV: ServeHttp + Send + Sync,
49{
50 async fn process_new_http(
51 self: &Arc<Self>,
52 mut http: ServerSession,
53 shutdown: &ShutdownWatch,
54 ) -> Option<ReusedHttpStream> {
55 match http.read_request().await {
56 Ok(res) => match res {
57 false => {
58 debug!("Failed to read request header");
59 return None;
60 }
61 true => {
62 debug!("Successfully get a new request");
63 }
64 },
65 Err(e) => {
66 error!("HTTP server fails to read from downstream: {e}");
67 return None;
68 }
69 }
70 trace!("{:?}", http.req_header());
71 if *shutdown.borrow() {
72 http.set_keepalive(None);
73 } else {
74 http.set_keepalive(Some(60));
75 }
76 let new_response = self.response(&mut http).await;
77 let (parts, body) = new_response.into_parts();
78 let resp_header: ResponseHeader = parts.into();
79 match http.write_response_header(Box::new(resp_header)).await {
80 Ok(()) => {
81 debug!("HTTP response header done.");
82 }
83 Err(e) => {
84 error!(
85 "HTTP server fails to write to downstream: {e}, {}",
86 http.request_summary()
87 );
88 }
89 }
90 if !body.is_empty() {
91 match http.write_response_body(body.into(), true).await {
93 Ok(_) => debug!("HTTP response written."),
94 Err(e) => error!(
95 "HTTP server fails to write to downstream: {e}, {}",
96 http.request_summary()
97 ),
98 }
99 }
100 let persistent_settings = HttpPersistentSettings::for_session(&http);
101 match http.finish().await {
102 Ok(c) => c.map(|s| ReusedHttpStream::new(s, Some(persistent_settings))),
103 Err(e) => {
104 error!("HTTP server fails to finish the request: {e}");
105 None
106 }
107 }
108 }
109}
110
111pub struct HttpServer<SV> {
113 app: SV,
114 modules: HttpModules,
115 pub server_options: Option<HttpServerOptions>,
116 pub h2_options: Option<H2Options>,
117}
118
119impl<SV> HttpServer<SV> {
120 pub fn new_app(app: SV) -> Self {
122 HttpServer {
123 app,
124 modules: HttpModules::new(),
125 server_options: None,
126 h2_options: None,
127 }
128 }
129
130 pub fn add_module(&mut self, module: ModuleBuilder) {
132 self.modules.add_module(module)
133 }
134}
135
136#[async_trait]
137impl<SV> HttpServerApp for HttpServer<SV>
138where
139 SV: ServeHttp + Send + Sync,
140{
141 async fn process_new_http(
142 self: &Arc<Self>,
143 mut http: ServerSession,
144 shutdown: &ShutdownWatch,
145 ) -> Option<ReusedHttpStream> {
146 match http.read_request().await {
147 Ok(res) => match res {
148 false => {
149 debug!("Failed to read request header");
150 return None;
151 }
152 true => {
153 debug!("Successfully get a new request");
154 }
155 },
156 Err(e) => {
157 error!("HTTP server fails to read from downstream: {e}");
158 return None;
159 }
160 }
161 trace!("{:?}", http.req_header());
162 if *shutdown.borrow() {
163 http.set_keepalive(None);
164 } else {
165 http.set_keepalive(Some(60));
166 }
167 let mut module_ctx = self.modules.build_ctx();
168 let req = http.req_header_mut();
169 module_ctx.request_header_filter(req).await.ok()?;
170 let new_response = self.app.response(&mut http).await;
171 let (parts, body) = new_response.into_parts();
172 let mut resp_header: ResponseHeader = parts.into();
173 module_ctx
174 .response_header_filter(&mut resp_header, body.is_empty())
175 .await
176 .ok()?;
177
178 let task = HttpTask::Header(Box::new(resp_header), body.is_empty());
179 trace!("{task:?}");
180
181 match http.response_duplex_vec(vec![task]).await {
182 Ok(_) => {
183 debug!("HTTP response header done.");
184 }
185 Err(e) => {
186 error!(
187 "HTTP server fails to write to downstream: {e}, {}",
188 http.request_summary()
189 );
190 }
191 }
192
193 let mut body = Some(body.into());
194 module_ctx.response_body_filter(&mut body, true).ok()?;
195
196 let task = HttpTask::Body(body, true);
197
198 trace!("{task:?}");
199
200 match http.response_duplex_vec(vec![task]).await {
202 Ok(_) => debug!("HTTP response written."),
203 Err(e) => error!(
204 "HTTP server fails to write to downstream: {e}, {}",
205 http.request_summary()
206 ),
207 }
208 let persistent_settings = HttpPersistentSettings::for_session(&http);
209 match http.finish().await {
210 Ok(c) => c.map(|s| ReusedHttpStream::new(s, Some(persistent_settings))),
211 Err(e) => {
212 error!("HTTP server fails to finish the request: {e}");
213 None
214 }
215 }
216 }
217
218 fn h2_options(&self) -> Option<H2Options> {
219 self.h2_options.clone()
220 }
221
222 fn server_options(&self) -> Option<&HttpServerOptions> {
223 self.server_options.as_ref()
224 }
225}