pingora_proxy/
lib.rs

1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! # pingora-proxy
16//!
17//! Programmable HTTP proxy built on top of [pingora_core].
18//!
19//! # Features
20//! - HTTP/1.x and HTTP/2 for both downstream and upstream
21//! - Connection pooling
22//! - TLSv1.3, mutual TLS, customizable CA
23//! - Request/Response scanning, modification or rejection
24//! - Dynamic upstream selection
25//! - Configurable retry and failover
26//! - Fully programmable and customizable at any stage of a HTTP request
27//!
28//! # How to use
29//!
30//! Users of this crate defines their proxy by implementing [ProxyHttp] trait, which contains the
31//! callbacks to be invoked at each stage of a HTTP request.
32//!
33//! Then the service can be passed into [`http_proxy_service()`] for a [pingora_core::server::Server] to
34//! run it.
35//!
36//! See `examples/load_balancer.rs` for a detailed example.
37
38use async_trait::async_trait;
39use bytes::Bytes;
40use futures::future::FutureExt;
41use http::{header, version::Version};
42use log::{debug, error, trace, warn};
43use once_cell::sync::Lazy;
44use pingora_http::{RequestHeader, ResponseHeader};
45use std::fmt::Debug;
46use std::str;
47use std::sync::Arc;
48use tokio::sync::{mpsc, Notify};
49use tokio::time;
50
51use pingora_cache::NoCacheReason;
52use pingora_core::apps::{HttpServerApp, HttpServerOptions};
53use pingora_core::connectors::{http::Connector, ConnectorOptions};
54use pingora_core::modules::http::compression::ResponseCompressionBuilder;
55use pingora_core::modules::http::{HttpModuleCtx, HttpModules};
56use pingora_core::protocols::http::client::HttpSession as ClientSession;
57use pingora_core::protocols::http::v1::client::HttpSession as HttpSessionV1;
58use pingora_core::protocols::http::HttpTask;
59use pingora_core::protocols::http::ServerSession as HttpSession;
60use pingora_core::protocols::http::SERVER_NAME;
61use pingora_core::protocols::Stream;
62use pingora_core::protocols::{Digest, UniqueID};
63use pingora_core::server::configuration::ServerConf;
64use pingora_core::server::ShutdownWatch;
65use pingora_core::upstreams::peer::{HttpPeer, Peer};
66use pingora_error::{Error, ErrorSource, ErrorType::*, OrErr, Result};
67
68const TASK_BUFFER_SIZE: usize = 4;
69
70mod proxy_cache;
71mod proxy_common;
72mod proxy_h1;
73mod proxy_h2;
74mod proxy_purge;
75mod proxy_trait;
76mod subrequest;
77
78use subrequest::Ctx as SubReqCtx;
79
80pub use proxy_cache::range_filter::{range_header_filter, RangeType};
81pub use proxy_purge::PurgeStatus;
82pub use proxy_trait::{FailToProxy, ProxyHttp};
83
84pub mod prelude {
85    pub use crate::{http_proxy_service, ProxyHttp, Session};
86}
87
88/// The concrete type that holds the user defined HTTP proxy.
89///
90/// Users don't need to interact with this object directly.
91pub struct HttpProxy<SV> {
92    inner: SV, // TODO: name it better than inner
93    client_upstream: Connector,
94    shutdown: Notify,
95    pub server_options: Option<HttpServerOptions>,
96    pub downstream_modules: HttpModules,
97    max_retries: usize,
98}
99
100impl<SV> HttpProxy<SV> {
101    fn new(inner: SV, conf: Arc<ServerConf>) -> Self {
102        HttpProxy {
103            inner,
104            client_upstream: Connector::new(Some(ConnectorOptions::from_server_conf(&conf))),
105            shutdown: Notify::new(),
106            server_options: None,
107            downstream_modules: HttpModules::new(),
108            max_retries: conf.max_retries,
109        }
110    }
111
112    fn handle_init_modules(&mut self)
113    where
114        SV: ProxyHttp,
115    {
116        self.inner
117            .init_downstream_modules(&mut self.downstream_modules);
118    }
119
120    async fn handle_new_request(
121        &self,
122        mut downstream_session: Box<HttpSession>,
123    ) -> Option<Box<HttpSession>>
124    where
125        SV: ProxyHttp + Send + Sync,
126        SV::CTX: Send + Sync,
127    {
128        // phase 1 read request header
129
130        let res = tokio::select! {
131            biased; // biased select is cheaper, and we don't want to drop already buffered requests
132            res = downstream_session.read_request() => { res }
133            _ = self.shutdown.notified() => {
134                // service shutting down, dropping the connection to stop more req from coming in
135                return None;
136            }
137        };
138        match res {
139            Ok(true) => {
140                // TODO: check n==0
141                debug!("Successfully get a new request");
142            }
143            Ok(false) => {
144                return None; // TODO: close connection?
145            }
146            Err(mut e) => {
147                e.as_down();
148                error!("Fail to proxy: {e}");
149                if matches!(e.etype, InvalidHTTPHeader) {
150                    downstream_session
151                        .respond_error(400)
152                        .await
153                        .unwrap_or_else(|e| {
154                            error!("failed to send error response to downstream: {e}");
155                        });
156                } // otherwise the connection must be broken, no need to send anything
157                downstream_session.shutdown().await;
158                return None;
159            }
160        }
161        trace!(
162            "Request header: {:?}",
163            downstream_session.req_header().as_ref()
164        );
165        Some(downstream_session)
166    }
167
168    // return bool: server_session can be reused, and error if any
169    async fn proxy_to_upstream(
170        &self,
171        session: &mut Session,
172        ctx: &mut SV::CTX,
173    ) -> (bool, Option<Box<Error>>)
174    where
175        SV: ProxyHttp + Send + Sync,
176        SV::CTX: Send + Sync,
177    {
178        let peer = match self.inner.upstream_peer(session, ctx).await {
179            Ok(p) => p,
180            Err(e) => return (false, Some(e)),
181        };
182
183        let client_session = self.client_upstream.get_http_session(&*peer).await;
184        match client_session {
185            Ok((client_session, client_reused)) => {
186                let (server_reused, error) = match client_session {
187                    ClientSession::H1(mut h1) => {
188                        let (server_reused, client_reuse, error) = self
189                            .proxy_to_h1_upstream(session, &mut h1, client_reused, &peer, ctx)
190                            .await;
191                        if client_reuse {
192                            let session = ClientSession::H1(h1);
193                            self.client_upstream
194                                .release_http_session(session, &*peer, peer.idle_timeout())
195                                .await;
196                        }
197                        (server_reused, error)
198                    }
199                    ClientSession::H2(mut h2) => {
200                        let (server_reused, mut error) = self
201                            .proxy_to_h2_upstream(session, &mut h2, client_reused, &peer, ctx)
202                            .await;
203                        let session = ClientSession::H2(h2);
204                        self.client_upstream
205                            .release_http_session(session, &*peer, peer.idle_timeout())
206                            .await;
207
208                        if let Some(e) = error.as_mut() {
209                            // try to downgrade if A. origin says so or B. origin sends an invalid
210                            // response, which usually means origin h2 is not production ready
211                            if matches!(e.etype, H2Downgrade | InvalidH2) {
212                                if peer
213                                    .get_alpn()
214                                    .map_or(true, |alpn| alpn.get_min_http_version() == 1)
215                                {
216                                    // Add the peer to prefer h1 so that all following requests
217                                    // will use h1
218                                    self.client_upstream.prefer_h1(&*peer);
219                                } else {
220                                    // the peer doesn't allow downgrading to h1 (e.g. gRPC)
221                                    e.retry = false.into();
222                                }
223                            }
224                        }
225
226                        (server_reused, error)
227                    }
228                };
229                (
230                    server_reused,
231                    error.map(|e| {
232                        self.inner
233                            .error_while_proxy(&peer, session, e, ctx, client_reused)
234                    }),
235                )
236            }
237            Err(mut e) => {
238                e.as_up();
239                let new_err = self.inner.fail_to_connect(session, &peer, ctx, e);
240                (false, Some(new_err.into_up()))
241            }
242        }
243    }
244
245    fn upstream_filter(
246        &self,
247        session: &mut Session,
248        task: &mut HttpTask,
249        ctx: &mut SV::CTX,
250    ) -> Result<()>
251    where
252        SV: ProxyHttp,
253    {
254        match task {
255            HttpTask::Header(header, _eos) => {
256                self.inner.upstream_response_filter(session, header, ctx)?
257            }
258            HttpTask::Body(data, eos) => self
259                .inner
260                .upstream_response_body_filter(session, data, *eos, ctx)?,
261            HttpTask::Trailer(Some(trailers)) => self
262                .inner
263                .upstream_response_trailer_filter(session, trailers, ctx)?,
264            _ => {
265                // task does not support a filter
266            }
267        }
268        Ok(())
269    }
270
271    async fn finish(
272        &self,
273        mut session: Session,
274        ctx: &mut SV::CTX,
275        reuse: bool,
276        error: Option<&Error>,
277    ) -> Option<Stream>
278    where
279        SV: ProxyHttp + Send + Sync,
280        SV::CTX: Send + Sync,
281    {
282        self.inner.logging(&mut session, error, ctx).await;
283
284        if reuse {
285            // TODO: log error
286            session.downstream_session.finish().await.ok().flatten()
287        } else {
288            None
289        }
290    }
291
292    fn cleanup_sub_req(&self, session: &mut Session) {
293        if let Some(ctx) = session.subrequest_ctx.as_mut() {
294            ctx.release_write_lock();
295        }
296    }
297}
298
299use pingora_cache::HttpCache;
300use pingora_core::protocols::http::compression::ResponseCompressionCtx;
301
302/// The established HTTP session
303///
304/// This object is what users interact with in order to access the request itself or change the proxy
305/// behavior.
306pub struct Session {
307    /// the HTTP session to downstream (the client)
308    pub downstream_session: Box<HttpSession>,
309    /// The interface to control HTTP caching
310    pub cache: HttpCache,
311    /// (de)compress responses coming into the proxy (from upstream)
312    pub upstream_compression: ResponseCompressionCtx,
313    /// ignore downstream range (skip downstream range filters)
314    pub ignore_downstream_range: bool,
315    // the context from parent request
316    pub subrequest_ctx: Option<Box<SubReqCtx>>,
317    // Downstream filter modules
318    pub downstream_modules_ctx: HttpModuleCtx,
319}
320
321impl Session {
322    fn new(
323        downstream_session: impl Into<Box<HttpSession>>,
324        downstream_modules: &HttpModules,
325    ) -> Self {
326        Session {
327            downstream_session: downstream_session.into(),
328            cache: HttpCache::new(),
329            // disable both upstream and downstream compression
330            upstream_compression: ResponseCompressionCtx::new(0, false, false),
331            ignore_downstream_range: false,
332            subrequest_ctx: None,
333            downstream_modules_ctx: downstream_modules.build_ctx(),
334        }
335    }
336
337    /// Create a new [Session] from the given [Stream]
338    ///
339    /// This function is mostly used for testing and mocking.
340    pub fn new_h1(stream: Stream) -> Self {
341        let modules = HttpModules::new();
342        Self::new(Box::new(HttpSession::new_http1(stream)), &modules)
343    }
344
345    /// Create a new [Session] from the given [Stream] with modules
346    ///
347    /// This function is mostly used for testing and mocking.
348    pub fn new_h1_with_modules(stream: Stream, downstream_modules: &HttpModules) -> Self {
349        Self::new(Box::new(HttpSession::new_http1(stream)), downstream_modules)
350    }
351
352    pub fn as_downstream_mut(&mut self) -> &mut HttpSession {
353        &mut self.downstream_session
354    }
355
356    pub fn as_downstream(&self) -> &HttpSession {
357        &self.downstream_session
358    }
359
360    /// Write HTTP response with the given error code to the downstream.
361    pub async fn respond_error(&mut self, error: u16) -> Result<()> {
362        self.as_downstream_mut().respond_error(error).await
363    }
364
365    /// Write HTTP response with the given error code to the downstream with a body.
366    pub async fn respond_error_with_body(&mut self, error: u16, body: Bytes) -> Result<()> {
367        self.as_downstream_mut()
368            .respond_error_with_body(error, body)
369            .await
370    }
371
372    /// Write the given HTTP response header to the downstream
373    ///
374    /// Different from directly calling [HttpSession::write_response_header], this function also
375    /// invokes the filter modules.
376    pub async fn write_response_header(
377        &mut self,
378        mut resp: Box<ResponseHeader>,
379        end_of_stream: bool,
380    ) -> Result<()> {
381        self.downstream_modules_ctx
382            .response_header_filter(&mut resp, end_of_stream)
383            .await?;
384        self.downstream_session.write_response_header(resp).await
385    }
386
387    /// Write the given HTTP response body chunk to the downstream
388    ///
389    /// Different from directly calling [HttpSession::write_response_body], this function also
390    /// invokes the filter modules.
391    pub async fn write_response_body(
392        &mut self,
393        mut body: Option<Bytes>,
394        end_of_stream: bool,
395    ) -> Result<()> {
396        self.downstream_modules_ctx
397            .response_body_filter(&mut body, end_of_stream)?;
398
399        if body.is_none() && !end_of_stream {
400            return Ok(());
401        }
402
403        let data = body.unwrap_or_default();
404        self.downstream_session
405            .write_response_body(data, end_of_stream)
406            .await
407    }
408
409    pub async fn write_response_tasks(&mut self, mut tasks: Vec<HttpTask>) -> Result<bool> {
410        for task in tasks.iter_mut() {
411            match task {
412                HttpTask::Header(resp, end) => {
413                    self.downstream_modules_ctx
414                        .response_header_filter(resp, *end)
415                        .await?;
416                }
417                HttpTask::Body(data, end) => {
418                    self.downstream_modules_ctx
419                        .response_body_filter(data, *end)?;
420                }
421                HttpTask::Trailer(trailers) => {
422                    if let Some(buf) = self
423                        .downstream_modules_ctx
424                        .response_trailer_filter(trailers)?
425                    {
426                        // Write the trailers into the body if the filter
427                        // returns a buffer.
428                        //
429                        // Note, this will not work if end of stream has already
430                        // been seen or we've written content-length bytes.
431                        *task = HttpTask::Body(Some(buf), true);
432                    }
433                }
434                HttpTask::Done => {
435                    // `Done` can be sent in certain response paths to mark end
436                    // of response if not already done via trailers or body with
437                    // end flag set.
438                    // If the filter returns body bytes on Done,
439                    // write them into the response.
440                    //
441                    // Note, this will not work if end of stream has already
442                    // been seen or we've written content-length bytes.
443                    if let Some(buf) = self.downstream_modules_ctx.response_done_filter()? {
444                        *task = HttpTask::Body(Some(buf), true);
445                    }
446                }
447                _ => { /* Failed */ }
448            }
449        }
450        self.downstream_session.response_duplex_vec(tasks).await
451    }
452}
453
454impl AsRef<HttpSession> for Session {
455    fn as_ref(&self) -> &HttpSession {
456        &self.downstream_session
457    }
458}
459
460impl AsMut<HttpSession> for Session {
461    fn as_mut(&mut self) -> &mut HttpSession {
462        &mut self.downstream_session
463    }
464}
465
466use std::ops::{Deref, DerefMut};
467
468impl Deref for Session {
469    type Target = HttpSession;
470
471    fn deref(&self) -> &Self::Target {
472        &self.downstream_session
473    }
474}
475
476impl DerefMut for Session {
477    fn deref_mut(&mut self) -> &mut Self::Target {
478        &mut self.downstream_session
479    }
480}
481
482// generic HTTP 502 response sent when proxy_upstream_filter refuses to connect to upstream
483static BAD_GATEWAY: Lazy<ResponseHeader> = Lazy::new(|| {
484    let mut resp = ResponseHeader::build(http::StatusCode::BAD_GATEWAY, Some(3)).unwrap();
485    resp.insert_header(header::SERVER, &SERVER_NAME[..])
486        .unwrap();
487    resp.insert_header(header::CONTENT_LENGTH, 0).unwrap();
488    resp.insert_header(header::CACHE_CONTROL, "private, no-store")
489        .unwrap();
490
491    resp
492});
493
494impl<SV> HttpProxy<SV> {
495    async fn process_request(
496        self: &Arc<Self>,
497        mut session: Session,
498        mut ctx: <SV as ProxyHttp>::CTX,
499    ) -> Option<Stream>
500    where
501        SV: ProxyHttp + Send + Sync + 'static,
502        <SV as ProxyHttp>::CTX: Send + Sync,
503    {
504        if let Err(e) = self
505            .inner
506            .early_request_filter(&mut session, &mut ctx)
507            .await
508        {
509            return self
510                .handle_error(session, &mut ctx, e, "Fail to early filter request:")
511                .await;
512        }
513
514        let req = session.downstream_session.req_header_mut();
515
516        // Built-in downstream request filters go first
517        if let Err(e) = session
518            .downstream_modules_ctx
519            .request_header_filter(req)
520            .await
521        {
522            return self
523                .handle_error(
524                    session,
525                    &mut ctx,
526                    e,
527                    "Failed in downstream modules request filter:",
528                )
529                .await;
530        }
531
532        match self.inner.request_filter(&mut session, &mut ctx).await {
533            Ok(response_sent) => {
534                if response_sent {
535                    // TODO: log error
536                    self.inner.logging(&mut session, None, &mut ctx).await;
537                    self.cleanup_sub_req(&mut session);
538                    return session.downstream_session.finish().await.ok().flatten();
539                }
540                /* else continue */
541            }
542            Err(e) => {
543                return self
544                    .handle_error(session, &mut ctx, e, "Fail to filter request:")
545                    .await;
546            }
547        }
548
549        if let Some((reuse, err)) = self.proxy_cache(&mut session, &mut ctx).await {
550            // cache hit
551            return self.finish(session, &mut ctx, reuse, err.as_deref()).await;
552        }
553        // either uncacheable, or cache miss
554
555        // there should not be a write lock in the sub req ctx after this point
556        self.cleanup_sub_req(&mut session);
557
558        // decide if the request is allowed to go to upstream
559        match self
560            .inner
561            .proxy_upstream_filter(&mut session, &mut ctx)
562            .await
563        {
564            Ok(proxy_to_upstream) => {
565                if !proxy_to_upstream {
566                    // The hook can choose to write its own response, but if it doesn't, we respond
567                    // with a generic 502
568                    if session.cache.enabled() {
569                        // drop the cache lock that this request may be holding onto
570                        session.cache.disable(NoCacheReason::DeclinedToUpstream);
571                    }
572                    if session.response_written().is_none() {
573                        match session.write_response_header_ref(&BAD_GATEWAY).await {
574                            Ok(()) => {}
575                            Err(e) => {
576                                return self
577                                    .handle_error(
578                                        session,
579                                        &mut ctx,
580                                        e,
581                                        "Error responding with Bad Gateway:",
582                                    )
583                                    .await;
584                            }
585                        }
586                    }
587
588                    return self.finish(session, &mut ctx, true, None).await;
589                }
590                /* else continue */
591            }
592            Err(e) => {
593                if session.cache.enabled() {
594                    session.cache.disable(NoCacheReason::InternalError);
595                }
596
597                return self
598                    .handle_error(
599                        session,
600                        &mut ctx,
601                        e,
602                        "Error deciding if we should proxy to upstream:",
603                    )
604                    .await;
605            }
606        }
607
608        let mut retries: usize = 0;
609
610        let mut server_reuse = false;
611        let mut proxy_error: Option<Box<Error>> = None;
612
613        while retries < self.max_retries {
614            retries += 1;
615
616            let (reuse, e) = self.proxy_to_upstream(&mut session, &mut ctx).await;
617            server_reuse = reuse;
618
619            match e {
620                Some(error) => {
621                    let retry = error.retry();
622                    proxy_error = Some(error);
623                    if !retry {
624                        break;
625                    }
626                    // only log error that will be retried here, the final error will be logged below
627                    warn!(
628                        "Fail to proxy: {}, tries: {}, retry: {}, {}",
629                        proxy_error.as_ref().unwrap(),
630                        retries,
631                        retry,
632                        self.inner.request_summary(&session, &ctx)
633                    );
634                }
635                None => {
636                    proxy_error = None;
637                    break;
638                }
639            };
640        }
641
642        // serve stale if error
643        // Check both error and cache before calling the function because await is not cheap
644        let serve_stale_result = if proxy_error.is_some() && session.cache.can_serve_stale_error() {
645            self.handle_stale_if_error(&mut session, &mut ctx, proxy_error.as_ref().unwrap())
646                .await
647        } else {
648            None
649        };
650
651        let final_error = if let Some((reuse, stale_cache_error)) = serve_stale_result {
652            // don't reuse server conn if serve stale polluted it
653            server_reuse = server_reuse && reuse;
654            stale_cache_error
655        } else {
656            proxy_error
657        };
658
659        if let Some(e) = final_error.as_ref() {
660            // If we have errored and are still holding a cache lock, release it.
661            if session.cache.enabled() {
662                let reason = if *e.esource() == ErrorSource::Upstream {
663                    NoCacheReason::UpstreamError
664                } else {
665                    NoCacheReason::InternalError
666                };
667                session.cache.disable(reason);
668            }
669            let res = self.inner.fail_to_proxy(&mut session, e, &mut ctx).await;
670
671            // final error will have > 0 status unless downstream connection is dead
672            if !self.inner.suppress_error_log(&session, &ctx, e) {
673                error!(
674                    "Fail to proxy: {}, status: {}, tries: {}, retry: {}, {}",
675                    final_error.as_ref().unwrap(),
676                    res.error_code,
677                    retries,
678                    false, // we never retry here
679                    self.inner.request_summary(&session, &ctx)
680                );
681            }
682        }
683
684        // logging() will be called in finish()
685        self.finish(session, &mut ctx, server_reuse, final_error.as_deref())
686            .await
687    }
688
689    async fn handle_error(
690        &self,
691        mut session: Session,
692        ctx: &mut <SV as ProxyHttp>::CTX,
693        e: Box<Error>,
694        context: &str,
695    ) -> Option<Stream>
696    where
697        SV: ProxyHttp + Send + Sync + 'static,
698        <SV as ProxyHttp>::CTX: Send + Sync,
699    {
700        let res = self.inner.fail_to_proxy(&mut session, &e, ctx).await;
701        if !self.inner.suppress_error_log(&session, ctx, &e) {
702            error!(
703                "{context} {}, status: {}, {}",
704                e,
705                res.error_code,
706                self.inner.request_summary(&session, ctx)
707            );
708        }
709        self.inner.logging(&mut session, Some(&e), ctx).await;
710        self.cleanup_sub_req(&mut session);
711
712        if res.can_reuse_downstream {
713            session.downstream_session.finish().await.ok().flatten()
714        } else {
715            None
716        }
717    }
718}
719
720/* Make process_subrequest() a trait to workaround https://github.com/rust-lang/rust/issues/78649
721   if process_subrequest() is implemented as a member of HttpProxy, rust complains
722
723error[E0391]: cycle detected when computing type of `proxy_cache::<impl at pingora-proxy/src/proxy_cache.rs:7:1: 7:23>::proxy_cache::{opaque#0}`
724   --> pingora-proxy/src/proxy_cache.rs:13:10
725    |
72613  |     ) -> Option<(bool, Option<Box<Error>>)>
727
728*/
729#[async_trait]
730trait Subrequest {
731    async fn process_subrequest(
732        self: &Arc<Self>,
733        session: Box<HttpSession>,
734        sub_req_ctx: Box<SubReqCtx>,
735    );
736}
737
738#[async_trait]
739impl<SV> Subrequest for HttpProxy<SV>
740where
741    SV: ProxyHttp + Send + Sync + 'static,
742    <SV as ProxyHttp>::CTX: Send + Sync,
743{
744    async fn process_subrequest(
745        self: &Arc<Self>,
746        session: Box<HttpSession>,
747        sub_req_ctx: Box<SubReqCtx>,
748    ) {
749        debug!("starting subrequest");
750        let mut session = match self.handle_new_request(session).await {
751            Some(downstream_session) => Session::new(downstream_session, &self.downstream_modules),
752            None => return, // bad request
753        };
754
755        // no real downstream to keepalive, but it doesn't matter what is set here because at the end
756        // of this fn the dummy connection will be dropped
757        session.set_keepalive(None);
758
759        session.subrequest_ctx.replace(sub_req_ctx);
760        trace!("processing subrequest");
761        let ctx = self.inner.new_ctx();
762        self.process_request(session, ctx).await;
763        trace!("subrequest done");
764    }
765}
766
767#[async_trait]
768impl<SV> HttpServerApp for HttpProxy<SV>
769where
770    SV: ProxyHttp + Send + Sync + 'static,
771    <SV as ProxyHttp>::CTX: Send + Sync,
772{
773    async fn process_new_http(
774        self: &Arc<Self>,
775        session: HttpSession,
776        shutdown: &ShutdownWatch,
777    ) -> Option<Stream> {
778        let session = Box::new(session);
779
780        // TODO: keepalive pool, use stack
781        let mut session = match self.handle_new_request(session).await {
782            Some(downstream_session) => Session::new(downstream_session, &self.downstream_modules),
783            None => return None, // bad request
784        };
785
786        if *shutdown.borrow() {
787            // stop downstream from reusing if this service is shutting down soon
788            session.set_keepalive(None);
789        } else {
790            // default 60s
791            session.set_keepalive(Some(60));
792        }
793
794        let ctx = self.inner.new_ctx();
795        self.process_request(session, ctx).await
796    }
797
798    async fn http_cleanup(&self) {
799        // Notify all keepalived requests blocking on read_request() to abort
800        self.shutdown.notify_waiters();
801
802        // TODO: impl shutting down flag so that we don't need to read stack.is_shutting_down()
803    }
804
805    fn server_options(&self) -> Option<&HttpServerOptions> {
806        self.server_options.as_ref()
807    }
808
809    // TODO implement h2_options
810}
811
812use pingora_core::services::listening::Service;
813
814/// Create a [Service] from the user implemented [ProxyHttp].
815///
816/// The returned [Service] can be hosted by a [pingora_core::server::Server] directly.
817pub fn http_proxy_service<SV>(conf: &Arc<ServerConf>, inner: SV) -> Service<HttpProxy<SV>>
818where
819    SV: ProxyHttp,
820{
821    http_proxy_service_with_name(conf, inner, "Pingora HTTP Proxy Service")
822}
823
824/// Create a [Service] from the user implemented [ProxyHttp].
825///
826/// The returned [Service] can be hosted by a [pingora_core::server::Server] directly.
827pub fn http_proxy_service_with_name<SV>(
828    conf: &Arc<ServerConf>,
829    inner: SV,
830    name: &str,
831) -> Service<HttpProxy<SV>>
832where
833    SV: ProxyHttp,
834{
835    let mut proxy = HttpProxy::new(inner, conf.clone());
836    proxy.handle_init_modules();
837    Service::new(name.to_string(), proxy)
838}