pingora_proxy/
lib.rs

1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! # pingora-proxy
16//!
17//! Programmable HTTP proxy built on top of [pingora_core].
18//!
19//! # Features
20//! - HTTP/1.x and HTTP/2 for both downstream and upstream
21//! - Connection pooling
22//! - TLSv1.3, mutual TLS, customizable CA
23//! - Request/Response scanning, modification or rejection
24//! - Dynamic upstream selection
25//! - Configurable retry and failover
26//! - Fully programmable and customizable at any stage of a HTTP request
27//!
28//! # How to use
29//!
30//! Users of this crate defines their proxy by implementing [ProxyHttp] trait, which contains the
31//! callbacks to be invoked at each stage of a HTTP request.
32//!
33//! Then the service can be passed into [`http_proxy_service()`] for a [pingora_core::server::Server] to
34//! run it.
35//!
36//! See `examples/load_balancer.rs` for a detailed example.
37
38use async_trait::async_trait;
39use bytes::Bytes;
40use futures::future::FutureExt;
41use http::{header, version::Version};
42use log::{debug, error, trace, warn};
43use once_cell::sync::Lazy;
44use pingora_http::{RequestHeader, ResponseHeader};
45use std::fmt::Debug;
46use std::str;
47use std::sync::Arc;
48use tokio::sync::{mpsc, Notify};
49use tokio::time;
50
51use pingora_cache::NoCacheReason;
52use pingora_core::apps::{
53    HttpPersistentSettings, HttpServerApp, HttpServerOptions, ReusedHttpStream,
54};
55use pingora_core::connectors::{http::Connector, ConnectorOptions};
56use pingora_core::modules::http::compression::ResponseCompressionBuilder;
57use pingora_core::modules::http::{HttpModuleCtx, HttpModules};
58use pingora_core::protocols::http::client::HttpSession as ClientSession;
59use pingora_core::protocols::http::v1::client::HttpSession as HttpSessionV1;
60use pingora_core::protocols::http::v2::server::H2Options;
61use pingora_core::protocols::http::HttpTask;
62use pingora_core::protocols::http::ServerSession as HttpSession;
63use pingora_core::protocols::http::SERVER_NAME;
64use pingora_core::protocols::Stream;
65use pingora_core::protocols::{Digest, UniqueID};
66use pingora_core::server::configuration::ServerConf;
67use pingora_core::server::ShutdownWatch;
68use pingora_core::upstreams::peer::{HttpPeer, Peer};
69use pingora_error::{Error, ErrorSource, ErrorType::*, OrErr, Result};
70
71const TASK_BUFFER_SIZE: usize = 4;
72
73mod proxy_cache;
74mod proxy_common;
75mod proxy_h1;
76mod proxy_h2;
77mod proxy_purge;
78mod proxy_trait;
79mod subrequest;
80
81use subrequest::Ctx as SubReqCtx;
82
83pub use proxy_cache::range_filter::{range_header_filter, RangeType};
84pub use proxy_purge::PurgeStatus;
85pub use proxy_trait::{FailToProxy, ProxyHttp};
86
87pub mod prelude {
88    pub use crate::{http_proxy_service, ProxyHttp, Session};
89}
90
91/// The concrete type that holds the user defined HTTP proxy.
92///
93/// Users don't need to interact with this object directly.
94pub struct HttpProxy<SV> {
95    inner: SV, // TODO: name it better than inner
96    client_upstream: Connector,
97    shutdown: Notify,
98    pub server_options: Option<HttpServerOptions>,
99    pub h2_options: Option<H2Options>,
100    pub downstream_modules: HttpModules,
101    max_retries: usize,
102}
103
104impl<SV> HttpProxy<SV> {
105    fn new(inner: SV, conf: Arc<ServerConf>) -> Self {
106        HttpProxy {
107            inner,
108            client_upstream: Connector::new(Some(ConnectorOptions::from_server_conf(&conf))),
109            shutdown: Notify::new(),
110            server_options: None,
111            h2_options: None,
112            downstream_modules: HttpModules::new(),
113            max_retries: conf.max_retries,
114        }
115    }
116
117    fn handle_init_modules(&mut self)
118    where
119        SV: ProxyHttp,
120    {
121        self.inner
122            .init_downstream_modules(&mut self.downstream_modules);
123    }
124
125    async fn handle_new_request(
126        &self,
127        mut downstream_session: Box<HttpSession>,
128    ) -> Option<Box<HttpSession>>
129    where
130        SV: ProxyHttp + Send + Sync,
131        SV::CTX: Send + Sync,
132    {
133        // phase 1 read request header
134
135        let res = tokio::select! {
136            biased; // biased select is cheaper, and we don't want to drop already buffered requests
137            res = downstream_session.read_request() => { res }
138            _ = self.shutdown.notified() => {
139                // service shutting down, dropping the connection to stop more req from coming in
140                return None;
141            }
142        };
143        match res {
144            Ok(true) => {
145                // TODO: check n==0
146                debug!("Successfully get a new request");
147            }
148            Ok(false) => {
149                return None; // TODO: close connection?
150            }
151            Err(mut e) => {
152                e.as_down();
153                error!("Fail to proxy: {e}");
154                if matches!(e.etype, InvalidHTTPHeader) {
155                    downstream_session
156                        .respond_error(400)
157                        .await
158                        .unwrap_or_else(|e| {
159                            error!("failed to send error response to downstream: {e}");
160                        });
161                } // otherwise the connection must be broken, no need to send anything
162                downstream_session.shutdown().await;
163                return None;
164            }
165        }
166        trace!(
167            "Request header: {:?}",
168            downstream_session.req_header().as_ref()
169        );
170        Some(downstream_session)
171    }
172
173    // return bool: server_session can be reused, and error if any
174    async fn proxy_to_upstream(
175        &self,
176        session: &mut Session,
177        ctx: &mut SV::CTX,
178    ) -> (bool, Option<Box<Error>>)
179    where
180        SV: ProxyHttp + Send + Sync,
181        SV::CTX: Send + Sync,
182    {
183        let peer = match self.inner.upstream_peer(session, ctx).await {
184            Ok(p) => p,
185            Err(e) => return (false, Some(e)),
186        };
187
188        let client_session = self.client_upstream.get_http_session(&*peer).await;
189        match client_session {
190            Ok((client_session, client_reused)) => {
191                let (server_reused, error) = match client_session {
192                    ClientSession::H1(mut h1) => {
193                        let (server_reused, client_reuse, error) = self
194                            .proxy_to_h1_upstream(session, &mut h1, client_reused, &peer, ctx)
195                            .await;
196                        if client_reuse {
197                            let session = ClientSession::H1(h1);
198                            self.client_upstream
199                                .release_http_session(session, &*peer, peer.idle_timeout())
200                                .await;
201                        }
202                        (server_reused, error)
203                    }
204                    ClientSession::H2(mut h2) => {
205                        let (server_reused, mut error) = self
206                            .proxy_to_h2_upstream(session, &mut h2, client_reused, &peer, ctx)
207                            .await;
208                        let session = ClientSession::H2(h2);
209                        self.client_upstream
210                            .release_http_session(session, &*peer, peer.idle_timeout())
211                            .await;
212
213                        if let Some(e) = error.as_mut() {
214                            // try to downgrade if A. origin says so or B. origin sends an invalid
215                            // response, which usually means origin h2 is not production ready
216                            if matches!(e.etype, H2Downgrade | InvalidH2) {
217                                if peer
218                                    .get_alpn()
219                                    .map_or(true, |alpn| alpn.get_min_http_version() == 1)
220                                {
221                                    // Add the peer to prefer h1 so that all following requests
222                                    // will use h1
223                                    self.client_upstream.prefer_h1(&*peer);
224                                } else {
225                                    // the peer doesn't allow downgrading to h1 (e.g. gRPC)
226                                    e.retry = false.into();
227                                }
228                            }
229                        }
230
231                        (server_reused, error)
232                    }
233                };
234                (
235                    server_reused,
236                    error.map(|e| {
237                        self.inner
238                            .error_while_proxy(&peer, session, e, ctx, client_reused)
239                    }),
240                )
241            }
242            Err(mut e) => {
243                e.as_up();
244                let new_err = self.inner.fail_to_connect(session, &peer, ctx, e);
245                (false, Some(new_err.into_up()))
246            }
247        }
248    }
249
250    fn upstream_filter(
251        &self,
252        session: &mut Session,
253        task: &mut HttpTask,
254        ctx: &mut SV::CTX,
255    ) -> Result<()>
256    where
257        SV: ProxyHttp,
258    {
259        match task {
260            HttpTask::Header(header, _eos) => {
261                self.inner.upstream_response_filter(session, header, ctx)?
262            }
263            HttpTask::Body(data, eos) => self
264                .inner
265                .upstream_response_body_filter(session, data, *eos, ctx)?,
266            HttpTask::Trailer(Some(trailers)) => self
267                .inner
268                .upstream_response_trailer_filter(session, trailers, ctx)?,
269            _ => {
270                // task does not support a filter
271            }
272        }
273        Ok(())
274    }
275
276    async fn finish(
277        &self,
278        mut session: Session,
279        ctx: &mut SV::CTX,
280        reuse: bool,
281        error: Option<&Error>,
282    ) -> Option<ReusedHttpStream>
283    where
284        SV: ProxyHttp + Send + Sync,
285        SV::CTX: Send + Sync,
286    {
287        self.inner.logging(&mut session, error, ctx).await;
288
289        if reuse {
290            // TODO: log error
291            let persistent_settings = HttpPersistentSettings::for_session(&session);
292            session
293                .downstream_session
294                .finish()
295                .await
296                .ok()
297                .flatten()
298                .map(|s| ReusedHttpStream::new(s, Some(persistent_settings)))
299        } else {
300            None
301        }
302    }
303
304    fn cleanup_sub_req(&self, session: &mut Session) {
305        if let Some(ctx) = session.subrequest_ctx.as_mut() {
306            ctx.release_write_lock();
307        }
308    }
309}
310
311use pingora_cache::HttpCache;
312use pingora_core::protocols::http::compression::ResponseCompressionCtx;
313
314/// The established HTTP session
315///
316/// This object is what users interact with in order to access the request itself or change the proxy
317/// behavior.
318pub struct Session {
319    /// the HTTP session to downstream (the client)
320    pub downstream_session: Box<HttpSession>,
321    /// The interface to control HTTP caching
322    pub cache: HttpCache,
323    /// (de)compress responses coming into the proxy (from upstream)
324    pub upstream_compression: ResponseCompressionCtx,
325    /// ignore downstream range (skip downstream range filters)
326    pub ignore_downstream_range: bool,
327    /// Were the upstream request headers modified?
328    pub upstream_headers_mutated_for_cache: bool,
329    // the context from parent request
330    pub subrequest_ctx: Option<Box<SubReqCtx>>,
331    // Downstream filter modules
332    pub downstream_modules_ctx: HttpModuleCtx,
333}
334
335impl Session {
336    fn new(
337        downstream_session: impl Into<Box<HttpSession>>,
338        downstream_modules: &HttpModules,
339    ) -> Self {
340        Session {
341            downstream_session: downstream_session.into(),
342            cache: HttpCache::new(),
343            // disable both upstream and downstream compression
344            upstream_compression: ResponseCompressionCtx::new(0, false, false),
345            ignore_downstream_range: false,
346            upstream_headers_mutated_for_cache: false,
347            subrequest_ctx: None,
348            downstream_modules_ctx: downstream_modules.build_ctx(),
349        }
350    }
351
352    /// Create a new [Session] from the given [Stream]
353    ///
354    /// This function is mostly used for testing and mocking.
355    pub fn new_h1(stream: Stream) -> Self {
356        let modules = HttpModules::new();
357        Self::new(Box::new(HttpSession::new_http1(stream)), &modules)
358    }
359
360    /// Create a new [Session] from the given [Stream] with modules
361    ///
362    /// This function is mostly used for testing and mocking.
363    pub fn new_h1_with_modules(stream: Stream, downstream_modules: &HttpModules) -> Self {
364        Self::new(Box::new(HttpSession::new_http1(stream)), downstream_modules)
365    }
366
367    pub fn as_downstream_mut(&mut self) -> &mut HttpSession {
368        &mut self.downstream_session
369    }
370
371    pub fn as_downstream(&self) -> &HttpSession {
372        &self.downstream_session
373    }
374
375    /// Write HTTP response with the given error code to the downstream.
376    pub async fn respond_error(&mut self, error: u16) -> Result<()> {
377        self.as_downstream_mut().respond_error(error).await
378    }
379
380    /// Write HTTP response with the given error code to the downstream with a body.
381    pub async fn respond_error_with_body(&mut self, error: u16, body: Bytes) -> Result<()> {
382        self.as_downstream_mut()
383            .respond_error_with_body(error, body)
384            .await
385    }
386
387    /// Write the given HTTP response header to the downstream
388    ///
389    /// Different from directly calling [HttpSession::write_response_header], this function also
390    /// invokes the filter modules.
391    pub async fn write_response_header(
392        &mut self,
393        mut resp: Box<ResponseHeader>,
394        end_of_stream: bool,
395    ) -> Result<()> {
396        self.downstream_modules_ctx
397            .response_header_filter(&mut resp, end_of_stream)
398            .await?;
399        self.downstream_session.write_response_header(resp).await
400    }
401
402    /// Write the given HTTP response body chunk to the downstream
403    ///
404    /// Different from directly calling [HttpSession::write_response_body], this function also
405    /// invokes the filter modules.
406    pub async fn write_response_body(
407        &mut self,
408        mut body: Option<Bytes>,
409        end_of_stream: bool,
410    ) -> Result<()> {
411        self.downstream_modules_ctx
412            .response_body_filter(&mut body, end_of_stream)?;
413
414        if body.is_none() && !end_of_stream {
415            return Ok(());
416        }
417
418        let data = body.unwrap_or_default();
419        self.downstream_session
420            .write_response_body(data, end_of_stream)
421            .await
422    }
423
424    pub async fn write_response_tasks(&mut self, mut tasks: Vec<HttpTask>) -> Result<bool> {
425        for task in tasks.iter_mut() {
426            match task {
427                HttpTask::Header(resp, end) => {
428                    self.downstream_modules_ctx
429                        .response_header_filter(resp, *end)
430                        .await?;
431                }
432                HttpTask::Body(data, end) => {
433                    self.downstream_modules_ctx
434                        .response_body_filter(data, *end)?;
435                }
436                HttpTask::Trailer(trailers) => {
437                    if let Some(buf) = self
438                        .downstream_modules_ctx
439                        .response_trailer_filter(trailers)?
440                    {
441                        // Write the trailers into the body if the filter
442                        // returns a buffer.
443                        //
444                        // Note, this will not work if end of stream has already
445                        // been seen or we've written content-length bytes.
446                        *task = HttpTask::Body(Some(buf), true);
447                    }
448                }
449                HttpTask::Done => {
450                    // `Done` can be sent in certain response paths to mark end
451                    // of response if not already done via trailers or body with
452                    // end flag set.
453                    // If the filter returns body bytes on Done,
454                    // write them into the response.
455                    //
456                    // Note, this will not work if end of stream has already
457                    // been seen or we've written content-length bytes.
458                    if let Some(buf) = self.downstream_modules_ctx.response_done_filter()? {
459                        *task = HttpTask::Body(Some(buf), true);
460                    }
461                }
462                _ => { /* Failed */ }
463            }
464        }
465        self.downstream_session.response_duplex_vec(tasks).await
466    }
467
468    /// Mark the upstream headers as modified by caching. This should lead to range filters being
469    /// skipped when responding to the downstream.
470    pub fn mark_upstream_headers_mutated_for_cache(&mut self) {
471        self.upstream_headers_mutated_for_cache = true;
472    }
473
474    /// Check whether the upstream headers were marked as mutated during the request.
475    pub fn upstream_headers_mutated_for_cache(&self) -> bool {
476        self.upstream_headers_mutated_for_cache
477    }
478}
479
480impl AsRef<HttpSession> for Session {
481    fn as_ref(&self) -> &HttpSession {
482        &self.downstream_session
483    }
484}
485
486impl AsMut<HttpSession> for Session {
487    fn as_mut(&mut self) -> &mut HttpSession {
488        &mut self.downstream_session
489    }
490}
491
492use std::ops::{Deref, DerefMut};
493
494impl Deref for Session {
495    type Target = HttpSession;
496
497    fn deref(&self) -> &Self::Target {
498        &self.downstream_session
499    }
500}
501
502impl DerefMut for Session {
503    fn deref_mut(&mut self) -> &mut Self::Target {
504        &mut self.downstream_session
505    }
506}
507
508// generic HTTP 502 response sent when proxy_upstream_filter refuses to connect to upstream
509static BAD_GATEWAY: Lazy<ResponseHeader> = Lazy::new(|| {
510    let mut resp = ResponseHeader::build(http::StatusCode::BAD_GATEWAY, Some(3)).unwrap();
511    resp.insert_header(header::SERVER, &SERVER_NAME[..])
512        .unwrap();
513    resp.insert_header(header::CONTENT_LENGTH, 0).unwrap();
514    resp.insert_header(header::CACHE_CONTROL, "private, no-store")
515        .unwrap();
516
517    resp
518});
519
520impl<SV> HttpProxy<SV> {
521    async fn process_request(
522        self: &Arc<Self>,
523        mut session: Session,
524        mut ctx: <SV as ProxyHttp>::CTX,
525    ) -> Option<ReusedHttpStream>
526    where
527        SV: ProxyHttp + Send + Sync + 'static,
528        <SV as ProxyHttp>::CTX: Send + Sync,
529    {
530        if let Err(e) = self
531            .inner
532            .early_request_filter(&mut session, &mut ctx)
533            .await
534        {
535            return self
536                .handle_error(session, &mut ctx, e, "Fail to early filter request:")
537                .await;
538        }
539
540        let req = session.downstream_session.req_header_mut();
541
542        // Built-in downstream request filters go first
543        if let Err(e) = session
544            .downstream_modules_ctx
545            .request_header_filter(req)
546            .await
547        {
548            return self
549                .handle_error(
550                    session,
551                    &mut ctx,
552                    e,
553                    "Failed in downstream modules request filter:",
554                )
555                .await;
556        }
557
558        match self.inner.request_filter(&mut session, &mut ctx).await {
559            Ok(response_sent) => {
560                if response_sent {
561                    // TODO: log error
562                    self.inner.logging(&mut session, None, &mut ctx).await;
563                    self.cleanup_sub_req(&mut session);
564                    let persistent_settings = HttpPersistentSettings::for_session(&session);
565                    return session
566                        .downstream_session
567                        .finish()
568                        .await
569                        .ok()
570                        .flatten()
571                        .map(|s| ReusedHttpStream::new(s, Some(persistent_settings)));
572                }
573                /* else continue */
574            }
575            Err(e) => {
576                return self
577                    .handle_error(session, &mut ctx, e, "Fail to filter request:")
578                    .await;
579            }
580        }
581
582        if let Some((reuse, err)) = self.proxy_cache(&mut session, &mut ctx).await {
583            // cache hit
584            return self.finish(session, &mut ctx, reuse, err.as_deref()).await;
585        }
586        // either uncacheable, or cache miss
587
588        // there should not be a write lock in the sub req ctx after this point
589        self.cleanup_sub_req(&mut session);
590
591        // decide if the request is allowed to go to upstream
592        match self
593            .inner
594            .proxy_upstream_filter(&mut session, &mut ctx)
595            .await
596        {
597            Ok(proxy_to_upstream) => {
598                if !proxy_to_upstream {
599                    // The hook can choose to write its own response, but if it doesn't, we respond
600                    // with a generic 502
601                    if session.cache.enabled() {
602                        // drop the cache lock that this request may be holding onto
603                        session.cache.disable(NoCacheReason::DeclinedToUpstream);
604                    }
605                    if session.response_written().is_none() {
606                        match session.write_response_header_ref(&BAD_GATEWAY).await {
607                            Ok(()) => {}
608                            Err(e) => {
609                                return self
610                                    .handle_error(
611                                        session,
612                                        &mut ctx,
613                                        e,
614                                        "Error responding with Bad Gateway:",
615                                    )
616                                    .await;
617                            }
618                        }
619                    }
620
621                    return self.finish(session, &mut ctx, true, None).await;
622                }
623                /* else continue */
624            }
625            Err(e) => {
626                if session.cache.enabled() {
627                    session.cache.disable(NoCacheReason::InternalError);
628                }
629
630                return self
631                    .handle_error(
632                        session,
633                        &mut ctx,
634                        e,
635                        "Error deciding if we should proxy to upstream:",
636                    )
637                    .await;
638            }
639        }
640
641        let mut retries: usize = 0;
642
643        let mut server_reuse = false;
644        let mut proxy_error: Option<Box<Error>> = None;
645
646        while retries < self.max_retries {
647            retries += 1;
648
649            let (reuse, e) = self.proxy_to_upstream(&mut session, &mut ctx).await;
650            server_reuse = reuse;
651
652            match e {
653                Some(error) => {
654                    let retry = error.retry();
655                    proxy_error = Some(error);
656                    if !retry {
657                        break;
658                    }
659                    // only log error that will be retried here, the final error will be logged below
660                    warn!(
661                        "Fail to proxy: {}, tries: {}, retry: {}, {}",
662                        proxy_error.as_ref().unwrap(),
663                        retries,
664                        retry,
665                        self.inner.request_summary(&session, &ctx)
666                    );
667                }
668                None => {
669                    proxy_error = None;
670                    break;
671                }
672            };
673        }
674
675        // serve stale if error
676        // Check both error and cache before calling the function because await is not cheap
677        let serve_stale_result = if proxy_error.is_some() && session.cache.can_serve_stale_error() {
678            self.handle_stale_if_error(&mut session, &mut ctx, proxy_error.as_ref().unwrap())
679                .await
680        } else {
681            None
682        };
683
684        let final_error = if let Some((reuse, stale_cache_error)) = serve_stale_result {
685            // don't reuse server conn if serve stale polluted it
686            server_reuse = server_reuse && reuse;
687            stale_cache_error
688        } else {
689            proxy_error
690        };
691
692        if let Some(e) = final_error.as_ref() {
693            // If we have errored and are still holding a cache lock, release it.
694            if session.cache.enabled() {
695                let reason = if *e.esource() == ErrorSource::Upstream {
696                    NoCacheReason::UpstreamError
697                } else {
698                    NoCacheReason::InternalError
699                };
700                session.cache.disable(reason);
701            }
702            let res = self.inner.fail_to_proxy(&mut session, e, &mut ctx).await;
703
704            // final error will have > 0 status unless downstream connection is dead
705            if !self.inner.suppress_error_log(&session, &ctx, e) {
706                error!(
707                    "Fail to proxy: {}, status: {}, tries: {}, retry: {}, {}",
708                    final_error.as_ref().unwrap(),
709                    res.error_code,
710                    retries,
711                    false, // we never retry here
712                    self.inner.request_summary(&session, &ctx)
713                );
714            }
715        }
716
717        // logging() will be called in finish()
718        self.finish(session, &mut ctx, server_reuse, final_error.as_deref())
719            .await
720    }
721
722    async fn handle_error(
723        &self,
724        mut session: Session,
725        ctx: &mut <SV as ProxyHttp>::CTX,
726        e: Box<Error>,
727        context: &str,
728    ) -> Option<ReusedHttpStream>
729    where
730        SV: ProxyHttp + Send + Sync + 'static,
731        <SV as ProxyHttp>::CTX: Send + Sync,
732    {
733        let res = self.inner.fail_to_proxy(&mut session, &e, ctx).await;
734        if !self.inner.suppress_error_log(&session, ctx, &e) {
735            error!(
736                "{context} {}, status: {}, {}",
737                e,
738                res.error_code,
739                self.inner.request_summary(&session, ctx)
740            );
741        }
742        self.inner.logging(&mut session, Some(&e), ctx).await;
743        self.cleanup_sub_req(&mut session);
744
745        if res.can_reuse_downstream {
746            let persistent_settings = HttpPersistentSettings::for_session(&session);
747            session
748                .downstream_session
749                .finish()
750                .await
751                .ok()
752                .flatten()
753                .map(|s| ReusedHttpStream::new(s, Some(persistent_settings)))
754        } else {
755            None
756        }
757    }
758}
759
760/* Make process_subrequest() a trait to workaround https://github.com/rust-lang/rust/issues/78649
761   if process_subrequest() is implemented as a member of HttpProxy, rust complains
762
763error[E0391]: cycle detected when computing type of `proxy_cache::<impl at pingora-proxy/src/proxy_cache.rs:7:1: 7:23>::proxy_cache::{opaque#0}`
764   --> pingora-proxy/src/proxy_cache.rs:13:10
765    |
76613  |     ) -> Option<(bool, Option<Box<Error>>)>
767
768*/
769#[async_trait]
770trait Subrequest {
771    async fn process_subrequest(
772        self: &Arc<Self>,
773        session: Box<HttpSession>,
774        sub_req_ctx: Box<SubReqCtx>,
775    );
776}
777
778#[async_trait]
779impl<SV> Subrequest for HttpProxy<SV>
780where
781    SV: ProxyHttp + Send + Sync + 'static,
782    <SV as ProxyHttp>::CTX: Send + Sync,
783{
784    async fn process_subrequest(
785        self: &Arc<Self>,
786        session: Box<HttpSession>,
787        sub_req_ctx: Box<SubReqCtx>,
788    ) {
789        debug!("starting subrequest");
790        let mut session = match self.handle_new_request(session).await {
791            Some(downstream_session) => Session::new(downstream_session, &self.downstream_modules),
792            None => return, // bad request
793        };
794
795        // no real downstream to keepalive, but it doesn't matter what is set here because at the end
796        // of this fn the dummy connection will be dropped
797        session.set_keepalive(None);
798
799        session.subrequest_ctx.replace(sub_req_ctx);
800        trace!("processing subrequest");
801        let ctx = self.inner.new_ctx();
802        self.process_request(session, ctx).await;
803        trace!("subrequest done");
804    }
805}
806
807#[async_trait]
808impl<SV> HttpServerApp for HttpProxy<SV>
809where
810    SV: ProxyHttp + Send + Sync + 'static,
811    <SV as ProxyHttp>::CTX: Send + Sync,
812{
813    async fn process_new_http(
814        self: &Arc<Self>,
815        session: HttpSession,
816        _shutdown: &ShutdownWatch,
817    ) -> Option<ReusedHttpStream> {
818        let session = Box::new(session);
819
820        // TODO: keepalive pool, use stack
821        let session = match self.handle_new_request(session).await {
822            Some(downstream_session) => Session::new(downstream_session, &self.downstream_modules),
823            None => return None, // bad request
824        };
825
826        let ctx = self.inner.new_ctx();
827        self.process_request(session, ctx).await
828    }
829
830    async fn http_cleanup(&self) {
831        // Notify all keepalived requests blocking on read_request() to abort
832        self.shutdown.notify_waiters();
833
834        // TODO: impl shutting down flag so that we don't need to read stack.is_shutting_down()
835    }
836
837    fn server_options(&self) -> Option<&HttpServerOptions> {
838        self.server_options.as_ref()
839    }
840
841    fn h2_options(&self) -> Option<H2Options> {
842        self.h2_options.clone()
843    }
844}
845
846use pingora_core::services::listening::Service;
847
848/// Create a [Service] from the user implemented [ProxyHttp].
849///
850/// The returned [Service] can be hosted by a [pingora_core::server::Server] directly.
851pub fn http_proxy_service<SV>(conf: &Arc<ServerConf>, inner: SV) -> Service<HttpProxy<SV>>
852where
853    SV: ProxyHttp,
854{
855    http_proxy_service_with_name(conf, inner, "Pingora HTTP Proxy Service")
856}
857
858/// Create a [Service] from the user implemented [ProxyHttp].
859///
860/// The returned [Service] can be hosted by a [pingora_core::server::Server] directly.
861pub fn http_proxy_service_with_name<SV>(
862    conf: &Arc<ServerConf>,
863    inner: SV,
864    name: &str,
865) -> Service<HttpProxy<SV>>
866where
867    SV: ProxyHttp,
868{
869    let mut proxy = HttpProxy::new(inner, conf.clone());
870    proxy.handle_init_modules();
871    Service::new(name.to_string(), proxy)
872}