1use async_trait::async_trait;
39use bytes::Bytes;
40use futures::future::FutureExt;
41use http::{header, version::Version};
42use log::{debug, error, trace, warn};
43use once_cell::sync::Lazy;
44use pingora_http::{RequestHeader, ResponseHeader};
45use std::fmt::Debug;
46use std::str;
47use std::sync::Arc;
48use tokio::sync::{mpsc, Notify};
49use tokio::time;
50
51use pingora_cache::NoCacheReason;
52use pingora_core::apps::{HttpServerApp, HttpServerOptions};
53use pingora_core::connectors::{http::Connector, ConnectorOptions};
54use pingora_core::modules::http::compression::ResponseCompressionBuilder;
55use pingora_core::modules::http::{HttpModuleCtx, HttpModules};
56use pingora_core::protocols::http::client::HttpSession as ClientSession;
57use pingora_core::protocols::http::v1::client::HttpSession as HttpSessionV1;
58use pingora_core::protocols::http::HttpTask;
59use pingora_core::protocols::http::ServerSession as HttpSession;
60use pingora_core::protocols::http::SERVER_NAME;
61use pingora_core::protocols::Stream;
62use pingora_core::protocols::{Digest, UniqueID};
63use pingora_core::server::configuration::ServerConf;
64use pingora_core::server::ShutdownWatch;
65use pingora_core::upstreams::peer::{HttpPeer, Peer};
66use pingora_error::{Error, ErrorSource, ErrorType::*, OrErr, Result};
67
68const MAX_RETRIES: usize = 16;
69const TASK_BUFFER_SIZE: usize = 4;
70
71mod proxy_cache;
72mod proxy_common;
73mod proxy_h1;
74mod proxy_h2;
75mod proxy_purge;
76mod proxy_trait;
77mod subrequest;
78
79use subrequest::Ctx as SubReqCtx;
80
81pub use proxy_purge::PurgeStatus;
82pub use proxy_trait::ProxyHttp;
83
84pub mod prelude {
85 pub use crate::{http_proxy_service, ProxyHttp, Session};
86}
87
88pub struct HttpProxy<SV> {
92 inner: SV, client_upstream: Connector,
94 shutdown: Notify,
95 pub server_options: Option<HttpServerOptions>,
96 pub downstream_modules: HttpModules,
97}
98
99impl<SV> HttpProxy<SV> {
100 fn new(inner: SV, conf: Arc<ServerConf>) -> Self {
101 HttpProxy {
102 inner,
103 client_upstream: Connector::new(Some(ConnectorOptions::from_server_conf(&conf))),
104 shutdown: Notify::new(),
105 server_options: None,
106 downstream_modules: HttpModules::new(),
107 }
108 }
109
110 fn handle_init_modules(&mut self)
111 where
112 SV: ProxyHttp,
113 {
114 self.inner
115 .init_downstream_modules(&mut self.downstream_modules);
116 }
117
118 async fn handle_new_request(
119 &self,
120 mut downstream_session: Box<HttpSession>,
121 ) -> Option<Box<HttpSession>>
122 where
123 SV: ProxyHttp + Send + Sync,
124 SV::CTX: Send + Sync,
125 {
126 let res = tokio::select! {
129 biased; res = downstream_session.read_request() => { res }
131 _ = self.shutdown.notified() => {
132 return None;
134 }
135 };
136 match res {
137 Ok(true) => {
138 debug!("Successfully get a new request");
140 }
141 Ok(false) => {
142 return None; }
144 Err(mut e) => {
145 e.as_down();
146 error!("Fail to proxy: {}", e);
147 if matches!(e.etype, InvalidHTTPHeader) {
148 downstream_session.respond_error(400).await;
149 } downstream_session.shutdown().await;
151 return None;
152 }
153 }
154 trace!(
155 "Request header: {:?}",
156 downstream_session.req_header().as_ref()
157 );
158 Some(downstream_session)
159 }
160
161 async fn proxy_to_upstream(
163 &self,
164 session: &mut Session,
165 ctx: &mut SV::CTX,
166 ) -> (bool, Option<Box<Error>>)
167 where
168 SV: ProxyHttp + Send + Sync,
169 SV::CTX: Send + Sync,
170 {
171 let peer = match self.inner.upstream_peer(session, ctx).await {
172 Ok(p) => p,
173 Err(e) => return (false, Some(e)),
174 };
175
176 let client_session = self.client_upstream.get_http_session(&*peer).await;
177 match client_session {
178 Ok((client_session, client_reused)) => {
179 let (server_reused, error) = match client_session {
180 ClientSession::H1(mut h1) => {
181 let (server_reused, client_reuse, error) = self
182 .proxy_to_h1_upstream(session, &mut h1, client_reused, &peer, ctx)
183 .await;
184 if client_reuse {
185 let session = ClientSession::H1(h1);
186 self.client_upstream
187 .release_http_session(session, &*peer, peer.idle_timeout())
188 .await;
189 }
190 (server_reused, error)
191 }
192 ClientSession::H2(mut h2) => {
193 let (server_reused, mut error) = self
194 .proxy_to_h2_upstream(session, &mut h2, client_reused, &peer, ctx)
195 .await;
196 let session = ClientSession::H2(h2);
197 self.client_upstream
198 .release_http_session(session, &*peer, peer.idle_timeout())
199 .await;
200
201 if let Some(e) = error.as_mut() {
202 if matches!(e.etype, H2Downgrade | InvalidH2) {
205 if peer
206 .get_alpn()
207 .map_or(true, |alpn| alpn.get_min_http_version() == 1)
208 {
209 self.client_upstream.prefer_h1(&*peer);
212 } else {
213 e.retry = false.into();
215 }
216 }
217 }
218
219 (server_reused, error)
220 }
221 };
222 (
223 server_reused,
224 error.map(|e| {
225 self.inner
226 .error_while_proxy(&peer, session, e, ctx, client_reused)
227 }),
228 )
229 }
230 Err(mut e) => {
231 e.as_up();
232 let new_err = self.inner.fail_to_connect(session, &peer, ctx, e);
233 (false, Some(new_err.into_up()))
234 }
235 }
236 }
237
238 fn upstream_filter(
239 &self,
240 session: &mut Session,
241 task: &mut HttpTask,
242 ctx: &mut SV::CTX,
243 ) -> Result<()>
244 where
245 SV: ProxyHttp,
246 {
247 match task {
248 HttpTask::Header(header, _eos) => {
249 self.inner.upstream_response_filter(session, header, ctx)
250 }
251 HttpTask::Body(data, eos) => self
252 .inner
253 .upstream_response_body_filter(session, data, *eos, ctx),
254 HttpTask::Trailer(Some(trailers)) => self
255 .inner
256 .upstream_response_trailer_filter(session, trailers, ctx)?,
257 _ => {
258 }
260 }
261 Ok(())
262 }
263
264 async fn finish(
265 &self,
266 mut session: Session,
267 ctx: &mut SV::CTX,
268 reuse: bool,
269 error: Option<&Error>,
270 ) -> Option<Stream>
271 where
272 SV: ProxyHttp + Send + Sync,
273 SV::CTX: Send + Sync,
274 {
275 self.inner.logging(&mut session, error, ctx).await;
276
277 if reuse {
278 session.downstream_session.finish().await.ok().flatten()
280 } else {
281 None
282 }
283 }
284}
285
286use pingora_cache::HttpCache;
287use pingora_core::protocols::http::compression::ResponseCompressionCtx;
288
289pub struct Session {
294 pub downstream_session: Box<HttpSession>,
296 pub cache: HttpCache,
298 pub upstream_compression: ResponseCompressionCtx,
300 pub ignore_downstream_range: bool,
302 subrequest_ctx: Option<Box<SubReqCtx>>,
304 pub downstream_modules_ctx: HttpModuleCtx,
306}
307
308impl Session {
309 fn new(
310 downstream_session: impl Into<Box<HttpSession>>,
311 downstream_modules: &HttpModules,
312 ) -> Self {
313 Session {
314 downstream_session: downstream_session.into(),
315 cache: HttpCache::new(),
316 upstream_compression: ResponseCompressionCtx::new(0, false, false),
318 ignore_downstream_range: false,
319 subrequest_ctx: None,
320 downstream_modules_ctx: downstream_modules.build_ctx(),
321 }
322 }
323
324 pub fn new_h1(stream: Stream) -> Self {
328 let modules = HttpModules::new();
329 Self::new(Box::new(HttpSession::new_http1(stream)), &modules)
330 }
331
332 pub fn new_h1_with_modules(stream: Stream, downstream_modules: &HttpModules) -> Self {
336 Self::new(Box::new(HttpSession::new_http1(stream)), downstream_modules)
337 }
338
339 pub fn as_downstream_mut(&mut self) -> &mut HttpSession {
340 &mut self.downstream_session
341 }
342
343 pub fn as_downstream(&self) -> &HttpSession {
344 &self.downstream_session
345 }
346
347 pub async fn respond_error(&mut self, error: u16) -> Result<()> {
349 let resp = HttpSession::generate_error(error);
350 self.write_response_header(Box::new(resp), true)
351 .await
352 .unwrap_or_else(|e| {
353 self.downstream_session.set_keepalive(None);
354 error!("failed to send error response to downstream: {e}");
355 });
356 Ok(())
357 }
358
359 pub async fn write_response_header(
364 &mut self,
365 mut resp: Box<ResponseHeader>,
366 end_of_stream: bool,
367 ) -> Result<()> {
368 self.downstream_modules_ctx
369 .response_header_filter(&mut resp, end_of_stream)
370 .await?;
371 self.downstream_session.write_response_header(resp).await
372 }
373
374 pub async fn write_response_body(
379 &mut self,
380 mut body: Option<Bytes>,
381 end_of_stream: bool,
382 ) -> Result<()> {
383 self.downstream_modules_ctx
384 .response_body_filter(&mut body, end_of_stream)?;
385
386 if body.is_none() && !end_of_stream {
387 return Ok(());
388 }
389
390 let data = body.unwrap_or_default();
391 self.downstream_session
392 .write_response_body(data, end_of_stream)
393 .await
394 }
395
396 pub async fn write_response_tasks(&mut self, mut tasks: Vec<HttpTask>) -> Result<bool> {
397 for task in tasks.iter_mut() {
398 match task {
399 HttpTask::Header(resp, end) => {
400 self.downstream_modules_ctx
401 .response_header_filter(resp, *end)
402 .await?;
403 }
404 HttpTask::Body(data, end) => {
405 self.downstream_modules_ctx
406 .response_body_filter(data, *end)?;
407 }
408 HttpTask::Trailer(trailers) => {
409 if let Some(buf) = self
410 .downstream_modules_ctx
411 .response_trailer_filter(trailers)?
412 {
413 *task = HttpTask::Body(Some(buf), true);
419 }
420 }
421 _ => { }
422 }
423 }
424 self.downstream_session.response_duplex_vec(tasks).await
425 }
426}
427
428impl AsRef<HttpSession> for Session {
429 fn as_ref(&self) -> &HttpSession {
430 &self.downstream_session
431 }
432}
433
434impl AsMut<HttpSession> for Session {
435 fn as_mut(&mut self) -> &mut HttpSession {
436 &mut self.downstream_session
437 }
438}
439
440use std::ops::{Deref, DerefMut};
441
442impl Deref for Session {
443 type Target = HttpSession;
444
445 fn deref(&self) -> &Self::Target {
446 &self.downstream_session
447 }
448}
449
450impl DerefMut for Session {
451 fn deref_mut(&mut self) -> &mut Self::Target {
452 &mut self.downstream_session
453 }
454}
455
456static BAD_GATEWAY: Lazy<ResponseHeader> = Lazy::new(|| {
458 let mut resp = ResponseHeader::build(http::StatusCode::BAD_GATEWAY, Some(3)).unwrap();
459 resp.insert_header(header::SERVER, &SERVER_NAME[..])
460 .unwrap();
461 resp.insert_header(header::CONTENT_LENGTH, 0).unwrap();
462 resp.insert_header(header::CACHE_CONTROL, "private, no-store")
463 .unwrap();
464
465 resp
466});
467
468impl<SV> HttpProxy<SV> {
469 async fn process_request(
470 self: &Arc<Self>,
471 mut session: Session,
472 mut ctx: <SV as ProxyHttp>::CTX,
473 ) -> Option<Stream>
474 where
475 SV: ProxyHttp + Send + Sync + 'static,
476 <SV as ProxyHttp>::CTX: Send + Sync,
477 {
478 if let Err(e) = self
479 .inner
480 .early_request_filter(&mut session, &mut ctx)
481 .await
482 {
483 self.handle_error(&mut session, &mut ctx, e, "Fail to early filter request:")
484 .await;
485 return None;
486 }
487
488 let req = session.downstream_session.req_header_mut();
489
490 if let Err(e) = session
492 .downstream_modules_ctx
493 .request_header_filter(req)
494 .await
495 {
496 self.handle_error(
497 &mut session,
498 &mut ctx,
499 e,
500 "Failed in downstream modules request filter:",
501 )
502 .await;
503 return None;
504 }
505
506 match self.inner.request_filter(&mut session, &mut ctx).await {
507 Ok(response_sent) => {
508 if response_sent {
509 self.inner.logging(&mut session, None, &mut ctx).await;
511 return session.downstream_session.finish().await.ok().flatten();
512 }
513 }
515 Err(e) => {
516 self.handle_error(&mut session, &mut ctx, e, "Fail to filter request:")
517 .await;
518 return None;
519 }
520 }
521
522 if let Some((reuse, err)) = self.proxy_cache(&mut session, &mut ctx).await {
523 return self.finish(session, &mut ctx, reuse, err.as_deref()).await;
525 }
526 match self
530 .inner
531 .proxy_upstream_filter(&mut session, &mut ctx)
532 .await
533 {
534 Ok(proxy_to_upstream) => {
535 if !proxy_to_upstream {
536 if session.response_written().is_none() {
539 match session.write_response_header_ref(&BAD_GATEWAY).await {
540 Ok(()) => {}
541 Err(e) => {
542 self.handle_error(
543 &mut session,
544 &mut ctx,
545 e,
546 "Error responding with Bad Gateway:",
547 )
548 .await;
549
550 return None;
551 }
552 }
553 }
554
555 return self.finish(session, &mut ctx, false, None).await;
556 }
557 }
559 Err(e) => {
560 self.handle_error(
561 &mut session,
562 &mut ctx,
563 e,
564 "Error deciding if we should proxy to upstream:",
565 )
566 .await;
567 return None;
568 }
569 }
570
571 let mut retries: usize = 0;
572
573 let mut server_reuse = false;
574 let mut proxy_error: Option<Box<Error>> = None;
575
576 while retries < MAX_RETRIES {
577 retries += 1;
578
579 let (reuse, e) = self.proxy_to_upstream(&mut session, &mut ctx).await;
580 server_reuse = reuse;
581
582 match e {
583 Some(error) => {
584 let retry = error.retry();
585 proxy_error = Some(error);
586 if !retry {
587 break;
588 }
589 warn!(
591 "Fail to proxy: {}, tries: {}, retry: {}, {}",
592 proxy_error.as_ref().unwrap(),
593 retries,
594 retry,
595 self.inner.request_summary(&session, &ctx)
596 );
597 }
598 None => {
599 proxy_error = None;
600 break;
601 }
602 };
603 }
604
605 let serve_stale_result = if proxy_error.is_some() && session.cache.can_serve_stale_error() {
608 self.handle_stale_if_error(&mut session, &mut ctx, proxy_error.as_ref().unwrap())
609 .await
610 } else {
611 None
612 };
613
614 let final_error = if let Some((reuse, stale_cache_error)) = serve_stale_result {
615 server_reuse = server_reuse && reuse;
617 stale_cache_error
618 } else {
619 proxy_error
620 };
621
622 if let Some(e) = final_error.as_ref() {
623 session.cache.disable(NoCacheReason::InternalError);
625 let status = self.inner.fail_to_proxy(&mut session, e, &mut ctx).await;
626
627 if !self.inner.suppress_error_log(&session, &ctx, e) {
629 error!(
630 "Fail to proxy: {}, status: {}, tries: {}, retry: {}, {}",
631 final_error.as_ref().unwrap(),
632 status,
633 retries,
634 false, self.inner.request_summary(&session, &ctx)
636 );
637 }
638 }
639
640 self.finish(session, &mut ctx, server_reuse, final_error.as_deref())
642 .await
643 }
644
645 async fn handle_error(
646 &self,
647 session: &mut Session,
648 ctx: &mut <SV as ProxyHttp>::CTX,
649 e: Box<Error>,
650 context: &str,
651 ) where
652 SV: ProxyHttp + Send + Sync + 'static,
653 <SV as ProxyHttp>::CTX: Send + Sync,
654 {
655 if !self.inner.suppress_error_log(session, ctx, &e) {
656 error!(
657 "{context} {}, {}",
658 e,
659 self.inner.request_summary(session, ctx)
660 );
661 }
662 self.inner.fail_to_proxy(session, &e, ctx).await;
663 self.inner.logging(session, Some(&e), ctx).await;
664 }
665}
666
667#[async_trait]
677trait Subrequest {
678 async fn process_subrequest(
679 self: &Arc<Self>,
680 session: Box<HttpSession>,
681 sub_req_ctx: Box<SubReqCtx>,
682 );
683}
684
685#[async_trait]
686impl<SV> Subrequest for HttpProxy<SV>
687where
688 SV: ProxyHttp + Send + Sync + 'static,
689 <SV as ProxyHttp>::CTX: Send + Sync,
690{
691 async fn process_subrequest(
692 self: &Arc<Self>,
693 session: Box<HttpSession>,
694 sub_req_ctx: Box<SubReqCtx>,
695 ) {
696 debug!("starting subrequest");
697 let mut session = match self.handle_new_request(session).await {
698 Some(downstream_session) => Session::new(downstream_session, &self.downstream_modules),
699 None => return, };
701
702 session.set_keepalive(None);
705
706 session.subrequest_ctx.replace(sub_req_ctx);
707 trace!("processing subrequest");
708 let ctx = self.inner.new_ctx();
709 self.process_request(session, ctx).await;
710 trace!("subrequest done");
711 }
712}
713
714#[async_trait]
715impl<SV> HttpServerApp for HttpProxy<SV>
716where
717 SV: ProxyHttp + Send + Sync + 'static,
718 <SV as ProxyHttp>::CTX: Send + Sync,
719{
720 async fn process_new_http(
721 self: &Arc<Self>,
722 session: HttpSession,
723 shutdown: &ShutdownWatch,
724 ) -> Option<Stream> {
725 let session = Box::new(session);
726
727 let mut session = match self.handle_new_request(session).await {
729 Some(downstream_session) => Session::new(downstream_session, &self.downstream_modules),
730 None => return None, };
732
733 if *shutdown.borrow() {
734 session.set_keepalive(None);
736 } else {
737 session.set_keepalive(Some(60));
739 }
740
741 let ctx = self.inner.new_ctx();
742 self.process_request(session, ctx).await
743 }
744
745 async fn http_cleanup(&self) {
746 self.shutdown.notify_waiters();
748
749 }
751
752 fn server_options(&self) -> Option<&HttpServerOptions> {
753 self.server_options.as_ref()
754 }
755
756 }
758
759use pingora_core::services::listening::Service;
760
761pub fn http_proxy_service<SV>(conf: &Arc<ServerConf>, inner: SV) -> Service<HttpProxy<SV>>
765where
766 SV: ProxyHttp,
767{
768 http_proxy_service_with_name(conf, inner, "Pingora HTTP Proxy Service")
769}
770
771pub fn http_proxy_service_with_name<SV>(
775 conf: &Arc<ServerConf>,
776 inner: SV,
777 name: &str,
778) -> Service<HttpProxy<SV>>
779where
780 SV: ProxyHttp,
781{
782 let mut proxy = HttpProxy::new(inner, conf.clone());
783 proxy.handle_init_modules();
784 Service::new(name.to_string(), proxy)
785}