1use async_trait::async_trait;
39use bytes::Bytes;
40use futures::future::FutureExt;
41use http::{header, version::Version};
42use log::{debug, error, trace, warn};
43use once_cell::sync::Lazy;
44use pingora_http::{RequestHeader, ResponseHeader};
45use std::fmt::Debug;
46use std::str;
47use std::sync::Arc;
48use tokio::sync::{mpsc, Notify};
49use tokio::time;
50
51use pingora_cache::NoCacheReason;
52use pingora_core::apps::{
53 HttpPersistentSettings, HttpServerApp, HttpServerOptions, ReusedHttpStream,
54};
55use pingora_core::connectors::{http::Connector, ConnectorOptions};
56use pingora_core::modules::http::compression::ResponseCompressionBuilder;
57use pingora_core::modules::http::{HttpModuleCtx, HttpModules};
58use pingora_core::protocols::http::client::HttpSession as ClientSession;
59use pingora_core::protocols::http::v1::client::HttpSession as HttpSessionV1;
60use pingora_core::protocols::http::v2::server::H2Options;
61use pingora_core::protocols::http::HttpTask;
62use pingora_core::protocols::http::ServerSession as HttpSession;
63use pingora_core::protocols::http::SERVER_NAME;
64use pingora_core::protocols::Stream;
65use pingora_core::protocols::{Digest, UniqueID};
66use pingora_core::server::configuration::ServerConf;
67use pingora_core::server::ShutdownWatch;
68use pingora_core::upstreams::peer::{HttpPeer, Peer};
69use pingora_error::{Error, ErrorSource, ErrorType::*, OrErr, Result};
70
71const TASK_BUFFER_SIZE: usize = 4;
72
73mod proxy_cache;
74mod proxy_common;
75mod proxy_h1;
76mod proxy_h2;
77mod proxy_purge;
78mod proxy_trait;
79mod subrequest;
80
81use subrequest::Ctx as SubReqCtx;
82
83pub use proxy_cache::range_filter::{range_header_filter, RangeType};
84pub use proxy_purge::PurgeStatus;
85pub use proxy_trait::{FailToProxy, ProxyHttp};
86
87pub mod prelude {
88 pub use crate::{http_proxy_service, ProxyHttp, Session};
89}
90
91pub struct HttpProxy<SV> {
95 inner: SV, client_upstream: Connector,
97 shutdown: Notify,
98 pub server_options: Option<HttpServerOptions>,
99 pub h2_options: Option<H2Options>,
100 pub downstream_modules: HttpModules,
101 max_retries: usize,
102}
103
104impl<SV> HttpProxy<SV> {
105 fn new(inner: SV, conf: Arc<ServerConf>) -> Self {
106 HttpProxy {
107 inner,
108 client_upstream: Connector::new(Some(ConnectorOptions::from_server_conf(&conf))),
109 shutdown: Notify::new(),
110 server_options: None,
111 h2_options: None,
112 downstream_modules: HttpModules::new(),
113 max_retries: conf.max_retries,
114 }
115 }
116
117 fn handle_init_modules(&mut self)
118 where
119 SV: ProxyHttp,
120 {
121 self.inner
122 .init_downstream_modules(&mut self.downstream_modules);
123 }
124
125 async fn handle_new_request(
126 &self,
127 mut downstream_session: Box<HttpSession>,
128 ) -> Option<Box<HttpSession>>
129 where
130 SV: ProxyHttp + Send + Sync,
131 SV::CTX: Send + Sync,
132 {
133 let res = tokio::select! {
136 biased; res = downstream_session.read_request() => { res }
138 _ = self.shutdown.notified() => {
139 return None;
141 }
142 };
143 match res {
144 Ok(true) => {
145 debug!("Successfully get a new request");
147 }
148 Ok(false) => {
149 return None; }
151 Err(mut e) => {
152 e.as_down();
153 error!("Fail to proxy: {e}");
154 if matches!(e.etype, InvalidHTTPHeader) {
155 downstream_session
156 .respond_error(400)
157 .await
158 .unwrap_or_else(|e| {
159 error!("failed to send error response to downstream: {e}");
160 });
161 } downstream_session.shutdown().await;
163 return None;
164 }
165 }
166 trace!(
167 "Request header: {:?}",
168 downstream_session.req_header().as_ref()
169 );
170 Some(downstream_session)
171 }
172
173 async fn proxy_to_upstream(
175 &self,
176 session: &mut Session,
177 ctx: &mut SV::CTX,
178 ) -> (bool, Option<Box<Error>>)
179 where
180 SV: ProxyHttp + Send + Sync,
181 SV::CTX: Send + Sync,
182 {
183 let peer = match self.inner.upstream_peer(session, ctx).await {
184 Ok(p) => p,
185 Err(e) => return (false, Some(e)),
186 };
187
188 let client_session = self.client_upstream.get_http_session(&*peer).await;
189 match client_session {
190 Ok((client_session, client_reused)) => {
191 let (server_reused, error) = match client_session {
192 ClientSession::H1(mut h1) => {
193 let (server_reused, client_reuse, error) = self
194 .proxy_to_h1_upstream(session, &mut h1, client_reused, &peer, ctx)
195 .await;
196 if client_reuse {
197 let session = ClientSession::H1(h1);
198 self.client_upstream
199 .release_http_session(session, &*peer, peer.idle_timeout())
200 .await;
201 }
202 (server_reused, error)
203 }
204 ClientSession::H2(mut h2) => {
205 let (server_reused, mut error) = self
206 .proxy_to_h2_upstream(session, &mut h2, client_reused, &peer, ctx)
207 .await;
208 let session = ClientSession::H2(h2);
209 self.client_upstream
210 .release_http_session(session, &*peer, peer.idle_timeout())
211 .await;
212
213 if let Some(e) = error.as_mut() {
214 if matches!(e.etype, H2Downgrade | InvalidH2) {
217 if peer
218 .get_alpn()
219 .map_or(true, |alpn| alpn.get_min_http_version() == 1)
220 {
221 self.client_upstream.prefer_h1(&*peer);
224 } else {
225 e.retry = false.into();
227 }
228 }
229 }
230
231 (server_reused, error)
232 }
233 };
234 (
235 server_reused,
236 error.map(|e| {
237 self.inner
238 .error_while_proxy(&peer, session, e, ctx, client_reused)
239 }),
240 )
241 }
242 Err(mut e) => {
243 e.as_up();
244 let new_err = self.inner.fail_to_connect(session, &peer, ctx, e);
245 (false, Some(new_err.into_up()))
246 }
247 }
248 }
249
250 fn upstream_filter(
251 &self,
252 session: &mut Session,
253 task: &mut HttpTask,
254 ctx: &mut SV::CTX,
255 ) -> Result<()>
256 where
257 SV: ProxyHttp,
258 {
259 match task {
260 HttpTask::Header(header, _eos) => {
261 self.inner.upstream_response_filter(session, header, ctx)?
262 }
263 HttpTask::Body(data, eos) => self
264 .inner
265 .upstream_response_body_filter(session, data, *eos, ctx)?,
266 HttpTask::Trailer(Some(trailers)) => self
267 .inner
268 .upstream_response_trailer_filter(session, trailers, ctx)?,
269 _ => {
270 }
272 }
273 Ok(())
274 }
275
276 async fn finish(
277 &self,
278 mut session: Session,
279 ctx: &mut SV::CTX,
280 reuse: bool,
281 error: Option<&Error>,
282 ) -> Option<ReusedHttpStream>
283 where
284 SV: ProxyHttp + Send + Sync,
285 SV::CTX: Send + Sync,
286 {
287 self.inner.logging(&mut session, error, ctx).await;
288
289 if reuse {
290 let persistent_settings = HttpPersistentSettings::for_session(&session);
292 session
293 .downstream_session
294 .finish()
295 .await
296 .ok()
297 .flatten()
298 .map(|s| ReusedHttpStream::new(s, Some(persistent_settings)))
299 } else {
300 None
301 }
302 }
303
304 fn cleanup_sub_req(&self, session: &mut Session) {
305 if let Some(ctx) = session.subrequest_ctx.as_mut() {
306 ctx.release_write_lock();
307 }
308 }
309}
310
311use pingora_cache::HttpCache;
312use pingora_core::protocols::http::compression::ResponseCompressionCtx;
313
314pub struct Session {
319 pub downstream_session: Box<HttpSession>,
321 pub cache: HttpCache,
323 pub upstream_compression: ResponseCompressionCtx,
325 pub ignore_downstream_range: bool,
327 pub upstream_headers_mutated_for_cache: bool,
329 pub subrequest_ctx: Option<Box<SubReqCtx>>,
331 pub downstream_modules_ctx: HttpModuleCtx,
333}
334
335impl Session {
336 fn new(
337 downstream_session: impl Into<Box<HttpSession>>,
338 downstream_modules: &HttpModules,
339 ) -> Self {
340 Session {
341 downstream_session: downstream_session.into(),
342 cache: HttpCache::new(),
343 upstream_compression: ResponseCompressionCtx::new(0, false, false),
345 ignore_downstream_range: false,
346 upstream_headers_mutated_for_cache: false,
347 subrequest_ctx: None,
348 downstream_modules_ctx: downstream_modules.build_ctx(),
349 }
350 }
351
352 pub fn new_h1(stream: Stream) -> Self {
356 let modules = HttpModules::new();
357 Self::new(Box::new(HttpSession::new_http1(stream)), &modules)
358 }
359
360 pub fn new_h1_with_modules(stream: Stream, downstream_modules: &HttpModules) -> Self {
364 Self::new(Box::new(HttpSession::new_http1(stream)), downstream_modules)
365 }
366
367 pub fn as_downstream_mut(&mut self) -> &mut HttpSession {
368 &mut self.downstream_session
369 }
370
371 pub fn as_downstream(&self) -> &HttpSession {
372 &self.downstream_session
373 }
374
375 pub async fn respond_error(&mut self, error: u16) -> Result<()> {
377 self.as_downstream_mut().respond_error(error).await
378 }
379
380 pub async fn respond_error_with_body(&mut self, error: u16, body: Bytes) -> Result<()> {
382 self.as_downstream_mut()
383 .respond_error_with_body(error, body)
384 .await
385 }
386
387 pub async fn write_response_header(
392 &mut self,
393 mut resp: Box<ResponseHeader>,
394 end_of_stream: bool,
395 ) -> Result<()> {
396 self.downstream_modules_ctx
397 .response_header_filter(&mut resp, end_of_stream)
398 .await?;
399 self.downstream_session.write_response_header(resp).await
400 }
401
402 pub async fn write_response_body(
407 &mut self,
408 mut body: Option<Bytes>,
409 end_of_stream: bool,
410 ) -> Result<()> {
411 self.downstream_modules_ctx
412 .response_body_filter(&mut body, end_of_stream)?;
413
414 if body.is_none() && !end_of_stream {
415 return Ok(());
416 }
417
418 let data = body.unwrap_or_default();
419 self.downstream_session
420 .write_response_body(data, end_of_stream)
421 .await
422 }
423
424 pub async fn write_response_tasks(&mut self, mut tasks: Vec<HttpTask>) -> Result<bool> {
425 for task in tasks.iter_mut() {
426 match task {
427 HttpTask::Header(resp, end) => {
428 self.downstream_modules_ctx
429 .response_header_filter(resp, *end)
430 .await?;
431 }
432 HttpTask::Body(data, end) => {
433 self.downstream_modules_ctx
434 .response_body_filter(data, *end)?;
435 }
436 HttpTask::Trailer(trailers) => {
437 if let Some(buf) = self
438 .downstream_modules_ctx
439 .response_trailer_filter(trailers)?
440 {
441 *task = HttpTask::Body(Some(buf), true);
447 }
448 }
449 HttpTask::Done => {
450 if let Some(buf) = self.downstream_modules_ctx.response_done_filter()? {
459 *task = HttpTask::Body(Some(buf), true);
460 }
461 }
462 _ => { }
463 }
464 }
465 self.downstream_session.response_duplex_vec(tasks).await
466 }
467
468 pub fn mark_upstream_headers_mutated_for_cache(&mut self) {
471 self.upstream_headers_mutated_for_cache = true;
472 }
473
474 pub fn upstream_headers_mutated_for_cache(&self) -> bool {
476 self.upstream_headers_mutated_for_cache
477 }
478}
479
480impl AsRef<HttpSession> for Session {
481 fn as_ref(&self) -> &HttpSession {
482 &self.downstream_session
483 }
484}
485
486impl AsMut<HttpSession> for Session {
487 fn as_mut(&mut self) -> &mut HttpSession {
488 &mut self.downstream_session
489 }
490}
491
492use std::ops::{Deref, DerefMut};
493
494impl Deref for Session {
495 type Target = HttpSession;
496
497 fn deref(&self) -> &Self::Target {
498 &self.downstream_session
499 }
500}
501
502impl DerefMut for Session {
503 fn deref_mut(&mut self) -> &mut Self::Target {
504 &mut self.downstream_session
505 }
506}
507
508static BAD_GATEWAY: Lazy<ResponseHeader> = Lazy::new(|| {
510 let mut resp = ResponseHeader::build(http::StatusCode::BAD_GATEWAY, Some(3)).unwrap();
511 resp.insert_header(header::SERVER, &SERVER_NAME[..])
512 .unwrap();
513 resp.insert_header(header::CONTENT_LENGTH, 0).unwrap();
514 resp.insert_header(header::CACHE_CONTROL, "private, no-store")
515 .unwrap();
516
517 resp
518});
519
520impl<SV> HttpProxy<SV> {
521 async fn process_request(
522 self: &Arc<Self>,
523 mut session: Session,
524 mut ctx: <SV as ProxyHttp>::CTX,
525 ) -> Option<ReusedHttpStream>
526 where
527 SV: ProxyHttp + Send + Sync + 'static,
528 <SV as ProxyHttp>::CTX: Send + Sync,
529 {
530 if let Err(e) = self
531 .inner
532 .early_request_filter(&mut session, &mut ctx)
533 .await
534 {
535 return self
536 .handle_error(session, &mut ctx, e, "Fail to early filter request:")
537 .await;
538 }
539
540 let req = session.downstream_session.req_header_mut();
541
542 if let Err(e) = session
544 .downstream_modules_ctx
545 .request_header_filter(req)
546 .await
547 {
548 return self
549 .handle_error(
550 session,
551 &mut ctx,
552 e,
553 "Failed in downstream modules request filter:",
554 )
555 .await;
556 }
557
558 match self.inner.request_filter(&mut session, &mut ctx).await {
559 Ok(response_sent) => {
560 if response_sent {
561 self.inner.logging(&mut session, None, &mut ctx).await;
563 self.cleanup_sub_req(&mut session);
564 let persistent_settings = HttpPersistentSettings::for_session(&session);
565 return session
566 .downstream_session
567 .finish()
568 .await
569 .ok()
570 .flatten()
571 .map(|s| ReusedHttpStream::new(s, Some(persistent_settings)));
572 }
573 }
575 Err(e) => {
576 return self
577 .handle_error(session, &mut ctx, e, "Fail to filter request:")
578 .await;
579 }
580 }
581
582 if let Some((reuse, err)) = self.proxy_cache(&mut session, &mut ctx).await {
583 return self.finish(session, &mut ctx, reuse, err.as_deref()).await;
585 }
586 self.cleanup_sub_req(&mut session);
590
591 match self
593 .inner
594 .proxy_upstream_filter(&mut session, &mut ctx)
595 .await
596 {
597 Ok(proxy_to_upstream) => {
598 if !proxy_to_upstream {
599 if session.cache.enabled() {
602 session.cache.disable(NoCacheReason::DeclinedToUpstream);
604 }
605 if session.response_written().is_none() {
606 match session.write_response_header_ref(&BAD_GATEWAY).await {
607 Ok(()) => {}
608 Err(e) => {
609 return self
610 .handle_error(
611 session,
612 &mut ctx,
613 e,
614 "Error responding with Bad Gateway:",
615 )
616 .await;
617 }
618 }
619 }
620
621 return self.finish(session, &mut ctx, true, None).await;
622 }
623 }
625 Err(e) => {
626 if session.cache.enabled() {
627 session.cache.disable(NoCacheReason::InternalError);
628 }
629
630 return self
631 .handle_error(
632 session,
633 &mut ctx,
634 e,
635 "Error deciding if we should proxy to upstream:",
636 )
637 .await;
638 }
639 }
640
641 let mut retries: usize = 0;
642
643 let mut server_reuse = false;
644 let mut proxy_error: Option<Box<Error>> = None;
645
646 while retries < self.max_retries {
647 retries += 1;
648
649 let (reuse, e) = self.proxy_to_upstream(&mut session, &mut ctx).await;
650 server_reuse = reuse;
651
652 match e {
653 Some(error) => {
654 let retry = error.retry();
655 proxy_error = Some(error);
656 if !retry {
657 break;
658 }
659 warn!(
661 "Fail to proxy: {}, tries: {}, retry: {}, {}",
662 proxy_error.as_ref().unwrap(),
663 retries,
664 retry,
665 self.inner.request_summary(&session, &ctx)
666 );
667 }
668 None => {
669 proxy_error = None;
670 break;
671 }
672 };
673 }
674
675 let serve_stale_result = if proxy_error.is_some() && session.cache.can_serve_stale_error() {
678 self.handle_stale_if_error(&mut session, &mut ctx, proxy_error.as_ref().unwrap())
679 .await
680 } else {
681 None
682 };
683
684 let final_error = if let Some((reuse, stale_cache_error)) = serve_stale_result {
685 server_reuse = server_reuse && reuse;
687 stale_cache_error
688 } else {
689 proxy_error
690 };
691
692 if let Some(e) = final_error.as_ref() {
693 if session.cache.enabled() {
695 let reason = if *e.esource() == ErrorSource::Upstream {
696 NoCacheReason::UpstreamError
697 } else {
698 NoCacheReason::InternalError
699 };
700 session.cache.disable(reason);
701 }
702 let res = self.inner.fail_to_proxy(&mut session, e, &mut ctx).await;
703
704 if !self.inner.suppress_error_log(&session, &ctx, e) {
706 error!(
707 "Fail to proxy: {}, status: {}, tries: {}, retry: {}, {}",
708 final_error.as_ref().unwrap(),
709 res.error_code,
710 retries,
711 false, self.inner.request_summary(&session, &ctx)
713 );
714 }
715 }
716
717 self.finish(session, &mut ctx, server_reuse, final_error.as_deref())
719 .await
720 }
721
722 async fn handle_error(
723 &self,
724 mut session: Session,
725 ctx: &mut <SV as ProxyHttp>::CTX,
726 e: Box<Error>,
727 context: &str,
728 ) -> Option<ReusedHttpStream>
729 where
730 SV: ProxyHttp + Send + Sync + 'static,
731 <SV as ProxyHttp>::CTX: Send + Sync,
732 {
733 let res = self.inner.fail_to_proxy(&mut session, &e, ctx).await;
734 if !self.inner.suppress_error_log(&session, ctx, &e) {
735 error!(
736 "{context} {}, status: {}, {}",
737 e,
738 res.error_code,
739 self.inner.request_summary(&session, ctx)
740 );
741 }
742 self.inner.logging(&mut session, Some(&e), ctx).await;
743 self.cleanup_sub_req(&mut session);
744
745 if res.can_reuse_downstream {
746 let persistent_settings = HttpPersistentSettings::for_session(&session);
747 session
748 .downstream_session
749 .finish()
750 .await
751 .ok()
752 .flatten()
753 .map(|s| ReusedHttpStream::new(s, Some(persistent_settings)))
754 } else {
755 None
756 }
757 }
758}
759
760#[async_trait]
770trait Subrequest {
771 async fn process_subrequest(
772 self: &Arc<Self>,
773 session: Box<HttpSession>,
774 sub_req_ctx: Box<SubReqCtx>,
775 );
776}
777
778#[async_trait]
779impl<SV> Subrequest for HttpProxy<SV>
780where
781 SV: ProxyHttp + Send + Sync + 'static,
782 <SV as ProxyHttp>::CTX: Send + Sync,
783{
784 async fn process_subrequest(
785 self: &Arc<Self>,
786 session: Box<HttpSession>,
787 sub_req_ctx: Box<SubReqCtx>,
788 ) {
789 debug!("starting subrequest");
790 let mut session = match self.handle_new_request(session).await {
791 Some(downstream_session) => Session::new(downstream_session, &self.downstream_modules),
792 None => return, };
794
795 session.set_keepalive(None);
798
799 session.subrequest_ctx.replace(sub_req_ctx);
800 trace!("processing subrequest");
801 let ctx = self.inner.new_ctx();
802 self.process_request(session, ctx).await;
803 trace!("subrequest done");
804 }
805}
806
807#[async_trait]
808impl<SV> HttpServerApp for HttpProxy<SV>
809where
810 SV: ProxyHttp + Send + Sync + 'static,
811 <SV as ProxyHttp>::CTX: Send + Sync,
812{
813 async fn process_new_http(
814 self: &Arc<Self>,
815 session: HttpSession,
816 _shutdown: &ShutdownWatch,
817 ) -> Option<ReusedHttpStream> {
818 let session = Box::new(session);
819
820 let session = match self.handle_new_request(session).await {
822 Some(downstream_session) => Session::new(downstream_session, &self.downstream_modules),
823 None => return None, };
825
826 let ctx = self.inner.new_ctx();
827 self.process_request(session, ctx).await
828 }
829
830 async fn http_cleanup(&self) {
831 self.shutdown.notify_waiters();
833
834 }
836
837 fn server_options(&self) -> Option<&HttpServerOptions> {
838 self.server_options.as_ref()
839 }
840
841 fn h2_options(&self) -> Option<H2Options> {
842 self.h2_options.clone()
843 }
844}
845
846use pingora_core::services::listening::Service;
847
848pub fn http_proxy_service<SV>(conf: &Arc<ServerConf>, inner: SV) -> Service<HttpProxy<SV>>
852where
853 SV: ProxyHttp,
854{
855 http_proxy_service_with_name(conf, inner, "Pingora HTTP Proxy Service")
856}
857
858pub fn http_proxy_service_with_name<SV>(
862 conf: &Arc<ServerConf>,
863 inner: SV,
864 name: &str,
865) -> Service<HttpProxy<SV>>
866where
867 SV: ProxyHttp,
868{
869 let mut proxy = HttpProxy::new(inner, conf.clone());
870 proxy.handle_init_modules();
871 Service::new(name.to_string(), proxy)
872}