1use async_trait::async_trait;
39use bytes::Bytes;
40use futures::future::BoxFuture;
41use futures::future::FutureExt;
42use http::{header, version::Version, Method};
43use log::{debug, error, trace, warn};
44use once_cell::sync::Lazy;
45use pingora_http::{RequestHeader, ResponseHeader};
46use std::fmt::Debug;
47use std::str;
48use std::sync::{
49 atomic::{AtomicBool, Ordering},
50 Arc,
51};
52use std::time::Duration;
53use tokio::sync::{mpsc, Notify};
54use tokio::time;
55
56use pingora_cache::NoCacheReason;
57use pingora_core::apps::{
58 HttpPersistentSettings, HttpServerApp, HttpServerOptions, ReusedHttpStream,
59};
60use pingora_core::connectors::http::custom;
61use pingora_core::connectors::{http::Connector, ConnectorOptions};
62use pingora_core::modules::http::compression::ResponseCompressionBuilder;
63use pingora_core::modules::http::{HttpModuleCtx, HttpModules};
64use pingora_core::protocols::http::client::HttpSession as ClientSession;
65use pingora_core::protocols::http::custom::CustomMessageWrite;
66use pingora_core::protocols::http::subrequest::server::SubrequestHandle;
67use pingora_core::protocols::http::v1::client::HttpSession as HttpSessionV1;
68use pingora_core::protocols::http::v2::server::H2Options;
69use pingora_core::protocols::http::HttpTask;
70use pingora_core::protocols::http::ServerSession as HttpSession;
71use pingora_core::protocols::http::SERVER_NAME;
72use pingora_core::protocols::Stream;
73use pingora_core::protocols::{Digest, UniqueID};
74use pingora_core::server::configuration::ServerConf;
75use pingora_core::server::ShutdownWatch;
76use pingora_core::upstreams::peer::{HttpPeer, Peer};
77use pingora_error::{Error, ErrorSource, ErrorType::*, OrErr, Result};
78
79const TASK_BUFFER_SIZE: usize = 4;
80
81mod proxy_cache;
82mod proxy_common;
83mod proxy_custom;
84mod proxy_h1;
85mod proxy_h2;
86mod proxy_purge;
87mod proxy_trait;
88pub mod subrequest;
89
90use subrequest::{BodyMode, Ctx as SubrequestCtx};
91
92pub use proxy_cache::range_filter::{range_header_filter, MultiRangeInfo, RangeType};
93pub use proxy_purge::PurgeStatus;
94pub use proxy_trait::{FailToProxy, ProxyHttp};
95
96pub mod prelude {
97 pub use crate::{http_proxy, http_proxy_service, ProxyHttp, Session};
98}
99
100pub type ProcessCustomSession<SV, C> = Arc<
101 dyn Fn(Arc<HttpProxy<SV, C>>, Stream, &ShutdownWatch) -> BoxFuture<'static, Option<Stream>>
102 + Send
103 + Sync
104 + Unpin
105 + 'static,
106>;
107
108pub struct HttpProxy<SV, C = ()>
112where
113 C: custom::Connector, {
115 inner: SV, client_upstream: Connector<C>,
117 shutdown: Notify,
118 shutdown_flag: Arc<AtomicBool>,
119 pub server_options: Option<HttpServerOptions>,
120 pub h2_options: Option<H2Options>,
121 pub downstream_modules: HttpModules,
122 max_retries: usize,
123 process_custom_session: Option<ProcessCustomSession<SV, C>>,
124}
125
126impl<SV> HttpProxy<SV, ()> {
127 pub fn new(inner: SV, conf: Arc<ServerConf>) -> Self {
148 HttpProxy {
149 inner,
150 client_upstream: Connector::new(Some(ConnectorOptions::from_server_conf(&conf))),
151 shutdown: Notify::new(),
152 shutdown_flag: Arc::new(AtomicBool::new(false)),
153 server_options: None,
154 h2_options: None,
155 downstream_modules: HttpModules::new(),
156 max_retries: conf.max_retries,
157 process_custom_session: None,
158 }
159 }
160}
161
162impl<SV, C> HttpProxy<SV, C>
163where
164 C: custom::Connector,
165{
166 fn new_custom(
167 inner: SV,
168 conf: Arc<ServerConf>,
169 connector: C,
170 on_custom: Option<ProcessCustomSession<SV, C>>,
171 server_options: Option<HttpServerOptions>,
172 ) -> Self
173 where
174 SV: ProxyHttp + Send + Sync + 'static,
175 SV::CTX: Send + Sync,
176 {
177 let client_upstream =
178 Connector::new_custom(Some(ConnectorOptions::from_server_conf(&conf)), connector);
179
180 HttpProxy {
181 inner,
182 client_upstream,
183 shutdown: Notify::new(),
184 shutdown_flag: Arc::new(AtomicBool::new(false)),
185 server_options,
186 downstream_modules: HttpModules::new(),
187 max_retries: conf.max_retries,
188 process_custom_session: on_custom,
189 h2_options: None,
190 }
191 }
192
193 pub fn handle_init_modules(&mut self)
202 where
203 SV: ProxyHttp,
204 {
205 self.inner
206 .init_downstream_modules(&mut self.downstream_modules);
207 }
208
209 async fn handle_new_request(
210 &self,
211 mut downstream_session: Box<HttpSession>,
212 ) -> Option<Box<HttpSession>>
213 where
214 SV: ProxyHttp + Send + Sync,
215 SV::CTX: Send + Sync,
216 {
217 let res = tokio::select! {
220 biased; res = downstream_session.read_request() => { res }
222 _ = self.shutdown.notified() => {
223 return None;
225 }
226 };
227 match res {
228 Ok(true) => {
229 debug!("Successfully get a new request");
231 }
232 Ok(false) => {
233 return None; }
235 Err(mut e) => {
236 e.as_down();
237 error!("Fail to proxy: {e}");
238 if matches!(e.etype, InvalidHTTPHeader) {
239 downstream_session
240 .respond_error(400)
241 .await
242 .unwrap_or_else(|e| {
243 error!("failed to send error response to downstream: {e}");
244 });
245 } downstream_session.shutdown().await;
247 return None;
248 }
249 }
250 trace!(
251 "Request header: {:?}",
252 downstream_session.req_header().as_ref()
253 );
254 if !self
261 .server_options
262 .as_ref()
263 .is_some_and(|opts| opts.allow_connect_method_proxying)
264 && downstream_session.req_header().method == Method::CONNECT
265 {
266 downstream_session
267 .respond_error(405)
268 .await
269 .unwrap_or_else(|e| {
270 error!("failed to send error response to downstream: {e}");
271 });
272 downstream_session.shutdown().await;
273 return None;
274 }
275 Some(downstream_session)
276 }
277
278 async fn proxy_to_upstream(
280 &self,
281 session: &mut Session,
282 ctx: &mut SV::CTX,
283 ) -> (bool, Option<Box<Error>>)
284 where
285 SV: ProxyHttp + Send + Sync,
286 SV::CTX: Send + Sync,
287 {
288 let peer = match self.inner.upstream_peer(session, ctx).await {
289 Ok(p) => p,
290 Err(e) => return (false, Some(e)),
291 };
292
293 let client_session = self.client_upstream.get_http_session(&*peer).await;
294 match client_session {
295 Ok((client_session, client_reused)) => {
296 let (server_reused, error) = match client_session {
297 ClientSession::H1(mut h1) => {
298 let (server_reused, client_reuse, error) = self
299 .proxy_to_h1_upstream(session, &mut h1, client_reused, &peer, ctx)
300 .await;
301 if client_reuse {
302 let session = ClientSession::H1(h1);
303 self.client_upstream
304 .release_http_session(session, &*peer, peer.idle_timeout())
305 .await;
306 }
307 (server_reused, error)
308 }
309 ClientSession::H2(mut h2) => {
310 let (server_reused, mut error) = self
311 .proxy_to_h2_upstream(session, &mut h2, client_reused, &peer, ctx)
312 .await;
313 let session = ClientSession::H2(h2);
314 self.client_upstream
315 .release_http_session(session, &*peer, peer.idle_timeout())
316 .await;
317
318 if let Some(e) = error.as_mut() {
319 if matches!(e.etype, H2Downgrade | InvalidH2) {
322 if peer
323 .get_alpn()
324 .is_none_or(|alpn| alpn.get_min_http_version() == 1)
325 {
326 self.client_upstream.prefer_h1(&*peer);
329 } else {
330 e.retry = false.into();
332 }
333 }
334 }
335
336 (server_reused, error)
337 }
338 ClientSession::Custom(mut c) => {
339 let (server_reused, error) = self
340 .proxy_to_custom_upstream(session, &mut c, client_reused, &peer, ctx)
341 .await;
342 let session = ClientSession::Custom(c);
343 self.client_upstream
344 .release_http_session(session, &*peer, peer.idle_timeout())
345 .await;
346 (server_reused, error)
347 }
348 };
349 (
350 server_reused,
351 error.map(|e| {
352 self.inner
353 .error_while_proxy(&peer, session, e, ctx, client_reused)
354 }),
355 )
356 }
357 Err(mut e) => {
358 e.as_up();
359 let new_err = self.inner.fail_to_connect(session, &peer, ctx, e);
360 (false, Some(new_err.into_up()))
361 }
362 }
363 }
364
365 async fn upstream_filter(
366 &self,
367 session: &mut Session,
368 task: &mut HttpTask,
369 ctx: &mut SV::CTX,
370 ) -> Result<Option<Duration>>
371 where
372 SV: ProxyHttp + Send + Sync,
373 SV::CTX: Send + Sync,
374 {
375 let duration = match task {
376 HttpTask::Header(header, _eos) => {
377 self.inner
378 .upstream_response_filter(session, header, ctx)
379 .await?;
380 None
381 }
382 HttpTask::Body(data, eos) | HttpTask::UpgradedBody(data, eos) => self
383 .inner
384 .upstream_response_body_filter(session, data, *eos, ctx)?,
385 HttpTask::Trailer(Some(trailers)) => {
386 self.inner
387 .upstream_response_trailer_filter(session, trailers, ctx)?;
388 None
389 }
390 _ => {
391 None
393 }
394 };
395
396 Ok(duration)
397 }
398
399 async fn finish(
400 &self,
401 mut session: Session,
402 ctx: &mut SV::CTX,
403 reuse: bool,
404 error: Option<Box<Error>>,
405 ) -> Option<ReusedHttpStream>
406 where
407 SV: ProxyHttp + Send + Sync,
408 SV::CTX: Send + Sync,
409 {
410 self.inner
411 .logging(&mut session, error.as_deref(), ctx)
412 .await;
413
414 if let Some(e) = error {
415 session.downstream_session.on_proxy_failure(e);
416 }
417
418 if reuse {
419 let persistent_settings = HttpPersistentSettings::for_session(&session);
421 session
422 .downstream_session
423 .finish()
424 .await
425 .ok()
426 .flatten()
427 .map(|s| ReusedHttpStream::new(s, Some(persistent_settings)))
428 } else {
429 None
430 }
431 }
432
433 fn cleanup_sub_req(&self, session: &mut Session) {
434 if let Some(ctx) = session.subrequest_ctx.as_mut() {
435 ctx.release_write_lock();
436 }
437 }
438}
439
440use pingora_cache::HttpCache;
441use pingora_core::protocols::http::compression::ResponseCompressionCtx;
442
443pub struct Session {
448 pub downstream_session: Box<HttpSession>,
450 pub cache: HttpCache,
452 pub upstream_compression: ResponseCompressionCtx,
454 pub ignore_downstream_range: bool,
456 pub upstream_headers_mutated_for_cache: bool,
458 pub subrequest_ctx: Option<Box<SubrequestCtx>>,
460 pub subrequest_spawner: Option<SubrequestSpawner>,
462 pub downstream_modules_ctx: HttpModuleCtx,
464 upstream_body_bytes_received: usize,
467 upstream_write_pending_time: Duration,
469 shutdown_flag: Arc<AtomicBool>,
471}
472
473impl Session {
474 fn new(
475 downstream_session: impl Into<Box<HttpSession>>,
476 downstream_modules: &HttpModules,
477 shutdown_flag: Arc<AtomicBool>,
478 ) -> Self {
479 Session {
480 downstream_session: downstream_session.into(),
481 cache: HttpCache::new(),
482 upstream_compression: ResponseCompressionCtx::new(0, false, false),
484 ignore_downstream_range: false,
485 upstream_headers_mutated_for_cache: false,
486 subrequest_ctx: None,
487 subrequest_spawner: None, downstream_modules_ctx: downstream_modules.build_ctx(),
489 upstream_body_bytes_received: 0,
490 upstream_write_pending_time: Duration::ZERO,
491 shutdown_flag,
492 }
493 }
494
495 pub fn new_h1(stream: Stream) -> Self {
500 let modules = HttpModules::new();
501 Self::new(
502 Box::new(HttpSession::new_http1(stream)),
503 &modules,
504 Arc::new(AtomicBool::new(false)),
505 )
506 }
507
508 pub fn new_h1_with_modules(stream: Stream, downstream_modules: &HttpModules) -> Self {
513 Self::new(
514 Box::new(HttpSession::new_http1(stream)),
515 downstream_modules,
516 Arc::new(AtomicBool::new(false)),
517 )
518 }
519
520 pub fn as_downstream_mut(&mut self) -> &mut HttpSession {
521 &mut self.downstream_session
522 }
523
524 pub fn as_downstream(&self) -> &HttpSession {
525 &self.downstream_session
526 }
527
528 pub async fn respond_error(&mut self, error: u16) -> Result<()> {
530 self.as_downstream_mut().respond_error(error).await
531 }
532
533 pub async fn respond_error_with_body(&mut self, error: u16, body: Bytes) -> Result<()> {
535 self.as_downstream_mut()
536 .respond_error_with_body(error, body)
537 .await
538 }
539
540 pub async fn write_response_header(
545 &mut self,
546 mut resp: Box<ResponseHeader>,
547 end_of_stream: bool,
548 ) -> Result<()> {
549 self.downstream_modules_ctx
550 .response_header_filter(&mut resp, end_of_stream)
551 .await?;
552 self.downstream_session.write_response_header(resp).await
553 }
554
555 pub async fn write_response_header_ref(
557 &mut self,
558 resp: &ResponseHeader,
559 end_of_stream: bool,
560 ) -> Result<(), Box<Error>> {
561 self.write_response_header(Box::new(resp.clone()), end_of_stream)
562 .await
563 }
564
565 pub async fn write_response_body(
570 &mut self,
571 mut body: Option<Bytes>,
572 end_of_stream: bool,
573 ) -> Result<()> {
574 self.downstream_modules_ctx
575 .response_body_filter(&mut body, end_of_stream)?;
576
577 if body.is_none() && !end_of_stream {
578 return Ok(());
579 }
580
581 let data = body.unwrap_or_default();
582 self.downstream_session
583 .write_response_body(data, end_of_stream)
584 .await
585 }
586
587 pub async fn write_response_tasks(&mut self, mut tasks: Vec<HttpTask>) -> Result<bool> {
588 let mut seen_upgraded = self.was_upgraded();
589 for task in tasks.iter_mut() {
590 match task {
591 HttpTask::Header(resp, end) => {
592 self.downstream_modules_ctx
593 .response_header_filter(resp, *end)
594 .await?;
595 }
596 HttpTask::Body(data, end) => {
597 self.downstream_modules_ctx
598 .response_body_filter(data, *end)?;
599 }
600 HttpTask::UpgradedBody(data, end) => {
601 seen_upgraded = true;
602 self.downstream_modules_ctx
603 .response_body_filter(data, *end)?;
604 }
605 HttpTask::Trailer(trailers) => {
606 if let Some(buf) = self
607 .downstream_modules_ctx
608 .response_trailer_filter(trailers)?
609 {
610 *task = HttpTask::Body(Some(buf), true);
617 }
618 }
619 HttpTask::Done => {
620 if let Some(buf) = self.downstream_modules_ctx.response_done_filter()? {
629 if seen_upgraded {
630 *task = HttpTask::UpgradedBody(Some(buf), true);
631 } else {
632 *task = HttpTask::Body(Some(buf), true);
633 }
634 }
635 }
636 _ => { }
637 }
638 }
639 self.downstream_session.response_duplex_vec(tasks).await
640 }
641
642 pub fn mark_upstream_headers_mutated_for_cache(&mut self) {
645 self.upstream_headers_mutated_for_cache = true;
646 }
647
648 pub fn upstream_headers_mutated_for_cache(&self) -> bool {
650 self.upstream_headers_mutated_for_cache
651 }
652
653 pub fn upstream_body_bytes_received(&self) -> usize {
655 self.upstream_body_bytes_received
656 }
657
658 pub(crate) fn set_upstream_body_bytes_received(&mut self, n: usize) {
660 self.upstream_body_bytes_received = n;
661 }
662
663 pub fn upstream_write_pending_time(&self) -> Duration {
665 self.upstream_write_pending_time
666 }
667
668 pub(crate) fn set_upstream_write_pending_time(&mut self, d: Duration) {
670 self.upstream_write_pending_time = d;
671 }
672
673 pub fn is_process_shutting_down(&self) -> bool {
675 self.shutdown_flag.load(Ordering::Acquire)
676 }
677
678 pub fn downstream_custom_message(
679 &mut self,
680 ) -> Result<
681 Option<Box<dyn futures::Stream<Item = Result<Bytes>> + Unpin + Send + Sync + 'static>>,
682 > {
683 if let Some(custom_session) = self.downstream_session.as_custom_mut() {
684 custom_session
685 .take_custom_message_reader()
686 .map(Some)
687 .ok_or(Error::explain(
688 ReadError,
689 "can't extract custom reader from downstream",
690 ))
691 } else {
692 Ok(None)
693 }
694 }
695}
696
697impl AsRef<HttpSession> for Session {
698 fn as_ref(&self) -> &HttpSession {
699 &self.downstream_session
700 }
701}
702
703impl AsMut<HttpSession> for Session {
704 fn as_mut(&mut self) -> &mut HttpSession {
705 &mut self.downstream_session
706 }
707}
708
709use std::ops::{Deref, DerefMut};
710
711impl Deref for Session {
712 type Target = HttpSession;
713
714 fn deref(&self) -> &Self::Target {
715 &self.downstream_session
716 }
717}
718
719impl DerefMut for Session {
720 fn deref_mut(&mut self) -> &mut Self::Target {
721 &mut self.downstream_session
722 }
723}
724
725static BAD_GATEWAY: Lazy<ResponseHeader> = Lazy::new(|| {
727 let mut resp = ResponseHeader::build(http::StatusCode::BAD_GATEWAY, Some(3)).unwrap();
728 resp.insert_header(header::SERVER, &SERVER_NAME[..])
729 .unwrap();
730 resp.insert_header(header::CONTENT_LENGTH, 0).unwrap();
731 resp.insert_header(header::CACHE_CONTROL, "private, no-store")
732 .unwrap();
733
734 resp
735});
736
737impl<SV, C> HttpProxy<SV, C>
738where
739 C: custom::Connector,
740{
741 async fn process_request(
742 self: &Arc<Self>,
743 mut session: Session,
744 mut ctx: <SV as ProxyHttp>::CTX,
745 ) -> Option<ReusedHttpStream>
746 where
747 SV: ProxyHttp + Send + Sync + 'static,
748 <SV as ProxyHttp>::CTX: Send + Sync,
749 {
750 if let Err(e) = self
751 .inner
752 .early_request_filter(&mut session, &mut ctx)
753 .await
754 {
755 return self
756 .handle_error(session, &mut ctx, e, "Fail to early filter request:")
757 .await;
758 }
759
760 if self.inner.allow_spawning_subrequest(&session, &ctx) {
761 session.subrequest_spawner = Some(SubrequestSpawner::new(self.clone()));
762 }
763
764 let req = session.downstream_session.req_header_mut();
765
766 if let Err(e) = session
768 .downstream_modules_ctx
769 .request_header_filter(req)
770 .await
771 {
772 return self
773 .handle_error(
774 session,
775 &mut ctx,
776 e,
777 "Failed in downstream modules request filter:",
778 )
779 .await;
780 }
781
782 match self.inner.request_filter(&mut session, &mut ctx).await {
783 Ok(response_sent) => {
784 if response_sent {
785 self.inner.logging(&mut session, None, &mut ctx).await;
787 self.cleanup_sub_req(&mut session);
788 let persistent_settings = HttpPersistentSettings::for_session(&session);
789 return session
790 .downstream_session
791 .finish()
792 .await
793 .ok()
794 .flatten()
795 .map(|s| ReusedHttpStream::new(s, Some(persistent_settings)));
796 }
797 }
799 Err(e) => {
800 return self
801 .handle_error(session, &mut ctx, e, "Fail to filter request:")
802 .await;
803 }
804 }
805
806 if let Some((reuse, err)) = self.proxy_cache(&mut session, &mut ctx).await {
807 return self.finish(session, &mut ctx, reuse, err).await;
809 }
810 self.cleanup_sub_req(&mut session);
814
815 match self
817 .inner
818 .proxy_upstream_filter(&mut session, &mut ctx)
819 .await
820 {
821 Ok(proxy_to_upstream) => {
822 if !proxy_to_upstream {
823 if session.cache.enabled() {
826 session.cache.disable(NoCacheReason::DeclinedToUpstream);
828 }
829 if session.response_written().is_none() {
830 match session.write_response_header_ref(&BAD_GATEWAY, true).await {
831 Ok(()) => {}
832 Err(e) => {
833 return self
834 .handle_error(
835 session,
836 &mut ctx,
837 e,
838 "Error responding with Bad Gateway:",
839 )
840 .await;
841 }
842 }
843 }
844
845 return self.finish(session, &mut ctx, true, None).await;
846 }
847 }
849 Err(e) => {
850 if session.cache.enabled() {
851 session.cache.disable(NoCacheReason::InternalError);
852 }
853
854 return self
855 .handle_error(
856 session,
857 &mut ctx,
858 e,
859 "Error deciding if we should proxy to upstream:",
860 )
861 .await;
862 }
863 }
864
865 let mut retries: usize = 0;
866
867 let mut server_reuse = false;
868 let mut proxy_error: Option<Box<Error>> = None;
869
870 while retries < self.max_retries {
871 retries += 1;
872
873 let (reuse, e) = self.proxy_to_upstream(&mut session, &mut ctx).await;
874 server_reuse = reuse;
875
876 match e {
877 Some(error) => {
878 let retry = error.retry();
879 proxy_error = Some(error);
880 if !retry {
881 break;
882 }
883 warn!(
885 "Fail to proxy: {}, tries: {}, retry: {}, {}",
886 proxy_error.as_ref().unwrap(),
887 retries,
888 retry,
889 self.inner.request_summary(&session, &ctx)
890 );
891 }
892 None => {
893 proxy_error = None;
894 break;
895 }
896 };
897 }
898
899 #[allow(clippy::unnecessary_unwrap)]
903 let serve_stale_result = if proxy_error.is_some() && session.cache.can_serve_stale_error() {
904 self.handle_stale_if_error(&mut session, &mut ctx, proxy_error.as_ref().unwrap())
905 .await
906 } else {
907 None
908 };
909
910 let final_error = if let Some((reuse, stale_cache_error)) = serve_stale_result {
911 server_reuse = server_reuse && reuse;
913 stale_cache_error
914 } else {
915 proxy_error
916 };
917
918 if let Some(e) = final_error.as_ref() {
919 if session.cache.enabled() {
921 let reason = if *e.esource() == ErrorSource::Upstream {
922 NoCacheReason::UpstreamError
923 } else {
924 NoCacheReason::InternalError
925 };
926 session.cache.disable(reason);
927 }
928 let res = self.inner.fail_to_proxy(&mut session, e, &mut ctx).await;
929
930 if !self.inner.suppress_error_log(&session, &ctx, e) {
932 error!(
933 "Fail to proxy: {}, status: {}, tries: {}, retry: {}, {}",
934 final_error.as_ref().unwrap(),
935 res.error_code,
936 retries,
937 false, self.inner.request_summary(&session, &ctx),
939 );
940 }
941 }
942
943 self.finish(session, &mut ctx, server_reuse, final_error)
945 .await
946 }
947
948 async fn handle_error(
949 &self,
950 mut session: Session,
951 ctx: &mut <SV as ProxyHttp>::CTX,
952 e: Box<Error>,
953 context: &str,
954 ) -> Option<ReusedHttpStream>
955 where
956 SV: ProxyHttp + Send + Sync + 'static,
957 <SV as ProxyHttp>::CTX: Send + Sync,
958 {
959 let res = self.inner.fail_to_proxy(&mut session, &e, ctx).await;
960 if !self.inner.suppress_error_log(&session, ctx, &e) {
961 error!(
962 "{context} {}, status: {}, {}",
963 e,
964 res.error_code,
965 self.inner.request_summary(&session, ctx)
966 );
967 }
968 self.inner.logging(&mut session, Some(&e), ctx).await;
969 self.cleanup_sub_req(&mut session);
970
971 session.downstream_session.on_proxy_failure(e);
972
973 if res.can_reuse_downstream {
974 let persistent_settings = HttpPersistentSettings::for_session(&session);
975 session
976 .downstream_session
977 .finish()
978 .await
979 .ok()
980 .flatten()
981 .map(|s| ReusedHttpStream::new(s, Some(persistent_settings)))
982 } else {
983 None
984 }
985 }
986}
987
988#[async_trait]
998pub trait Subrequest {
999 async fn process_subrequest(
1000 self: Arc<Self>,
1001 session: Box<HttpSession>,
1002 sub_req_ctx: Box<SubrequestCtx>,
1003 );
1004}
1005
1006#[async_trait]
1007impl<SV, C> Subrequest for HttpProxy<SV, C>
1008where
1009 SV: ProxyHttp + Send + Sync + 'static,
1010 <SV as ProxyHttp>::CTX: Send + Sync,
1011 C: custom::Connector,
1012{
1013 async fn process_subrequest(
1014 self: Arc<Self>,
1015 session: Box<HttpSession>,
1016 sub_req_ctx: Box<SubrequestCtx>,
1017 ) {
1018 debug!("starting subrequest");
1019
1020 let mut session = match self.handle_new_request(session).await {
1021 Some(downstream_session) => Session::new(
1022 downstream_session,
1023 &self.downstream_modules,
1024 self.shutdown_flag.clone(),
1025 ),
1026 None => return, };
1028
1029 session.set_keepalive(None);
1032
1033 session.subrequest_ctx.replace(sub_req_ctx);
1034 trace!("processing subrequest");
1035 let ctx = self.inner.new_ctx();
1036 self.process_request(session, ctx).await;
1037 trace!("subrequest done");
1038 }
1039}
1040
1041pub struct SubrequestSpawner {
1043 app: Arc<dyn Subrequest + Send + Sync>,
1044}
1045
1046pub struct PreparedSubrequest {
1048 app: Arc<dyn Subrequest + Send + Sync>,
1049 session: Box<HttpSession>,
1050 sub_req_ctx: Box<SubrequestCtx>,
1051}
1052
1053impl PreparedSubrequest {
1054 pub async fn run(self) {
1055 self.app
1056 .process_subrequest(self.session, self.sub_req_ctx)
1057 .await
1058 }
1059
1060 pub fn session(&self) -> &HttpSession {
1061 self.session.as_ref()
1062 }
1063
1064 pub fn session_mut(&mut self) -> &mut HttpSession {
1065 self.session.deref_mut()
1066 }
1067}
1068
1069impl SubrequestSpawner {
1070 pub fn new(app: Arc<dyn Subrequest + Send + Sync>) -> SubrequestSpawner {
1072 SubrequestSpawner { app }
1073 }
1074
1075 pub fn spawn_background_subrequest(
1078 &self,
1079 session: &HttpSession,
1080 ctx: SubrequestCtx,
1081 ) -> tokio::task::JoinHandle<()> {
1082 let new_app = self.app.clone(); let (mut session, handle) = subrequest::create_session(session);
1084 if ctx.body_mode() == BodyMode::NoBody {
1085 session
1086 .as_subrequest_mut()
1087 .expect("created subrequest session")
1088 .clear_request_body_headers();
1089 }
1090 let sub_req_ctx = Box::new(ctx);
1091 handle.drain_tasks();
1092 tokio::spawn(async move {
1093 new_app
1094 .process_subrequest(Box::new(session), sub_req_ctx)
1095 .await;
1096 })
1097 }
1098
1099 pub fn create_subrequest(
1105 &self,
1106 session: &HttpSession,
1107 ctx: SubrequestCtx,
1108 ) -> (PreparedSubrequest, SubrequestHandle) {
1109 let new_app = self.app.clone(); let (mut session, handle) = subrequest::create_session(session);
1111 if ctx.body_mode() == BodyMode::NoBody {
1112 session
1113 .as_subrequest_mut()
1114 .expect("created subrequest session")
1115 .clear_request_body_headers();
1116 }
1117 let sub_req_ctx = Box::new(ctx);
1118 (
1119 PreparedSubrequest {
1120 app: new_app,
1121 session: Box::new(session),
1122 sub_req_ctx,
1123 },
1124 handle,
1125 )
1126 }
1127}
1128
1129#[async_trait]
1130impl<SV, C> HttpServerApp for HttpProxy<SV, C>
1131where
1132 SV: ProxyHttp + Send + Sync + 'static,
1133 <SV as ProxyHttp>::CTX: Send + Sync,
1134 C: custom::Connector,
1135{
1136 async fn process_new_http(
1137 self: &Arc<Self>,
1138 session: HttpSession,
1139 shutdown: &ShutdownWatch,
1140 ) -> Option<ReusedHttpStream> {
1141 let session = Box::new(session);
1142
1143 let mut session = match self.handle_new_request(session).await {
1145 Some(downstream_session) => Session::new(
1146 downstream_session,
1147 &self.downstream_modules,
1148 self.shutdown_flag.clone(),
1149 ),
1150 None => return None, };
1152
1153 if *shutdown.borrow() {
1154 session.set_keepalive(None);
1156 }
1157
1158 let ctx = self.inner.new_ctx();
1159 self.process_request(session, ctx).await
1160 }
1161
1162 async fn http_cleanup(&self) {
1163 self.shutdown_flag.store(true, Ordering::Release);
1164 self.shutdown.notify_waiters();
1166 }
1167
1168 fn server_options(&self) -> Option<&HttpServerOptions> {
1169 self.server_options.as_ref()
1170 }
1171
1172 fn h2_options(&self) -> Option<H2Options> {
1173 self.h2_options.clone()
1174 }
1175 async fn process_custom_session(
1176 self: Arc<Self>,
1177 stream: Stream,
1178 shutdown: &ShutdownWatch,
1179 ) -> Option<Stream> {
1180 let app = self.clone();
1181
1182 let Some(process_custom_session) = app.process_custom_session.as_ref() else {
1183 warn!("custom was called on an empty on_custom");
1184 return None;
1185 };
1186
1187 process_custom_session(self.clone(), stream, shutdown).await
1188 }
1189
1190 }
1192
1193use pingora_core::services::listening::Service;
1194
1195pub fn http_proxy<SV>(conf: &Arc<ServerConf>, inner: SV) -> HttpProxy<SV>
1226where
1227 SV: ProxyHttp,
1228{
1229 let mut proxy = HttpProxy::new(inner, conf.clone());
1230 proxy.handle_init_modules();
1231 proxy
1232}
1233
1234pub fn http_proxy_service<SV>(conf: &Arc<ServerConf>, inner: SV) -> Service<HttpProxy<SV, ()>>
1238where
1239 SV: ProxyHttp,
1240{
1241 http_proxy_service_with_name(conf, inner, "Pingora HTTP Proxy Service")
1242}
1243
1244pub fn http_proxy_service_with_name<SV>(
1248 conf: &Arc<ServerConf>,
1249 inner: SV,
1250 name: &str,
1251) -> Service<HttpProxy<SV, ()>>
1252where
1253 SV: ProxyHttp,
1254{
1255 let mut proxy = HttpProxy::new(inner, conf.clone());
1256 proxy.handle_init_modules();
1257 Service::new(name.to_string(), proxy)
1258}
1259
1260pub fn http_proxy_service_with_name_custom<SV, C>(
1264 conf: &Arc<ServerConf>,
1265 inner: SV,
1266 name: &str,
1267 connector: C,
1268 on_custom: ProcessCustomSession<SV, C>,
1269) -> Service<HttpProxy<SV, C>>
1270where
1271 SV: ProxyHttp + Send + Sync + 'static,
1272 SV::CTX: Send + Sync + 'static,
1273 C: custom::Connector,
1274{
1275 let mut proxy = HttpProxy::new_custom(inner, conf.clone(), connector, Some(on_custom), None);
1276 proxy.handle_init_modules();
1277
1278 Service::new(name.to_string(), proxy)
1279}
1280
1281pub struct ProxyServiceBuilder<SV, C>
1287where
1288 SV: ProxyHttp + Send + Sync + 'static,
1289 SV::CTX: Send + Sync + 'static,
1290 C: custom::Connector,
1291{
1292 conf: Arc<ServerConf>,
1293 inner: SV,
1294 name: String,
1295 connector: C,
1296 custom: Option<ProcessCustomSession<SV, C>>,
1297 server_options: Option<HttpServerOptions>,
1298}
1299
1300impl<SV> ProxyServiceBuilder<SV, ()>
1301where
1302 SV: ProxyHttp + Send + Sync + 'static,
1303 SV::CTX: Send + Sync + 'static,
1304{
1305 pub fn new(conf: &Arc<ServerConf>, inner: SV) -> Self {
1315 ProxyServiceBuilder {
1316 conf: conf.clone(),
1317 inner,
1318 name: "Pingora HTTP Proxy Service".into(),
1319 connector: (),
1320 custom: None,
1321 server_options: None,
1322 }
1323 }
1324}
1325
1326impl<SV, C> ProxyServiceBuilder<SV, C>
1327where
1328 SV: ProxyHttp + Send + Sync + 'static,
1329 SV::CTX: Send + Sync + 'static,
1330 C: custom::Connector,
1331{
1332 pub fn name(mut self, name: impl AsRef<str>) -> Self {
1334 self.name = name.as_ref().to_owned();
1335 self
1336 }
1337
1338 pub fn custom<C2: custom::Connector>(
1347 self,
1348 connector: C2,
1349 on_custom: ProcessCustomSession<SV, C2>,
1350 ) -> ProxyServiceBuilder<SV, C2> {
1351 let Self {
1352 conf,
1353 inner,
1354 name,
1355 server_options,
1356 ..
1357 } = self;
1358 ProxyServiceBuilder {
1359 conf,
1360 inner,
1361 name,
1362 connector,
1363 custom: Some(on_custom),
1364 server_options,
1365 }
1366 }
1367
1368 pub fn server_options(mut self, options: HttpServerOptions) -> Self {
1372 self.server_options = Some(options);
1373 self
1374 }
1375
1376 pub fn build(self) -> Service<HttpProxy<SV, C>> {
1383 let Self {
1384 conf,
1385 inner,
1386 name,
1387 connector,
1388 custom,
1389 server_options,
1390 } = self;
1391
1392 let mut proxy = HttpProxy::new_custom(inner, conf, connector, custom, server_options);
1393
1394 proxy.handle_init_modules();
1395 Service::new(name, proxy)
1396 }
1397}