pingora_proxy/
lib.rs

1// Copyright 2024 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! # pingora-proxy
16//!
17//! Programmable HTTP proxy built on top of [pingora_core].
18//!
19//! # Features
20//! - HTTP/1.x and HTTP/2 for both downstream and upstream
21//! - Connection pooling
22//! - TLSv1.3, mutual TLS, customizable CA
23//! - Request/Response scanning, modification or rejection
24//! - Dynamic upstream selection
25//! - Configurable retry and failover
26//! - Fully programmable and customizable at any stage of a HTTP request
27//!
28//! # How to use
29//!
30//! Users of this crate defines their proxy by implementing [ProxyHttp] trait, which contains the
31//! callbacks to be invoked at each stage of a HTTP request.
32//!
33//! Then the service can be passed into [`http_proxy_service()`] for a [pingora_core::server::Server] to
34//! run it.
35//!
36//! See `examples/load_balancer.rs` for a detailed example.
37
38use async_trait::async_trait;
39use bytes::Bytes;
40use futures::future::FutureExt;
41use http::{header, version::Version};
42use log::{debug, error, trace, warn};
43use once_cell::sync::Lazy;
44use pingora_http::{RequestHeader, ResponseHeader};
45use std::fmt::Debug;
46use std::str;
47use std::sync::Arc;
48use tokio::sync::{mpsc, Notify};
49use tokio::time;
50
51use pingora_cache::NoCacheReason;
52use pingora_core::apps::{HttpServerApp, HttpServerOptions};
53use pingora_core::connectors::{http::Connector, ConnectorOptions};
54use pingora_core::modules::http::compression::ResponseCompressionBuilder;
55use pingora_core::modules::http::{HttpModuleCtx, HttpModules};
56use pingora_core::protocols::http::client::HttpSession as ClientSession;
57use pingora_core::protocols::http::v1::client::HttpSession as HttpSessionV1;
58use pingora_core::protocols::http::HttpTask;
59use pingora_core::protocols::http::ServerSession as HttpSession;
60use pingora_core::protocols::http::SERVER_NAME;
61use pingora_core::protocols::Stream;
62use pingora_core::protocols::{Digest, UniqueID};
63use pingora_core::server::configuration::ServerConf;
64use pingora_core::server::ShutdownWatch;
65use pingora_core::upstreams::peer::{HttpPeer, Peer};
66use pingora_error::{Error, ErrorSource, ErrorType::*, OrErr, Result};
67
68const MAX_RETRIES: usize = 16;
69const TASK_BUFFER_SIZE: usize = 4;
70
71mod proxy_cache;
72mod proxy_common;
73mod proxy_h1;
74mod proxy_h2;
75mod proxy_purge;
76mod proxy_trait;
77mod subrequest;
78
79use subrequest::Ctx as SubReqCtx;
80
81pub use proxy_purge::PurgeStatus;
82pub use proxy_trait::ProxyHttp;
83
84pub mod prelude {
85    pub use crate::{http_proxy_service, ProxyHttp, Session};
86}
87
88/// The concrete type that holds the user defined HTTP proxy.
89///
90/// Users don't need to interact with this object directly.
91pub struct HttpProxy<SV> {
92    inner: SV, // TODO: name it better than inner
93    client_upstream: Connector,
94    shutdown: Notify,
95    pub server_options: Option<HttpServerOptions>,
96    pub downstream_modules: HttpModules,
97}
98
99impl<SV> HttpProxy<SV> {
100    fn new(inner: SV, conf: Arc<ServerConf>) -> Self {
101        HttpProxy {
102            inner,
103            client_upstream: Connector::new(Some(ConnectorOptions::from_server_conf(&conf))),
104            shutdown: Notify::new(),
105            server_options: None,
106            downstream_modules: HttpModules::new(),
107        }
108    }
109
110    fn handle_init_modules(&mut self)
111    where
112        SV: ProxyHttp,
113    {
114        self.inner
115            .init_downstream_modules(&mut self.downstream_modules);
116    }
117
118    async fn handle_new_request(
119        &self,
120        mut downstream_session: Box<HttpSession>,
121    ) -> Option<Box<HttpSession>>
122    where
123        SV: ProxyHttp + Send + Sync,
124        SV::CTX: Send + Sync,
125    {
126        // phase 1 read request header
127
128        let res = tokio::select! {
129            biased; // biased select is cheaper, and we don't want to drop already buffered requests
130            res = downstream_session.read_request() => { res }
131            _ = self.shutdown.notified() => {
132                // service shutting down, dropping the connection to stop more req from coming in
133                return None;
134            }
135        };
136        match res {
137            Ok(true) => {
138                // TODO: check n==0
139                debug!("Successfully get a new request");
140            }
141            Ok(false) => {
142                return None; // TODO: close connection?
143            }
144            Err(mut e) => {
145                e.as_down();
146                error!("Fail to proxy: {}", e);
147                if matches!(e.etype, InvalidHTTPHeader) {
148                    downstream_session.respond_error(400).await;
149                } // otherwise the connection must be broken, no need to send anything
150                downstream_session.shutdown().await;
151                return None;
152            }
153        }
154        trace!(
155            "Request header: {:?}",
156            downstream_session.req_header().as_ref()
157        );
158        Some(downstream_session)
159    }
160
161    // return bool: server_session can be reused, and error if any
162    async fn proxy_to_upstream(
163        &self,
164        session: &mut Session,
165        ctx: &mut SV::CTX,
166    ) -> (bool, Option<Box<Error>>)
167    where
168        SV: ProxyHttp + Send + Sync,
169        SV::CTX: Send + Sync,
170    {
171        let peer = match self.inner.upstream_peer(session, ctx).await {
172            Ok(p) => p,
173            Err(e) => return (false, Some(e)),
174        };
175
176        let client_session = self.client_upstream.get_http_session(&*peer).await;
177        match client_session {
178            Ok((client_session, client_reused)) => {
179                let (server_reused, error) = match client_session {
180                    ClientSession::H1(mut h1) => {
181                        let (server_reused, client_reuse, error) = self
182                            .proxy_to_h1_upstream(session, &mut h1, client_reused, &peer, ctx)
183                            .await;
184                        if client_reuse {
185                            let session = ClientSession::H1(h1);
186                            self.client_upstream
187                                .release_http_session(session, &*peer, peer.idle_timeout())
188                                .await;
189                        }
190                        (server_reused, error)
191                    }
192                    ClientSession::H2(mut h2) => {
193                        let (server_reused, mut error) = self
194                            .proxy_to_h2_upstream(session, &mut h2, client_reused, &peer, ctx)
195                            .await;
196                        let session = ClientSession::H2(h2);
197                        self.client_upstream
198                            .release_http_session(session, &*peer, peer.idle_timeout())
199                            .await;
200
201                        if let Some(e) = error.as_mut() {
202                            // try to downgrade if A. origin says so or B. origin sends an invalid
203                            // response, which usually means origin h2 is not production ready
204                            if matches!(e.etype, H2Downgrade | InvalidH2) {
205                                if peer
206                                    .get_alpn()
207                                    .map_or(true, |alpn| alpn.get_min_http_version() == 1)
208                                {
209                                    // Add the peer to prefer h1 so that all following requests
210                                    // will use h1
211                                    self.client_upstream.prefer_h1(&*peer);
212                                } else {
213                                    // the peer doesn't allow downgrading to h1 (e.g. gRPC)
214                                    e.retry = false.into();
215                                }
216                            }
217                        }
218
219                        (server_reused, error)
220                    }
221                };
222                (
223                    server_reused,
224                    error.map(|e| {
225                        self.inner
226                            .error_while_proxy(&peer, session, e, ctx, client_reused)
227                    }),
228                )
229            }
230            Err(mut e) => {
231                e.as_up();
232                let new_err = self.inner.fail_to_connect(session, &peer, ctx, e);
233                (false, Some(new_err.into_up()))
234            }
235        }
236    }
237
238    fn upstream_filter(
239        &self,
240        session: &mut Session,
241        task: &mut HttpTask,
242        ctx: &mut SV::CTX,
243    ) -> Result<()>
244    where
245        SV: ProxyHttp,
246    {
247        match task {
248            HttpTask::Header(header, _eos) => {
249                self.inner.upstream_response_filter(session, header, ctx)
250            }
251            HttpTask::Body(data, eos) => self
252                .inner
253                .upstream_response_body_filter(session, data, *eos, ctx),
254            HttpTask::Trailer(Some(trailers)) => self
255                .inner
256                .upstream_response_trailer_filter(session, trailers, ctx)?,
257            _ => {
258                // task does not support a filter
259            }
260        }
261        Ok(())
262    }
263
264    async fn finish(
265        &self,
266        mut session: Session,
267        ctx: &mut SV::CTX,
268        reuse: bool,
269        error: Option<&Error>,
270    ) -> Option<Stream>
271    where
272        SV: ProxyHttp + Send + Sync,
273        SV::CTX: Send + Sync,
274    {
275        self.inner.logging(&mut session, error, ctx).await;
276
277        if reuse {
278            // TODO: log error
279            session.downstream_session.finish().await.ok().flatten()
280        } else {
281            None
282        }
283    }
284}
285
286use pingora_cache::HttpCache;
287use pingora_core::protocols::http::compression::ResponseCompressionCtx;
288
289/// The established HTTP session
290///
291/// This object is what users interact with in order to access the request itself or change the proxy
292/// behavior.
293pub struct Session {
294    /// the HTTP session to downstream (the client)
295    pub downstream_session: Box<HttpSession>,
296    /// The interface to control HTTP caching
297    pub cache: HttpCache,
298    /// (de)compress responses coming into the proxy (from upstream)
299    pub upstream_compression: ResponseCompressionCtx,
300    /// ignore downstream range (skip downstream range filters)
301    pub ignore_downstream_range: bool,
302    // the context from parent request
303    subrequest_ctx: Option<Box<SubReqCtx>>,
304    // Downstream filter modules
305    pub downstream_modules_ctx: HttpModuleCtx,
306}
307
308impl Session {
309    fn new(
310        downstream_session: impl Into<Box<HttpSession>>,
311        downstream_modules: &HttpModules,
312    ) -> Self {
313        Session {
314            downstream_session: downstream_session.into(),
315            cache: HttpCache::new(),
316            // disable both upstream and downstream compression
317            upstream_compression: ResponseCompressionCtx::new(0, false, false),
318            ignore_downstream_range: false,
319            subrequest_ctx: None,
320            downstream_modules_ctx: downstream_modules.build_ctx(),
321        }
322    }
323
324    /// Create a new [Session] from the given [Stream]
325    ///
326    /// This function is mostly used for testing and mocking.
327    pub fn new_h1(stream: Stream) -> Self {
328        let modules = HttpModules::new();
329        Self::new(Box::new(HttpSession::new_http1(stream)), &modules)
330    }
331
332    /// Create a new [Session] from the given [Stream] with modules
333    ///
334    /// This function is mostly used for testing and mocking.
335    pub fn new_h1_with_modules(stream: Stream, downstream_modules: &HttpModules) -> Self {
336        Self::new(Box::new(HttpSession::new_http1(stream)), downstream_modules)
337    }
338
339    pub fn as_downstream_mut(&mut self) -> &mut HttpSession {
340        &mut self.downstream_session
341    }
342
343    pub fn as_downstream(&self) -> &HttpSession {
344        &self.downstream_session
345    }
346
347    /// Write HTTP response with the given error code to the downstream
348    pub async fn respond_error(&mut self, error: u16) -> Result<()> {
349        let resp = HttpSession::generate_error(error);
350        self.write_response_header(Box::new(resp), true)
351            .await
352            .unwrap_or_else(|e| {
353                self.downstream_session.set_keepalive(None);
354                error!("failed to send error response to downstream: {e}");
355            });
356        Ok(())
357    }
358
359    /// Write the given HTTP response header to the downstream
360    ///
361    /// Different from directly calling [HttpSession::write_response_header], this function also
362    /// invokes the filter modules.
363    pub async fn write_response_header(
364        &mut self,
365        mut resp: Box<ResponseHeader>,
366        end_of_stream: bool,
367    ) -> Result<()> {
368        self.downstream_modules_ctx
369            .response_header_filter(&mut resp, end_of_stream)
370            .await?;
371        self.downstream_session.write_response_header(resp).await
372    }
373
374    /// Write the given HTTP response body chunk to the downstream
375    ///
376    /// Different from directly calling [HttpSession::write_response_body], this function also
377    /// invokes the filter modules.
378    pub async fn write_response_body(
379        &mut self,
380        mut body: Option<Bytes>,
381        end_of_stream: bool,
382    ) -> Result<()> {
383        self.downstream_modules_ctx
384            .response_body_filter(&mut body, end_of_stream)?;
385
386        if body.is_none() && !end_of_stream {
387            return Ok(());
388        }
389
390        let data = body.unwrap_or_default();
391        self.downstream_session
392            .write_response_body(data, end_of_stream)
393            .await
394    }
395
396    pub async fn write_response_tasks(&mut self, mut tasks: Vec<HttpTask>) -> Result<bool> {
397        for task in tasks.iter_mut() {
398            match task {
399                HttpTask::Header(resp, end) => {
400                    self.downstream_modules_ctx
401                        .response_header_filter(resp, *end)
402                        .await?;
403                }
404                HttpTask::Body(data, end) => {
405                    self.downstream_modules_ctx
406                        .response_body_filter(data, *end)?;
407                }
408                HttpTask::Trailer(trailers) => {
409                    if let Some(buf) = self
410                        .downstream_modules_ctx
411                        .response_trailer_filter(trailers)?
412                    {
413                        // Write the trailers into the body if the filter
414                        // returns a buffer.
415                        //
416                        // Note, this will not work if end of stream has already
417                        // been seen or we've written content-length bytes.
418                        *task = HttpTask::Body(Some(buf), true);
419                    }
420                }
421                _ => { /* Done or Failed */ }
422            }
423        }
424        self.downstream_session.response_duplex_vec(tasks).await
425    }
426}
427
428impl AsRef<HttpSession> for Session {
429    fn as_ref(&self) -> &HttpSession {
430        &self.downstream_session
431    }
432}
433
434impl AsMut<HttpSession> for Session {
435    fn as_mut(&mut self) -> &mut HttpSession {
436        &mut self.downstream_session
437    }
438}
439
440use std::ops::{Deref, DerefMut};
441
442impl Deref for Session {
443    type Target = HttpSession;
444
445    fn deref(&self) -> &Self::Target {
446        &self.downstream_session
447    }
448}
449
450impl DerefMut for Session {
451    fn deref_mut(&mut self) -> &mut Self::Target {
452        &mut self.downstream_session
453    }
454}
455
456// generic HTTP 502 response sent when proxy_upstream_filter refuses to connect to upstream
457static BAD_GATEWAY: Lazy<ResponseHeader> = Lazy::new(|| {
458    let mut resp = ResponseHeader::build(http::StatusCode::BAD_GATEWAY, Some(3)).unwrap();
459    resp.insert_header(header::SERVER, &SERVER_NAME[..])
460        .unwrap();
461    resp.insert_header(header::CONTENT_LENGTH, 0).unwrap();
462    resp.insert_header(header::CACHE_CONTROL, "private, no-store")
463        .unwrap();
464
465    resp
466});
467
468impl<SV> HttpProxy<SV> {
469    async fn process_request(
470        self: &Arc<Self>,
471        mut session: Session,
472        mut ctx: <SV as ProxyHttp>::CTX,
473    ) -> Option<Stream>
474    where
475        SV: ProxyHttp + Send + Sync + 'static,
476        <SV as ProxyHttp>::CTX: Send + Sync,
477    {
478        if let Err(e) = self
479            .inner
480            .early_request_filter(&mut session, &mut ctx)
481            .await
482        {
483            self.handle_error(&mut session, &mut ctx, e, "Fail to early filter request:")
484                .await;
485            return None;
486        }
487
488        let req = session.downstream_session.req_header_mut();
489
490        // Built-in downstream request filters go first
491        if let Err(e) = session
492            .downstream_modules_ctx
493            .request_header_filter(req)
494            .await
495        {
496            self.handle_error(
497                &mut session,
498                &mut ctx,
499                e,
500                "Failed in downstream modules request filter:",
501            )
502            .await;
503            return None;
504        }
505
506        match self.inner.request_filter(&mut session, &mut ctx).await {
507            Ok(response_sent) => {
508                if response_sent {
509                    // TODO: log error
510                    self.inner.logging(&mut session, None, &mut ctx).await;
511                    return session.downstream_session.finish().await.ok().flatten();
512                }
513                /* else continue */
514            }
515            Err(e) => {
516                self.handle_error(&mut session, &mut ctx, e, "Fail to filter request:")
517                    .await;
518                return None;
519            }
520        }
521
522        if let Some((reuse, err)) = self.proxy_cache(&mut session, &mut ctx).await {
523            // cache hit
524            return self.finish(session, &mut ctx, reuse, err.as_deref()).await;
525        }
526        // either uncacheable, or cache miss
527
528        // decide if the request is allowed to go to upstream
529        match self
530            .inner
531            .proxy_upstream_filter(&mut session, &mut ctx)
532            .await
533        {
534            Ok(proxy_to_upstream) => {
535                if !proxy_to_upstream {
536                    // The hook can choose to write its own response, but if it doesn't, we respond
537                    // with a generic 502
538                    if session.response_written().is_none() {
539                        match session.write_response_header_ref(&BAD_GATEWAY).await {
540                            Ok(()) => {}
541                            Err(e) => {
542                                self.handle_error(
543                                    &mut session,
544                                    &mut ctx,
545                                    e,
546                                    "Error responding with Bad Gateway:",
547                                )
548                                .await;
549
550                                return None;
551                            }
552                        }
553                    }
554
555                    return self.finish(session, &mut ctx, false, None).await;
556                }
557                /* else continue */
558            }
559            Err(e) => {
560                self.handle_error(
561                    &mut session,
562                    &mut ctx,
563                    e,
564                    "Error deciding if we should proxy to upstream:",
565                )
566                .await;
567                return None;
568            }
569        }
570
571        let mut retries: usize = 0;
572
573        let mut server_reuse = false;
574        let mut proxy_error: Option<Box<Error>> = None;
575
576        while retries < MAX_RETRIES {
577            retries += 1;
578
579            let (reuse, e) = self.proxy_to_upstream(&mut session, &mut ctx).await;
580            server_reuse = reuse;
581
582            match e {
583                Some(error) => {
584                    let retry = error.retry();
585                    proxy_error = Some(error);
586                    if !retry {
587                        break;
588                    }
589                    // only log error that will be retried here, the final error will be logged below
590                    warn!(
591                        "Fail to proxy: {}, tries: {}, retry: {}, {}",
592                        proxy_error.as_ref().unwrap(),
593                        retries,
594                        retry,
595                        self.inner.request_summary(&session, &ctx)
596                    );
597                }
598                None => {
599                    proxy_error = None;
600                    break;
601                }
602            };
603        }
604
605        // serve stale if error
606        // Check both error and cache before calling the function because await is not cheap
607        let serve_stale_result = if proxy_error.is_some() && session.cache.can_serve_stale_error() {
608            self.handle_stale_if_error(&mut session, &mut ctx, proxy_error.as_ref().unwrap())
609                .await
610        } else {
611            None
612        };
613
614        let final_error = if let Some((reuse, stale_cache_error)) = serve_stale_result {
615            // don't reuse server conn if serve stale polluted it
616            server_reuse = server_reuse && reuse;
617            stale_cache_error
618        } else {
619            proxy_error
620        };
621
622        if let Some(e) = final_error.as_ref() {
623            // If we have errored and are still holding a cache lock, release it.
624            session.cache.disable(NoCacheReason::InternalError);
625            let status = self.inner.fail_to_proxy(&mut session, e, &mut ctx).await;
626
627            // final error will have > 0 status unless downstream connection is dead
628            if !self.inner.suppress_error_log(&session, &ctx, e) {
629                error!(
630                    "Fail to proxy: {}, status: {}, tries: {}, retry: {}, {}",
631                    final_error.as_ref().unwrap(),
632                    status,
633                    retries,
634                    false, // we never retry here
635                    self.inner.request_summary(&session, &ctx)
636                );
637            }
638        }
639
640        // logging() will be called in finish()
641        self.finish(session, &mut ctx, server_reuse, final_error.as_deref())
642            .await
643    }
644
645    async fn handle_error(
646        &self,
647        session: &mut Session,
648        ctx: &mut <SV as ProxyHttp>::CTX,
649        e: Box<Error>,
650        context: &str,
651    ) where
652        SV: ProxyHttp + Send + Sync + 'static,
653        <SV as ProxyHttp>::CTX: Send + Sync,
654    {
655        if !self.inner.suppress_error_log(session, ctx, &e) {
656            error!(
657                "{context} {}, {}",
658                e,
659                self.inner.request_summary(session, ctx)
660            );
661        }
662        self.inner.fail_to_proxy(session, &e, ctx).await;
663        self.inner.logging(session, Some(&e), ctx).await;
664    }
665}
666
667/* Make process_subrequest() a trait to workaround https://github.com/rust-lang/rust/issues/78649
668   if process_subrequest() is implemented as a member of HttpProxy, rust complains
669
670error[E0391]: cycle detected when computing type of `proxy_cache::<impl at pingora-proxy/src/proxy_cache.rs:7:1: 7:23>::proxy_cache::{opaque#0}`
671   --> pingora-proxy/src/proxy_cache.rs:13:10
672    |
67313  |     ) -> Option<(bool, Option<Box<Error>>)>
674
675*/
676#[async_trait]
677trait Subrequest {
678    async fn process_subrequest(
679        self: &Arc<Self>,
680        session: Box<HttpSession>,
681        sub_req_ctx: Box<SubReqCtx>,
682    );
683}
684
685#[async_trait]
686impl<SV> Subrequest for HttpProxy<SV>
687where
688    SV: ProxyHttp + Send + Sync + 'static,
689    <SV as ProxyHttp>::CTX: Send + Sync,
690{
691    async fn process_subrequest(
692        self: &Arc<Self>,
693        session: Box<HttpSession>,
694        sub_req_ctx: Box<SubReqCtx>,
695    ) {
696        debug!("starting subrequest");
697        let mut session = match self.handle_new_request(session).await {
698            Some(downstream_session) => Session::new(downstream_session, &self.downstream_modules),
699            None => return, // bad request
700        };
701
702        // no real downstream to keepalive, but it doesn't matter what is set here because at the end
703        // of this fn the dummy connection will be dropped
704        session.set_keepalive(None);
705
706        session.subrequest_ctx.replace(sub_req_ctx);
707        trace!("processing subrequest");
708        let ctx = self.inner.new_ctx();
709        self.process_request(session, ctx).await;
710        trace!("subrequest done");
711    }
712}
713
714#[async_trait]
715impl<SV> HttpServerApp for HttpProxy<SV>
716where
717    SV: ProxyHttp + Send + Sync + 'static,
718    <SV as ProxyHttp>::CTX: Send + Sync,
719{
720    async fn process_new_http(
721        self: &Arc<Self>,
722        session: HttpSession,
723        shutdown: &ShutdownWatch,
724    ) -> Option<Stream> {
725        let session = Box::new(session);
726
727        // TODO: keepalive pool, use stack
728        let mut session = match self.handle_new_request(session).await {
729            Some(downstream_session) => Session::new(downstream_session, &self.downstream_modules),
730            None => return None, // bad request
731        };
732
733        if *shutdown.borrow() {
734            // stop downstream from reusing if this service is shutting down soon
735            session.set_keepalive(None);
736        } else {
737            // default 60s
738            session.set_keepalive(Some(60));
739        }
740
741        let ctx = self.inner.new_ctx();
742        self.process_request(session, ctx).await
743    }
744
745    async fn http_cleanup(&self) {
746        // Notify all keepalived requests blocking on read_request() to abort
747        self.shutdown.notify_waiters();
748
749        // TODO: impl shutting down flag so that we don't need to read stack.is_shutting_down()
750    }
751
752    fn server_options(&self) -> Option<&HttpServerOptions> {
753        self.server_options.as_ref()
754    }
755
756    // TODO implement h2_options
757}
758
759use pingora_core::services::listening::Service;
760
761/// Create a [Service] from the user implemented [ProxyHttp].
762///
763/// The returned [Service] can be hosted by a [pingora_core::server::Server] directly.
764pub fn http_proxy_service<SV>(conf: &Arc<ServerConf>, inner: SV) -> Service<HttpProxy<SV>>
765where
766    SV: ProxyHttp,
767{
768    http_proxy_service_with_name(conf, inner, "Pingora HTTP Proxy Service")
769}
770
771/// Create a [Service] from the user implemented [ProxyHttp].
772///
773/// The returned [Service] can be hosted by a [pingora_core::server::Server] directly.
774pub fn http_proxy_service_with_name<SV>(
775    conf: &Arc<ServerConf>,
776    inner: SV,
777    name: &str,
778) -> Service<HttpProxy<SV>>
779where
780    SV: ProxyHttp,
781{
782    let mut proxy = HttpProxy::new(inner, conf.clone());
783    proxy.handle_init_modules();
784    Service::new(name.to_string(), proxy)
785}