Skip to main content

pingap_proxy/
server.rs

1// Copyright 2024-2025 Tree xie.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(feature = "tracing")]
16use super::tracing::{
17    initialize_telemetry, inject_telemetry_headers, set_otel_request_attrs,
18    set_otel_upstream_attrs, update_otel_cache_attrs,
19};
20use super::{LOG_TARGET, ServerConf, set_append_proxy_headers};
21use crate::ServerLocationsProvider;
22use async_trait::async_trait;
23use bstr::ByteSlice;
24use bytes::Bytes;
25use bytes::BytesMut;
26use http::StatusCode;
27use pingap_acme::handle_lets_encrypt;
28use pingap_certificate::CertificateProvider;
29use pingap_certificate::{GlobalCertificate, TlsSettingParams};
30use pingap_config::ConfigManager;
31use pingap_core::BackgroundTask;
32#[cfg(feature = "tracing")]
33use pingap_core::HttpResponse;
34use pingap_core::LocationInstance;
35use pingap_core::PluginProvider;
36use pingap_core::{
37    CompressionStat, Ctx, PluginStep, RequestPluginResult,
38    ResponseBodyPluginResult, ResponsePluginResult, get_cache_key,
39};
40use pingap_core::{HTTP_HEADER_NAME_X_REQUEST_ID, get_digest_detail};
41use pingap_core::{Plugin, new_internal_error};
42use pingap_location::{Location, LocationProvider};
43use pingap_logger::{Parser, parse_access_log_directive};
44#[cfg(feature = "tracing")]
45use pingap_otel::{KeyValue, trace::Span};
46#[cfg(feature = "tracing")]
47use pingap_performance::{
48    Prometheus, new_prometheus, new_prometheus_push_service,
49};
50use pingap_performance::{accept_request, end_request};
51use pingap_upstream::{Upstream, UpstreamProvider};
52use pingora::apps::HttpServerOptions;
53use pingora::cache::cache_control::DirectiveValue;
54use pingora::cache::cache_control::{CacheControl, InterpretCacheControl};
55use pingora::cache::filters::resp_cacheable;
56use pingora::cache::key::CacheHashKey;
57use pingora::cache::{
58    CacheKey, CacheMetaDefaults, NoCacheReason, RespCacheable,
59};
60use pingora::http::{RequestHeader, ResponseHeader};
61use pingora::listeners::TcpSocketOptions;
62use pingora::modules::http::HttpModules;
63use pingora::modules::http::compression::{
64    ResponseCompression, ResponseCompressionBuilder,
65};
66use pingora::modules::http::grpc_web::{GrpcWeb, GrpcWebBridge};
67use pingora::protocols::Digest;
68use pingora::protocols::http::error_resp;
69use pingora::proxy::{FailToProxy, HttpProxy, http_proxy_service};
70use pingora::proxy::{ProxyHttp, Session};
71use pingora::server::configuration;
72use pingora::services::listening::Service;
73use pingora::upstreams::peer::{HttpPeer, Peer};
74use scopeguard::defer;
75use snafu::Snafu;
76use std::sync::Arc;
77use std::sync::LazyLock;
78use std::sync::atomic::{AtomicI32, AtomicU64, Ordering};
79use std::time::{Duration, Instant};
80use tokio::sync::mpsc::Sender;
81use tracing::{debug, error, info};
82
83#[derive(Debug, Snafu)]
84pub enum Error {
85    #[snafu(display("Common error, category: {category}, {message}"))]
86    Common { category: String, message: String },
87}
88type Result<T, E = Error> = std::result::Result<T, E>;
89
90#[inline]
91pub fn get_start_time(started_at: &Instant) -> i32 {
92    // the offset of start time
93    let value = started_at.elapsed().as_millis() as i32;
94    if value == 0 {
95        return -1;
96    }
97    -value
98}
99
100#[inline]
101pub fn get_latency(started_at: &Instant, value: &Option<i32>) -> Option<i32> {
102    let Some(value) = value else {
103        return None;
104    };
105    if *value >= 0 {
106        return None;
107    }
108    let latency = started_at.elapsed().as_millis() as i32 + *value;
109
110    Some(latency)
111}
112
113/// Core HTTP proxy server implementation that handles request processing, caching, and monitoring.
114/// Manages server configuration, connection lifecycle, and integration with various modules.
115pub struct Server {
116    /// Server name identifier used for logging and metrics
117    name: String,
118
119    /// Whether this instance serves admin endpoints and functionality
120    admin: bool,
121
122    /// Comma-separated list of listening addresses (e.g. "127.0.0.1:8080,127.0.0.1:8081")
123    addr: String,
124
125    /// Counter tracking total number of accepted connections since server start
126    accepted: AtomicU64,
127
128    /// Counter tracking number of currently active request processing operations
129    processing: AtomicI32,
130
131    /// Optional parser for customizing access log format and output
132    log_parser: Option<Parser>,
133
134    /// HTML/JSON template used for rendering error responses
135    error_template: String,
136
137    /// Number of worker threads for request processing. None uses default.
138    threads: Option<usize>,
139
140    /// OpenSSL cipher list string for TLS connections
141    tls_cipher_list: Option<String>,
142
143    /// TLS 1.3 cipher suites configuration
144    tls_ciphersuites: Option<String>,
145
146    /// Minimum TLS protocol version to accept (e.g. "TLSv1.2")
147    tls_min_version: Option<String>,
148
149    /// Maximum TLS protocol version to accept
150    tls_max_version: Option<String>,
151
152    /// Whether HTTP/2 protocol is enabled
153    enabled_h2: bool,
154
155    /// Whether Let's Encrypt certificate automation is enabled
156    lets_encrypt_enabled: bool,
157
158    /// Whether to use global certificate store for TLS
159    global_certificates: bool,
160
161    /// TCP socket configuration options (keepalive, TCP fastopen etc)
162    tcp_socket_options: Option<TcpSocketOptions>,
163
164    /// Prometheus metrics registry when metrics collection is enabled
165    #[cfg(feature = "tracing")]
166    prometheus: Option<Arc<Prometheus>>,
167
168    /// Whether to push metrics to remote Prometheus pushgateway
169    prometheus_push_mode: bool,
170
171    /// Prometheus metrics endpoint path or push gateway URL
172    #[cfg(feature = "tracing")]
173    prometheus_metrics: String,
174
175    /// Whether OpenTelemetry tracing is enabled
176    #[cfg(feature = "tracing")]
177    enabled_otel: bool,
178
179    /// List of enabled modules (e.g. "grpc-web")
180    modules: Option<Vec<String>>,
181
182    /// Whether to enable server-timing header
183    enable_server_timing: bool,
184
185    // downstream read timeout
186    downstream_read_timeout: Option<Duration>,
187    // downstream write timeout
188    downstream_write_timeout: Option<Duration>,
189
190    // server locations
191    server_locations_provider: Arc<dyn ServerLocationsProvider>,
192    // plugin loader
193    plugin_provider: Arc<dyn PluginProvider>,
194
195    // locations
196    location_provider: Arc<dyn LocationProvider>,
197
198    // upstreams
199    upstream_provider: Arc<dyn UpstreamProvider>,
200
201    // certificates
202    certificate_provider: Arc<dyn CertificateProvider>,
203
204    // config manager
205    config_manager: Arc<ConfigManager>,
206
207    // logger
208    access_logger: Option<Sender<BytesMut>>,
209}
210
211pub struct ServerServices {
212    pub lb: Service<HttpProxy<Server>>,
213}
214
215const META_DEFAULTS: CacheMetaDefaults =
216    CacheMetaDefaults::new(|_| Some(Duration::from_secs(1)), 1, 1);
217
218static HTTP_500_RESPONSE: LazyLock<ResponseHeader> =
219    LazyLock::new(|| error_resp::gen_error_response(500));
220
221#[derive(Clone)]
222pub struct AppContext {
223    pub logger: Option<Sender<BytesMut>>,
224    pub config_manager: Arc<ConfigManager>,
225    pub server_locations_provider: Arc<dyn ServerLocationsProvider>,
226    pub location_provider: Arc<dyn LocationProvider>,
227    pub upstream_provider: Arc<dyn UpstreamProvider>,
228    pub plugin_provider: Arc<dyn PluginProvider>,
229    pub certificate_provider: Arc<dyn CertificateProvider>,
230}
231
232impl Server {
233    /// Creates a new HTTP proxy server instance with the given configuration.
234    /// Initializes all server components including:
235    /// - TCP socket options
236    /// - TLS settings
237    /// - Prometheus metrics (if enabled)
238    /// - Threading configuration
239    pub fn new(conf: &ServerConf, ctx: AppContext) -> Result<Self> {
240        debug!(target: LOG_TARGET, config = conf.to_string(), "new server");
241        let mut p = None;
242        let (access_log, _) =
243            parse_access_log_directive(conf.access_log.as_ref());
244        if let Some(access_log) = access_log {
245            p = Some(Parser::from(access_log.as_str()));
246        }
247        let tcp_socket_options = if conf.tcp_fastopen.is_some()
248            || conf.tcp_keepalive.is_some()
249            || conf.reuse_port.is_some()
250        {
251            let mut opts = TcpSocketOptions::default();
252            opts.tcp_fastopen = conf.tcp_fastopen;
253            opts.tcp_keepalive.clone_from(&conf.tcp_keepalive);
254            opts.so_reuseport = conf.reuse_port;
255            Some(opts)
256        } else {
257            None
258        };
259        let prometheus_metrics =
260            conf.prometheus_metrics.clone().unwrap_or_default();
261        #[cfg(feature = "tracing")]
262        let prometheus = if prometheus_metrics.is_empty() {
263            None
264        } else {
265            let p = new_prometheus(&conf.name).map_err(|e| Error::Common {
266                category: "prometheus".to_string(),
267                message: e.to_string(),
268            })?;
269            Some(Arc::new(p))
270        };
271        let s = Server {
272            name: conf.name.clone(),
273            admin: conf.admin,
274            accepted: AtomicU64::new(0),
275            processing: AtomicI32::new(0),
276            addr: conf.addr.clone(),
277            log_parser: p,
278            error_template: conf.error_template.clone(),
279            tls_cipher_list: conf.tls_cipher_list.clone(),
280            tls_ciphersuites: conf.tls_ciphersuites.clone(),
281            tls_min_version: conf.tls_min_version.clone(),
282            tls_max_version: conf.tls_max_version.clone(),
283            threads: conf.threads,
284            lets_encrypt_enabled: false,
285            global_certificates: conf.global_certificates,
286            enabled_h2: conf.enabled_h2,
287            tcp_socket_options,
288            prometheus_push_mode: prometheus_metrics.contains("://"),
289            #[cfg(feature = "tracing")]
290            enabled_otel: conf.otlp_exporter.is_some(),
291            #[cfg(feature = "tracing")]
292            prometheus_metrics,
293            #[cfg(feature = "tracing")]
294            prometheus,
295            enable_server_timing: conf.enable_server_timing,
296            modules: conf.modules.clone(),
297            downstream_read_timeout: conf.downstream_read_timeout,
298            downstream_write_timeout: conf.downstream_write_timeout,
299            server_locations_provider: ctx.server_locations_provider,
300            location_provider: ctx.location_provider,
301            upstream_provider: ctx.upstream_provider,
302            plugin_provider: ctx.plugin_provider,
303            certificate_provider: ctx.certificate_provider,
304            access_logger: ctx.logger,
305            config_manager: ctx.config_manager,
306        };
307        Ok(s)
308    }
309    /// Enable lets encrypt proxy plugin for handling ACME challenges at
310    /// `/.well-known/acme-challenge` path
311    pub fn enable_lets_encrypt(&mut self) {
312        self.lets_encrypt_enabled = true;
313    }
314    /// Get the prometheus push service configuration if enabled.
315    /// Returns a tuple of (metrics endpoint, service future) if push mode is configured.
316    pub fn get_prometheus_push_service(
317        &self,
318    ) -> Option<Box<dyn BackgroundTask>> {
319        if !self.prometheus_push_mode {
320            return None;
321        }
322        cfg_if::cfg_if! {
323            if #[cfg(feature = "tracing")] {
324                let Some(prometheus) = &self.prometheus else {
325                    return None;
326                };
327                match new_prometheus_push_service(
328                    &self.name,
329                    &self.prometheus_metrics,
330                    prometheus.clone(),
331                ) {
332                    Ok(service) => Some(service),
333                    Err(e) => {
334                        error!(
335                            target: LOG_TARGET,
336                            error = %e,
337                            name = self.name,
338                            "new prometheus push service fail"
339                        );
340                        None
341                    },
342                }
343            } else {
344               None
345            }
346        }
347    }
348
349    /// Starts the server and sets up TCP/TLS listening endpoints.
350    /// - Configures listeners for each address
351    /// - Sets up TLS if enabled
352    /// - Initializes HTTP/2 support
353    /// - Configures thread pool
354    pub fn run(
355        self,
356        conf: Arc<configuration::ServerConf>,
357    ) -> Result<ServerServices> {
358        let addr = self.addr.clone();
359        let tcp_socket_options = self.tcp_socket_options.clone();
360
361        let name = self.name.clone();
362        let mut dynamic_cert = None;
363        // tls
364        if self.global_certificates {
365            dynamic_cert =
366                Some(GlobalCertificate::new(self.certificate_provider.clone()));
367        }
368
369        let is_tls = dynamic_cert.is_some();
370
371        let enabled_h2 = self.enabled_h2;
372        let threads = if let Some(threads) = self.threads {
373            // use cpus when set threads:0
374            let value = if threads == 0 {
375                num_cpus::get()
376            } else {
377                threads
378            };
379            Some(value)
380        } else {
381            None
382        };
383
384        info!(
385            target: LOG_TARGET,
386            name,
387            addr,
388            threads,
389            is_tls,
390            h2 = enabled_h2,
391            tcp_socket_options = format!("{:?}", tcp_socket_options),
392            "server is listening"
393        );
394        let cipher_list = self.tls_cipher_list.clone();
395        let cipher_suites = self.tls_ciphersuites.clone();
396        let tls_min_version = self.tls_min_version.clone();
397        let tls_max_version = self.tls_max_version.clone();
398        let mut lb = http_proxy_service(&conf, self);
399        // use h2c if not tls and enable http2
400        if !is_tls
401            && enabled_h2
402            && let Some(http_logic) = lb.app_logic_mut()
403        {
404            let mut http_server_options = HttpServerOptions::default();
405            http_server_options.h2c = true;
406            http_logic.server_options = Some(http_server_options);
407        }
408        lb.threads = threads;
409        // support listen multi address
410        for addr in addr.split(',') {
411            // tls
412            if let Some(dynamic_cert) = &dynamic_cert {
413                let tls_settings = dynamic_cert
414                    .new_tls_settings(&TlsSettingParams {
415                        server_name: name.clone(),
416                        enabled_h2,
417                        cipher_list: cipher_list.clone(),
418                        cipher_suites: cipher_suites.clone(),
419                        tls_min_version: tls_min_version.clone(),
420                        tls_max_version: tls_max_version.clone(),
421                    })
422                    .map_err(|e| Error::Common {
423                        category: "tls".to_string(),
424                        message: e.to_string(),
425                    })?;
426                lb.add_tls_with_settings(
427                    addr,
428                    tcp_socket_options.clone(),
429                    tls_settings,
430                );
431            } else if let Some(opt) = &tcp_socket_options {
432                lb.add_tcp_with_settings(addr, opt.clone());
433            } else {
434                lb.add_tcp(addr);
435            }
436        }
437        Ok(ServerServices { lb })
438    }
439    /// Handles requests to the admin interface.
440    /// Processes admin-specific plugins and returns response if handled.
441    async fn serve_admin(
442        &self,
443        session: &mut Session,
444        ctx: &mut Ctx,
445    ) -> pingora::Result<bool> {
446        if let Some(plugin) = self.plugin_provider.get("pingap:admin") {
447            let result = plugin
448                .handle_request(PluginStep::Request, session, ctx)
449                .await?;
450            if let RequestPluginResult::Respond(resp) = result {
451                ctx.state.status = Some(resp.status);
452                resp.send(session).await?;
453                return Ok(true);
454            }
455        }
456        Ok(false)
457    }
458    #[inline]
459    fn initialize_context(&self, session: &mut Session, ctx: &mut Ctx) {
460        session.set_read_timeout(self.downstream_read_timeout);
461        session.set_write_timeout(self.downstream_write_timeout);
462
463        if let Some(stream) = session.stream() {
464            ctx.conn.id = stream.id() as usize;
465        }
466        // get digest of timing and tls
467        if let Some(digest) = session.digest() {
468            let digest_detail = get_digest_detail(digest);
469            ctx.timing.connection_duration = digest_detail.connection_time;
470            ctx.conn.reused = digest_detail.connection_reused;
471
472            if !ctx.conn.reused
473                && digest_detail.tls_established
474                    >= digest_detail.tcp_established
475            {
476                let latency = digest_detail.tls_established
477                    - digest_detail.tcp_established;
478                ctx.timing.tls_handshake = Some(latency as i32);
479            }
480            ctx.conn.tls_cipher = digest_detail.tls_cipher;
481            ctx.conn.tls_version = digest_detail.tls_version;
482        };
483        accept_request();
484
485        ctx.state.processing_count =
486            self.processing.fetch_add(1, Ordering::Relaxed) + 1;
487        ctx.state.accepted_count =
488            self.accepted.fetch_add(1, Ordering::Relaxed) + 1;
489        if let Some((remote_addr, remote_port)) =
490            pingap_core::get_remote_addr(session)
491        {
492            ctx.conn.remote_addr = Some(remote_addr);
493            ctx.conn.remote_port = Some(remote_port);
494        }
495        if let Some(addr) =
496            session.server_addr().and_then(|addr| addr.as_inet())
497        {
498            ctx.conn.server_addr = Some(addr.ip().to_string());
499            ctx.conn.server_port = Some(addr.port());
500        }
501    }
502
503    #[inline]
504    async fn find_and_apply_location(
505        &self,
506        session: &mut Session,
507        ctx: &mut Ctx,
508    ) -> pingora::Result<()> {
509        let header = session.req_header();
510        let host = pingap_core::get_host(header).unwrap_or_default();
511        let path = header.uri.path();
512
513        // locations not found
514        let Some(locations) = self.server_locations_provider.get(&self.name)
515        else {
516            return Ok(());
517        };
518
519        // use find_map to optimize logic, performance and readability
520        let matched_info = locations.iter().find_map(|name| {
521            let location = self.location_provider.get(name)?;
522            let (matched, captures) = location.match_host_path(host, path);
523            if matched {
524                Some((location, captures))
525            } else {
526                None
527            }
528        });
529
530        let Some((location, captures)) = matched_info else {
531            return Ok(());
532        };
533
534        // only execute all subsequent operations after successtracingy matching location
535        ctx.upstream.location = location.name.clone();
536        ctx.upstream.location_instance = Some(location.clone());
537        ctx.upstream.max_retries = location.max_retries;
538        ctx.upstream.max_retry_window = location.max_retry_window;
539        if let Some(captures) = captures {
540            ctx.extend_variables(captures);
541        }
542
543        debug!(
544            target: LOG_TARGET,
545            "variables: {:?}",
546            ctx.features.as_ref().map(|item| &item.variables)
547        );
548
549        // set prometheus stats
550        #[cfg(feature = "tracing")]
551        if let Some(prom) = &self.prometheus {
552            prom.before(&ctx.upstream.location);
553        }
554
555        // validate content length
556        location
557            .validate_content_length(header)
558            .map_err(|e| new_internal_error(413, e))?;
559
560        // limit processing
561        let (accepted, processing) = location.on_request()?;
562        ctx.state.location_accepted_count = accepted;
563        ctx.state.location_processing_count = processing;
564
565        // initialize gRPC Web
566        if location.support_grpc_web() {
567            let grpc_web = session
568                .downstream_modules_ctx
569                .get_mut::<GrpcWebBridge>()
570                .ok_or_else(|| {
571                    new_internal_error(
572                        500,
573                        "grpc web bridge module should be added",
574                    )
575                })?;
576            grpc_web.init();
577        }
578
579        // initialize plugins and execute
580        ctx.plugins = self.get_context_plugins(location.clone(), session);
581        let _ = self
582            .handle_request_plugin(PluginStep::EarlyRequest, session, ctx)
583            .await?;
584
585        Ok(())
586    }
587
588    #[inline]
589    async fn handle_admin_request(
590        &self,
591        session: &mut Session,
592        ctx: &mut Ctx,
593    ) -> Option<pingora::Result<bool>> {
594        if self.admin {
595            match self.serve_admin(session, ctx).await {
596                Ok(true) => return Some(Ok(true)), // handled
597                Ok(false) => {}, // not admin request, continue
598                Err(e) => return Some(Err(e)), // error
599            }
600        }
601        None // not admin service, continue
602    }
603    #[inline]
604    async fn handle_acme_challenge(
605        &self,
606        session: &mut Session,
607        ctx: &mut Ctx,
608    ) -> Option<pingora::Result<bool>> {
609        if self.lets_encrypt_enabled {
610            return match handle_lets_encrypt(
611                self.config_manager.clone(),
612                session,
613                ctx,
614            )
615            .await
616            {
617                Ok(true) => Some(Ok(true)), // handle ACME request
618                Ok(false) => None,          // not ACME request, continue
619                Err(e) => Some(Err(e)),
620            };
621        }
622        None // not enable ACME, continue
623    }
624    #[inline]
625    #[cfg(feature = "tracing")]
626    async fn handle_metrics_request(
627        &self,
628        session: &mut Session,
629        _ctx: &mut Ctx,
630    ) -> Option<pingora::Result<bool>> {
631        let header = session.req_header();
632        let should_handle = !self.prometheus_push_mode
633            && self.prometheus.is_some()
634            && header.uri.path() == self.prometheus_metrics;
635
636        if should_handle {
637            let prom = self.prometheus.as_ref().unwrap(); // is_some() 检查已通过
638            let result = async {
639                let body =
640                    prom.metrics().map_err(|e| new_internal_error(500, e))?;
641                HttpResponse::text(body).send(session).await?;
642                Ok(true)
643            }
644            .await;
645            return Some(result);
646        }
647        None
648    }
649    #[inline]
650    async fn handle_standard_request(
651        &self,
652        session: &mut Session,
653        ctx: &mut Ctx,
654    ) -> pingora::Result<bool> {
655        let Some(location) = &ctx.upstream.location_instance else {
656            let header = session.req_header();
657            let host = pingap_core::get_host(header).unwrap_or_default();
658            let message = format!(
659                "No matching location, host:{host} path:{}",
660                header.uri.path()
661            );
662            return Err(pingap_core::new_internal_error(500, message));
663        };
664
665        debug!(
666            target: LOG_TARGET,
667            server = self.name,
668            location = location.name(),
669            "location is matched"
670        );
671
672        let variables = if let Some(features) = &mut ctx.features {
673            features.variables.take()
674        } else {
675            None
676        };
677
678        let (_, capture_variables) =
679            location.rewrite(session.req_header_mut(), variables);
680        if let Some(capture_variables) = capture_variables {
681            ctx.extend_variables(capture_variables);
682        }
683
684        if self
685            .handle_request_plugin(PluginStep::Request, session, ctx)
686            .await?
687        {
688            return Ok(true);
689        }
690
691        Ok(false)
692    }
693}
694
695const MODULE_GRPC_WEB: &str = "grpc-web";
696
697impl Server {
698    #[inline]
699    fn get_context_plugins(
700        &self,
701        location: Arc<Location>,
702        session: &Session,
703    ) -> Option<Vec<(String, Arc<dyn Plugin>)>> {
704        if session.is_upgrade_req() {
705            return None;
706        }
707        let plugins = location.plugins.as_ref()?;
708
709        let location_plugins: Vec<_> = plugins
710            .iter()
711            .filter_map(|name| {
712                self.plugin_provider
713                    .get(name)
714                    .map(|plugin| (name.clone(), plugin))
715            })
716            .collect();
717
718        // use then_some to handle empty collection
719        (!location_plugins.is_empty()).then_some(location_plugins)
720    }
721
722    /// Executes request plugins in the configured chain
723    /// Returns true if a plugin handled the request completely
724    #[inline]
725    pub async fn handle_request_plugin(
726        &self,
727        step: PluginStep,
728        session: &mut Session,
729        ctx: &mut Ctx,
730    ) -> pingora::Result<bool> {
731        let plugins = match ctx.plugins.take() {
732            Some(p) => p,
733            None => return Ok(false), // No plugins, exit early.
734        };
735        if plugins.is_empty() {
736            return Ok(false);
737        }
738
739        let result = async {
740            let mut request_done = false;
741            for (name, plugin) in plugins.iter() {
742                let now = Instant::now();
743                let result = plugin.handle_request(step, session, ctx).await?;
744                let elapsed = now.elapsed().as_millis() as u32;
745
746                // extract repeated logging and timing logic
747                let mut record_time = |msg: &str| {
748                    debug!(
749                        target: LOG_TARGET,
750                        name,
751                        elapsed,
752                        step = step.to_string(),
753                        "{msg}"
754                    );
755                    ctx.add_plugin_processing_time(name, elapsed);
756                };
757
758                match result {
759                    RequestPluginResult::Skipped => {
760                        continue;
761                    },
762                    RequestPluginResult::Respond(resp) => {
763                        record_time("request plugin create new response");
764                        // ignore status >= 900
765                        if resp.status.as_u16() < 900 {
766                            ctx.state.status = Some(resp.status);
767                            resp.send(session).await?;
768                        }
769                        request_done = true;
770                        break;
771                    },
772                    RequestPluginResult::Continue => {
773                        record_time("request plugin run and continue request");
774                    },
775                }
776            }
777            Ok(request_done)
778        }
779        .await;
780        ctx.plugins = Some(plugins);
781        result
782    }
783
784    /// Run response plugins
785    #[inline]
786    pub async fn handle_response_plugin(
787        &self,
788        session: &mut Session,
789        ctx: &mut Ctx,
790        upstream_response: &mut ResponseHeader,
791    ) -> pingora::Result<()> {
792        let plugins = match ctx.plugins.take() {
793            Some(p) => p,
794            None => return Ok(()), // No plugins, exit early.
795        };
796        if plugins.is_empty() {
797            return Ok(());
798        }
799
800        let result = async {
801            for (name, plugin) in plugins.iter() {
802                let now = Instant::now();
803                if let ResponsePluginResult::Modified = plugin
804                    .handle_response(session, ctx, upstream_response)
805                    .await?
806                {
807                    let elapsed = now.elapsed().as_millis() as u32;
808                    debug!(
809                        target: LOG_TARGET,
810                        name, elapsed, "response plugin modify headers"
811                    );
812                    ctx.add_plugin_processing_time(name, elapsed);
813                };
814            }
815            Ok(())
816        }
817        .await;
818        ctx.plugins = Some(plugins);
819        result
820    }
821
822    #[inline]
823    pub fn handle_upstream_response_plugin(
824        &self,
825        session: &mut Session,
826        ctx: &mut Ctx,
827        upstream_response: &mut ResponseHeader,
828    ) -> pingora::Result<()> {
829        let plugins = match ctx.plugins.take() {
830            Some(p) => p,
831            None => return Ok(()), // No plugins, exit early.
832        };
833        if plugins.is_empty() {
834            return Ok(());
835        }
836
837        let result = {
838            for (name, plugin) in plugins.iter() {
839                let now = Instant::now();
840                if let ResponsePluginResult::Modified = plugin
841                    .handle_upstream_response(session, ctx, upstream_response)?
842                {
843                    let elapsed = now.elapsed().as_millis() as u32;
844                    debug!(
845                        target: LOG_TARGET,
846                        name,
847                        elapsed,
848                        "upstream response plugin modify headers"
849                    );
850                    ctx.add_plugin_processing_time(name, elapsed);
851                };
852            }
853            Ok(())
854        };
855        ctx.plugins = Some(plugins);
856        result
857    }
858
859    #[inline]
860    pub fn handle_upstream_response_body_plugin(
861        &self,
862        session: &mut Session,
863        ctx: &mut Ctx,
864        body: &mut Option<bytes::Bytes>,
865        end_of_stream: bool,
866    ) -> pingora::Result<()> {
867        let plugins = match ctx.plugins.take() {
868            Some(p) => p,
869            None => return Ok(()), // No plugins, exit early.
870        };
871        if plugins.is_empty() {
872            return Ok(());
873        }
874
875        let result = {
876            for (name, plugin) in plugins.iter() {
877                let now = Instant::now();
878                match plugin.handle_upstream_response_body(
879                    session,
880                    ctx,
881                    body,
882                    end_of_stream,
883                )? {
884                    ResponseBodyPluginResult::PartialReplaced
885                    | ResponseBodyPluginResult::FullyReplaced => {
886                        let elapsed = now.elapsed().as_millis() as u32;
887                        ctx.add_plugin_processing_time(name, elapsed);
888                        debug!(
889                            target: LOG_TARGET,
890                            name, elapsed, "response body plugin modify body"
891                        );
892                    },
893                    _ => {},
894                }
895            }
896            Ok(())
897        };
898        ctx.plugins = Some(plugins);
899        result
900    }
901
902    #[inline]
903    pub fn handle_response_body_plugin(
904        &self,
905        session: &mut Session,
906        ctx: &mut Ctx,
907        body: &mut Option<bytes::Bytes>,
908        end_of_stream: bool,
909    ) -> pingora::Result<()> {
910        let plugins = match ctx.plugins.take() {
911            Some(p) => p,
912            None => return Ok(()), // No plugins, exit early.
913        };
914        if plugins.is_empty() {
915            return Ok(());
916        }
917        let result = {
918            for (name, plugin) in plugins.iter() {
919                let now = Instant::now();
920                match plugin.handle_response_body(
921                    session,
922                    ctx,
923                    body,
924                    end_of_stream,
925                )? {
926                    ResponseBodyPluginResult::PartialReplaced
927                    | ResponseBodyPluginResult::FullyReplaced => {
928                        let elapsed = now.elapsed().as_millis() as u32;
929                        ctx.add_plugin_processing_time(name, elapsed);
930                        debug!(
931                            target: LOG_TARGET,
932                            name, elapsed, "response body plugin modify body"
933                        );
934                    },
935                    _ => {},
936                }
937            }
938            Ok(())
939        };
940        ctx.plugins = Some(plugins);
941        result
942    }
943
944    fn process_cache_control(
945        &self,
946        c: &mut CacheControl,
947        max_ttl: Option<Duration>,
948    ) -> Result<(), NoCacheReason> {
949        // no-cache, no-store, private
950        if c.no_cache() || c.no_store() || c.private() {
951            return Err(NoCacheReason::OriginNotCache);
952        }
953
954        // max-age=0
955        if c.max_age().ok().flatten().unwrap_or_default() == 0 {
956            return Err(NoCacheReason::OriginNotCache);
957        }
958
959        // set cache max ttl
960        if let Some(d) = max_ttl
961            && c.fresh_duration().unwrap_or_default() > d
962        {
963            // 更新 s-maxage 的值
964            let s_maxage_value =
965                itoa::Buffer::new().format(d.as_secs()).as_bytes().to_vec();
966            c.directives.insert(
967                "s-maxage".to_string(),
968                Some(DirectiveValue(s_maxage_value)),
969            );
970        }
971
972        Ok(())
973    }
974
975    #[inline]
976    fn handle_cache_headers(
977        &self,
978        session: &Session,
979        upstream_response: &mut ResponseHeader,
980        ctx: &mut Ctx,
981    ) {
982        let cache_status = session.cache.phase().as_str();
983        let _ = upstream_response.insert_header("x-cache-status", cache_status);
984
985        // process lookup duration
986        let lookup_duration_str = self.process_cache_timing(
987            session.cache.lookup_duration(),
988            "x-cache-lookup",
989            upstream_response,
990            &mut ctx.timing.cache_lookup,
991        );
992
993        // process lock duration
994        let lock_duration_str = self.process_cache_timing(
995            session.cache.lock_duration(),
996            "x-cache-lock",
997            upstream_response,
998            &mut ctx.timing.cache_lock,
999        );
1000
1001        #[cfg(not(feature = "tracing"))]
1002        {
1003            let _ = lookup_duration_str;
1004            let _ = lock_duration_str;
1005        }
1006
1007        // (optional) process OpenTelemetry
1008        #[cfg(feature = "tracing")]
1009        update_otel_cache_attrs(
1010            ctx,
1011            cache_status,
1012            lookup_duration_str,
1013            lock_duration_str,
1014        );
1015    }
1016
1017    #[inline]
1018    fn process_cache_timing(
1019        &self,
1020        duration_opt: Option<Duration>,
1021        header_name: &'static str,
1022        resp: &mut ResponseHeader,
1023        ctx_field: &mut Option<i32>,
1024    ) -> String {
1025        if let Some(d) = duration_opt {
1026            let ms = d.as_millis() as i32;
1027
1028            // use itoa to avoid format! heap memory allocation
1029            let mut buffer = itoa::Buffer::new();
1030            let mut value_bytes = Vec::with_capacity(6);
1031            value_bytes.extend_from_slice(buffer.format(ms).as_bytes());
1032            value_bytes.extend_from_slice(b"ms");
1033
1034            let _ = resp.insert_header(header_name, value_bytes);
1035            *ctx_field = Some(ms);
1036
1037            #[cfg(feature = "tracing")]
1038            return humantime::Duration::from(d).to_string();
1039        }
1040        String::new()
1041    }
1042}
1043
1044#[inline]
1045fn get_upstream_with_variables(
1046    upstream: &str,
1047    ctx: &Ctx,
1048    upstreams: &dyn UpstreamProvider,
1049) -> Option<Arc<Upstream>> {
1050    let key = upstream
1051        .strip_prefix('$')
1052        .and_then(|var_name| ctx.get_variable(var_name))
1053        .unwrap_or(upstream);
1054    upstreams.get(key)
1055}
1056
1057#[async_trait]
1058impl ProxyHttp for Server {
1059    type CTX = Ctx;
1060    fn new_ctx(&self) -> Self::CTX {
1061        debug!(target: LOG_TARGET, "new ctx");
1062        Ctx::new()
1063    }
1064    fn init_downstream_modules(&self, modules: &mut HttpModules) {
1065        debug!(target: LOG_TARGET, "--> init downstream modules");
1066        defer!(debug!(target: LOG_TARGET, "<-- init downstream modules"););
1067        // Add disabled downstream compression module by default
1068        modules.add_module(ResponseCompressionBuilder::enable(0));
1069
1070        self.modules.iter().flatten().for_each(|item| {
1071            if item == MODULE_GRPC_WEB {
1072                modules.add_module(Box::new(GrpcWeb));
1073            }
1074        });
1075    }
1076    /// Handles early request processing before main request handling.
1077    /// Key responsibilities:
1078    /// - Sets up connection tracking and metrics
1079    /// - Records timing information
1080    /// - Initializes OpenTelemetry tracing
1081    /// - Matches request to location configuration
1082    /// - Validates request parameters
1083    /// - Initializes compression and gRPC modules if needed
1084    async fn early_request_filter(
1085        &self,
1086        session: &mut Session,
1087        ctx: &mut Self::CTX,
1088    ) -> pingora::Result<()>
1089    where
1090        Self::CTX: Send + Sync,
1091    {
1092        debug!(target: LOG_TARGET, "--> early request filter");
1093        defer!(debug!(target: LOG_TARGET, "<-- early request filter"););
1094
1095        self.initialize_context(session, ctx);
1096        #[cfg(feature = "tracing")]
1097        if self.enabled_otel {
1098            initialize_telemetry(&self.name, session, ctx);
1099        }
1100        self.find_and_apply_location(session, ctx).await?;
1101
1102        Ok(())
1103    }
1104    /// Main request processing filter.
1105    /// Handles:
1106    /// - Admin interface requests
1107    /// - Let's Encrypt certificate challenges
1108    /// - Location-specific processing
1109    /// - URL rewriting
1110    /// - Plugin execution
1111    async fn request_filter(
1112        &self,
1113        session: &mut Session,
1114        ctx: &mut Self::CTX,
1115    ) -> pingora::Result<bool>
1116    where
1117        Self::CTX: Send + Sync,
1118    {
1119        debug!(target: LOG_TARGET, "--> request filter");
1120        defer!(debug!(target: LOG_TARGET, "<-- request filter"););
1121        // try to handle special requests in order
1122        // admin route
1123        if let Some(result) = self.handle_admin_request(session, ctx).await {
1124            return result;
1125        }
1126        // acme http challengt
1127        if let Some(result) = self.handle_acme_challenge(session, ctx).await {
1128            return result;
1129        }
1130        // prometheus metrics pull request
1131        #[cfg(feature = "tracing")]
1132        if let Some(result) = self.handle_metrics_request(session, ctx).await {
1133            return result;
1134        }
1135
1136        self.handle_standard_request(session, ctx).await
1137    }
1138
1139    /// Filters requests before sending to upstream.
1140    /// Allows modifying request before proxying.
1141    async fn proxy_upstream_filter(
1142        &self,
1143        session: &mut Session,
1144        ctx: &mut Self::CTX,
1145    ) -> pingora::Result<bool>
1146    where
1147        Self::CTX: Send + Sync,
1148    {
1149        debug!(target: LOG_TARGET, "--> proxy upstream filter");
1150        defer!(debug!(target: LOG_TARGET, "<-- proxy upstream filter"););
1151        let done = self
1152            .handle_request_plugin(PluginStep::ProxyUpstream, session, ctx)
1153            .await?;
1154
1155        if done {
1156            return Ok(false);
1157        }
1158        Ok(true)
1159    }
1160
1161    /// Selects and configures the upstream peer to proxy to.
1162    /// Handles upstream connection pooling and health checking.
1163    async fn upstream_peer(
1164        &self,
1165        session: &mut Session,
1166        ctx: &mut Ctx,
1167    ) -> pingora::Result<Box<HttpPeer>> {
1168        debug!(target: LOG_TARGET, "--> upstream peer");
1169        defer!(debug!(target: LOG_TARGET, "<-- upstream peer"););
1170
1171        let peer = ctx
1172            .upstream
1173            .location_instance
1174            .clone()
1175            .as_ref()
1176            .and_then(|location| {
1177                let upstream = if ctx.upstream.name.is_empty() {
1178                    get_upstream_with_variables(
1179                        location.upstream(),
1180                        ctx,
1181                        self.upstream_provider.as_ref(),
1182                    )?
1183                } else {
1184                    // override upstream by other plugin
1185                    get_upstream_with_variables(
1186                        &ctx.upstream.name,
1187                        ctx,
1188                        self.upstream_provider.as_ref(),
1189                    )?
1190                };
1191                ctx.upstream.upstream_instance = Some(upstream.clone());
1192                Some(upstream)
1193            })
1194            .and_then(|upstream| {
1195                ctx.upstream.connected_count = upstream.connected();
1196                ctx.upstream.name = upstream.name.clone();
1197                #[cfg(feature = "tracing")]
1198                if let Some(features) = &ctx.features
1199                    && let Some(tracer) = &features.otel_tracer
1200                {
1201                    let name = format!("upstream.{}", &upstream.name);
1202                    let mut span = tracer.new_upstream_span(&name);
1203                    span.set_attribute(KeyValue::new(
1204                        "upstream.connected",
1205                        ctx.upstream.connected_count.unwrap_or_default() as i64,
1206                    ));
1207                    let features = ctx.features.get_or_insert_default();
1208                    features.upstream_span = Some(span);
1209                }
1210                upstream
1211                    .new_http_peer(session, &ctx.conn.client_ip)
1212                    .inspect(|peer| {
1213                        ctx.upstream.address = peer.address().to_string();
1214                    })
1215            })
1216            .ok_or_else(|| {
1217                new_internal_error(
1218                    503,
1219                    format!(
1220                        "No available upstream for {}",
1221                        &ctx.upstream.location
1222                    ),
1223                )
1224            })?;
1225
1226        // start connect to upstream
1227        ctx.timing.upstream_connect =
1228            Some(get_start_time(&ctx.timing.created_at));
1229
1230        Ok(Box::new(peer))
1231    }
1232    /// Called when connection is established to upstream.
1233    /// Records timing metrics and TLS details.
1234    async fn connected_to_upstream(
1235        &self,
1236        _session: &mut Session,
1237        reused: bool,
1238        _peer: &HttpPeer,
1239        #[cfg(unix)] _fd: std::os::unix::io::RawFd,
1240        #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
1241        digest: Option<&Digest>,
1242        ctx: &mut Self::CTX,
1243    ) -> pingora::Result<()>
1244    where
1245        Self::CTX: Send + Sync,
1246    {
1247        debug!(target: LOG_TARGET, "--> connected to upstream");
1248        defer!(debug!(target: LOG_TARGET, "<-- connected to upstream"););
1249        ctx.timing.upstream_connect =
1250            get_latency(&ctx.timing.created_at, &ctx.timing.upstream_connect);
1251        if let Some(digest) = digest {
1252            ctx.update_upstream_timing_from_digest(digest, reused);
1253        }
1254        ctx.upstream.reused = reused;
1255
1256        // upstream start processing
1257        ctx.timing.upstream_processing =
1258            Some(get_start_time(&ctx.timing.created_at));
1259
1260        Ok(())
1261    }
1262    fn fail_to_connect(
1263        &self,
1264        _session: &mut Session,
1265        peer: &HttpPeer,
1266        ctx: &mut Self::CTX,
1267        mut e: Box<pingora::Error>,
1268    ) -> Box<pingora::Error> {
1269        if let Some(upstream_instance) = &ctx.upstream.upstream_instance {
1270            upstream_instance.on_transport_failure(&peer.address().to_string());
1271        }
1272        let Some(max_retries) = ctx.upstream.max_retries else {
1273            return e;
1274        };
1275        if ctx.upstream.retries >= max_retries {
1276            return e;
1277        }
1278        if let Some(max_retry_window) = ctx.upstream.max_retry_window
1279            && ctx.timing.created_at.elapsed() > max_retry_window
1280        {
1281            return e;
1282        }
1283        ctx.upstream.retries += 1;
1284        e.set_retry(true);
1285        e
1286    }
1287    /// Filters upstream request before sending.
1288    /// Adds proxy headers and performs any request modifications.
1289    async fn upstream_request_filter(
1290        &self,
1291        session: &mut Session,
1292        upstream_response: &mut RequestHeader,
1293        ctx: &mut Self::CTX,
1294    ) -> pingora::Result<()>
1295    where
1296        Self::CTX: Send + Sync,
1297    {
1298        debug!(target: LOG_TARGET, "--> upstream request filter");
1299        defer!(debug!(target: LOG_TARGET, "<-- upstream request filter"););
1300        set_append_proxy_headers(session, ctx, upstream_response);
1301        Ok(())
1302    }
1303    /// Filters request body chunks before sending upstream.
1304    /// Tracks payload size and enforces size limits.
1305    async fn request_body_filter(
1306        &self,
1307        _session: &mut Session,
1308        body: &mut Option<Bytes>,
1309        _end_of_stream: bool,
1310        ctx: &mut Self::CTX,
1311    ) -> pingora::Result<()>
1312    where
1313        Self::CTX: Send + Sync,
1314    {
1315        debug!(target: LOG_TARGET, "--> request body filter");
1316        defer!(debug!(target: LOG_TARGET, "<-- request body filter"););
1317        if let Some(buf) = body {
1318            ctx.state.payload_size += buf.len();
1319            if let Some(location) = &ctx.upstream.location_instance {
1320                let size = location.client_body_size_limit();
1321                if size > 0 && ctx.state.payload_size > size {
1322                    return Err(new_internal_error(
1323                        413,
1324                        format!("Request Entity Too Large, max:{size}"),
1325                    ));
1326                }
1327            }
1328        }
1329        Ok(())
1330    }
1331    /// Generates cache keys for request caching.
1332    /// Combines:
1333    /// - Cache namespace
1334    /// - Request method
1335    /// - URL path and query
1336    /// - Optional custom prefix
1337    fn cache_key_callback(
1338        &self,
1339        session: &Session,
1340        ctx: &mut Self::CTX,
1341    ) -> pingora::Result<CacheKey> {
1342        debug!(target: LOG_TARGET, "--> cache key callback");
1343        defer!(debug!(target: LOG_TARGET, "<-- cache key callback"););
1344        let key = get_cache_key(
1345            ctx,
1346            session.req_header().method.as_ref(),
1347            &session.req_header().uri,
1348        );
1349        debug!(
1350            target: LOG_TARGET,
1351            namespace = key.namespace_str(),
1352            primary = key.primary_key_str(),
1353            user_tag = key.user_tag(),
1354            "cache key callback"
1355        );
1356        Ok(key)
1357    }
1358
1359    /// Determines if and how responses should be cached.
1360    /// Checks:
1361    /// - Cache-Control headers
1362    /// - TTL settings
1363    /// - Cache privacy settings
1364    /// - Custom cache control directives
1365    fn response_cache_filter(
1366        &self,
1367        _session: &Session,
1368        resp: &ResponseHeader,
1369        ctx: &mut Self::CTX,
1370    ) -> pingora::Result<RespCacheable> {
1371        debug!(target: LOG_TARGET, "--> response cache filter");
1372        defer!(debug!(target: LOG_TARGET, "<-- response cache filter"););
1373
1374        let (check_cache_control, max_ttl) = ctx.cache.as_ref().map_or(
1375            (false, None), // ctx.cache is None
1376            |c| (c.check_cache_control, c.max_ttl),
1377        );
1378
1379        let mut cc = CacheControl::from_resp_headers(resp);
1380
1381        if let Some(c) = &mut cc {
1382            // delegate all complex validation and modification logic to the helper function
1383            if let Err(reason) = self.process_cache_control(c, max_ttl) {
1384                return Ok(RespCacheable::Uncacheable(reason));
1385            }
1386        } else if check_cache_control {
1387            // if Cache-Control header is required but it doesn't exist or parsing fails
1388            return Ok(RespCacheable::Uncacheable(
1389                NoCacheReason::OriginNotCache,
1390            ));
1391        }
1392
1393        Ok(resp_cacheable(
1394            cc.as_ref(),
1395            resp.clone(),
1396            false,
1397            &META_DEFAULTS,
1398        ))
1399    }
1400
1401    async fn response_filter(
1402        &self,
1403        session: &mut Session,
1404        upstream_response: &mut ResponseHeader,
1405        ctx: &mut Self::CTX,
1406    ) -> pingora::Result<()>
1407    where
1408        Self::CTX: Send + Sync,
1409    {
1410        debug!(target: LOG_TARGET, "--> response filter");
1411        defer!(debug!(target: LOG_TARGET, "<-- response filter"););
1412        if session.cache.enabled() {
1413            self.handle_cache_headers(session, upstream_response, ctx);
1414        }
1415
1416        // call response plugin
1417        self.handle_response_plugin(session, ctx, upstream_response)
1418            .await?;
1419
1420        // add server-timing response header
1421        if self.enable_server_timing {
1422            let _ = upstream_response
1423                .insert_header("server-timing", ctx.generate_server_timing());
1424        }
1425        Ok(())
1426    }
1427
1428    async fn upstream_response_filter(
1429        &self,
1430        session: &mut Session,
1431        upstream_response: &mut ResponseHeader,
1432        ctx: &mut Self::CTX,
1433    ) -> pingora::Result<()> {
1434        debug!(target: LOG_TARGET, "--> upstream response filter");
1435        defer!(debug!(target: LOG_TARGET, "<-- upstream response filter"););
1436        self.handle_upstream_response_plugin(session, ctx, upstream_response)?;
1437        #[cfg(feature = "tracing")]
1438        inject_telemetry_headers(ctx, upstream_response);
1439        ctx.upstream.status = Some(upstream_response.status);
1440
1441        if ctx.state.status.is_none() {
1442            ctx.state.status = Some(upstream_response.status);
1443            // start to get upstream response data
1444            ctx.timing.upstream_response =
1445                Some(get_start_time(&ctx.timing.created_at));
1446        }
1447
1448        if let Some(id) = &ctx.state.request_id {
1449            let _ = upstream_response
1450                .insert_header(&HTTP_HEADER_NAME_X_REQUEST_ID, id);
1451        }
1452
1453        ctx.timing.upstream_processing = get_latency(
1454            &ctx.timing.created_at,
1455            &ctx.timing.upstream_processing,
1456        );
1457
1458        if let Some(upstream_instance) = &ctx.upstream.upstream_instance {
1459            upstream_instance
1460                .on_response(&ctx.upstream.address, upstream_response.status);
1461        }
1462
1463        Ok(())
1464    }
1465
1466    /// Filters upstream response body chunks.
1467    /// Records timing metrics and finalizes spans.
1468    fn upstream_response_body_filter(
1469        &self,
1470        session: &mut Session,
1471        body: &mut Option<bytes::Bytes>,
1472        end_of_stream: bool,
1473        ctx: &mut Self::CTX,
1474    ) -> pingora::Result<Option<std::time::Duration>> {
1475        debug!(target: LOG_TARGET, "--> upstream response body filter");
1476        defer!(debug!(target: LOG_TARGET, "<-- upstream response body filter"););
1477
1478        self.handle_upstream_response_body_plugin(
1479            session,
1480            ctx,
1481            body,
1482            end_of_stream,
1483        )?;
1484
1485        if end_of_stream {
1486            ctx.timing.upstream_response = get_latency(
1487                &ctx.timing.created_at,
1488                &ctx.timing.upstream_response,
1489            );
1490
1491            #[cfg(feature = "tracing")]
1492            set_otel_upstream_attrs(ctx);
1493            // self.finalize_upstream_session(ctx);
1494        }
1495        Ok(None)
1496    }
1497
1498    /// Final filter for response body before sending to client.
1499    /// Handles response body modifications and compression.
1500    fn response_body_filter(
1501        &self,
1502        session: &mut Session,
1503        body: &mut Option<Bytes>,
1504        end_of_stream: bool,
1505        ctx: &mut Self::CTX,
1506    ) -> pingora::Result<Option<std::time::Duration>>
1507    where
1508        Self::CTX: Send + Sync,
1509    {
1510        debug!(target: LOG_TARGET, "--> response body filter");
1511        defer!(debug!(target: LOG_TARGET, "<-- response body filter"););
1512        self.handle_response_body_plugin(session, ctx, body, end_of_stream)?;
1513        Ok(None)
1514    }
1515
1516    /// Handles proxy failures and generates appropriate error responses.
1517    /// Error handling for:
1518    /// - Upstream connection failures (502, 504)
1519    /// - Client timeouts (408)
1520    /// - Client disconnections (499)
1521    /// Generates error pages using configured template
1522    async fn fail_to_proxy(
1523        &self,
1524        session: &mut Session,
1525        e: &pingora::Error,
1526        ctx: &mut Self::CTX,
1527    ) -> FailToProxy
1528    where
1529        Self::CTX: Send + Sync,
1530    {
1531        debug!(target: LOG_TARGET, "--> fail to proxy");
1532        defer!(debug!(target: LOG_TARGET, "<-- fail to proxy"););
1533        let server_session = session.as_mut();
1534
1535        let code = match e.etype() {
1536            pingora::HTTPStatus(code) => *code,
1537            // spellchecker:off
1538            _ => match e.esource() {
1539                pingora::ErrorSource::Upstream => 502,
1540                pingora::ErrorSource::Downstream => match e.etype() {
1541                    pingora::ErrorType::ConnectTimedout => 408,
1542                    // client close the connection
1543                    pingora::ErrorType::ConnectionClosed => 499,
1544                    _ => 500,
1545                },
1546                pingora::ErrorSource::Internal
1547                | pingora::ErrorSource::Unset => 500,
1548            },
1549            // spellchecker:on
1550        };
1551        let mut resp = match code {
1552            502 => error_resp::HTTP_502_RESPONSE.clone(),
1553            400 => error_resp::HTTP_400_RESPONSE.clone(),
1554            500 => HTTP_500_RESPONSE.clone(),
1555            _ => error_resp::gen_error_response(code),
1556        };
1557
1558        let error_type = e.etype().as_str();
1559        let content = self
1560            .error_template
1561            .replace("{{version}}", pingap_util::get_pkg_version())
1562            .replace("{{content}}", &e.to_string())
1563            .replace("{{error_type}}", error_type);
1564        let buf = Bytes::from(content);
1565        ctx.state.status = Some(
1566            StatusCode::from_u16(code)
1567                .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
1568        );
1569        let content_type = if buf.starts_with(b"{") {
1570            "application/json; charset=utf-8"
1571        } else {
1572            "text/html; charset=utf-8"
1573        };
1574        let _ = resp.insert_header(http::header::CONTENT_TYPE, content_type);
1575        let _ = resp.insert_header("X-Pingap-EType", error_type);
1576        let _ = resp
1577            .insert_header(http::header::CONTENT_LENGTH, buf.len().to_string());
1578
1579        let user_agent = server_session
1580            .get_header(http::header::USER_AGENT)
1581            .map(|v| v.clone().to_str().unwrap_or_default().to_string());
1582
1583        error!(
1584            target: LOG_TARGET,
1585            error = %e,
1586            remote_addr = ctx.conn.remote_addr,
1587            client_ip = ctx.conn.client_ip,
1588            user_agent,
1589            error_type,
1590            path = server_session.req_header().uri.path(),
1591            "fail to proxy"
1592        );
1593
1594        // TODO: we shouldn't be closing downstream connections on internally generated errors
1595        // and possibly other upstream connect() errors (connection refused, timeout, etc)
1596        //
1597        // This change is only here because we DO NOT re-use downstream connections
1598        // today on these errors and we should signal to the client that pingora is dropping it
1599        // rather than a misleading the client with 'keep-alive'
1600        server_session.set_keepalive(None);
1601
1602        server_session
1603            .write_response_header(Box::new(resp))
1604            .await
1605            .unwrap_or_else(|e| {
1606                error!(
1607                    target: LOG_TARGET,
1608                    error = %e,
1609                    "send error response to downstream fail"
1610                );
1611            });
1612
1613        let _ = server_session.write_response_body(buf, true).await;
1614        FailToProxy {
1615            error_code: code,
1616            can_reuse_downstream: false,
1617        }
1618    }
1619    /// Performs request logging and cleanup after request completion.
1620    /// Handles:
1621    /// - Request counting cleanup
1622    /// - Compression statistics
1623    /// - Prometheus metrics
1624    /// - OpenTelemetry span completion
1625    /// - Access logging
1626    async fn logging(
1627        &self,
1628        session: &mut Session,
1629        _e: Option<&pingora::Error>,
1630        ctx: &mut Self::CTX,
1631    ) where
1632        Self::CTX: Send + Sync,
1633    {
1634        debug!(target: LOG_TARGET, "--> logging");
1635        defer!(debug!(target: LOG_TARGET, "<-- logging"););
1636        end_request();
1637        self.processing.fetch_sub(1, Ordering::Relaxed);
1638        if let Some(location) = &ctx.upstream.location_instance {
1639            location.on_response();
1640        }
1641        // get from cache does not connect to upstream
1642        if let Some(upstream_instance) = &ctx.upstream.upstream_instance {
1643            ctx.upstream.processing_count = Some(upstream_instance.completed());
1644        }
1645        if ctx.state.status.is_none()
1646            && let Some(header) = session.response_written()
1647        {
1648            ctx.state.status = Some(header.status);
1649        }
1650        #[cfg(feature = "tracing")]
1651        // enable open telemetry and proxy upstream fail
1652        if let Some(features) = ctx.features.as_mut()
1653            && let Some(ref mut span) = features.upstream_span.as_mut()
1654        {
1655            span.end();
1656        }
1657
1658        if let Some(c) =
1659            session.downstream_modules_ctx.get::<ResponseCompression>()
1660            && c.is_enabled()
1661            && let Some((algorithm, in_bytes, out_bytes, took)) = c.get_info()
1662        {
1663            let features = ctx.features.get_or_insert_default();
1664            features.compression_stat = Some(CompressionStat {
1665                algorithm: algorithm.to_string(),
1666                in_bytes,
1667                out_bytes,
1668                duration: took,
1669            });
1670        }
1671        #[cfg(feature = "tracing")]
1672        if let Some(prom) = &self.prometheus {
1673            // because prom.before will not be called if location is empty
1674            if !ctx.upstream.location.is_empty() {
1675                prom.after(session, ctx);
1676            }
1677        }
1678
1679        #[cfg(feature = "tracing")]
1680        set_otel_request_attrs(session, ctx);
1681
1682        if let Some(p) = &self.log_parser {
1683            let buf = p.format(session, ctx);
1684            if let Some(logger) = &self.access_logger {
1685                let _ = logger.try_send(buf);
1686            } else {
1687                let msg = buf.as_bstr();
1688                info!(target: LOG_TARGET, "{msg}");
1689            }
1690        }
1691    }
1692}
1693
1694#[cfg(test)]
1695mod tests {
1696    use super::*;
1697    use crate::server_conf::parse_from_conf;
1698    use ahash::AHashMap;
1699    use pingap_certificate::{DynamicCertificates, TlsCertificate};
1700    use pingap_config::{PingapConfig, new_file_config_manager};
1701    use pingap_core::{CacheInfo, Ctx, UpstreamInfo};
1702    use pingap_location::LocationStats;
1703    use pingora::http::ResponseHeader;
1704    use pingora::protocols::tls::SslDigest;
1705    use pingora::protocols::tls::SslDigestExtension;
1706    use pingora::protocols::{Digest, TimingDigest};
1707    use pingora::proxy::{ProxyHttp, Session};
1708    use pingora::server::configuration;
1709    use pingora::services::Service;
1710    use pretty_assertions::assert_eq;
1711    use std::collections::HashMap;
1712    use std::sync::Arc;
1713    use std::time::{Duration, SystemTime};
1714    use tokio_test::io::Builder;
1715
1716    #[test]
1717    fn test_get_digest_detail() {
1718        let digest = Digest {
1719            timing_digest: vec![Some(TimingDigest {
1720                established_ts: SystemTime::UNIX_EPOCH
1721                    .checked_add(Duration::from_secs(10))
1722                    .unwrap(),
1723            })],
1724            ssl_digest: Some(Arc::new(SslDigest {
1725                cipher: "123".into(),
1726                version: "1.3".into(),
1727                organization: None,
1728                serial_number: None,
1729                cert_digest: vec![],
1730                extension: SslDigestExtension::default(),
1731            })),
1732            ..Default::default()
1733        };
1734        let result = get_digest_detail(&digest);
1735        assert_eq!(10000, result.tcp_established);
1736        assert_eq!("1.3", result.tls_version.unwrap_or_default());
1737    }
1738
1739    /// Creates a new test server instance with default configuration
1740    fn new_server() -> Server {
1741        let toml_data = r###"
1742[upstreams.charts]
1743# upstream address list
1744addrs = ["127.0.0.1:5000"]
1745
1746
1747[upstreams.diving]
1748addrs = ["127.0.0.1:5001"]
1749
1750
1751[locations.lo]
1752# upstream of location (default none)
1753upstream = "charts"
1754
1755# location match path (default none)
1756path = "/"
1757
1758# location match host, multiple domain names are separated by commas (default none)
1759host = ""
1760
1761# set headers to request (default none)
1762includes = ["proxySetHeader"]
1763
1764# add headers to request (default none)
1765proxy_add_headers = ["name:value"]
1766
1767
1768# the weigh of location (default none)
1769weight = 1024
1770
1771
1772# plugin list for location
1773plugins = ["pingap:requestId", "stats"]
1774
1775[servers.test]
1776# server linsten address, multiple addresses are separated by commas (default none)
1777addr = "0.0.0.0:6188"
1778
1779# access log format (default none)
1780access_log = "tiny"
1781
1782# the locations for server
1783locations = ["lo"]
1784
1785# the threads count for server (default 1)
1786threads = 1
1787
1788[plugins.stats]
1789value = "/stats"
1790category = "stats"
1791
1792[storages.authToken]
1793category = "secret"
1794secret = "123123"
1795value = "PLpKJqvfkjTcYTDpauJf+2JnEayP+bm+0Oe60Jk="
1796
1797[storages.proxySetHeader]
1798category = "config"
1799value = 'proxy_set_headers = ["name:value"]'
1800        "###;
1801        let pingap_conf = PingapConfig::new(toml_data.as_ref(), false).unwrap();
1802
1803        let location = Arc::new(
1804            Location::new("lo", pingap_conf.locations.get("lo").unwrap())
1805                .unwrap(),
1806        );
1807        let upstream = Arc::new(
1808            Upstream::new(
1809                "charts",
1810                pingap_conf.upstreams.get("charts").unwrap(),
1811                None,
1812            )
1813            .unwrap(),
1814        );
1815
1816        struct TmpPluginLoader {}
1817        impl PluginProvider for TmpPluginLoader {
1818            fn get(&self, _name: &str) -> Option<Arc<dyn Plugin>> {
1819                None
1820            }
1821        }
1822        struct TmpLocationLoader {
1823            location: Arc<Location>,
1824        }
1825        impl LocationProvider for TmpLocationLoader {
1826            fn get(&self, _name: &str) -> Option<Arc<Location>> {
1827                Some(self.location.clone())
1828            }
1829            fn stats(&self) -> HashMap<String, LocationStats> {
1830                HashMap::new()
1831            }
1832        }
1833        struct TmpUpstreamLoader {
1834            upstream: Arc<Upstream>,
1835        }
1836        impl UpstreamProvider for TmpUpstreamLoader {
1837            fn get(&self, _name: &str) -> Option<Arc<Upstream>> {
1838                Some(self.upstream.clone())
1839            }
1840            fn list(&self) -> Vec<(String, Arc<Upstream>)> {
1841                vec![("charts".to_string(), self.upstream.clone())]
1842            }
1843        }
1844        struct TmpServerLocationsLoader {
1845            server_locations: Arc<Vec<String>>,
1846        }
1847        impl ServerLocationsProvider for TmpServerLocationsLoader {
1848            fn get(&self, _name: &str) -> Option<Arc<Vec<String>>> {
1849                Some(self.server_locations.clone())
1850            }
1851        }
1852
1853        struct TmpCertificateLoader {}
1854        impl CertificateProvider for TmpCertificateLoader {
1855            fn get(&self, _sni: &str) -> Option<Arc<TlsCertificate>> {
1856                None
1857            }
1858            fn list(&self) -> Arc<DynamicCertificates> {
1859                Arc::new(AHashMap::new())
1860            }
1861            fn store(&self, _data: DynamicCertificates) {}
1862        }
1863
1864        let confs = parse_from_conf(pingap_conf);
1865        let file = tempfile::NamedTempFile::with_suffix(".toml").unwrap();
1866
1867        Server::new(
1868            &confs[0],
1869            AppContext {
1870                logger: None,
1871                config_manager: Arc::new(
1872                    new_file_config_manager(&file.path().to_string_lossy())
1873                        .unwrap(),
1874                ),
1875                server_locations_provider: Arc::new(TmpServerLocationsLoader {
1876                    server_locations: Arc::new(vec!["lo".to_string()]),
1877                }),
1878                location_provider: Arc::new(TmpLocationLoader { location }),
1879                upstream_provider: Arc::new(TmpUpstreamLoader { upstream }),
1880                plugin_provider: Arc::new(TmpPluginLoader {}),
1881                certificate_provider: Arc::new(TmpCertificateLoader {}),
1882            },
1883        )
1884        .unwrap()
1885    }
1886
1887    #[test]
1888    fn test_new_server() {
1889        let server = new_server();
1890        let services = server
1891            .run(Arc::new(configuration::ServerConf::default()))
1892            .unwrap();
1893
1894        assert_eq!("Pingora HTTP Proxy Service", services.lb.name());
1895    }
1896
1897    #[tokio::test]
1898    async fn test_early_request_filter() {
1899        let server = new_server();
1900
1901        let headers = [""].join("\r\n");
1902        let input_header =
1903            format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
1904        let mock_io = Builder::new().read(input_header.as_bytes()).build();
1905        let mut session = Session::new_h1(Box::new(mock_io));
1906        session.read_request().await.unwrap();
1907
1908        let mut ctx = Ctx::default();
1909        server
1910            .early_request_filter(&mut session, &mut ctx)
1911            .await
1912            .unwrap();
1913        assert_eq!("lo", ctx.upstream.location.as_ref());
1914    }
1915
1916    #[tokio::test]
1917    async fn test_request_filter() {
1918        let server = new_server();
1919
1920        let headers = [""].join("\r\n");
1921        let input_header =
1922            format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
1923        let mock_io = Builder::new().read(input_header.as_bytes()).build();
1924        let mut session = Session::new_h1(Box::new(mock_io));
1925        session.read_request().await.unwrap();
1926
1927        let location = server.location_provider.get("lo").unwrap();
1928        let mut ctx = Ctx {
1929            upstream: UpstreamInfo {
1930                location: "lo".to_string().into(),
1931                location_instance: Some(location.clone()),
1932                ..Default::default()
1933            },
1934            ..Default::default()
1935        };
1936        let done = server.request_filter(&mut session, &mut ctx).await.unwrap();
1937        assert_eq!(false, done);
1938    }
1939
1940    #[tokio::test]
1941    async fn test_cache_key_callback() {
1942        let server = new_server();
1943
1944        let headers = [""].join("\r\n");
1945        let input_header =
1946            format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
1947        let mock_io = Builder::new().read(input_header.as_bytes()).build();
1948        let mut session = Session::new_h1(Box::new(mock_io));
1949        session.read_request().await.unwrap();
1950
1951        let key = server
1952            .cache_key_callback(
1953                &session,
1954                &mut Ctx {
1955                    cache: Some(CacheInfo {
1956                        namespace: Some("pingap".to_string()),
1957                        keys: Some(vec!["ss".to_string()]),
1958                        ..Default::default()
1959                    }),
1960                    ..Default::default()
1961                },
1962            )
1963            .unwrap();
1964        assert_eq!(
1965            key.primary_key_str(),
1966            Some("ss:GET:/vicanso/pingap?size=1")
1967        );
1968        assert_eq!(key.namespace_str(), Some("pingap"));
1969        assert_eq!(
1970            r#"CacheKey { namespace: [112, 105, 110, 103, 97, 112], primary: [115, 115, 58, 71, 69, 84, 58, 47, 118, 105, 99, 97, 110, 115, 111, 47, 112, 105, 110, 103, 97, 112, 63, 115, 105, 122, 101, 61, 49], primary_bin_override: None, variance: None, user_tag: "", extensions: {} }"#,
1971            format!("{key:?}")
1972        );
1973    }
1974
1975    #[tokio::test]
1976    async fn test_response_cache_filter() {
1977        let server = new_server();
1978
1979        let headers = [""].join("\r\n");
1980        let input_header =
1981            format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
1982        let mock_io = Builder::new().read(input_header.as_bytes()).build();
1983        let mut session = Session::new_h1(Box::new(mock_io));
1984        session.read_request().await.unwrap();
1985
1986        let mut upstream_response =
1987            ResponseHeader::build_no_case(200, None).unwrap();
1988        upstream_response
1989            .append_header("Content-Type", "application/json")
1990            .unwrap();
1991        let result = server
1992            .response_cache_filter(
1993                &session,
1994                &upstream_response,
1995                &mut Ctx {
1996                    cache: Some(CacheInfo {
1997                        keys: Some(vec!["ss".to_string()]),
1998                        ..Default::default()
1999                    }),
2000                    ..Default::default()
2001                },
2002            )
2003            .unwrap();
2004        assert_eq!(true, result.is_cacheable());
2005
2006        let mut upstream_response =
2007            ResponseHeader::build_no_case(200, None).unwrap();
2008        upstream_response
2009            .append_header("Cache-Control", "no-cache")
2010            .unwrap();
2011        let result = server
2012            .response_cache_filter(
2013                &session,
2014                &upstream_response,
2015                &mut Ctx {
2016                    cache: Some(CacheInfo {
2017                        keys: Some(vec!["ss".to_string()]),
2018                        ..Default::default()
2019                    }),
2020                    ..Default::default()
2021                },
2022            )
2023            .unwrap();
2024        assert_eq!(false, result.is_cacheable());
2025
2026        let mut upstream_response =
2027            ResponseHeader::build_no_case(200, None).unwrap();
2028        upstream_response
2029            .append_header("Cache-Control", "no-store")
2030            .unwrap();
2031        let result = server
2032            .response_cache_filter(
2033                &session,
2034                &upstream_response,
2035                &mut Ctx {
2036                    cache: Some(CacheInfo {
2037                        keys: Some(vec!["ss".to_string()]),
2038                        ..Default::default()
2039                    }),
2040                    ..Default::default()
2041                },
2042            )
2043            .unwrap();
2044        assert_eq!(false, result.is_cacheable());
2045
2046        let mut upstream_response =
2047            ResponseHeader::build_no_case(200, None).unwrap();
2048        upstream_response
2049            .append_header("Cache-Control", "private, max-age=100")
2050            .unwrap();
2051        let result = server
2052            .response_cache_filter(
2053                &session,
2054                &upstream_response,
2055                &mut Ctx {
2056                    cache: Some(CacheInfo {
2057                        keys: Some(vec!["ss".to_string()]),
2058                        ..Default::default()
2059                    }),
2060                    ..Default::default()
2061                },
2062            )
2063            .unwrap();
2064        assert_eq!(false, result.is_cacheable());
2065    }
2066
2067    fn create_session(path: &str) -> Session {
2068        let headers = ["Host: example.com"].join("\r\n");
2069        let input_header =
2070            format!("GET {} HTTP/1.1\r\n{headers}\r\n\r\n", path);
2071        let mock_io = Builder::new().read(input_header.as_bytes()).build();
2072        Session::new_h1(Box::new(mock_io))
2073    }
2074
2075    #[tokio::test]
2076    async fn test_handle_acme_challenge_not_enabled() {
2077        let server = new_server();
2078
2079        let mut session = create_session("/");
2080        session.read_request().await.unwrap();
2081
2082        let result = server
2083            .handle_acme_challenge(&mut session, &mut Ctx::default())
2084            .await;
2085        assert!(
2086            result.is_none(),
2087            "When ACME not enabled, should return None"
2088        );
2089    }
2090
2091    #[tokio::test]
2092    async fn test_handle_acme_challenge_non_challenge_returns_none() {
2093        let mut server = new_server();
2094        server.enable_lets_encrypt();
2095
2096        let test_paths = ["/", "/api", "/test.html", "/normal/path"];
2097
2098        for path in test_paths {
2099            let mut session = create_session(path);
2100            session.read_request().await.unwrap();
2101
2102            let result = server
2103                .handle_acme_challenge(&mut session, &mut Ctx::default())
2104                .await;
2105            assert!(
2106                result.is_none(),
2107                "Path '{}' should return None to continue processing (bug returns Some(Ok(false)))",
2108                path
2109            );
2110        }
2111    }
2112}