Skip to main content

pingora_proxy/
lib.rs

1// Copyright 2026 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! # pingora-proxy
16//!
17//! Programmable HTTP proxy built on top of [pingora_core].
18//!
19//! # Features
20//! - HTTP/1.x and HTTP/2 for both downstream and upstream
21//! - Connection pooling
22//! - TLSv1.3, mutual TLS, customizable CA
23//! - Request/Response scanning, modification or rejection
24//! - Dynamic upstream selection
25//! - Configurable retry and failover
26//! - Fully programmable and customizable at any stage of a HTTP request
27//!
28//! # How to use
29//!
30//! Users of this crate defines their proxy by implementing [ProxyHttp] trait, which contains the
31//! callbacks to be invoked at each stage of a HTTP request.
32//!
33//! Then the service can be passed into [`http_proxy_service()`] for a [pingora_core::server::Server] to
34//! run it.
35//!
36//! See `examples/load_balancer.rs` for a detailed example.
37
38use async_trait::async_trait;
39use bytes::Bytes;
40use futures::future::BoxFuture;
41use futures::future::FutureExt;
42use http::{header, version::Version};
43use log::{debug, error, trace, warn};
44use once_cell::sync::Lazy;
45use pingora_http::{RequestHeader, ResponseHeader};
46use std::fmt::Debug;
47use std::str;
48use std::sync::{
49    atomic::{AtomicBool, Ordering},
50    Arc,
51};
52use std::time::Duration;
53use tokio::sync::{mpsc, Notify};
54use tokio::time;
55
56use pingora_cache::NoCacheReason;
57use pingora_core::apps::{
58    HttpPersistentSettings, HttpServerApp, HttpServerOptions, ReusedHttpStream,
59};
60use pingora_core::connectors::http::custom;
61use pingora_core::connectors::{http::Connector, ConnectorOptions};
62use pingora_core::modules::http::compression::ResponseCompressionBuilder;
63use pingora_core::modules::http::{HttpModuleCtx, HttpModules};
64use pingora_core::protocols::http::client::HttpSession as ClientSession;
65use pingora_core::protocols::http::custom::CustomMessageWrite;
66use pingora_core::protocols::http::v1::client::HttpSession as HttpSessionV1;
67use pingora_core::protocols::http::v2::server::H2Options;
68use pingora_core::protocols::http::HttpTask;
69use pingora_core::protocols::http::ServerSession as HttpSession;
70use pingora_core::protocols::http::SERVER_NAME;
71use pingora_core::protocols::Stream;
72use pingora_core::protocols::{Digest, UniqueID};
73use pingora_core::server::configuration::ServerConf;
74use pingora_core::server::ShutdownWatch;
75use pingora_core::upstreams::peer::{HttpPeer, Peer};
76use pingora_error::{Error, ErrorSource, ErrorType::*, OrErr, Result};
77
78const TASK_BUFFER_SIZE: usize = 4;
79
80mod proxy_cache;
81mod proxy_common;
82mod proxy_custom;
83mod proxy_h1;
84mod proxy_h2;
85mod proxy_purge;
86mod proxy_trait;
87pub mod subrequest;
88
89use subrequest::{BodyMode, Ctx as SubrequestCtx};
90
91pub use proxy_cache::range_filter::{range_header_filter, MultiRangeInfo, RangeType};
92pub use proxy_purge::PurgeStatus;
93pub use proxy_trait::{FailToProxy, ProxyHttp};
94
95pub mod prelude {
96    pub use crate::{http_proxy, http_proxy_service, ProxyHttp, Session};
97}
98
99pub type ProcessCustomSession<SV, C> = Arc<
100    dyn Fn(Arc<HttpProxy<SV, C>>, Stream, &ShutdownWatch) -> BoxFuture<'static, Option<Stream>>
101        + Send
102        + Sync
103        + Unpin
104        + 'static,
105>;
106
107/// The concrete type that holds the user defined HTTP proxy.
108///
109/// Users don't need to interact with this object directly.
110pub struct HttpProxy<SV, C = ()>
111where
112    C: custom::Connector, // Upstream custom connector
113{
114    inner: SV, // TODO: name it better than inner
115    client_upstream: Connector<C>,
116    shutdown: Notify,
117    shutdown_flag: Arc<AtomicBool>,
118    pub server_options: Option<HttpServerOptions>,
119    pub h2_options: Option<H2Options>,
120    pub downstream_modules: HttpModules,
121    max_retries: usize,
122    process_custom_session: Option<ProcessCustomSession<SV, C>>,
123}
124
125impl<SV> HttpProxy<SV, ()> {
126    /// Create a new [`HttpProxy`] with the given [`ProxyHttp`] implementation and [`ServerConf`].
127    ///
128    /// After creating an `HttpProxy`, you should call [`HttpProxy::handle_init_modules()`] to
129    /// initialize the downstream modules before processing requests.
130    ///
131    /// For most use cases, prefer using [`http_proxy_service()`] which wraps the `HttpProxy` in a
132    /// [`Service`]. This constructor is useful when you need to integrate `HttpProxy` into a custom
133    /// accept loop (e.g., for SNI-based routing decisions before TLS termination).
134    ///
135    /// # Example
136    ///
137    /// ```ignore
138    /// use pingora_proxy::HttpProxy;
139    /// use std::sync::Arc;
140    ///
141    /// let mut proxy = HttpProxy::new(my_proxy_app, server_conf);
142    /// proxy.handle_init_modules();
143    /// let proxy = Arc::new(proxy);
144    /// // Use proxy.process_new_http() in your custom accept loop
145    /// ```
146    pub fn new(inner: SV, conf: Arc<ServerConf>) -> Self {
147        HttpProxy {
148            inner,
149            client_upstream: Connector::new(Some(ConnectorOptions::from_server_conf(&conf))),
150            shutdown: Notify::new(),
151            shutdown_flag: Arc::new(AtomicBool::new(false)),
152            server_options: None,
153            h2_options: None,
154            downstream_modules: HttpModules::new(),
155            max_retries: conf.max_retries,
156            process_custom_session: None,
157        }
158    }
159}
160
161impl<SV, C> HttpProxy<SV, C>
162where
163    C: custom::Connector,
164{
165    fn new_custom(
166        inner: SV,
167        conf: Arc<ServerConf>,
168        connector: C,
169        on_custom: ProcessCustomSession<SV, C>,
170    ) -> Self
171    where
172        SV: ProxyHttp + Send + Sync + 'static,
173        SV::CTX: Send + Sync,
174    {
175        let client_upstream =
176            Connector::new_custom(Some(ConnectorOptions::from_server_conf(&conf)), connector);
177
178        HttpProxy {
179            inner,
180            client_upstream,
181            shutdown: Notify::new(),
182            shutdown_flag: Arc::new(AtomicBool::new(false)),
183            server_options: None,
184            downstream_modules: HttpModules::new(),
185            max_retries: conf.max_retries,
186            process_custom_session: Some(on_custom),
187            h2_options: None,
188        }
189    }
190
191    /// Initialize the downstream modules for this proxy.
192    ///
193    /// This method must be called after creating an [`HttpProxy`] with [`HttpProxy::new()`]
194    /// and before processing any requests. It invokes [`ProxyHttp::init_downstream_modules()`]
195    /// to set up any HTTP modules configured by the user's proxy implementation.
196    ///
197    /// Note: When using [`http_proxy_service()`] or [`http_proxy_service_with_name()`],
198    /// this method is called automatically.
199    pub fn handle_init_modules(&mut self)
200    where
201        SV: ProxyHttp,
202    {
203        self.inner
204            .init_downstream_modules(&mut self.downstream_modules);
205    }
206
207    async fn handle_new_request(
208        &self,
209        mut downstream_session: Box<HttpSession>,
210    ) -> Option<Box<HttpSession>>
211    where
212        SV: ProxyHttp + Send + Sync,
213        SV::CTX: Send + Sync,
214    {
215        // phase 1 read request header
216
217        let res = tokio::select! {
218            biased; // biased select is cheaper, and we don't want to drop already buffered requests
219            res = downstream_session.read_request() => { res }
220            _ = self.shutdown.notified() => {
221                // service shutting down, dropping the connection to stop more req from coming in
222                return None;
223            }
224        };
225        match res {
226            Ok(true) => {
227                // TODO: check n==0
228                debug!("Successfully get a new request");
229            }
230            Ok(false) => {
231                return None; // TODO: close connection?
232            }
233            Err(mut e) => {
234                e.as_down();
235                error!("Fail to proxy: {e}");
236                if matches!(e.etype, InvalidHTTPHeader) {
237                    downstream_session
238                        .respond_error(400)
239                        .await
240                        .unwrap_or_else(|e| {
241                            error!("failed to send error response to downstream: {e}");
242                        });
243                } // otherwise the connection must be broken, no need to send anything
244                downstream_session.shutdown().await;
245                return None;
246            }
247        }
248        trace!(
249            "Request header: {:?}",
250            downstream_session.req_header().as_ref()
251        );
252        Some(downstream_session)
253    }
254
255    // return bool: server_session can be reused, and error if any
256    async fn proxy_to_upstream(
257        &self,
258        session: &mut Session,
259        ctx: &mut SV::CTX,
260    ) -> (bool, Option<Box<Error>>)
261    where
262        SV: ProxyHttp + Send + Sync,
263        SV::CTX: Send + Sync,
264    {
265        let peer = match self.inner.upstream_peer(session, ctx).await {
266            Ok(p) => p,
267            Err(e) => return (false, Some(e)),
268        };
269
270        let client_session = self.client_upstream.get_http_session(&*peer).await;
271        match client_session {
272            Ok((client_session, client_reused)) => {
273                let (server_reused, error) = match client_session {
274                    ClientSession::H1(mut h1) => {
275                        let (server_reused, client_reuse, error) = self
276                            .proxy_to_h1_upstream(session, &mut h1, client_reused, &peer, ctx)
277                            .await;
278                        if client_reuse {
279                            let session = ClientSession::H1(h1);
280                            self.client_upstream
281                                .release_http_session(session, &*peer, peer.idle_timeout())
282                                .await;
283                        }
284                        (server_reused, error)
285                    }
286                    ClientSession::H2(mut h2) => {
287                        let (server_reused, mut error) = self
288                            .proxy_to_h2_upstream(session, &mut h2, client_reused, &peer, ctx)
289                            .await;
290                        let session = ClientSession::H2(h2);
291                        self.client_upstream
292                            .release_http_session(session, &*peer, peer.idle_timeout())
293                            .await;
294
295                        if let Some(e) = error.as_mut() {
296                            // try to downgrade if A. origin says so or B. origin sends an invalid
297                            // response, which usually means origin h2 is not production ready
298                            if matches!(e.etype, H2Downgrade | InvalidH2) {
299                                if peer
300                                    .get_alpn()
301                                    .is_none_or(|alpn| alpn.get_min_http_version() == 1)
302                                {
303                                    // Add the peer to prefer h1 so that all following requests
304                                    // will use h1
305                                    self.client_upstream.prefer_h1(&*peer);
306                                } else {
307                                    // the peer doesn't allow downgrading to h1 (e.g. gRPC)
308                                    e.retry = false.into();
309                                }
310                            }
311                        }
312
313                        (server_reused, error)
314                    }
315                    ClientSession::Custom(mut c) => {
316                        let (server_reused, error) = self
317                            .proxy_to_custom_upstream(session, &mut c, client_reused, &peer, ctx)
318                            .await;
319                        let session = ClientSession::Custom(c);
320                        self.client_upstream
321                            .release_http_session(session, &*peer, peer.idle_timeout())
322                            .await;
323                        (server_reused, error)
324                    }
325                };
326                (
327                    server_reused,
328                    error.map(|e| {
329                        self.inner
330                            .error_while_proxy(&peer, session, e, ctx, client_reused)
331                    }),
332                )
333            }
334            Err(mut e) => {
335                e.as_up();
336                let new_err = self.inner.fail_to_connect(session, &peer, ctx, e);
337                (false, Some(new_err.into_up()))
338            }
339        }
340    }
341
342    async fn upstream_filter(
343        &self,
344        session: &mut Session,
345        task: &mut HttpTask,
346        ctx: &mut SV::CTX,
347    ) -> Result<Option<Duration>>
348    where
349        SV: ProxyHttp + Send + Sync,
350        SV::CTX: Send + Sync,
351    {
352        let duration = match task {
353            HttpTask::Header(header, _eos) => {
354                self.inner
355                    .upstream_response_filter(session, header, ctx)
356                    .await?;
357                None
358            }
359            HttpTask::Body(data, eos) => self
360                .inner
361                .upstream_response_body_filter(session, data, *eos, ctx)?,
362            HttpTask::Trailer(Some(trailers)) => {
363                self.inner
364                    .upstream_response_trailer_filter(session, trailers, ctx)?;
365                None
366            }
367            _ => {
368                // task does not support a filter
369                None
370            }
371        };
372
373        Ok(duration)
374    }
375
376    async fn finish(
377        &self,
378        mut session: Session,
379        ctx: &mut SV::CTX,
380        reuse: bool,
381        error: Option<&Error>,
382    ) -> Option<ReusedHttpStream>
383    where
384        SV: ProxyHttp + Send + Sync,
385        SV::CTX: Send + Sync,
386    {
387        self.inner.logging(&mut session, error, ctx).await;
388
389        if reuse {
390            // TODO: log error
391            let persistent_settings = HttpPersistentSettings::for_session(&session);
392            session
393                .downstream_session
394                .finish()
395                .await
396                .ok()
397                .flatten()
398                .map(|s| ReusedHttpStream::new(s, Some(persistent_settings)))
399        } else {
400            None
401        }
402    }
403
404    fn cleanup_sub_req(&self, session: &mut Session) {
405        if let Some(ctx) = session.subrequest_ctx.as_mut() {
406            ctx.release_write_lock();
407        }
408    }
409}
410
411use pingora_cache::HttpCache;
412use pingora_core::protocols::http::compression::ResponseCompressionCtx;
413
414/// The established HTTP session
415///
416/// This object is what users interact with in order to access the request itself or change the proxy
417/// behavior.
418pub struct Session {
419    /// the HTTP session to downstream (the client)
420    pub downstream_session: Box<HttpSession>,
421    /// The interface to control HTTP caching
422    pub cache: HttpCache,
423    /// (de)compress responses coming into the proxy (from upstream)
424    pub upstream_compression: ResponseCompressionCtx,
425    /// ignore downstream range (skip downstream range filters)
426    pub ignore_downstream_range: bool,
427    /// Were the upstream request headers modified?
428    pub upstream_headers_mutated_for_cache: bool,
429    /// The context from parent request, if this is a subrequest.
430    pub subrequest_ctx: Option<Box<SubrequestCtx>>,
431    /// Handle to allow spawning subrequests, assigned by the `Subrequest` app logic.
432    pub subrequest_spawner: Option<SubrequestSpawner>,
433    // Downstream filter modules
434    pub downstream_modules_ctx: HttpModuleCtx,
435    /// Upstream response body bytes received (payload only). Set by proxy layer.
436    /// TODO: move this into an upstream session digest for future fields.
437    upstream_body_bytes_received: usize,
438    /// Flag that is set when the shutdown process has begun.
439    shutdown_flag: Arc<AtomicBool>,
440}
441
442impl Session {
443    fn new(
444        downstream_session: impl Into<Box<HttpSession>>,
445        downstream_modules: &HttpModules,
446        shutdown_flag: Arc<AtomicBool>,
447    ) -> Self {
448        Session {
449            downstream_session: downstream_session.into(),
450            cache: HttpCache::new(),
451            // disable both upstream and downstream compression
452            upstream_compression: ResponseCompressionCtx::new(0, false, false),
453            ignore_downstream_range: false,
454            upstream_headers_mutated_for_cache: false,
455            subrequest_ctx: None,
456            subrequest_spawner: None, // optionally set later on
457            downstream_modules_ctx: downstream_modules.build_ctx(),
458            upstream_body_bytes_received: 0,
459            shutdown_flag,
460        }
461    }
462
463    /// Create a new [Session] from the given [Stream]
464    ///
465    /// This function is mostly used for testing and mocking, given the downstream modules and
466    /// shutdown flags will never be set.
467    pub fn new_h1(stream: Stream) -> Self {
468        let modules = HttpModules::new();
469        Self::new(
470            Box::new(HttpSession::new_http1(stream)),
471            &modules,
472            Arc::new(AtomicBool::new(false)),
473        )
474    }
475
476    /// Create a new [Session] from the given [Stream] with modules
477    ///
478    /// This function is mostly used for testing and mocking, given the shutdown flag will never be
479    /// set.
480    pub fn new_h1_with_modules(stream: Stream, downstream_modules: &HttpModules) -> Self {
481        Self::new(
482            Box::new(HttpSession::new_http1(stream)),
483            downstream_modules,
484            Arc::new(AtomicBool::new(false)),
485        )
486    }
487
488    pub fn as_downstream_mut(&mut self) -> &mut HttpSession {
489        &mut self.downstream_session
490    }
491
492    pub fn as_downstream(&self) -> &HttpSession {
493        &self.downstream_session
494    }
495
496    /// Write HTTP response with the given error code to the downstream.
497    pub async fn respond_error(&mut self, error: u16) -> Result<()> {
498        self.as_downstream_mut().respond_error(error).await
499    }
500
501    /// Write HTTP response with the given error code to the downstream with a body.
502    pub async fn respond_error_with_body(&mut self, error: u16, body: Bytes) -> Result<()> {
503        self.as_downstream_mut()
504            .respond_error_with_body(error, body)
505            .await
506    }
507
508    /// Write the given HTTP response header to the downstream
509    ///
510    /// Different from directly calling [HttpSession::write_response_header], this function also
511    /// invokes the filter modules.
512    pub async fn write_response_header(
513        &mut self,
514        mut resp: Box<ResponseHeader>,
515        end_of_stream: bool,
516    ) -> Result<()> {
517        self.downstream_modules_ctx
518            .response_header_filter(&mut resp, end_of_stream)
519            .await?;
520        self.downstream_session.write_response_header(resp).await
521    }
522
523    /// Similar to `write_response_header()`, this fn will clone the `resp` internally
524    pub async fn write_response_header_ref(
525        &mut self,
526        resp: &ResponseHeader,
527        end_of_stream: bool,
528    ) -> Result<(), Box<Error>> {
529        self.write_response_header(Box::new(resp.clone()), end_of_stream)
530            .await
531    }
532
533    /// Write the given HTTP response body chunk to the downstream
534    ///
535    /// Different from directly calling [HttpSession::write_response_body], this function also
536    /// invokes the filter modules.
537    pub async fn write_response_body(
538        &mut self,
539        mut body: Option<Bytes>,
540        end_of_stream: bool,
541    ) -> Result<()> {
542        self.downstream_modules_ctx
543            .response_body_filter(&mut body, end_of_stream)?;
544
545        if body.is_none() && !end_of_stream {
546            return Ok(());
547        }
548
549        let data = body.unwrap_or_default();
550        self.downstream_session
551            .write_response_body(data, end_of_stream)
552            .await
553    }
554
555    pub async fn write_response_tasks(&mut self, mut tasks: Vec<HttpTask>) -> Result<bool> {
556        for task in tasks.iter_mut() {
557            match task {
558                HttpTask::Header(resp, end) => {
559                    self.downstream_modules_ctx
560                        .response_header_filter(resp, *end)
561                        .await?;
562                }
563                HttpTask::Body(data, end) => {
564                    self.downstream_modules_ctx
565                        .response_body_filter(data, *end)?;
566                }
567                HttpTask::Trailer(trailers) => {
568                    if let Some(buf) = self
569                        .downstream_modules_ctx
570                        .response_trailer_filter(trailers)?
571                    {
572                        // Write the trailers into the body if the filter
573                        // returns a buffer.
574                        //
575                        // Note, this will not work if end of stream has already
576                        // been seen or we've written content-length bytes.
577                        *task = HttpTask::Body(Some(buf), true);
578                    }
579                }
580                HttpTask::Done => {
581                    // `Done` can be sent in certain response paths to mark end
582                    // of response if not already done via trailers or body with
583                    // end flag set.
584                    // If the filter returns body bytes on Done,
585                    // write them into the response.
586                    //
587                    // Note, this will not work if end of stream has already
588                    // been seen or we've written content-length bytes.
589                    if let Some(buf) = self.downstream_modules_ctx.response_done_filter()? {
590                        *task = HttpTask::Body(Some(buf), true);
591                    }
592                }
593                _ => { /* Failed */ }
594            }
595        }
596        self.downstream_session.response_duplex_vec(tasks).await
597    }
598
599    /// Mark the upstream headers as modified by caching. This should lead to range filters being
600    /// skipped when responding to the downstream.
601    pub fn mark_upstream_headers_mutated_for_cache(&mut self) {
602        self.upstream_headers_mutated_for_cache = true;
603    }
604
605    /// Check whether the upstream headers were marked as mutated during the request.
606    pub fn upstream_headers_mutated_for_cache(&self) -> bool {
607        self.upstream_headers_mutated_for_cache
608    }
609
610    /// Get the total upstream response body bytes received (payload only) recorded by the proxy layer.
611    pub fn upstream_body_bytes_received(&self) -> usize {
612        self.upstream_body_bytes_received
613    }
614
615    /// Set the total upstream response body bytes received (payload only). Intended for internal use by proxy layer.
616    pub(crate) fn set_upstream_body_bytes_received(&mut self, n: usize) {
617        self.upstream_body_bytes_received = n;
618    }
619
620    /// Is the proxy process in the process of shutting down (e.g. due to graceful upgrade)?
621    pub fn is_process_shutting_down(&self) -> bool {
622        self.shutdown_flag.load(Ordering::Acquire)
623    }
624
625    pub fn downstream_custom_message(
626        &mut self,
627    ) -> Result<
628        Option<Box<dyn futures::Stream<Item = Result<Bytes>> + Unpin + Send + Sync + 'static>>,
629    > {
630        if let Some(custom_session) = self.downstream_session.as_custom_mut() {
631            custom_session
632                .take_custom_message_reader()
633                .map(Some)
634                .ok_or(Error::explain(
635                    ReadError,
636                    "can't extract custom reader from downstream",
637                ))
638        } else {
639            Ok(None)
640        }
641    }
642}
643
644impl AsRef<HttpSession> for Session {
645    fn as_ref(&self) -> &HttpSession {
646        &self.downstream_session
647    }
648}
649
650impl AsMut<HttpSession> for Session {
651    fn as_mut(&mut self) -> &mut HttpSession {
652        &mut self.downstream_session
653    }
654}
655
656use std::ops::{Deref, DerefMut};
657
658impl Deref for Session {
659    type Target = HttpSession;
660
661    fn deref(&self) -> &Self::Target {
662        &self.downstream_session
663    }
664}
665
666impl DerefMut for Session {
667    fn deref_mut(&mut self) -> &mut Self::Target {
668        &mut self.downstream_session
669    }
670}
671
672// generic HTTP 502 response sent when proxy_upstream_filter refuses to connect to upstream
673static BAD_GATEWAY: Lazy<ResponseHeader> = Lazy::new(|| {
674    let mut resp = ResponseHeader::build(http::StatusCode::BAD_GATEWAY, Some(3)).unwrap();
675    resp.insert_header(header::SERVER, &SERVER_NAME[..])
676        .unwrap();
677    resp.insert_header(header::CONTENT_LENGTH, 0).unwrap();
678    resp.insert_header(header::CACHE_CONTROL, "private, no-store")
679        .unwrap();
680
681    resp
682});
683
684impl<SV, C> HttpProxy<SV, C>
685where
686    C: custom::Connector,
687{
688    async fn process_request(
689        self: &Arc<Self>,
690        mut session: Session,
691        mut ctx: <SV as ProxyHttp>::CTX,
692    ) -> Option<ReusedHttpStream>
693    where
694        SV: ProxyHttp + Send + Sync + 'static,
695        <SV as ProxyHttp>::CTX: Send + Sync,
696    {
697        if let Err(e) = self
698            .inner
699            .early_request_filter(&mut session, &mut ctx)
700            .await
701        {
702            return self
703                .handle_error(session, &mut ctx, e, "Fail to early filter request:")
704                .await;
705        }
706
707        if self.inner.allow_spawning_subrequest(&session, &ctx) {
708            session.subrequest_spawner = Some(SubrequestSpawner::new(self.clone()));
709        }
710
711        let req = session.downstream_session.req_header_mut();
712
713        // Built-in downstream request filters go first
714        if let Err(e) = session
715            .downstream_modules_ctx
716            .request_header_filter(req)
717            .await
718        {
719            return self
720                .handle_error(
721                    session,
722                    &mut ctx,
723                    e,
724                    "Failed in downstream modules request filter:",
725                )
726                .await;
727        }
728
729        match self.inner.request_filter(&mut session, &mut ctx).await {
730            Ok(response_sent) => {
731                if response_sent {
732                    // TODO: log error
733                    self.inner.logging(&mut session, None, &mut ctx).await;
734                    self.cleanup_sub_req(&mut session);
735                    let persistent_settings = HttpPersistentSettings::for_session(&session);
736                    return session
737                        .downstream_session
738                        .finish()
739                        .await
740                        .ok()
741                        .flatten()
742                        .map(|s| ReusedHttpStream::new(s, Some(persistent_settings)));
743                }
744                /* else continue */
745            }
746            Err(e) => {
747                return self
748                    .handle_error(session, &mut ctx, e, "Fail to filter request:")
749                    .await;
750            }
751        }
752
753        if let Some((reuse, err)) = self.proxy_cache(&mut session, &mut ctx).await {
754            // cache hit
755            return self.finish(session, &mut ctx, reuse, err.as_deref()).await;
756        }
757        // either uncacheable, or cache miss
758
759        // there should not be a write lock in the sub req ctx after this point
760        self.cleanup_sub_req(&mut session);
761
762        // decide if the request is allowed to go to upstream
763        match self
764            .inner
765            .proxy_upstream_filter(&mut session, &mut ctx)
766            .await
767        {
768            Ok(proxy_to_upstream) => {
769                if !proxy_to_upstream {
770                    // The hook can choose to write its own response, but if it doesn't, we respond
771                    // with a generic 502
772                    if session.cache.enabled() {
773                        // drop the cache lock that this request may be holding onto
774                        session.cache.disable(NoCacheReason::DeclinedToUpstream);
775                    }
776                    if session.response_written().is_none() {
777                        match session.write_response_header_ref(&BAD_GATEWAY, true).await {
778                            Ok(()) => {}
779                            Err(e) => {
780                                return self
781                                    .handle_error(
782                                        session,
783                                        &mut ctx,
784                                        e,
785                                        "Error responding with Bad Gateway:",
786                                    )
787                                    .await;
788                            }
789                        }
790                    }
791
792                    return self.finish(session, &mut ctx, true, None).await;
793                }
794                /* else continue */
795            }
796            Err(e) => {
797                if session.cache.enabled() {
798                    session.cache.disable(NoCacheReason::InternalError);
799                }
800
801                return self
802                    .handle_error(
803                        session,
804                        &mut ctx,
805                        e,
806                        "Error deciding if we should proxy to upstream:",
807                    )
808                    .await;
809            }
810        }
811
812        let mut retries: usize = 0;
813
814        let mut server_reuse = false;
815        let mut proxy_error: Option<Box<Error>> = None;
816
817        while retries < self.max_retries {
818            retries += 1;
819
820            let (reuse, e) = self.proxy_to_upstream(&mut session, &mut ctx).await;
821            server_reuse = reuse;
822
823            match e {
824                Some(error) => {
825                    let retry = error.retry();
826                    proxy_error = Some(error);
827                    if !retry {
828                        break;
829                    }
830                    // only log error that will be retried here, the final error will be logged below
831                    warn!(
832                        "Fail to proxy: {}, tries: {}, retry: {}, {}",
833                        proxy_error.as_ref().unwrap(),
834                        retries,
835                        retry,
836                        self.inner.request_summary(&session, &ctx)
837                    );
838                }
839                None => {
840                    proxy_error = None;
841                    break;
842                }
843            };
844        }
845
846        // serve stale if error
847        // Check both error and cache before calling the function because await is not cheap
848        // allow unwrap until if let chains
849        #[allow(clippy::unnecessary_unwrap)]
850        let serve_stale_result = if proxy_error.is_some() && session.cache.can_serve_stale_error() {
851            self.handle_stale_if_error(&mut session, &mut ctx, proxy_error.as_ref().unwrap())
852                .await
853        } else {
854            None
855        };
856
857        let final_error = if let Some((reuse, stale_cache_error)) = serve_stale_result {
858            // don't reuse server conn if serve stale polluted it
859            server_reuse = server_reuse && reuse;
860            stale_cache_error
861        } else {
862            proxy_error
863        };
864
865        if let Some(e) = final_error.as_ref() {
866            // If we have errored and are still holding a cache lock, release it.
867            if session.cache.enabled() {
868                let reason = if *e.esource() == ErrorSource::Upstream {
869                    NoCacheReason::UpstreamError
870                } else {
871                    NoCacheReason::InternalError
872                };
873                session.cache.disable(reason);
874            }
875            let res = self.inner.fail_to_proxy(&mut session, e, &mut ctx).await;
876
877            // final error will have > 0 status unless downstream connection is dead
878            if !self.inner.suppress_error_log(&session, &ctx, e) {
879                error!(
880                    "Fail to proxy: {}, status: {}, tries: {}, retry: {}, {}",
881                    final_error.as_ref().unwrap(),
882                    res.error_code,
883                    retries,
884                    false, // we never retry here
885                    self.inner.request_summary(&session, &ctx),
886                );
887            }
888        }
889
890        // logging() will be called in finish()
891        self.finish(session, &mut ctx, server_reuse, final_error.as_deref())
892            .await
893    }
894
895    async fn handle_error(
896        &self,
897        mut session: Session,
898        ctx: &mut <SV as ProxyHttp>::CTX,
899        e: Box<Error>,
900        context: &str,
901    ) -> Option<ReusedHttpStream>
902    where
903        SV: ProxyHttp + Send + Sync + 'static,
904        <SV as ProxyHttp>::CTX: Send + Sync,
905    {
906        let res = self.inner.fail_to_proxy(&mut session, &e, ctx).await;
907        if !self.inner.suppress_error_log(&session, ctx, &e) {
908            error!(
909                "{context} {}, status: {}, {}",
910                e,
911                res.error_code,
912                self.inner.request_summary(&session, ctx)
913            );
914        }
915        self.inner.logging(&mut session, Some(&e), ctx).await;
916        self.cleanup_sub_req(&mut session);
917
918        if res.can_reuse_downstream {
919            let persistent_settings = HttpPersistentSettings::for_session(&session);
920            session
921                .downstream_session
922                .finish()
923                .await
924                .ok()
925                .flatten()
926                .map(|s| ReusedHttpStream::new(s, Some(persistent_settings)))
927        } else {
928            None
929        }
930    }
931}
932
933/* Make process_subrequest() a trait to workaround https://github.com/rust-lang/rust/issues/78649
934   if process_subrequest() is implemented as a member of HttpProxy, rust complains
935
936error[E0391]: cycle detected when computing type of `proxy_cache::<impl at pingora-proxy/src/proxy_cache.rs:7:1: 7:23>::proxy_cache::{opaque#0}`
937   --> pingora-proxy/src/proxy_cache.rs:13:10
938    |
93913  |     ) -> Option<(bool, Option<Box<Error>>)>
940
941*/
942#[async_trait]
943pub trait Subrequest {
944    async fn process_subrequest(
945        self: Arc<Self>,
946        session: Box<HttpSession>,
947        sub_req_ctx: Box<SubrequestCtx>,
948    );
949}
950
951#[async_trait]
952impl<SV, C> Subrequest for HttpProxy<SV, C>
953where
954    SV: ProxyHttp + Send + Sync + 'static,
955    <SV as ProxyHttp>::CTX: Send + Sync,
956    C: custom::Connector,
957{
958    async fn process_subrequest(
959        self: Arc<Self>,
960        session: Box<HttpSession>,
961        sub_req_ctx: Box<SubrequestCtx>,
962    ) {
963        debug!("starting subrequest");
964
965        let mut session = match self.handle_new_request(session).await {
966            Some(downstream_session) => Session::new(
967                downstream_session,
968                &self.downstream_modules,
969                self.shutdown_flag.clone(),
970            ),
971            None => return, // bad request
972        };
973
974        // no real downstream to keepalive, but it doesn't matter what is set here because at the end
975        // of this fn the dummy connection will be dropped
976        session.set_keepalive(None);
977
978        session.subrequest_ctx.replace(sub_req_ctx);
979        trace!("processing subrequest");
980        let ctx = self.inner.new_ctx();
981        self.process_request(session, ctx).await;
982        trace!("subrequest done");
983    }
984}
985
986/// A handle to the underlying HTTP proxy app that allows spawning subrequests.
987pub struct SubrequestSpawner {
988    app: Arc<dyn Subrequest + Send + Sync>,
989}
990
991impl SubrequestSpawner {
992    /// Create a new [`SubrequestSpawner`].
993    pub fn new(app: Arc<dyn Subrequest + Send + Sync>) -> SubrequestSpawner {
994        SubrequestSpawner { app }
995    }
996
997    /// Spawn a background subrequest and return a join handle.
998    // TODO: allow configuring the subrequest session before use
999    pub fn spawn_background_subrequest(
1000        &self,
1001        session: &HttpSession,
1002        ctx: SubrequestCtx,
1003    ) -> tokio::task::JoinHandle<()> {
1004        let new_app = self.app.clone(); // Clone the Arc
1005        let (mut session, handle) = subrequest::create_session(session);
1006        if ctx.body_mode() == BodyMode::NoBody {
1007            session
1008                .as_subrequest_mut()
1009                .expect("created subrequest session")
1010                .clear_request_body_headers();
1011        }
1012        let sub_req_ctx = Box::new(ctx);
1013        handle.drain_tasks();
1014        tokio::spawn(async move {
1015            new_app
1016                .process_subrequest(Box::new(session), sub_req_ctx)
1017                .await;
1018        })
1019    }
1020}
1021
1022#[async_trait]
1023impl<SV, C> HttpServerApp for HttpProxy<SV, C>
1024where
1025    SV: ProxyHttp + Send + Sync + 'static,
1026    <SV as ProxyHttp>::CTX: Send + Sync,
1027    C: custom::Connector,
1028{
1029    async fn process_new_http(
1030        self: &Arc<Self>,
1031        session: HttpSession,
1032        shutdown: &ShutdownWatch,
1033    ) -> Option<ReusedHttpStream> {
1034        let session = Box::new(session);
1035
1036        // TODO: keepalive pool, use stack
1037        let mut session = match self.handle_new_request(session).await {
1038            Some(downstream_session) => Session::new(
1039                downstream_session,
1040                &self.downstream_modules,
1041                self.shutdown_flag.clone(),
1042            ),
1043            None => return None, // bad request
1044        };
1045
1046        if *shutdown.borrow() {
1047            // stop downstream from reusing if this service is shutting down soon
1048            session.set_keepalive(None);
1049        }
1050
1051        let ctx = self.inner.new_ctx();
1052        self.process_request(session, ctx).await
1053    }
1054
1055    async fn http_cleanup(&self) {
1056        self.shutdown_flag.store(true, Ordering::Release);
1057        // Notify all keepalived requests blocking on read_request() to abort
1058        self.shutdown.notify_waiters();
1059    }
1060
1061    fn server_options(&self) -> Option<&HttpServerOptions> {
1062        self.server_options.as_ref()
1063    }
1064
1065    fn h2_options(&self) -> Option<H2Options> {
1066        self.h2_options.clone()
1067    }
1068    async fn process_custom_session(
1069        self: Arc<Self>,
1070        stream: Stream,
1071        shutdown: &ShutdownWatch,
1072    ) -> Option<Stream> {
1073        let app = self.clone();
1074
1075        let Some(process_custom_session) = app.process_custom_session.as_ref() else {
1076            warn!("custom was called on an empty on_custom");
1077            return None;
1078        };
1079
1080        process_custom_session(self.clone(), stream, shutdown).await
1081    }
1082
1083    // TODO implement h2_options
1084}
1085
1086use pingora_core::services::listening::Service;
1087
1088/// Create an [`HttpProxy`] without wrapping it in a [`Service`].
1089///
1090/// This is useful when you need to integrate `HttpProxy` into a custom accept loop,
1091/// for example when implementing SNI-based routing that decides between TLS passthrough
1092/// and TLS termination on a single port.
1093///
1094/// The returned `HttpProxy` is fully initialized and ready to process requests via
1095/// [`HttpServerApp::process_new_http()`].
1096///
1097/// # Example
1098///
1099/// ```ignore
1100/// use pingora_proxy::http_proxy;
1101/// use std::sync::Arc;
1102///
1103/// // Create the proxy
1104/// let proxy = Arc::new(http_proxy(&server_conf, my_proxy_app));
1105///
1106/// // In your custom accept loop:
1107/// loop {
1108///     let (stream, addr) = listener.accept().await?;
1109///     
1110///     // Peek SNI, decide routing...
1111///     if should_terminate_tls {
1112///         let tls_stream = my_acceptor.accept(stream).await?;
1113///         let session = HttpSession::new_http1(Box::new(tls_stream));
1114///         proxy.process_new_http(session, &shutdown).await;
1115///     }
1116/// }
1117/// ```
1118pub fn http_proxy<SV>(conf: &Arc<ServerConf>, inner: SV) -> HttpProxy<SV>
1119where
1120    SV: ProxyHttp,
1121{
1122    let mut proxy = HttpProxy::new(inner, conf.clone());
1123    proxy.handle_init_modules();
1124    proxy
1125}
1126
1127/// Create a [Service] from the user implemented [ProxyHttp].
1128///
1129/// The returned [Service] can be hosted by a [pingora_core::server::Server] directly.
1130pub fn http_proxy_service<SV>(conf: &Arc<ServerConf>, inner: SV) -> Service<HttpProxy<SV, ()>>
1131where
1132    SV: ProxyHttp,
1133{
1134    http_proxy_service_with_name(conf, inner, "Pingora HTTP Proxy Service")
1135}
1136
1137/// Create a [Service] from the user implemented [ProxyHttp].
1138///
1139/// The returned [Service] can be hosted by a [pingora_core::server::Server] directly.
1140pub fn http_proxy_service_with_name<SV>(
1141    conf: &Arc<ServerConf>,
1142    inner: SV,
1143    name: &str,
1144) -> Service<HttpProxy<SV, ()>>
1145where
1146    SV: ProxyHttp,
1147{
1148    let mut proxy = HttpProxy::new(inner, conf.clone());
1149    proxy.handle_init_modules();
1150    Service::new(name.to_string(), proxy)
1151}
1152
1153/// Create a [Service] from the user implemented [ProxyHttp].
1154///
1155/// The returned [Service] can be hosted by a [pingora_core::server::Server] directly.
1156pub fn http_proxy_service_with_name_custom<SV, C>(
1157    conf: &Arc<ServerConf>,
1158    inner: SV,
1159    name: &str,
1160    connector: C,
1161    on_custom: ProcessCustomSession<SV, C>,
1162) -> Service<HttpProxy<SV, C>>
1163where
1164    SV: ProxyHttp + Send + Sync + 'static,
1165    SV::CTX: Send + Sync + 'static,
1166    C: custom::Connector,
1167{
1168    let mut proxy = HttpProxy::new_custom(inner, conf.clone(), connector, on_custom);
1169    proxy.handle_init_modules();
1170
1171    Service::new(name.to_string(), proxy)
1172}