1use async_trait::async_trait;
39use bytes::Bytes;
40use futures::future::BoxFuture;
41use futures::future::FutureExt;
42use http::{header, version::Version};
43use log::{debug, error, trace, warn};
44use once_cell::sync::Lazy;
45use pingora_http::{RequestHeader, ResponseHeader};
46use std::fmt::Debug;
47use std::str;
48use std::sync::{
49 atomic::{AtomicBool, Ordering},
50 Arc,
51};
52use std::time::Duration;
53use tokio::sync::{mpsc, Notify};
54use tokio::time;
55
56use pingora_cache::NoCacheReason;
57use pingora_core::apps::{
58 HttpPersistentSettings, HttpServerApp, HttpServerOptions, ReusedHttpStream,
59};
60use pingora_core::connectors::http::custom;
61use pingora_core::connectors::{http::Connector, ConnectorOptions};
62use pingora_core::modules::http::compression::ResponseCompressionBuilder;
63use pingora_core::modules::http::{HttpModuleCtx, HttpModules};
64use pingora_core::protocols::http::client::HttpSession as ClientSession;
65use pingora_core::protocols::http::custom::CustomMessageWrite;
66use pingora_core::protocols::http::v1::client::HttpSession as HttpSessionV1;
67use pingora_core::protocols::http::v2::server::H2Options;
68use pingora_core::protocols::http::HttpTask;
69use pingora_core::protocols::http::ServerSession as HttpSession;
70use pingora_core::protocols::http::SERVER_NAME;
71use pingora_core::protocols::Stream;
72use pingora_core::protocols::{Digest, UniqueID};
73use pingora_core::server::configuration::ServerConf;
74use pingora_core::server::ShutdownWatch;
75use pingora_core::upstreams::peer::{HttpPeer, Peer};
76use pingora_error::{Error, ErrorSource, ErrorType::*, OrErr, Result};
77
78const TASK_BUFFER_SIZE: usize = 4;
79
80mod proxy_cache;
81mod proxy_common;
82mod proxy_custom;
83mod proxy_h1;
84mod proxy_h2;
85mod proxy_purge;
86mod proxy_trait;
87pub mod subrequest;
88
89use subrequest::{BodyMode, Ctx as SubrequestCtx};
90
91pub use proxy_cache::range_filter::{range_header_filter, MultiRangeInfo, RangeType};
92pub use proxy_purge::PurgeStatus;
93pub use proxy_trait::{FailToProxy, ProxyHttp};
94
95pub mod prelude {
96 pub use crate::{http_proxy, http_proxy_service, ProxyHttp, Session};
97}
98
99pub type ProcessCustomSession<SV, C> = Arc<
100 dyn Fn(Arc<HttpProxy<SV, C>>, Stream, &ShutdownWatch) -> BoxFuture<'static, Option<Stream>>
101 + Send
102 + Sync
103 + Unpin
104 + 'static,
105>;
106
107pub struct HttpProxy<SV, C = ()>
111where
112 C: custom::Connector, {
114 inner: SV, client_upstream: Connector<C>,
116 shutdown: Notify,
117 shutdown_flag: Arc<AtomicBool>,
118 pub server_options: Option<HttpServerOptions>,
119 pub h2_options: Option<H2Options>,
120 pub downstream_modules: HttpModules,
121 max_retries: usize,
122 process_custom_session: Option<ProcessCustomSession<SV, C>>,
123}
124
125impl<SV> HttpProxy<SV, ()> {
126 pub fn new(inner: SV, conf: Arc<ServerConf>) -> Self {
147 HttpProxy {
148 inner,
149 client_upstream: Connector::new(Some(ConnectorOptions::from_server_conf(&conf))),
150 shutdown: Notify::new(),
151 shutdown_flag: Arc::new(AtomicBool::new(false)),
152 server_options: None,
153 h2_options: None,
154 downstream_modules: HttpModules::new(),
155 max_retries: conf.max_retries,
156 process_custom_session: None,
157 }
158 }
159}
160
161impl<SV, C> HttpProxy<SV, C>
162where
163 C: custom::Connector,
164{
165 fn new_custom(
166 inner: SV,
167 conf: Arc<ServerConf>,
168 connector: C,
169 on_custom: ProcessCustomSession<SV, C>,
170 ) -> Self
171 where
172 SV: ProxyHttp + Send + Sync + 'static,
173 SV::CTX: Send + Sync,
174 {
175 let client_upstream =
176 Connector::new_custom(Some(ConnectorOptions::from_server_conf(&conf)), connector);
177
178 HttpProxy {
179 inner,
180 client_upstream,
181 shutdown: Notify::new(),
182 shutdown_flag: Arc::new(AtomicBool::new(false)),
183 server_options: None,
184 downstream_modules: HttpModules::new(),
185 max_retries: conf.max_retries,
186 process_custom_session: Some(on_custom),
187 h2_options: None,
188 }
189 }
190
191 pub fn handle_init_modules(&mut self)
200 where
201 SV: ProxyHttp,
202 {
203 self.inner
204 .init_downstream_modules(&mut self.downstream_modules);
205 }
206
207 async fn handle_new_request(
208 &self,
209 mut downstream_session: Box<HttpSession>,
210 ) -> Option<Box<HttpSession>>
211 where
212 SV: ProxyHttp + Send + Sync,
213 SV::CTX: Send + Sync,
214 {
215 let res = tokio::select! {
218 biased; res = downstream_session.read_request() => { res }
220 _ = self.shutdown.notified() => {
221 return None;
223 }
224 };
225 match res {
226 Ok(true) => {
227 debug!("Successfully get a new request");
229 }
230 Ok(false) => {
231 return None; }
233 Err(mut e) => {
234 e.as_down();
235 error!("Fail to proxy: {e}");
236 if matches!(e.etype, InvalidHTTPHeader) {
237 downstream_session
238 .respond_error(400)
239 .await
240 .unwrap_or_else(|e| {
241 error!("failed to send error response to downstream: {e}");
242 });
243 } downstream_session.shutdown().await;
245 return None;
246 }
247 }
248 trace!(
249 "Request header: {:?}",
250 downstream_session.req_header().as_ref()
251 );
252 Some(downstream_session)
253 }
254
255 async fn proxy_to_upstream(
257 &self,
258 session: &mut Session,
259 ctx: &mut SV::CTX,
260 ) -> (bool, Option<Box<Error>>)
261 where
262 SV: ProxyHttp + Send + Sync,
263 SV::CTX: Send + Sync,
264 {
265 let peer = match self.inner.upstream_peer(session, ctx).await {
266 Ok(p) => p,
267 Err(e) => return (false, Some(e)),
268 };
269
270 let client_session = self.client_upstream.get_http_session(&*peer).await;
271 match client_session {
272 Ok((client_session, client_reused)) => {
273 let (server_reused, error) = match client_session {
274 ClientSession::H1(mut h1) => {
275 let (server_reused, client_reuse, error) = self
276 .proxy_to_h1_upstream(session, &mut h1, client_reused, &peer, ctx)
277 .await;
278 if client_reuse {
279 let session = ClientSession::H1(h1);
280 self.client_upstream
281 .release_http_session(session, &*peer, peer.idle_timeout())
282 .await;
283 }
284 (server_reused, error)
285 }
286 ClientSession::H2(mut h2) => {
287 let (server_reused, mut error) = self
288 .proxy_to_h2_upstream(session, &mut h2, client_reused, &peer, ctx)
289 .await;
290 let session = ClientSession::H2(h2);
291 self.client_upstream
292 .release_http_session(session, &*peer, peer.idle_timeout())
293 .await;
294
295 if let Some(e) = error.as_mut() {
296 if matches!(e.etype, H2Downgrade | InvalidH2) {
299 if peer
300 .get_alpn()
301 .is_none_or(|alpn| alpn.get_min_http_version() == 1)
302 {
303 self.client_upstream.prefer_h1(&*peer);
306 } else {
307 e.retry = false.into();
309 }
310 }
311 }
312
313 (server_reused, error)
314 }
315 ClientSession::Custom(mut c) => {
316 let (server_reused, error) = self
317 .proxy_to_custom_upstream(session, &mut c, client_reused, &peer, ctx)
318 .await;
319 let session = ClientSession::Custom(c);
320 self.client_upstream
321 .release_http_session(session, &*peer, peer.idle_timeout())
322 .await;
323 (server_reused, error)
324 }
325 };
326 (
327 server_reused,
328 error.map(|e| {
329 self.inner
330 .error_while_proxy(&peer, session, e, ctx, client_reused)
331 }),
332 )
333 }
334 Err(mut e) => {
335 e.as_up();
336 let new_err = self.inner.fail_to_connect(session, &peer, ctx, e);
337 (false, Some(new_err.into_up()))
338 }
339 }
340 }
341
342 async fn upstream_filter(
343 &self,
344 session: &mut Session,
345 task: &mut HttpTask,
346 ctx: &mut SV::CTX,
347 ) -> Result<Option<Duration>>
348 where
349 SV: ProxyHttp + Send + Sync,
350 SV::CTX: Send + Sync,
351 {
352 let duration = match task {
353 HttpTask::Header(header, _eos) => {
354 self.inner
355 .upstream_response_filter(session, header, ctx)
356 .await?;
357 None
358 }
359 HttpTask::Body(data, eos) => self
360 .inner
361 .upstream_response_body_filter(session, data, *eos, ctx)?,
362 HttpTask::Trailer(Some(trailers)) => {
363 self.inner
364 .upstream_response_trailer_filter(session, trailers, ctx)?;
365 None
366 }
367 _ => {
368 None
370 }
371 };
372
373 Ok(duration)
374 }
375
376 async fn finish(
377 &self,
378 mut session: Session,
379 ctx: &mut SV::CTX,
380 reuse: bool,
381 error: Option<&Error>,
382 ) -> Option<ReusedHttpStream>
383 where
384 SV: ProxyHttp + Send + Sync,
385 SV::CTX: Send + Sync,
386 {
387 self.inner.logging(&mut session, error, ctx).await;
388
389 if reuse {
390 let persistent_settings = HttpPersistentSettings::for_session(&session);
392 session
393 .downstream_session
394 .finish()
395 .await
396 .ok()
397 .flatten()
398 .map(|s| ReusedHttpStream::new(s, Some(persistent_settings)))
399 } else {
400 None
401 }
402 }
403
404 fn cleanup_sub_req(&self, session: &mut Session) {
405 if let Some(ctx) = session.subrequest_ctx.as_mut() {
406 ctx.release_write_lock();
407 }
408 }
409}
410
411use pingora_cache::HttpCache;
412use pingora_core::protocols::http::compression::ResponseCompressionCtx;
413
414pub struct Session {
419 pub downstream_session: Box<HttpSession>,
421 pub cache: HttpCache,
423 pub upstream_compression: ResponseCompressionCtx,
425 pub ignore_downstream_range: bool,
427 pub upstream_headers_mutated_for_cache: bool,
429 pub subrequest_ctx: Option<Box<SubrequestCtx>>,
431 pub subrequest_spawner: Option<SubrequestSpawner>,
433 pub downstream_modules_ctx: HttpModuleCtx,
435 upstream_body_bytes_received: usize,
438 shutdown_flag: Arc<AtomicBool>,
440}
441
442impl Session {
443 fn new(
444 downstream_session: impl Into<Box<HttpSession>>,
445 downstream_modules: &HttpModules,
446 shutdown_flag: Arc<AtomicBool>,
447 ) -> Self {
448 Session {
449 downstream_session: downstream_session.into(),
450 cache: HttpCache::new(),
451 upstream_compression: ResponseCompressionCtx::new(0, false, false),
453 ignore_downstream_range: false,
454 upstream_headers_mutated_for_cache: false,
455 subrequest_ctx: None,
456 subrequest_spawner: None, downstream_modules_ctx: downstream_modules.build_ctx(),
458 upstream_body_bytes_received: 0,
459 shutdown_flag,
460 }
461 }
462
463 pub fn new_h1(stream: Stream) -> Self {
468 let modules = HttpModules::new();
469 Self::new(
470 Box::new(HttpSession::new_http1(stream)),
471 &modules,
472 Arc::new(AtomicBool::new(false)),
473 )
474 }
475
476 pub fn new_h1_with_modules(stream: Stream, downstream_modules: &HttpModules) -> Self {
481 Self::new(
482 Box::new(HttpSession::new_http1(stream)),
483 downstream_modules,
484 Arc::new(AtomicBool::new(false)),
485 )
486 }
487
488 pub fn as_downstream_mut(&mut self) -> &mut HttpSession {
489 &mut self.downstream_session
490 }
491
492 pub fn as_downstream(&self) -> &HttpSession {
493 &self.downstream_session
494 }
495
496 pub async fn respond_error(&mut self, error: u16) -> Result<()> {
498 self.as_downstream_mut().respond_error(error).await
499 }
500
501 pub async fn respond_error_with_body(&mut self, error: u16, body: Bytes) -> Result<()> {
503 self.as_downstream_mut()
504 .respond_error_with_body(error, body)
505 .await
506 }
507
508 pub async fn write_response_header(
513 &mut self,
514 mut resp: Box<ResponseHeader>,
515 end_of_stream: bool,
516 ) -> Result<()> {
517 self.downstream_modules_ctx
518 .response_header_filter(&mut resp, end_of_stream)
519 .await?;
520 self.downstream_session.write_response_header(resp).await
521 }
522
523 pub async fn write_response_header_ref(
525 &mut self,
526 resp: &ResponseHeader,
527 end_of_stream: bool,
528 ) -> Result<(), Box<Error>> {
529 self.write_response_header(Box::new(resp.clone()), end_of_stream)
530 .await
531 }
532
533 pub async fn write_response_body(
538 &mut self,
539 mut body: Option<Bytes>,
540 end_of_stream: bool,
541 ) -> Result<()> {
542 self.downstream_modules_ctx
543 .response_body_filter(&mut body, end_of_stream)?;
544
545 if body.is_none() && !end_of_stream {
546 return Ok(());
547 }
548
549 let data = body.unwrap_or_default();
550 self.downstream_session
551 .write_response_body(data, end_of_stream)
552 .await
553 }
554
555 pub async fn write_response_tasks(&mut self, mut tasks: Vec<HttpTask>) -> Result<bool> {
556 for task in tasks.iter_mut() {
557 match task {
558 HttpTask::Header(resp, end) => {
559 self.downstream_modules_ctx
560 .response_header_filter(resp, *end)
561 .await?;
562 }
563 HttpTask::Body(data, end) => {
564 self.downstream_modules_ctx
565 .response_body_filter(data, *end)?;
566 }
567 HttpTask::Trailer(trailers) => {
568 if let Some(buf) = self
569 .downstream_modules_ctx
570 .response_trailer_filter(trailers)?
571 {
572 *task = HttpTask::Body(Some(buf), true);
578 }
579 }
580 HttpTask::Done => {
581 if let Some(buf) = self.downstream_modules_ctx.response_done_filter()? {
590 *task = HttpTask::Body(Some(buf), true);
591 }
592 }
593 _ => { }
594 }
595 }
596 self.downstream_session.response_duplex_vec(tasks).await
597 }
598
599 pub fn mark_upstream_headers_mutated_for_cache(&mut self) {
602 self.upstream_headers_mutated_for_cache = true;
603 }
604
605 pub fn upstream_headers_mutated_for_cache(&self) -> bool {
607 self.upstream_headers_mutated_for_cache
608 }
609
610 pub fn upstream_body_bytes_received(&self) -> usize {
612 self.upstream_body_bytes_received
613 }
614
615 pub(crate) fn set_upstream_body_bytes_received(&mut self, n: usize) {
617 self.upstream_body_bytes_received = n;
618 }
619
620 pub fn is_process_shutting_down(&self) -> bool {
622 self.shutdown_flag.load(Ordering::Acquire)
623 }
624
625 pub fn downstream_custom_message(
626 &mut self,
627 ) -> Result<
628 Option<Box<dyn futures::Stream<Item = Result<Bytes>> + Unpin + Send + Sync + 'static>>,
629 > {
630 if let Some(custom_session) = self.downstream_session.as_custom_mut() {
631 custom_session
632 .take_custom_message_reader()
633 .map(Some)
634 .ok_or(Error::explain(
635 ReadError,
636 "can't extract custom reader from downstream",
637 ))
638 } else {
639 Ok(None)
640 }
641 }
642}
643
644impl AsRef<HttpSession> for Session {
645 fn as_ref(&self) -> &HttpSession {
646 &self.downstream_session
647 }
648}
649
650impl AsMut<HttpSession> for Session {
651 fn as_mut(&mut self) -> &mut HttpSession {
652 &mut self.downstream_session
653 }
654}
655
656use std::ops::{Deref, DerefMut};
657
658impl Deref for Session {
659 type Target = HttpSession;
660
661 fn deref(&self) -> &Self::Target {
662 &self.downstream_session
663 }
664}
665
666impl DerefMut for Session {
667 fn deref_mut(&mut self) -> &mut Self::Target {
668 &mut self.downstream_session
669 }
670}
671
672static BAD_GATEWAY: Lazy<ResponseHeader> = Lazy::new(|| {
674 let mut resp = ResponseHeader::build(http::StatusCode::BAD_GATEWAY, Some(3)).unwrap();
675 resp.insert_header(header::SERVER, &SERVER_NAME[..])
676 .unwrap();
677 resp.insert_header(header::CONTENT_LENGTH, 0).unwrap();
678 resp.insert_header(header::CACHE_CONTROL, "private, no-store")
679 .unwrap();
680
681 resp
682});
683
684impl<SV, C> HttpProxy<SV, C>
685where
686 C: custom::Connector,
687{
688 async fn process_request(
689 self: &Arc<Self>,
690 mut session: Session,
691 mut ctx: <SV as ProxyHttp>::CTX,
692 ) -> Option<ReusedHttpStream>
693 where
694 SV: ProxyHttp + Send + Sync + 'static,
695 <SV as ProxyHttp>::CTX: Send + Sync,
696 {
697 if let Err(e) = self
698 .inner
699 .early_request_filter(&mut session, &mut ctx)
700 .await
701 {
702 return self
703 .handle_error(session, &mut ctx, e, "Fail to early filter request:")
704 .await;
705 }
706
707 if self.inner.allow_spawning_subrequest(&session, &ctx) {
708 session.subrequest_spawner = Some(SubrequestSpawner::new(self.clone()));
709 }
710
711 let req = session.downstream_session.req_header_mut();
712
713 if let Err(e) = session
715 .downstream_modules_ctx
716 .request_header_filter(req)
717 .await
718 {
719 return self
720 .handle_error(
721 session,
722 &mut ctx,
723 e,
724 "Failed in downstream modules request filter:",
725 )
726 .await;
727 }
728
729 match self.inner.request_filter(&mut session, &mut ctx).await {
730 Ok(response_sent) => {
731 if response_sent {
732 self.inner.logging(&mut session, None, &mut ctx).await;
734 self.cleanup_sub_req(&mut session);
735 let persistent_settings = HttpPersistentSettings::for_session(&session);
736 return session
737 .downstream_session
738 .finish()
739 .await
740 .ok()
741 .flatten()
742 .map(|s| ReusedHttpStream::new(s, Some(persistent_settings)));
743 }
744 }
746 Err(e) => {
747 return self
748 .handle_error(session, &mut ctx, e, "Fail to filter request:")
749 .await;
750 }
751 }
752
753 if let Some((reuse, err)) = self.proxy_cache(&mut session, &mut ctx).await {
754 return self.finish(session, &mut ctx, reuse, err.as_deref()).await;
756 }
757 self.cleanup_sub_req(&mut session);
761
762 match self
764 .inner
765 .proxy_upstream_filter(&mut session, &mut ctx)
766 .await
767 {
768 Ok(proxy_to_upstream) => {
769 if !proxy_to_upstream {
770 if session.cache.enabled() {
773 session.cache.disable(NoCacheReason::DeclinedToUpstream);
775 }
776 if session.response_written().is_none() {
777 match session.write_response_header_ref(&BAD_GATEWAY, true).await {
778 Ok(()) => {}
779 Err(e) => {
780 return self
781 .handle_error(
782 session,
783 &mut ctx,
784 e,
785 "Error responding with Bad Gateway:",
786 )
787 .await;
788 }
789 }
790 }
791
792 return self.finish(session, &mut ctx, true, None).await;
793 }
794 }
796 Err(e) => {
797 if session.cache.enabled() {
798 session.cache.disable(NoCacheReason::InternalError);
799 }
800
801 return self
802 .handle_error(
803 session,
804 &mut ctx,
805 e,
806 "Error deciding if we should proxy to upstream:",
807 )
808 .await;
809 }
810 }
811
812 let mut retries: usize = 0;
813
814 let mut server_reuse = false;
815 let mut proxy_error: Option<Box<Error>> = None;
816
817 while retries < self.max_retries {
818 retries += 1;
819
820 let (reuse, e) = self.proxy_to_upstream(&mut session, &mut ctx).await;
821 server_reuse = reuse;
822
823 match e {
824 Some(error) => {
825 let retry = error.retry();
826 proxy_error = Some(error);
827 if !retry {
828 break;
829 }
830 warn!(
832 "Fail to proxy: {}, tries: {}, retry: {}, {}",
833 proxy_error.as_ref().unwrap(),
834 retries,
835 retry,
836 self.inner.request_summary(&session, &ctx)
837 );
838 }
839 None => {
840 proxy_error = None;
841 break;
842 }
843 };
844 }
845
846 #[allow(clippy::unnecessary_unwrap)]
850 let serve_stale_result = if proxy_error.is_some() && session.cache.can_serve_stale_error() {
851 self.handle_stale_if_error(&mut session, &mut ctx, proxy_error.as_ref().unwrap())
852 .await
853 } else {
854 None
855 };
856
857 let final_error = if let Some((reuse, stale_cache_error)) = serve_stale_result {
858 server_reuse = server_reuse && reuse;
860 stale_cache_error
861 } else {
862 proxy_error
863 };
864
865 if let Some(e) = final_error.as_ref() {
866 if session.cache.enabled() {
868 let reason = if *e.esource() == ErrorSource::Upstream {
869 NoCacheReason::UpstreamError
870 } else {
871 NoCacheReason::InternalError
872 };
873 session.cache.disable(reason);
874 }
875 let res = self.inner.fail_to_proxy(&mut session, e, &mut ctx).await;
876
877 if !self.inner.suppress_error_log(&session, &ctx, e) {
879 error!(
880 "Fail to proxy: {}, status: {}, tries: {}, retry: {}, {}",
881 final_error.as_ref().unwrap(),
882 res.error_code,
883 retries,
884 false, self.inner.request_summary(&session, &ctx),
886 );
887 }
888 }
889
890 self.finish(session, &mut ctx, server_reuse, final_error.as_deref())
892 .await
893 }
894
895 async fn handle_error(
896 &self,
897 mut session: Session,
898 ctx: &mut <SV as ProxyHttp>::CTX,
899 e: Box<Error>,
900 context: &str,
901 ) -> Option<ReusedHttpStream>
902 where
903 SV: ProxyHttp + Send + Sync + 'static,
904 <SV as ProxyHttp>::CTX: Send + Sync,
905 {
906 let res = self.inner.fail_to_proxy(&mut session, &e, ctx).await;
907 if !self.inner.suppress_error_log(&session, ctx, &e) {
908 error!(
909 "{context} {}, status: {}, {}",
910 e,
911 res.error_code,
912 self.inner.request_summary(&session, ctx)
913 );
914 }
915 self.inner.logging(&mut session, Some(&e), ctx).await;
916 self.cleanup_sub_req(&mut session);
917
918 if res.can_reuse_downstream {
919 let persistent_settings = HttpPersistentSettings::for_session(&session);
920 session
921 .downstream_session
922 .finish()
923 .await
924 .ok()
925 .flatten()
926 .map(|s| ReusedHttpStream::new(s, Some(persistent_settings)))
927 } else {
928 None
929 }
930 }
931}
932
933#[async_trait]
943pub trait Subrequest {
944 async fn process_subrequest(
945 self: Arc<Self>,
946 session: Box<HttpSession>,
947 sub_req_ctx: Box<SubrequestCtx>,
948 );
949}
950
951#[async_trait]
952impl<SV, C> Subrequest for HttpProxy<SV, C>
953where
954 SV: ProxyHttp + Send + Sync + 'static,
955 <SV as ProxyHttp>::CTX: Send + Sync,
956 C: custom::Connector,
957{
958 async fn process_subrequest(
959 self: Arc<Self>,
960 session: Box<HttpSession>,
961 sub_req_ctx: Box<SubrequestCtx>,
962 ) {
963 debug!("starting subrequest");
964
965 let mut session = match self.handle_new_request(session).await {
966 Some(downstream_session) => Session::new(
967 downstream_session,
968 &self.downstream_modules,
969 self.shutdown_flag.clone(),
970 ),
971 None => return, };
973
974 session.set_keepalive(None);
977
978 session.subrequest_ctx.replace(sub_req_ctx);
979 trace!("processing subrequest");
980 let ctx = self.inner.new_ctx();
981 self.process_request(session, ctx).await;
982 trace!("subrequest done");
983 }
984}
985
986pub struct SubrequestSpawner {
988 app: Arc<dyn Subrequest + Send + Sync>,
989}
990
991impl SubrequestSpawner {
992 pub fn new(app: Arc<dyn Subrequest + Send + Sync>) -> SubrequestSpawner {
994 SubrequestSpawner { app }
995 }
996
997 pub fn spawn_background_subrequest(
1000 &self,
1001 session: &HttpSession,
1002 ctx: SubrequestCtx,
1003 ) -> tokio::task::JoinHandle<()> {
1004 let new_app = self.app.clone(); let (mut session, handle) = subrequest::create_session(session);
1006 if ctx.body_mode() == BodyMode::NoBody {
1007 session
1008 .as_subrequest_mut()
1009 .expect("created subrequest session")
1010 .clear_request_body_headers();
1011 }
1012 let sub_req_ctx = Box::new(ctx);
1013 handle.drain_tasks();
1014 tokio::spawn(async move {
1015 new_app
1016 .process_subrequest(Box::new(session), sub_req_ctx)
1017 .await;
1018 })
1019 }
1020}
1021
1022#[async_trait]
1023impl<SV, C> HttpServerApp for HttpProxy<SV, C>
1024where
1025 SV: ProxyHttp + Send + Sync + 'static,
1026 <SV as ProxyHttp>::CTX: Send + Sync,
1027 C: custom::Connector,
1028{
1029 async fn process_new_http(
1030 self: &Arc<Self>,
1031 session: HttpSession,
1032 shutdown: &ShutdownWatch,
1033 ) -> Option<ReusedHttpStream> {
1034 let session = Box::new(session);
1035
1036 let mut session = match self.handle_new_request(session).await {
1038 Some(downstream_session) => Session::new(
1039 downstream_session,
1040 &self.downstream_modules,
1041 self.shutdown_flag.clone(),
1042 ),
1043 None => return None, };
1045
1046 if *shutdown.borrow() {
1047 session.set_keepalive(None);
1049 }
1050
1051 let ctx = self.inner.new_ctx();
1052 self.process_request(session, ctx).await
1053 }
1054
1055 async fn http_cleanup(&self) {
1056 self.shutdown_flag.store(true, Ordering::Release);
1057 self.shutdown.notify_waiters();
1059 }
1060
1061 fn server_options(&self) -> Option<&HttpServerOptions> {
1062 self.server_options.as_ref()
1063 }
1064
1065 fn h2_options(&self) -> Option<H2Options> {
1066 self.h2_options.clone()
1067 }
1068 async fn process_custom_session(
1069 self: Arc<Self>,
1070 stream: Stream,
1071 shutdown: &ShutdownWatch,
1072 ) -> Option<Stream> {
1073 let app = self.clone();
1074
1075 let Some(process_custom_session) = app.process_custom_session.as_ref() else {
1076 warn!("custom was called on an empty on_custom");
1077 return None;
1078 };
1079
1080 process_custom_session(self.clone(), stream, shutdown).await
1081 }
1082
1083 }
1085
1086use pingora_core::services::listening::Service;
1087
1088pub fn http_proxy<SV>(conf: &Arc<ServerConf>, inner: SV) -> HttpProxy<SV>
1119where
1120 SV: ProxyHttp,
1121{
1122 let mut proxy = HttpProxy::new(inner, conf.clone());
1123 proxy.handle_init_modules();
1124 proxy
1125}
1126
1127pub fn http_proxy_service<SV>(conf: &Arc<ServerConf>, inner: SV) -> Service<HttpProxy<SV, ()>>
1131where
1132 SV: ProxyHttp,
1133{
1134 http_proxy_service_with_name(conf, inner, "Pingora HTTP Proxy Service")
1135}
1136
1137pub fn http_proxy_service_with_name<SV>(
1141 conf: &Arc<ServerConf>,
1142 inner: SV,
1143 name: &str,
1144) -> Service<HttpProxy<SV, ()>>
1145where
1146 SV: ProxyHttp,
1147{
1148 let mut proxy = HttpProxy::new(inner, conf.clone());
1149 proxy.handle_init_modules();
1150 Service::new(name.to_string(), proxy)
1151}
1152
1153pub fn http_proxy_service_with_name_custom<SV, C>(
1157 conf: &Arc<ServerConf>,
1158 inner: SV,
1159 name: &str,
1160 connector: C,
1161 on_custom: ProcessCustomSession<SV, C>,
1162) -> Service<HttpProxy<SV, C>>
1163where
1164 SV: ProxyHttp + Send + Sync + 'static,
1165 SV::CTX: Send + Sync + 'static,
1166 C: custom::Connector,
1167{
1168 let mut proxy = HttpProxy::new_custom(inner, conf.clone(), connector, on_custom);
1169 proxy.handle_init_modules();
1170
1171 Service::new(name.to_string(), proxy)
1172}