1use async_trait::async_trait;
39use bytes::Bytes;
40use futures::future::FutureExt;
41use http::{header, version::Version};
42use log::{debug, error, trace, warn};
43use once_cell::sync::Lazy;
44use pingora_http::{RequestHeader, ResponseHeader};
45use std::fmt::Debug;
46use std::str;
47use std::sync::Arc;
48use tokio::sync::{mpsc, Notify};
49use tokio::time;
50
51use pingora_cache::NoCacheReason;
52use pingora_core::apps::{HttpServerApp, HttpServerOptions};
53use pingora_core::connectors::{http::Connector, ConnectorOptions};
54use pingora_core::modules::http::compression::ResponseCompressionBuilder;
55use pingora_core::modules::http::{HttpModuleCtx, HttpModules};
56use pingora_core::protocols::http::client::HttpSession as ClientSession;
57use pingora_core::protocols::http::v1::client::HttpSession as HttpSessionV1;
58use pingora_core::protocols::http::HttpTask;
59use pingora_core::protocols::http::ServerSession as HttpSession;
60use pingora_core::protocols::http::SERVER_NAME;
61use pingora_core::protocols::Stream;
62use pingora_core::protocols::{Digest, UniqueID};
63use pingora_core::server::configuration::ServerConf;
64use pingora_core::server::ShutdownWatch;
65use pingora_core::upstreams::peer::{HttpPeer, Peer};
66use pingora_error::{Error, ErrorSource, ErrorType::*, OrErr, Result};
67
68const TASK_BUFFER_SIZE: usize = 4;
69
70mod proxy_cache;
71mod proxy_common;
72mod proxy_h1;
73mod proxy_h2;
74mod proxy_purge;
75mod proxy_trait;
76mod subrequest;
77
78use subrequest::Ctx as SubReqCtx;
79
80pub use proxy_cache::range_filter::{range_header_filter, RangeType};
81pub use proxy_purge::PurgeStatus;
82pub use proxy_trait::{FailToProxy, ProxyHttp};
83
84pub mod prelude {
85 pub use crate::{http_proxy_service, ProxyHttp, Session};
86}
87
88pub struct HttpProxy<SV> {
92 inner: SV, client_upstream: Connector,
94 shutdown: Notify,
95 pub server_options: Option<HttpServerOptions>,
96 pub downstream_modules: HttpModules,
97 max_retries: usize,
98}
99
100impl<SV> HttpProxy<SV> {
101 fn new(inner: SV, conf: Arc<ServerConf>) -> Self {
102 HttpProxy {
103 inner,
104 client_upstream: Connector::new(Some(ConnectorOptions::from_server_conf(&conf))),
105 shutdown: Notify::new(),
106 server_options: None,
107 downstream_modules: HttpModules::new(),
108 max_retries: conf.max_retries,
109 }
110 }
111
112 fn handle_init_modules(&mut self)
113 where
114 SV: ProxyHttp,
115 {
116 self.inner
117 .init_downstream_modules(&mut self.downstream_modules);
118 }
119
120 async fn handle_new_request(
121 &self,
122 mut downstream_session: Box<HttpSession>,
123 ) -> Option<Box<HttpSession>>
124 where
125 SV: ProxyHttp + Send + Sync,
126 SV::CTX: Send + Sync,
127 {
128 let res = tokio::select! {
131 biased; res = downstream_session.read_request() => { res }
133 _ = self.shutdown.notified() => {
134 return None;
136 }
137 };
138 match res {
139 Ok(true) => {
140 debug!("Successfully get a new request");
142 }
143 Ok(false) => {
144 return None; }
146 Err(mut e) => {
147 e.as_down();
148 error!("Fail to proxy: {e}");
149 if matches!(e.etype, InvalidHTTPHeader) {
150 downstream_session
151 .respond_error(400)
152 .await
153 .unwrap_or_else(|e| {
154 error!("failed to send error response to downstream: {e}");
155 });
156 } downstream_session.shutdown().await;
158 return None;
159 }
160 }
161 trace!(
162 "Request header: {:?}",
163 downstream_session.req_header().as_ref()
164 );
165 Some(downstream_session)
166 }
167
168 async fn proxy_to_upstream(
170 &self,
171 session: &mut Session,
172 ctx: &mut SV::CTX,
173 ) -> (bool, Option<Box<Error>>)
174 where
175 SV: ProxyHttp + Send + Sync,
176 SV::CTX: Send + Sync,
177 {
178 let peer = match self.inner.upstream_peer(session, ctx).await {
179 Ok(p) => p,
180 Err(e) => return (false, Some(e)),
181 };
182
183 let client_session = self.client_upstream.get_http_session(&*peer).await;
184 match client_session {
185 Ok((client_session, client_reused)) => {
186 let (server_reused, error) = match client_session {
187 ClientSession::H1(mut h1) => {
188 let (server_reused, client_reuse, error) = self
189 .proxy_to_h1_upstream(session, &mut h1, client_reused, &peer, ctx)
190 .await;
191 if client_reuse {
192 let session = ClientSession::H1(h1);
193 self.client_upstream
194 .release_http_session(session, &*peer, peer.idle_timeout())
195 .await;
196 }
197 (server_reused, error)
198 }
199 ClientSession::H2(mut h2) => {
200 let (server_reused, mut error) = self
201 .proxy_to_h2_upstream(session, &mut h2, client_reused, &peer, ctx)
202 .await;
203 let session = ClientSession::H2(h2);
204 self.client_upstream
205 .release_http_session(session, &*peer, peer.idle_timeout())
206 .await;
207
208 if let Some(e) = error.as_mut() {
209 if matches!(e.etype, H2Downgrade | InvalidH2) {
212 if peer
213 .get_alpn()
214 .map_or(true, |alpn| alpn.get_min_http_version() == 1)
215 {
216 self.client_upstream.prefer_h1(&*peer);
219 } else {
220 e.retry = false.into();
222 }
223 }
224 }
225
226 (server_reused, error)
227 }
228 };
229 (
230 server_reused,
231 error.map(|e| {
232 self.inner
233 .error_while_proxy(&peer, session, e, ctx, client_reused)
234 }),
235 )
236 }
237 Err(mut e) => {
238 e.as_up();
239 let new_err = self.inner.fail_to_connect(session, &peer, ctx, e);
240 (false, Some(new_err.into_up()))
241 }
242 }
243 }
244
245 fn upstream_filter(
246 &self,
247 session: &mut Session,
248 task: &mut HttpTask,
249 ctx: &mut SV::CTX,
250 ) -> Result<()>
251 where
252 SV: ProxyHttp,
253 {
254 match task {
255 HttpTask::Header(header, _eos) => {
256 self.inner.upstream_response_filter(session, header, ctx)?
257 }
258 HttpTask::Body(data, eos) => self
259 .inner
260 .upstream_response_body_filter(session, data, *eos, ctx)?,
261 HttpTask::Trailer(Some(trailers)) => self
262 .inner
263 .upstream_response_trailer_filter(session, trailers, ctx)?,
264 _ => {
265 }
267 }
268 Ok(())
269 }
270
271 async fn finish(
272 &self,
273 mut session: Session,
274 ctx: &mut SV::CTX,
275 reuse: bool,
276 error: Option<&Error>,
277 ) -> Option<Stream>
278 where
279 SV: ProxyHttp + Send + Sync,
280 SV::CTX: Send + Sync,
281 {
282 self.inner.logging(&mut session, error, ctx).await;
283
284 if reuse {
285 session.downstream_session.finish().await.ok().flatten()
287 } else {
288 None
289 }
290 }
291
292 fn cleanup_sub_req(&self, session: &mut Session) {
293 if let Some(ctx) = session.subrequest_ctx.as_mut() {
294 ctx.release_write_lock();
295 }
296 }
297}
298
299use pingora_cache::HttpCache;
300use pingora_core::protocols::http::compression::ResponseCompressionCtx;
301
302pub struct Session {
307 pub downstream_session: Box<HttpSession>,
309 pub cache: HttpCache,
311 pub upstream_compression: ResponseCompressionCtx,
313 pub ignore_downstream_range: bool,
315 pub subrequest_ctx: Option<Box<SubReqCtx>>,
317 pub downstream_modules_ctx: HttpModuleCtx,
319}
320
321impl Session {
322 fn new(
323 downstream_session: impl Into<Box<HttpSession>>,
324 downstream_modules: &HttpModules,
325 ) -> Self {
326 Session {
327 downstream_session: downstream_session.into(),
328 cache: HttpCache::new(),
329 upstream_compression: ResponseCompressionCtx::new(0, false, false),
331 ignore_downstream_range: false,
332 subrequest_ctx: None,
333 downstream_modules_ctx: downstream_modules.build_ctx(),
334 }
335 }
336
337 pub fn new_h1(stream: Stream) -> Self {
341 let modules = HttpModules::new();
342 Self::new(Box::new(HttpSession::new_http1(stream)), &modules)
343 }
344
345 pub fn new_h1_with_modules(stream: Stream, downstream_modules: &HttpModules) -> Self {
349 Self::new(Box::new(HttpSession::new_http1(stream)), downstream_modules)
350 }
351
352 pub fn as_downstream_mut(&mut self) -> &mut HttpSession {
353 &mut self.downstream_session
354 }
355
356 pub fn as_downstream(&self) -> &HttpSession {
357 &self.downstream_session
358 }
359
360 pub async fn respond_error(&mut self, error: u16) -> Result<()> {
362 self.as_downstream_mut().respond_error(error).await
363 }
364
365 pub async fn respond_error_with_body(&mut self, error: u16, body: Bytes) -> Result<()> {
367 self.as_downstream_mut()
368 .respond_error_with_body(error, body)
369 .await
370 }
371
372 pub async fn write_response_header(
377 &mut self,
378 mut resp: Box<ResponseHeader>,
379 end_of_stream: bool,
380 ) -> Result<()> {
381 self.downstream_modules_ctx
382 .response_header_filter(&mut resp, end_of_stream)
383 .await?;
384 self.downstream_session.write_response_header(resp).await
385 }
386
387 pub async fn write_response_body(
392 &mut self,
393 mut body: Option<Bytes>,
394 end_of_stream: bool,
395 ) -> Result<()> {
396 self.downstream_modules_ctx
397 .response_body_filter(&mut body, end_of_stream)?;
398
399 if body.is_none() && !end_of_stream {
400 return Ok(());
401 }
402
403 let data = body.unwrap_or_default();
404 self.downstream_session
405 .write_response_body(data, end_of_stream)
406 .await
407 }
408
409 pub async fn write_response_tasks(&mut self, mut tasks: Vec<HttpTask>) -> Result<bool> {
410 for task in tasks.iter_mut() {
411 match task {
412 HttpTask::Header(resp, end) => {
413 self.downstream_modules_ctx
414 .response_header_filter(resp, *end)
415 .await?;
416 }
417 HttpTask::Body(data, end) => {
418 self.downstream_modules_ctx
419 .response_body_filter(data, *end)?;
420 }
421 HttpTask::Trailer(trailers) => {
422 if let Some(buf) = self
423 .downstream_modules_ctx
424 .response_trailer_filter(trailers)?
425 {
426 *task = HttpTask::Body(Some(buf), true);
432 }
433 }
434 HttpTask::Done => {
435 if let Some(buf) = self.downstream_modules_ctx.response_done_filter()? {
444 *task = HttpTask::Body(Some(buf), true);
445 }
446 }
447 _ => { }
448 }
449 }
450 self.downstream_session.response_duplex_vec(tasks).await
451 }
452}
453
454impl AsRef<HttpSession> for Session {
455 fn as_ref(&self) -> &HttpSession {
456 &self.downstream_session
457 }
458}
459
460impl AsMut<HttpSession> for Session {
461 fn as_mut(&mut self) -> &mut HttpSession {
462 &mut self.downstream_session
463 }
464}
465
466use std::ops::{Deref, DerefMut};
467
468impl Deref for Session {
469 type Target = HttpSession;
470
471 fn deref(&self) -> &Self::Target {
472 &self.downstream_session
473 }
474}
475
476impl DerefMut for Session {
477 fn deref_mut(&mut self) -> &mut Self::Target {
478 &mut self.downstream_session
479 }
480}
481
482static BAD_GATEWAY: Lazy<ResponseHeader> = Lazy::new(|| {
484 let mut resp = ResponseHeader::build(http::StatusCode::BAD_GATEWAY, Some(3)).unwrap();
485 resp.insert_header(header::SERVER, &SERVER_NAME[..])
486 .unwrap();
487 resp.insert_header(header::CONTENT_LENGTH, 0).unwrap();
488 resp.insert_header(header::CACHE_CONTROL, "private, no-store")
489 .unwrap();
490
491 resp
492});
493
494impl<SV> HttpProxy<SV> {
495 async fn process_request(
496 self: &Arc<Self>,
497 mut session: Session,
498 mut ctx: <SV as ProxyHttp>::CTX,
499 ) -> Option<Stream>
500 where
501 SV: ProxyHttp + Send + Sync + 'static,
502 <SV as ProxyHttp>::CTX: Send + Sync,
503 {
504 if let Err(e) = self
505 .inner
506 .early_request_filter(&mut session, &mut ctx)
507 .await
508 {
509 return self
510 .handle_error(session, &mut ctx, e, "Fail to early filter request:")
511 .await;
512 }
513
514 let req = session.downstream_session.req_header_mut();
515
516 if let Err(e) = session
518 .downstream_modules_ctx
519 .request_header_filter(req)
520 .await
521 {
522 return self
523 .handle_error(
524 session,
525 &mut ctx,
526 e,
527 "Failed in downstream modules request filter:",
528 )
529 .await;
530 }
531
532 match self.inner.request_filter(&mut session, &mut ctx).await {
533 Ok(response_sent) => {
534 if response_sent {
535 self.inner.logging(&mut session, None, &mut ctx).await;
537 self.cleanup_sub_req(&mut session);
538 return session.downstream_session.finish().await.ok().flatten();
539 }
540 }
542 Err(e) => {
543 return self
544 .handle_error(session, &mut ctx, e, "Fail to filter request:")
545 .await;
546 }
547 }
548
549 if let Some((reuse, err)) = self.proxy_cache(&mut session, &mut ctx).await {
550 return self.finish(session, &mut ctx, reuse, err.as_deref()).await;
552 }
553 self.cleanup_sub_req(&mut session);
557
558 match self
560 .inner
561 .proxy_upstream_filter(&mut session, &mut ctx)
562 .await
563 {
564 Ok(proxy_to_upstream) => {
565 if !proxy_to_upstream {
566 if session.cache.enabled() {
569 session.cache.disable(NoCacheReason::DeclinedToUpstream);
571 }
572 if session.response_written().is_none() {
573 match session.write_response_header_ref(&BAD_GATEWAY).await {
574 Ok(()) => {}
575 Err(e) => {
576 return self
577 .handle_error(
578 session,
579 &mut ctx,
580 e,
581 "Error responding with Bad Gateway:",
582 )
583 .await;
584 }
585 }
586 }
587
588 return self.finish(session, &mut ctx, true, None).await;
589 }
590 }
592 Err(e) => {
593 if session.cache.enabled() {
594 session.cache.disable(NoCacheReason::InternalError);
595 }
596
597 return self
598 .handle_error(
599 session,
600 &mut ctx,
601 e,
602 "Error deciding if we should proxy to upstream:",
603 )
604 .await;
605 }
606 }
607
608 let mut retries: usize = 0;
609
610 let mut server_reuse = false;
611 let mut proxy_error: Option<Box<Error>> = None;
612
613 while retries < self.max_retries {
614 retries += 1;
615
616 let (reuse, e) = self.proxy_to_upstream(&mut session, &mut ctx).await;
617 server_reuse = reuse;
618
619 match e {
620 Some(error) => {
621 let retry = error.retry();
622 proxy_error = Some(error);
623 if !retry {
624 break;
625 }
626 warn!(
628 "Fail to proxy: {}, tries: {}, retry: {}, {}",
629 proxy_error.as_ref().unwrap(),
630 retries,
631 retry,
632 self.inner.request_summary(&session, &ctx)
633 );
634 }
635 None => {
636 proxy_error = None;
637 break;
638 }
639 };
640 }
641
642 let serve_stale_result = if proxy_error.is_some() && session.cache.can_serve_stale_error() {
645 self.handle_stale_if_error(&mut session, &mut ctx, proxy_error.as_ref().unwrap())
646 .await
647 } else {
648 None
649 };
650
651 let final_error = if let Some((reuse, stale_cache_error)) = serve_stale_result {
652 server_reuse = server_reuse && reuse;
654 stale_cache_error
655 } else {
656 proxy_error
657 };
658
659 if let Some(e) = final_error.as_ref() {
660 if session.cache.enabled() {
662 let reason = if *e.esource() == ErrorSource::Upstream {
663 NoCacheReason::UpstreamError
664 } else {
665 NoCacheReason::InternalError
666 };
667 session.cache.disable(reason);
668 }
669 let res = self.inner.fail_to_proxy(&mut session, e, &mut ctx).await;
670
671 if !self.inner.suppress_error_log(&session, &ctx, e) {
673 error!(
674 "Fail to proxy: {}, status: {}, tries: {}, retry: {}, {}",
675 final_error.as_ref().unwrap(),
676 res.error_code,
677 retries,
678 false, self.inner.request_summary(&session, &ctx)
680 );
681 }
682 }
683
684 self.finish(session, &mut ctx, server_reuse, final_error.as_deref())
686 .await
687 }
688
689 async fn handle_error(
690 &self,
691 mut session: Session,
692 ctx: &mut <SV as ProxyHttp>::CTX,
693 e: Box<Error>,
694 context: &str,
695 ) -> Option<Stream>
696 where
697 SV: ProxyHttp + Send + Sync + 'static,
698 <SV as ProxyHttp>::CTX: Send + Sync,
699 {
700 let res = self.inner.fail_to_proxy(&mut session, &e, ctx).await;
701 if !self.inner.suppress_error_log(&session, ctx, &e) {
702 error!(
703 "{context} {}, status: {}, {}",
704 e,
705 res.error_code,
706 self.inner.request_summary(&session, ctx)
707 );
708 }
709 self.inner.logging(&mut session, Some(&e), ctx).await;
710 self.cleanup_sub_req(&mut session);
711
712 if res.can_reuse_downstream {
713 session.downstream_session.finish().await.ok().flatten()
714 } else {
715 None
716 }
717 }
718}
719
720#[async_trait]
730trait Subrequest {
731 async fn process_subrequest(
732 self: &Arc<Self>,
733 session: Box<HttpSession>,
734 sub_req_ctx: Box<SubReqCtx>,
735 );
736}
737
738#[async_trait]
739impl<SV> Subrequest for HttpProxy<SV>
740where
741 SV: ProxyHttp + Send + Sync + 'static,
742 <SV as ProxyHttp>::CTX: Send + Sync,
743{
744 async fn process_subrequest(
745 self: &Arc<Self>,
746 session: Box<HttpSession>,
747 sub_req_ctx: Box<SubReqCtx>,
748 ) {
749 debug!("starting subrequest");
750 let mut session = match self.handle_new_request(session).await {
751 Some(downstream_session) => Session::new(downstream_session, &self.downstream_modules),
752 None => return, };
754
755 session.set_keepalive(None);
758
759 session.subrequest_ctx.replace(sub_req_ctx);
760 trace!("processing subrequest");
761 let ctx = self.inner.new_ctx();
762 self.process_request(session, ctx).await;
763 trace!("subrequest done");
764 }
765}
766
767#[async_trait]
768impl<SV> HttpServerApp for HttpProxy<SV>
769where
770 SV: ProxyHttp + Send + Sync + 'static,
771 <SV as ProxyHttp>::CTX: Send + Sync,
772{
773 async fn process_new_http(
774 self: &Arc<Self>,
775 session: HttpSession,
776 shutdown: &ShutdownWatch,
777 ) -> Option<Stream> {
778 let session = Box::new(session);
779
780 let mut session = match self.handle_new_request(session).await {
782 Some(downstream_session) => Session::new(downstream_session, &self.downstream_modules),
783 None => return None, };
785
786 if *shutdown.borrow() {
787 session.set_keepalive(None);
789 } else {
790 session.set_keepalive(Some(60));
792 }
793
794 let ctx = self.inner.new_ctx();
795 self.process_request(session, ctx).await
796 }
797
798 async fn http_cleanup(&self) {
799 self.shutdown.notify_waiters();
801
802 }
804
805 fn server_options(&self) -> Option<&HttpServerOptions> {
806 self.server_options.as_ref()
807 }
808
809 }
811
812use pingora_core::services::listening::Service;
813
814pub fn http_proxy_service<SV>(conf: &Arc<ServerConf>, inner: SV) -> Service<HttpProxy<SV>>
818where
819 SV: ProxyHttp,
820{
821 http_proxy_service_with_name(conf, inner, "Pingora HTTP Proxy Service")
822}
823
824pub fn http_proxy_service_with_name<SV>(
828 conf: &Arc<ServerConf>,
829 inner: SV,
830 name: &str,
831) -> Service<HttpProxy<SV>>
832where
833 SV: ProxyHttp,
834{
835 let mut proxy = HttpProxy::new(inner, conf.clone());
836 proxy.handle_init_modules();
837 Service::new(name.to_string(), proxy)
838}