Skip to main content

pingora_proxy/
lib.rs

1// Copyright 2026 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! # pingora-proxy
16//!
17//! Programmable HTTP proxy built on top of [pingora_core].
18//!
19//! # Features
20//! - HTTP/1.x and HTTP/2 for both downstream and upstream
21//! - Connection pooling
22//! - TLSv1.3, mutual TLS, customizable CA
23//! - Request/Response scanning, modification or rejection
24//! - Dynamic upstream selection
25//! - Configurable retry and failover
26//! - Fully programmable and customizable at any stage of a HTTP request
27//!
28//! # How to use
29//!
30//! Users of this crate defines their proxy by implementing [ProxyHttp] trait, which contains the
31//! callbacks to be invoked at each stage of a HTTP request.
32//!
33//! Then the service can be passed into [`http_proxy_service()`] for a [pingora_core::server::Server] to
34//! run it.
35//!
36//! See `examples/load_balancer.rs` for a detailed example.
37
38use async_trait::async_trait;
39use bytes::Bytes;
40use futures::future::BoxFuture;
41use futures::future::FutureExt;
42use http::{header, version::Version, Method};
43use log::{debug, error, trace, warn};
44use once_cell::sync::Lazy;
45use pingora_http::{RequestHeader, ResponseHeader};
46use std::fmt::Debug;
47use std::str;
48use std::sync::{
49    atomic::{AtomicBool, Ordering},
50    Arc,
51};
52use std::time::Duration;
53use tokio::sync::{mpsc, Notify};
54use tokio::time;
55
56use pingora_cache::NoCacheReason;
57use pingora_core::apps::{
58    HttpPersistentSettings, HttpServerApp, HttpServerOptions, ReusedHttpStream,
59};
60use pingora_core::connectors::http::custom;
61use pingora_core::connectors::{http::Connector, ConnectorOptions};
62use pingora_core::modules::http::compression::ResponseCompressionBuilder;
63use pingora_core::modules::http::{HttpModuleCtx, HttpModules};
64use pingora_core::protocols::http::client::HttpSession as ClientSession;
65use pingora_core::protocols::http::custom::CustomMessageWrite;
66use pingora_core::protocols::http::subrequest::server::SubrequestHandle;
67use pingora_core::protocols::http::v1::client::HttpSession as HttpSessionV1;
68use pingora_core::protocols::http::v2::server::H2Options;
69use pingora_core::protocols::http::HttpTask;
70use pingora_core::protocols::http::ServerSession as HttpSession;
71use pingora_core::protocols::http::SERVER_NAME;
72use pingora_core::protocols::Stream;
73use pingora_core::protocols::{Digest, UniqueID};
74use pingora_core::server::configuration::ServerConf;
75use pingora_core::server::ShutdownWatch;
76use pingora_core::upstreams::peer::{HttpPeer, Peer};
77use pingora_error::{Error, ErrorSource, ErrorType::*, OrErr, Result};
78
79const TASK_BUFFER_SIZE: usize = 4;
80
81mod proxy_cache;
82mod proxy_common;
83mod proxy_custom;
84mod proxy_h1;
85mod proxy_h2;
86mod proxy_purge;
87mod proxy_trait;
88pub mod subrequest;
89
90use subrequest::{BodyMode, Ctx as SubrequestCtx};
91
92pub use proxy_cache::range_filter::{range_header_filter, MultiRangeInfo, RangeType};
93pub use proxy_purge::PurgeStatus;
94pub use proxy_trait::{FailToProxy, ProxyHttp};
95
96pub mod prelude {
97    pub use crate::{http_proxy, http_proxy_service, ProxyHttp, Session};
98}
99
100pub type ProcessCustomSession<SV, C> = Arc<
101    dyn Fn(Arc<HttpProxy<SV, C>>, Stream, &ShutdownWatch) -> BoxFuture<'static, Option<Stream>>
102        + Send
103        + Sync
104        + Unpin
105        + 'static,
106>;
107
108/// The concrete type that holds the user defined HTTP proxy.
109///
110/// Users don't need to interact with this object directly.
111pub struct HttpProxy<SV, C = ()>
112where
113    C: custom::Connector, // Upstream custom connector
114{
115    inner: SV, // TODO: name it better than inner
116    client_upstream: Connector<C>,
117    shutdown: Notify,
118    shutdown_flag: Arc<AtomicBool>,
119    pub server_options: Option<HttpServerOptions>,
120    pub h2_options: Option<H2Options>,
121    pub downstream_modules: HttpModules,
122    max_retries: usize,
123    process_custom_session: Option<ProcessCustomSession<SV, C>>,
124}
125
126impl<SV> HttpProxy<SV, ()> {
127    /// Create a new [`HttpProxy`] with the given [`ProxyHttp`] implementation and [`ServerConf`].
128    ///
129    /// After creating an `HttpProxy`, you should call [`HttpProxy::handle_init_modules()`] to
130    /// initialize the downstream modules before processing requests.
131    ///
132    /// For most use cases, prefer using [`http_proxy_service()`] which wraps the `HttpProxy` in a
133    /// [`Service`]. This constructor is useful when you need to integrate `HttpProxy` into a custom
134    /// accept loop (e.g., for SNI-based routing decisions before TLS termination).
135    ///
136    /// # Example
137    ///
138    /// ```ignore
139    /// use pingora_proxy::HttpProxy;
140    /// use std::sync::Arc;
141    ///
142    /// let mut proxy = HttpProxy::new(my_proxy_app, server_conf);
143    /// proxy.handle_init_modules();
144    /// let proxy = Arc::new(proxy);
145    /// // Use proxy.process_new_http() in your custom accept loop
146    /// ```
147    pub fn new(inner: SV, conf: Arc<ServerConf>) -> Self {
148        HttpProxy {
149            inner,
150            client_upstream: Connector::new(Some(ConnectorOptions::from_server_conf(&conf))),
151            shutdown: Notify::new(),
152            shutdown_flag: Arc::new(AtomicBool::new(false)),
153            server_options: None,
154            h2_options: None,
155            downstream_modules: HttpModules::new(),
156            max_retries: conf.max_retries,
157            process_custom_session: None,
158        }
159    }
160}
161
162impl<SV, C> HttpProxy<SV, C>
163where
164    C: custom::Connector,
165{
166    fn new_custom(
167        inner: SV,
168        conf: Arc<ServerConf>,
169        connector: C,
170        on_custom: Option<ProcessCustomSession<SV, C>>,
171        server_options: Option<HttpServerOptions>,
172    ) -> Self
173    where
174        SV: ProxyHttp + Send + Sync + 'static,
175        SV::CTX: Send + Sync,
176    {
177        let client_upstream =
178            Connector::new_custom(Some(ConnectorOptions::from_server_conf(&conf)), connector);
179
180        HttpProxy {
181            inner,
182            client_upstream,
183            shutdown: Notify::new(),
184            shutdown_flag: Arc::new(AtomicBool::new(false)),
185            server_options,
186            downstream_modules: HttpModules::new(),
187            max_retries: conf.max_retries,
188            process_custom_session: on_custom,
189            h2_options: None,
190        }
191    }
192
193    /// Initialize the downstream modules for this proxy.
194    ///
195    /// This method must be called after creating an [`HttpProxy`] with [`HttpProxy::new()`]
196    /// and before processing any requests. It invokes [`ProxyHttp::init_downstream_modules()`]
197    /// to set up any HTTP modules configured by the user's proxy implementation.
198    ///
199    /// Note: When using [`http_proxy_service()`] or [`http_proxy_service_with_name()`],
200    /// this method is called automatically.
201    pub fn handle_init_modules(&mut self)
202    where
203        SV: ProxyHttp,
204    {
205        self.inner
206            .init_downstream_modules(&mut self.downstream_modules);
207    }
208
209    async fn handle_new_request(
210        &self,
211        mut downstream_session: Box<HttpSession>,
212    ) -> Option<Box<HttpSession>>
213    where
214        SV: ProxyHttp + Send + Sync,
215        SV::CTX: Send + Sync,
216    {
217        // phase 1 read request header
218
219        let res = tokio::select! {
220            biased; // biased select is cheaper, and we don't want to drop already buffered requests
221            res = downstream_session.read_request() => { res }
222            _ = self.shutdown.notified() => {
223                // service shutting down, dropping the connection to stop more req from coming in
224                return None;
225            }
226        };
227        match res {
228            Ok(true) => {
229                // TODO: check n==0
230                debug!("Successfully get a new request");
231            }
232            Ok(false) => {
233                return None; // TODO: close connection?
234            }
235            Err(mut e) => {
236                e.as_down();
237                error!("Fail to proxy: {e}");
238                if matches!(e.etype, InvalidHTTPHeader) {
239                    downstream_session
240                        .respond_error(400)
241                        .await
242                        .unwrap_or_else(|e| {
243                            error!("failed to send error response to downstream: {e}");
244                        });
245                } // otherwise the connection must be broken, no need to send anything
246                downstream_session.shutdown().await;
247                return None;
248            }
249        }
250        trace!(
251            "Request header: {:?}",
252            downstream_session.req_header().as_ref()
253        );
254        // CONNECT method proxying is not default supported by the proxy http logic itself,
255        // since the tunneling process changes the request-response flow.
256        // https://datatracker.ietf.org/doc/html/rfc9110#name-connect
257        // Also because the method impacts message framing in a way is currently unaccounted for
258        // (https://datatracker.ietf.org/doc/html/rfc9112#section-6.3-2.2)
259        // it is safest to disallow use of the method by default.
260        if !self
261            .server_options
262            .as_ref()
263            .is_some_and(|opts| opts.allow_connect_method_proxying)
264            && downstream_session.req_header().method == Method::CONNECT
265        {
266            downstream_session
267                .respond_error(405)
268                .await
269                .unwrap_or_else(|e| {
270                    error!("failed to send error response to downstream: {e}");
271                });
272            downstream_session.shutdown().await;
273            return None;
274        }
275        Some(downstream_session)
276    }
277
278    // return bool: server_session can be reused, and error if any
279    async fn proxy_to_upstream(
280        &self,
281        session: &mut Session,
282        ctx: &mut SV::CTX,
283    ) -> (bool, Option<Box<Error>>)
284    where
285        SV: ProxyHttp + Send + Sync,
286        SV::CTX: Send + Sync,
287    {
288        let peer = match self.inner.upstream_peer(session, ctx).await {
289            Ok(p) => p,
290            Err(e) => return (false, Some(e)),
291        };
292
293        let client_session = self.client_upstream.get_http_session(&*peer).await;
294        match client_session {
295            Ok((client_session, client_reused)) => {
296                let (server_reused, error) = match client_session {
297                    ClientSession::H1(mut h1) => {
298                        let (server_reused, client_reuse, error) = self
299                            .proxy_to_h1_upstream(session, &mut h1, client_reused, &peer, ctx)
300                            .await;
301                        if client_reuse {
302                            let session = ClientSession::H1(h1);
303                            self.client_upstream
304                                .release_http_session(session, &*peer, peer.idle_timeout())
305                                .await;
306                        }
307                        (server_reused, error)
308                    }
309                    ClientSession::H2(mut h2) => {
310                        let (server_reused, mut error) = self
311                            .proxy_to_h2_upstream(session, &mut h2, client_reused, &peer, ctx)
312                            .await;
313                        let session = ClientSession::H2(h2);
314                        self.client_upstream
315                            .release_http_session(session, &*peer, peer.idle_timeout())
316                            .await;
317
318                        if let Some(e) = error.as_mut() {
319                            // try to downgrade if A. origin says so or B. origin sends an invalid
320                            // response, which usually means origin h2 is not production ready
321                            if matches!(e.etype, H2Downgrade | InvalidH2) {
322                                if peer
323                                    .get_alpn()
324                                    .is_none_or(|alpn| alpn.get_min_http_version() == 1)
325                                {
326                                    // Add the peer to prefer h1 so that all following requests
327                                    // will use h1
328                                    self.client_upstream.prefer_h1(&*peer);
329                                } else {
330                                    // the peer doesn't allow downgrading to h1 (e.g. gRPC)
331                                    e.retry = false.into();
332                                }
333                            }
334                        }
335
336                        (server_reused, error)
337                    }
338                    ClientSession::Custom(mut c) => {
339                        let (server_reused, error) = self
340                            .proxy_to_custom_upstream(session, &mut c, client_reused, &peer, ctx)
341                            .await;
342                        let session = ClientSession::Custom(c);
343                        self.client_upstream
344                            .release_http_session(session, &*peer, peer.idle_timeout())
345                            .await;
346                        (server_reused, error)
347                    }
348                };
349                (
350                    server_reused,
351                    error.map(|e| {
352                        self.inner
353                            .error_while_proxy(&peer, session, e, ctx, client_reused)
354                    }),
355                )
356            }
357            Err(mut e) => {
358                e.as_up();
359                let new_err = self.inner.fail_to_connect(session, &peer, ctx, e);
360                (false, Some(new_err.into_up()))
361            }
362        }
363    }
364
365    async fn upstream_filter(
366        &self,
367        session: &mut Session,
368        task: &mut HttpTask,
369        ctx: &mut SV::CTX,
370    ) -> Result<Option<Duration>>
371    where
372        SV: ProxyHttp + Send + Sync,
373        SV::CTX: Send + Sync,
374    {
375        let duration = match task {
376            HttpTask::Header(header, _eos) => {
377                self.inner
378                    .upstream_response_filter(session, header, ctx)
379                    .await?;
380                None
381            }
382            HttpTask::Body(data, eos) | HttpTask::UpgradedBody(data, eos) => self
383                .inner
384                .upstream_response_body_filter(session, data, *eos, ctx)?,
385            HttpTask::Trailer(Some(trailers)) => {
386                self.inner
387                    .upstream_response_trailer_filter(session, trailers, ctx)?;
388                None
389            }
390            _ => {
391                // task does not support a filter
392                None
393            }
394        };
395
396        Ok(duration)
397    }
398
399    async fn finish(
400        &self,
401        mut session: Session,
402        ctx: &mut SV::CTX,
403        reuse: bool,
404        error: Option<Box<Error>>,
405    ) -> Option<ReusedHttpStream>
406    where
407        SV: ProxyHttp + Send + Sync,
408        SV::CTX: Send + Sync,
409    {
410        self.inner
411            .logging(&mut session, error.as_deref(), ctx)
412            .await;
413
414        if let Some(e) = error {
415            session.downstream_session.on_proxy_failure(e);
416        }
417
418        if reuse {
419            // TODO: log error
420            let persistent_settings = HttpPersistentSettings::for_session(&session);
421            session
422                .downstream_session
423                .finish()
424                .await
425                .ok()
426                .flatten()
427                .map(|s| ReusedHttpStream::new(s, Some(persistent_settings)))
428        } else {
429            None
430        }
431    }
432
433    fn cleanup_sub_req(&self, session: &mut Session) {
434        if let Some(ctx) = session.subrequest_ctx.as_mut() {
435            ctx.release_write_lock();
436        }
437    }
438}
439
440use pingora_cache::HttpCache;
441use pingora_core::protocols::http::compression::ResponseCompressionCtx;
442
443/// The established HTTP session
444///
445/// This object is what users interact with in order to access the request itself or change the proxy
446/// behavior.
447pub struct Session {
448    /// the HTTP session to downstream (the client)
449    pub downstream_session: Box<HttpSession>,
450    /// The interface to control HTTP caching
451    pub cache: HttpCache,
452    /// (de)compress responses coming into the proxy (from upstream)
453    pub upstream_compression: ResponseCompressionCtx,
454    /// ignore downstream range (skip downstream range filters)
455    pub ignore_downstream_range: bool,
456    /// Were the upstream request headers modified?
457    pub upstream_headers_mutated_for_cache: bool,
458    /// The context from parent request, if this is a subrequest.
459    pub subrequest_ctx: Option<Box<SubrequestCtx>>,
460    /// Handle to allow spawning subrequests, assigned by the `Subrequest` app logic.
461    pub subrequest_spawner: Option<SubrequestSpawner>,
462    // Downstream filter modules
463    pub downstream_modules_ctx: HttpModuleCtx,
464    /// Upstream response body bytes received (payload only). Set by proxy layer.
465    /// TODO: move this into an upstream session digest for future fields.
466    upstream_body_bytes_received: usize,
467    /// Upstream write pending time. Set by proxy layer (HTTP/1.x only).
468    upstream_write_pending_time: Duration,
469    /// Flag that is set when the shutdown process has begun.
470    shutdown_flag: Arc<AtomicBool>,
471}
472
473impl Session {
474    fn new(
475        downstream_session: impl Into<Box<HttpSession>>,
476        downstream_modules: &HttpModules,
477        shutdown_flag: Arc<AtomicBool>,
478    ) -> Self {
479        Session {
480            downstream_session: downstream_session.into(),
481            cache: HttpCache::new(),
482            // disable both upstream and downstream compression
483            upstream_compression: ResponseCompressionCtx::new(0, false, false),
484            ignore_downstream_range: false,
485            upstream_headers_mutated_for_cache: false,
486            subrequest_ctx: None,
487            subrequest_spawner: None, // optionally set later on
488            downstream_modules_ctx: downstream_modules.build_ctx(),
489            upstream_body_bytes_received: 0,
490            upstream_write_pending_time: Duration::ZERO,
491            shutdown_flag,
492        }
493    }
494
495    /// Create a new [Session] from the given [Stream]
496    ///
497    /// This function is mostly used for testing and mocking, given the downstream modules and
498    /// shutdown flags will never be set.
499    pub fn new_h1(stream: Stream) -> Self {
500        let modules = HttpModules::new();
501        Self::new(
502            Box::new(HttpSession::new_http1(stream)),
503            &modules,
504            Arc::new(AtomicBool::new(false)),
505        )
506    }
507
508    /// Create a new [Session] from the given [Stream] with modules
509    ///
510    /// This function is mostly used for testing and mocking, given the shutdown flag will never be
511    /// set.
512    pub fn new_h1_with_modules(stream: Stream, downstream_modules: &HttpModules) -> Self {
513        Self::new(
514            Box::new(HttpSession::new_http1(stream)),
515            downstream_modules,
516            Arc::new(AtomicBool::new(false)),
517        )
518    }
519
520    pub fn as_downstream_mut(&mut self) -> &mut HttpSession {
521        &mut self.downstream_session
522    }
523
524    pub fn as_downstream(&self) -> &HttpSession {
525        &self.downstream_session
526    }
527
528    /// Write HTTP response with the given error code to the downstream.
529    pub async fn respond_error(&mut self, error: u16) -> Result<()> {
530        self.as_downstream_mut().respond_error(error).await
531    }
532
533    /// Write HTTP response with the given error code to the downstream with a body.
534    pub async fn respond_error_with_body(&mut self, error: u16, body: Bytes) -> Result<()> {
535        self.as_downstream_mut()
536            .respond_error_with_body(error, body)
537            .await
538    }
539
540    /// Write the given HTTP response header to the downstream
541    ///
542    /// Different from directly calling [HttpSession::write_response_header], this function also
543    /// invokes the filter modules.
544    pub async fn write_response_header(
545        &mut self,
546        mut resp: Box<ResponseHeader>,
547        end_of_stream: bool,
548    ) -> Result<()> {
549        self.downstream_modules_ctx
550            .response_header_filter(&mut resp, end_of_stream)
551            .await?;
552        self.downstream_session.write_response_header(resp).await
553    }
554
555    /// Similar to `write_response_header()`, this fn will clone the `resp` internally
556    pub async fn write_response_header_ref(
557        &mut self,
558        resp: &ResponseHeader,
559        end_of_stream: bool,
560    ) -> Result<(), Box<Error>> {
561        self.write_response_header(Box::new(resp.clone()), end_of_stream)
562            .await
563    }
564
565    /// Write the given HTTP response body chunk to the downstream
566    ///
567    /// Different from directly calling [HttpSession::write_response_body], this function also
568    /// invokes the filter modules.
569    pub async fn write_response_body(
570        &mut self,
571        mut body: Option<Bytes>,
572        end_of_stream: bool,
573    ) -> Result<()> {
574        self.downstream_modules_ctx
575            .response_body_filter(&mut body, end_of_stream)?;
576
577        if body.is_none() && !end_of_stream {
578            return Ok(());
579        }
580
581        let data = body.unwrap_or_default();
582        self.downstream_session
583            .write_response_body(data, end_of_stream)
584            .await
585    }
586
587    pub async fn write_response_tasks(&mut self, mut tasks: Vec<HttpTask>) -> Result<bool> {
588        let mut seen_upgraded = self.was_upgraded();
589        for task in tasks.iter_mut() {
590            match task {
591                HttpTask::Header(resp, end) => {
592                    self.downstream_modules_ctx
593                        .response_header_filter(resp, *end)
594                        .await?;
595                }
596                HttpTask::Body(data, end) => {
597                    self.downstream_modules_ctx
598                        .response_body_filter(data, *end)?;
599                }
600                HttpTask::UpgradedBody(data, end) => {
601                    seen_upgraded = true;
602                    self.downstream_modules_ctx
603                        .response_body_filter(data, *end)?;
604                }
605                HttpTask::Trailer(trailers) => {
606                    if let Some(buf) = self
607                        .downstream_modules_ctx
608                        .response_trailer_filter(trailers)?
609                    {
610                        // Write the trailers into the body if the filter
611                        // returns a buffer.
612                        //
613                        // Note, this will not work if end of stream has already
614                        // been seen or we've written content-length bytes.
615                        // (Trailers should never come after upgraded body)
616                        *task = HttpTask::Body(Some(buf), true);
617                    }
618                }
619                HttpTask::Done => {
620                    // `Done` can be sent in certain response paths to mark end
621                    // of response if not already done via trailers or body with
622                    // end flag set.
623                    // If the filter returns body bytes on Done,
624                    // write them into the response.
625                    //
626                    // Note, this will not work if end of stream has already
627                    // been seen or we've written content-length bytes.
628                    if let Some(buf) = self.downstream_modules_ctx.response_done_filter()? {
629                        if seen_upgraded {
630                            *task = HttpTask::UpgradedBody(Some(buf), true);
631                        } else {
632                            *task = HttpTask::Body(Some(buf), true);
633                        }
634                    }
635                }
636                _ => { /* Failed */ }
637            }
638        }
639        self.downstream_session.response_duplex_vec(tasks).await
640    }
641
642    /// Mark the upstream headers as modified by caching. This should lead to range filters being
643    /// skipped when responding to the downstream.
644    pub fn mark_upstream_headers_mutated_for_cache(&mut self) {
645        self.upstream_headers_mutated_for_cache = true;
646    }
647
648    /// Check whether the upstream headers were marked as mutated during the request.
649    pub fn upstream_headers_mutated_for_cache(&self) -> bool {
650        self.upstream_headers_mutated_for_cache
651    }
652
653    /// Get the total upstream response body bytes received (payload only) recorded by the proxy layer.
654    pub fn upstream_body_bytes_received(&self) -> usize {
655        self.upstream_body_bytes_received
656    }
657
658    /// Set the total upstream response body bytes received (payload only). Intended for internal use by proxy layer.
659    pub(crate) fn set_upstream_body_bytes_received(&mut self, n: usize) {
660        self.upstream_body_bytes_received = n;
661    }
662
663    /// Get the upstream write pending time recorded by the proxy layer. Returns [`Duration::ZERO`] for HTTP/2.
664    pub fn upstream_write_pending_time(&self) -> Duration {
665        self.upstream_write_pending_time
666    }
667
668    /// Set the upstream write pending time. Intended for internal use by proxy layer.
669    pub(crate) fn set_upstream_write_pending_time(&mut self, d: Duration) {
670        self.upstream_write_pending_time = d;
671    }
672
673    /// Is the proxy process in the process of shutting down (e.g. due to graceful upgrade)?
674    pub fn is_process_shutting_down(&self) -> bool {
675        self.shutdown_flag.load(Ordering::Acquire)
676    }
677
678    pub fn downstream_custom_message(
679        &mut self,
680    ) -> Result<
681        Option<Box<dyn futures::Stream<Item = Result<Bytes>> + Unpin + Send + Sync + 'static>>,
682    > {
683        if let Some(custom_session) = self.downstream_session.as_custom_mut() {
684            custom_session
685                .take_custom_message_reader()
686                .map(Some)
687                .ok_or(Error::explain(
688                    ReadError,
689                    "can't extract custom reader from downstream",
690                ))
691        } else {
692            Ok(None)
693        }
694    }
695}
696
697impl AsRef<HttpSession> for Session {
698    fn as_ref(&self) -> &HttpSession {
699        &self.downstream_session
700    }
701}
702
703impl AsMut<HttpSession> for Session {
704    fn as_mut(&mut self) -> &mut HttpSession {
705        &mut self.downstream_session
706    }
707}
708
709use std::ops::{Deref, DerefMut};
710
711impl Deref for Session {
712    type Target = HttpSession;
713
714    fn deref(&self) -> &Self::Target {
715        &self.downstream_session
716    }
717}
718
719impl DerefMut for Session {
720    fn deref_mut(&mut self) -> &mut Self::Target {
721        &mut self.downstream_session
722    }
723}
724
725// generic HTTP 502 response sent when proxy_upstream_filter refuses to connect to upstream
726static BAD_GATEWAY: Lazy<ResponseHeader> = Lazy::new(|| {
727    let mut resp = ResponseHeader::build(http::StatusCode::BAD_GATEWAY, Some(3)).unwrap();
728    resp.insert_header(header::SERVER, &SERVER_NAME[..])
729        .unwrap();
730    resp.insert_header(header::CONTENT_LENGTH, 0).unwrap();
731    resp.insert_header(header::CACHE_CONTROL, "private, no-store")
732        .unwrap();
733
734    resp
735});
736
737impl<SV, C> HttpProxy<SV, C>
738where
739    C: custom::Connector,
740{
741    async fn process_request(
742        self: &Arc<Self>,
743        mut session: Session,
744        mut ctx: <SV as ProxyHttp>::CTX,
745    ) -> Option<ReusedHttpStream>
746    where
747        SV: ProxyHttp + Send + Sync + 'static,
748        <SV as ProxyHttp>::CTX: Send + Sync,
749    {
750        if let Err(e) = self
751            .inner
752            .early_request_filter(&mut session, &mut ctx)
753            .await
754        {
755            return self
756                .handle_error(session, &mut ctx, e, "Fail to early filter request:")
757                .await;
758        }
759
760        if self.inner.allow_spawning_subrequest(&session, &ctx) {
761            session.subrequest_spawner = Some(SubrequestSpawner::new(self.clone()));
762        }
763
764        let req = session.downstream_session.req_header_mut();
765
766        // Built-in downstream request filters go first
767        if let Err(e) = session
768            .downstream_modules_ctx
769            .request_header_filter(req)
770            .await
771        {
772            return self
773                .handle_error(
774                    session,
775                    &mut ctx,
776                    e,
777                    "Failed in downstream modules request filter:",
778                )
779                .await;
780        }
781
782        match self.inner.request_filter(&mut session, &mut ctx).await {
783            Ok(response_sent) => {
784                if response_sent {
785                    // TODO: log error
786                    self.inner.logging(&mut session, None, &mut ctx).await;
787                    self.cleanup_sub_req(&mut session);
788                    let persistent_settings = HttpPersistentSettings::for_session(&session);
789                    return session
790                        .downstream_session
791                        .finish()
792                        .await
793                        .ok()
794                        .flatten()
795                        .map(|s| ReusedHttpStream::new(s, Some(persistent_settings)));
796                }
797                /* else continue */
798            }
799            Err(e) => {
800                return self
801                    .handle_error(session, &mut ctx, e, "Fail to filter request:")
802                    .await;
803            }
804        }
805
806        if let Some((reuse, err)) = self.proxy_cache(&mut session, &mut ctx).await {
807            // cache hit
808            return self.finish(session, &mut ctx, reuse, err).await;
809        }
810        // either uncacheable, or cache miss
811
812        // there should not be a write lock in the sub req ctx after this point
813        self.cleanup_sub_req(&mut session);
814
815        // decide if the request is allowed to go to upstream
816        match self
817            .inner
818            .proxy_upstream_filter(&mut session, &mut ctx)
819            .await
820        {
821            Ok(proxy_to_upstream) => {
822                if !proxy_to_upstream {
823                    // The hook can choose to write its own response, but if it doesn't, we respond
824                    // with a generic 502
825                    if session.cache.enabled() {
826                        // drop the cache lock that this request may be holding onto
827                        session.cache.disable(NoCacheReason::DeclinedToUpstream);
828                    }
829                    if session.response_written().is_none() {
830                        match session.write_response_header_ref(&BAD_GATEWAY, true).await {
831                            Ok(()) => {}
832                            Err(e) => {
833                                return self
834                                    .handle_error(
835                                        session,
836                                        &mut ctx,
837                                        e,
838                                        "Error responding with Bad Gateway:",
839                                    )
840                                    .await;
841                            }
842                        }
843                    }
844
845                    return self.finish(session, &mut ctx, true, None).await;
846                }
847                /* else continue */
848            }
849            Err(e) => {
850                if session.cache.enabled() {
851                    session.cache.disable(NoCacheReason::InternalError);
852                }
853
854                return self
855                    .handle_error(
856                        session,
857                        &mut ctx,
858                        e,
859                        "Error deciding if we should proxy to upstream:",
860                    )
861                    .await;
862            }
863        }
864
865        let mut retries: usize = 0;
866
867        let mut server_reuse = false;
868        let mut proxy_error: Option<Box<Error>> = None;
869
870        while retries < self.max_retries {
871            retries += 1;
872
873            let (reuse, e) = self.proxy_to_upstream(&mut session, &mut ctx).await;
874            server_reuse = reuse;
875
876            match e {
877                Some(error) => {
878                    let retry = error.retry();
879                    proxy_error = Some(error);
880                    if !retry {
881                        break;
882                    }
883                    // only log error that will be retried here, the final error will be logged below
884                    warn!(
885                        "Fail to proxy: {}, tries: {}, retry: {}, {}",
886                        proxy_error.as_ref().unwrap(),
887                        retries,
888                        retry,
889                        self.inner.request_summary(&session, &ctx)
890                    );
891                }
892                None => {
893                    proxy_error = None;
894                    break;
895                }
896            };
897        }
898
899        // serve stale if error
900        // Check both error and cache before calling the function because await is not cheap
901        // allow unwrap until if let chains
902        #[allow(clippy::unnecessary_unwrap)]
903        let serve_stale_result = if proxy_error.is_some() && session.cache.can_serve_stale_error() {
904            self.handle_stale_if_error(&mut session, &mut ctx, proxy_error.as_ref().unwrap())
905                .await
906        } else {
907            None
908        };
909
910        let final_error = if let Some((reuse, stale_cache_error)) = serve_stale_result {
911            // don't reuse server conn if serve stale polluted it
912            server_reuse = server_reuse && reuse;
913            stale_cache_error
914        } else {
915            proxy_error
916        };
917
918        if let Some(e) = final_error.as_ref() {
919            // If we have errored and are still holding a cache lock, release it.
920            if session.cache.enabled() {
921                let reason = if *e.esource() == ErrorSource::Upstream {
922                    NoCacheReason::UpstreamError
923                } else {
924                    NoCacheReason::InternalError
925                };
926                session.cache.disable(reason);
927            }
928            let res = self.inner.fail_to_proxy(&mut session, e, &mut ctx).await;
929
930            // final error will have > 0 status unless downstream connection is dead
931            if !self.inner.suppress_error_log(&session, &ctx, e) {
932                error!(
933                    "Fail to proxy: {}, status: {}, tries: {}, retry: {}, {}",
934                    final_error.as_ref().unwrap(),
935                    res.error_code,
936                    retries,
937                    false, // we never retry here
938                    self.inner.request_summary(&session, &ctx),
939                );
940            }
941        }
942
943        // logging() will be called in finish()
944        self.finish(session, &mut ctx, server_reuse, final_error)
945            .await
946    }
947
948    async fn handle_error(
949        &self,
950        mut session: Session,
951        ctx: &mut <SV as ProxyHttp>::CTX,
952        e: Box<Error>,
953        context: &str,
954    ) -> Option<ReusedHttpStream>
955    where
956        SV: ProxyHttp + Send + Sync + 'static,
957        <SV as ProxyHttp>::CTX: Send + Sync,
958    {
959        let res = self.inner.fail_to_proxy(&mut session, &e, ctx).await;
960        if !self.inner.suppress_error_log(&session, ctx, &e) {
961            error!(
962                "{context} {}, status: {}, {}",
963                e,
964                res.error_code,
965                self.inner.request_summary(&session, ctx)
966            );
967        }
968        self.inner.logging(&mut session, Some(&e), ctx).await;
969        self.cleanup_sub_req(&mut session);
970
971        session.downstream_session.on_proxy_failure(e);
972
973        if res.can_reuse_downstream {
974            let persistent_settings = HttpPersistentSettings::for_session(&session);
975            session
976                .downstream_session
977                .finish()
978                .await
979                .ok()
980                .flatten()
981                .map(|s| ReusedHttpStream::new(s, Some(persistent_settings)))
982        } else {
983            None
984        }
985    }
986}
987
988/* Make process_subrequest() a trait to workaround https://github.com/rust-lang/rust/issues/78649
989   if process_subrequest() is implemented as a member of HttpProxy, rust complains
990
991error[E0391]: cycle detected when computing type of `proxy_cache::<impl at pingora-proxy/src/proxy_cache.rs:7:1: 7:23>::proxy_cache::{opaque#0}`
992   --> pingora-proxy/src/proxy_cache.rs:13:10
993    |
99413  |     ) -> Option<(bool, Option<Box<Error>>)>
995
996*/
997#[async_trait]
998pub trait Subrequest {
999    async fn process_subrequest(
1000        self: Arc<Self>,
1001        session: Box<HttpSession>,
1002        sub_req_ctx: Box<SubrequestCtx>,
1003    );
1004}
1005
1006#[async_trait]
1007impl<SV, C> Subrequest for HttpProxy<SV, C>
1008where
1009    SV: ProxyHttp + Send + Sync + 'static,
1010    <SV as ProxyHttp>::CTX: Send + Sync,
1011    C: custom::Connector,
1012{
1013    async fn process_subrequest(
1014        self: Arc<Self>,
1015        session: Box<HttpSession>,
1016        sub_req_ctx: Box<SubrequestCtx>,
1017    ) {
1018        debug!("starting subrequest");
1019
1020        let mut session = match self.handle_new_request(session).await {
1021            Some(downstream_session) => Session::new(
1022                downstream_session,
1023                &self.downstream_modules,
1024                self.shutdown_flag.clone(),
1025            ),
1026            None => return, // bad request
1027        };
1028
1029        // no real downstream to keepalive, but it doesn't matter what is set here because at the end
1030        // of this fn the dummy connection will be dropped
1031        session.set_keepalive(None);
1032
1033        session.subrequest_ctx.replace(sub_req_ctx);
1034        trace!("processing subrequest");
1035        let ctx = self.inner.new_ctx();
1036        self.process_request(session, ctx).await;
1037        trace!("subrequest done");
1038    }
1039}
1040
1041/// A handle to the underlying HTTP proxy app that allows spawning subrequests.
1042pub struct SubrequestSpawner {
1043    app: Arc<dyn Subrequest + Send + Sync>,
1044}
1045
1046/// A [`PreparedSubrequest`] that is ready to run.
1047pub struct PreparedSubrequest {
1048    app: Arc<dyn Subrequest + Send + Sync>,
1049    session: Box<HttpSession>,
1050    sub_req_ctx: Box<SubrequestCtx>,
1051}
1052
1053impl PreparedSubrequest {
1054    pub async fn run(self) {
1055        self.app
1056            .process_subrequest(self.session, self.sub_req_ctx)
1057            .await
1058    }
1059
1060    pub fn session(&self) -> &HttpSession {
1061        self.session.as_ref()
1062    }
1063
1064    pub fn session_mut(&mut self) -> &mut HttpSession {
1065        self.session.deref_mut()
1066    }
1067}
1068
1069impl SubrequestSpawner {
1070    /// Create a new [`SubrequestSpawner`].
1071    pub fn new(app: Arc<dyn Subrequest + Send + Sync>) -> SubrequestSpawner {
1072        SubrequestSpawner { app }
1073    }
1074
1075    /// Spawn a background subrequest and return a join handle.
1076    // TODO: allow configuring the subrequest session before use
1077    pub fn spawn_background_subrequest(
1078        &self,
1079        session: &HttpSession,
1080        ctx: SubrequestCtx,
1081    ) -> tokio::task::JoinHandle<()> {
1082        let new_app = self.app.clone(); // Clone the Arc
1083        let (mut session, handle) = subrequest::create_session(session);
1084        if ctx.body_mode() == BodyMode::NoBody {
1085            session
1086                .as_subrequest_mut()
1087                .expect("created subrequest session")
1088                .clear_request_body_headers();
1089        }
1090        let sub_req_ctx = Box::new(ctx);
1091        handle.drain_tasks();
1092        tokio::spawn(async move {
1093            new_app
1094                .process_subrequest(Box::new(session), sub_req_ctx)
1095                .await;
1096        })
1097    }
1098
1099    /// Create a subrequest that listens to `HttpTask`s sent from the returned `Sender`
1100    /// and sends `HttpTask`s to the returned `Receiver`.
1101    ///
1102    /// To run that subrequest, call `run()`.
1103    // TODO: allow configuring the subrequest session before use
1104    pub fn create_subrequest(
1105        &self,
1106        session: &HttpSession,
1107        ctx: SubrequestCtx,
1108    ) -> (PreparedSubrequest, SubrequestHandle) {
1109        let new_app = self.app.clone(); // Clone the Arc
1110        let (mut session, handle) = subrequest::create_session(session);
1111        if ctx.body_mode() == BodyMode::NoBody {
1112            session
1113                .as_subrequest_mut()
1114                .expect("created subrequest session")
1115                .clear_request_body_headers();
1116        }
1117        let sub_req_ctx = Box::new(ctx);
1118        (
1119            PreparedSubrequest {
1120                app: new_app,
1121                session: Box::new(session),
1122                sub_req_ctx,
1123            },
1124            handle,
1125        )
1126    }
1127}
1128
1129#[async_trait]
1130impl<SV, C> HttpServerApp for HttpProxy<SV, C>
1131where
1132    SV: ProxyHttp + Send + Sync + 'static,
1133    <SV as ProxyHttp>::CTX: Send + Sync,
1134    C: custom::Connector,
1135{
1136    async fn process_new_http(
1137        self: &Arc<Self>,
1138        session: HttpSession,
1139        shutdown: &ShutdownWatch,
1140    ) -> Option<ReusedHttpStream> {
1141        let session = Box::new(session);
1142
1143        // TODO: keepalive pool, use stack
1144        let mut session = match self.handle_new_request(session).await {
1145            Some(downstream_session) => Session::new(
1146                downstream_session,
1147                &self.downstream_modules,
1148                self.shutdown_flag.clone(),
1149            ),
1150            None => return None, // bad request
1151        };
1152
1153        if *shutdown.borrow() {
1154            // stop downstream from reusing if this service is shutting down soon
1155            session.set_keepalive(None);
1156        }
1157
1158        let ctx = self.inner.new_ctx();
1159        self.process_request(session, ctx).await
1160    }
1161
1162    async fn http_cleanup(&self) {
1163        self.shutdown_flag.store(true, Ordering::Release);
1164        // Notify all keepalived requests blocking on read_request() to abort
1165        self.shutdown.notify_waiters();
1166    }
1167
1168    fn server_options(&self) -> Option<&HttpServerOptions> {
1169        self.server_options.as_ref()
1170    }
1171
1172    fn h2_options(&self) -> Option<H2Options> {
1173        self.h2_options.clone()
1174    }
1175    async fn process_custom_session(
1176        self: Arc<Self>,
1177        stream: Stream,
1178        shutdown: &ShutdownWatch,
1179    ) -> Option<Stream> {
1180        let app = self.clone();
1181
1182        let Some(process_custom_session) = app.process_custom_session.as_ref() else {
1183            warn!("custom was called on an empty on_custom");
1184            return None;
1185        };
1186
1187        process_custom_session(self.clone(), stream, shutdown).await
1188    }
1189
1190    // TODO implement h2_options
1191}
1192
1193use pingora_core::services::listening::Service;
1194
1195/// Create an [`HttpProxy`] without wrapping it in a [`Service`].
1196///
1197/// This is useful when you need to integrate `HttpProxy` into a custom accept loop,
1198/// for example when implementing SNI-based routing that decides between TLS passthrough
1199/// and TLS termination on a single port.
1200///
1201/// The returned `HttpProxy` is fully initialized and ready to process requests via
1202/// [`HttpServerApp::process_new_http()`].
1203///
1204/// # Example
1205///
1206/// ```ignore
1207/// use pingora_proxy::http_proxy;
1208/// use std::sync::Arc;
1209///
1210/// // Create the proxy
1211/// let proxy = Arc::new(http_proxy(&server_conf, my_proxy_app));
1212///
1213/// // In your custom accept loop:
1214/// loop {
1215///     let (stream, addr) = listener.accept().await?;
1216///
1217///     // Peek SNI, decide routing...
1218///     if should_terminate_tls {
1219///         let tls_stream = my_acceptor.accept(stream).await?;
1220///         let session = HttpSession::new_http1(Box::new(tls_stream));
1221///         proxy.process_new_http(session, &shutdown).await;
1222///     }
1223/// }
1224/// ```
1225pub fn http_proxy<SV>(conf: &Arc<ServerConf>, inner: SV) -> HttpProxy<SV>
1226where
1227    SV: ProxyHttp,
1228{
1229    let mut proxy = HttpProxy::new(inner, conf.clone());
1230    proxy.handle_init_modules();
1231    proxy
1232}
1233
1234/// Create a [Service] from the user implemented [ProxyHttp].
1235///
1236/// The returned [Service] can be hosted by a [pingora_core::server::Server] directly.
1237pub fn http_proxy_service<SV>(conf: &Arc<ServerConf>, inner: SV) -> Service<HttpProxy<SV, ()>>
1238where
1239    SV: ProxyHttp,
1240{
1241    http_proxy_service_with_name(conf, inner, "Pingora HTTP Proxy Service")
1242}
1243
1244/// Create a [Service] from the user implemented [ProxyHttp].
1245///
1246/// The returned [Service] can be hosted by a [pingora_core::server::Server] directly.
1247pub fn http_proxy_service_with_name<SV>(
1248    conf: &Arc<ServerConf>,
1249    inner: SV,
1250    name: &str,
1251) -> Service<HttpProxy<SV, ()>>
1252where
1253    SV: ProxyHttp,
1254{
1255    let mut proxy = HttpProxy::new(inner, conf.clone());
1256    proxy.handle_init_modules();
1257    Service::new(name.to_string(), proxy)
1258}
1259
1260/// Create a [Service] from the user implemented [ProxyHttp].
1261///
1262/// The returned [Service] can be hosted by a [pingora_core::server::Server] directly.
1263pub fn http_proxy_service_with_name_custom<SV, C>(
1264    conf: &Arc<ServerConf>,
1265    inner: SV,
1266    name: &str,
1267    connector: C,
1268    on_custom: ProcessCustomSession<SV, C>,
1269) -> Service<HttpProxy<SV, C>>
1270where
1271    SV: ProxyHttp + Send + Sync + 'static,
1272    SV::CTX: Send + Sync + 'static,
1273    C: custom::Connector,
1274{
1275    let mut proxy = HttpProxy::new_custom(inner, conf.clone(), connector, Some(on_custom), None);
1276    proxy.handle_init_modules();
1277
1278    Service::new(name.to_string(), proxy)
1279}
1280
1281/// A builder for a [Service] that can be used to create a [HttpProxy] instance
1282///
1283/// The [ProxyServiceBuilder] can be used to construct a [HttpProxy] service with a custom name,
1284/// connector, and custom session handler.
1285///
1286pub struct ProxyServiceBuilder<SV, C>
1287where
1288    SV: ProxyHttp + Send + Sync + 'static,
1289    SV::CTX: Send + Sync + 'static,
1290    C: custom::Connector,
1291{
1292    conf: Arc<ServerConf>,
1293    inner: SV,
1294    name: String,
1295    connector: C,
1296    custom: Option<ProcessCustomSession<SV, C>>,
1297    server_options: Option<HttpServerOptions>,
1298}
1299
1300impl<SV> ProxyServiceBuilder<SV, ()>
1301where
1302    SV: ProxyHttp + Send + Sync + 'static,
1303    SV::CTX: Send + Sync + 'static,
1304{
1305    /// Create a new [ProxyServiceBuilder] with the given [ServerConf] and [ProxyHttp]
1306    /// implementation.
1307    ///
1308    /// The returned builder can be used to construct a [HttpProxy] service with a custom name,
1309    /// connector, and custom session handler.
1310    ///
1311    /// The [ProxyServiceBuilder] will default to using the [ProxyHttp] implementation and no custom
1312    /// session handler.
1313    ///
1314    pub fn new(conf: &Arc<ServerConf>, inner: SV) -> Self {
1315        ProxyServiceBuilder {
1316            conf: conf.clone(),
1317            inner,
1318            name: "Pingora HTTP Proxy Service".into(),
1319            connector: (),
1320            custom: None,
1321            server_options: None,
1322        }
1323    }
1324}
1325
1326impl<SV, C> ProxyServiceBuilder<SV, C>
1327where
1328    SV: ProxyHttp + Send + Sync + 'static,
1329    SV::CTX: Send + Sync + 'static,
1330    C: custom::Connector,
1331{
1332    /// Sets the name of the [HttpProxy] service.
1333    pub fn name(mut self, name: impl AsRef<str>) -> Self {
1334        self.name = name.as_ref().to_owned();
1335        self
1336    }
1337
1338    /// Set a custom connector and custom session handler for the [ProxyServiceBuilder].
1339    ///
1340    /// The custom connector is used to establish a connection to the upstream server.
1341    ///
1342    /// The custom session handler is used to handle custom protocol specific logic
1343    /// between the proxy and the upstream server.
1344    ///
1345    /// Returns a new [ProxyServiceBuilder] with the custom connector and session handler.
1346    pub fn custom<C2: custom::Connector>(
1347        self,
1348        connector: C2,
1349        on_custom: ProcessCustomSession<SV, C2>,
1350    ) -> ProxyServiceBuilder<SV, C2> {
1351        let Self {
1352            conf,
1353            inner,
1354            name,
1355            server_options,
1356            ..
1357        } = self;
1358        ProxyServiceBuilder {
1359            conf,
1360            inner,
1361            name,
1362            connector,
1363            custom: Some(on_custom),
1364            server_options,
1365        }
1366    }
1367
1368    /// Set the server options for the [ProxyServiceBuilder].
1369    ///
1370    /// Returns a new [ProxyServiceBuilder] with the server options set.
1371    pub fn server_options(mut self, options: HttpServerOptions) -> Self {
1372        self.server_options = Some(options);
1373        self
1374    }
1375
1376    /// Builds a new [Service] from the [ProxyServiceBuilder].
1377    ///
1378    /// This function takes ownership of the [ProxyServiceBuilder] and returns a new [Service] with
1379    /// a fully initialized [HttpProxy].
1380    ///
1381    /// The returned [Service] is ready to be used by a [pingora_core::server::Server].
1382    pub fn build(self) -> Service<HttpProxy<SV, C>> {
1383        let Self {
1384            conf,
1385            inner,
1386            name,
1387            connector,
1388            custom,
1389            server_options,
1390        } = self;
1391
1392        let mut proxy = HttpProxy::new_custom(inner, conf, connector, custom, server_options);
1393
1394        proxy.handle_init_modules();
1395        Service::new(name, proxy)
1396    }
1397}