Skip to main content

pingap_proxy/
server.rs

1// Copyright 2024-2025 Tree xie.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(feature = "tracing")]
16use super::tracing::{
17    initialize_telemetry, inject_telemetry_headers, set_otel_request_attrs,
18    set_otel_upstream_attrs, update_otel_cache_attrs,
19};
20use super::{LOG_TARGET, ServerConf, set_append_proxy_headers};
21use crate::ServerLocationsProvider;
22use async_trait::async_trait;
23use bstr::ByteSlice;
24use bytes::Bytes;
25use bytes::BytesMut;
26use http::StatusCode;
27use pingap_acme::handle_lets_encrypt;
28use pingap_certificate::CertificateProvider;
29use pingap_certificate::{GlobalCertificate, TlsSettingParams};
30use pingap_config::ConfigManager;
31use pingap_core::BackgroundTask;
32#[cfg(feature = "tracing")]
33use pingap_core::HttpResponse;
34use pingap_core::LocationInstance;
35use pingap_core::PluginProvider;
36use pingap_core::{
37    CompressionStat, Ctx, PluginStep, RequestPluginResult,
38    ResponseBodyPluginResult, ResponsePluginResult, get_cache_key,
39};
40use pingap_core::{HTTP_HEADER_NAME_X_REQUEST_ID, get_digest_detail};
41use pingap_core::{Plugin, new_internal_error};
42use pingap_location::{Location, LocationProvider};
43use pingap_logger::{Parser, parse_access_log_directive};
44#[cfg(feature = "tracing")]
45use pingap_otel::{KeyValue, trace::Span};
46#[cfg(feature = "tracing")]
47use pingap_performance::{
48    Prometheus, new_prometheus, new_prometheus_push_service,
49};
50use pingap_performance::{accept_request, end_request};
51use pingap_upstream::{Upstream, UpstreamProvider};
52use pingora::apps::HttpServerOptions;
53use pingora::cache::cache_control::DirectiveValue;
54use pingora::cache::cache_control::{CacheControl, InterpretCacheControl};
55use pingora::cache::filters::resp_cacheable;
56use pingora::cache::key::CacheHashKey;
57use pingora::cache::{
58    CacheKey, CacheMetaDefaults, NoCacheReason, RespCacheable,
59};
60use pingora::http::{RequestHeader, ResponseHeader};
61use pingora::listeners::TcpSocketOptions;
62use pingora::modules::http::HttpModules;
63use pingora::modules::http::compression::{
64    ResponseCompression, ResponseCompressionBuilder,
65};
66use pingora::modules::http::grpc_web::{GrpcWeb, GrpcWebBridge};
67use pingora::protocols::Digest;
68use pingora::protocols::http::error_resp;
69use pingora::proxy::{FailToProxy, HttpProxy, http_proxy_service};
70use pingora::proxy::{ProxyHttp, Session};
71use pingora::server::configuration;
72use pingora::services::listening::Service;
73use pingora::upstreams::peer::{HttpPeer, Peer};
74use scopeguard::defer;
75use snafu::Snafu;
76use std::sync::Arc;
77use std::sync::LazyLock;
78use std::sync::atomic::{AtomicI32, AtomicU64, Ordering};
79use std::time::{Duration, Instant};
80use tokio::sync::mpsc::Sender;
81use tracing::{debug, error, info};
82
83#[derive(Debug, Snafu)]
84pub enum Error {
85    #[snafu(display("Common error, category: {category}, {message}"))]
86    Common { category: String, message: String },
87}
88type Result<T, E = Error> = std::result::Result<T, E>;
89
90#[inline]
91pub fn get_start_time(started_at: &Instant) -> i32 {
92    // the offset of start time
93    let value = started_at.elapsed().as_millis() as i32;
94    if value == 0 {
95        return -1;
96    }
97    -value
98}
99
100#[inline]
101pub fn get_latency(started_at: &Instant, value: &Option<i32>) -> Option<i32> {
102    let Some(value) = value else {
103        return None;
104    };
105    if *value >= 0 {
106        return None;
107    }
108    let latency = started_at.elapsed().as_millis() as i32 + *value;
109
110    Some(latency)
111}
112
113/// Core HTTP proxy server implementation that handles request processing, caching, and monitoring.
114/// Manages server configuration, connection lifecycle, and integration with various modules.
115pub struct Server {
116    /// Server name identifier used for logging and metrics
117    name: String,
118
119    /// Whether this instance serves admin endpoints and functionality
120    admin: bool,
121
122    /// Comma-separated list of listening addresses (e.g. "127.0.0.1:8080,127.0.0.1:8081")
123    addr: String,
124
125    /// Counter tracking total number of accepted connections since server start
126    accepted: AtomicU64,
127
128    /// Counter tracking number of currently active request processing operations
129    processing: AtomicI32,
130
131    /// Optional parser for customizing access log format and output
132    log_parser: Option<Parser>,
133
134    /// HTML/JSON template used for rendering error responses
135    error_template: String,
136
137    /// Number of worker threads for request processing. None uses default.
138    threads: Option<usize>,
139
140    /// OpenSSL cipher list string for TLS connections
141    tls_cipher_list: Option<String>,
142
143    /// TLS 1.3 cipher suites configuration
144    tls_ciphersuites: Option<String>,
145
146    /// Minimum TLS protocol version to accept (e.g. "TLSv1.2")
147    tls_min_version: Option<String>,
148
149    /// Maximum TLS protocol version to accept
150    tls_max_version: Option<String>,
151
152    /// Whether HTTP/2 protocol is enabled
153    enabled_h2: bool,
154
155    /// Whether Let's Encrypt certificate automation is enabled
156    lets_encrypt_enabled: bool,
157
158    /// Whether to use global certificate store for TLS
159    global_certificates: bool,
160
161    /// TCP socket configuration options (keepalive, TCP fastopen etc)
162    tcp_socket_options: Option<TcpSocketOptions>,
163
164    /// Prometheus metrics registry when metrics collection is enabled
165    #[cfg(feature = "tracing")]
166    prometheus: Option<Arc<Prometheus>>,
167
168    /// Whether to push metrics to remote Prometheus pushgateway
169    prometheus_push_mode: bool,
170
171    /// Prometheus metrics endpoint path or push gateway URL
172    #[cfg(feature = "tracing")]
173    prometheus_metrics: String,
174
175    /// Whether OpenTelemetry tracing is enabled
176    #[cfg(feature = "tracing")]
177    enabled_otel: bool,
178
179    /// List of enabled modules (e.g. "grpc-web")
180    modules: Option<Vec<String>>,
181
182    /// Whether to enable server-timing header
183    enable_server_timing: bool,
184
185    // downstream read timeout
186    downstream_read_timeout: Option<Duration>,
187    // downstream write timeout
188    downstream_write_timeout: Option<Duration>,
189
190    // server locations
191    server_locations_provider: Arc<dyn ServerLocationsProvider>,
192    // plugin loader
193    plugin_provider: Arc<dyn PluginProvider>,
194
195    // locations
196    location_provider: Arc<dyn LocationProvider>,
197
198    // upstreams
199    upstream_provider: Arc<dyn UpstreamProvider>,
200
201    // certificates
202    certificate_provider: Arc<dyn CertificateProvider>,
203
204    // config manager
205    config_manager: Arc<ConfigManager>,
206
207    // logger
208    access_logger: Option<Sender<BytesMut>>,
209}
210
211pub struct ServerServices {
212    pub lb: Service<HttpProxy<Server>>,
213}
214
215const META_DEFAULTS: CacheMetaDefaults =
216    CacheMetaDefaults::new(|_| Some(Duration::from_secs(1)), 1, 1);
217
218static HTTP_500_RESPONSE: LazyLock<ResponseHeader> =
219    LazyLock::new(|| error_resp::gen_error_response(500));
220
221#[derive(Clone)]
222pub struct AppContext {
223    pub logger: Option<Sender<BytesMut>>,
224    pub config_manager: Arc<ConfigManager>,
225    pub server_locations_provider: Arc<dyn ServerLocationsProvider>,
226    pub location_provider: Arc<dyn LocationProvider>,
227    pub upstream_provider: Arc<dyn UpstreamProvider>,
228    pub plugin_provider: Arc<dyn PluginProvider>,
229    pub certificate_provider: Arc<dyn CertificateProvider>,
230}
231
232impl Server {
233    /// Creates a new HTTP proxy server instance with the given configuration.
234    /// Initializes all server components including:
235    /// - TCP socket options
236    /// - TLS settings
237    /// - Prometheus metrics (if enabled)
238    /// - Threading configuration
239    pub fn new(conf: &ServerConf, ctx: AppContext) -> Result<Self> {
240        debug!(target: LOG_TARGET, config = conf.to_string(), "new server");
241        let mut p = None;
242        let (access_log, _) =
243            parse_access_log_directive(conf.access_log.as_ref());
244        if let Some(access_log) = access_log {
245            p = Some(Parser::from(access_log.as_str()));
246        }
247        let tcp_socket_options = if conf.tcp_fastopen.is_some()
248            || conf.tcp_keepalive.is_some()
249            || conf.reuse_port.is_some()
250        {
251            let mut opts = TcpSocketOptions::default();
252            opts.tcp_fastopen = conf.tcp_fastopen;
253            opts.tcp_keepalive.clone_from(&conf.tcp_keepalive);
254            opts.so_reuseport = conf.reuse_port;
255            Some(opts)
256        } else {
257            None
258        };
259        let prometheus_metrics =
260            conf.prometheus_metrics.clone().unwrap_or_default();
261        #[cfg(feature = "tracing")]
262        let prometheus = if prometheus_metrics.is_empty() {
263            None
264        } else {
265            let p = new_prometheus(&conf.name).map_err(|e| Error::Common {
266                category: "prometheus".to_string(),
267                message: e.to_string(),
268            })?;
269            Some(Arc::new(p))
270        };
271        let s = Server {
272            name: conf.name.clone(),
273            admin: conf.admin,
274            accepted: AtomicU64::new(0),
275            processing: AtomicI32::new(0),
276            addr: conf.addr.clone(),
277            log_parser: p,
278            error_template: conf.error_template.clone(),
279            tls_cipher_list: conf.tls_cipher_list.clone(),
280            tls_ciphersuites: conf.tls_ciphersuites.clone(),
281            tls_min_version: conf.tls_min_version.clone(),
282            tls_max_version: conf.tls_max_version.clone(),
283            threads: conf.threads,
284            lets_encrypt_enabled: false,
285            global_certificates: conf.global_certificates,
286            enabled_h2: conf.enabled_h2,
287            tcp_socket_options,
288            prometheus_push_mode: prometheus_metrics.contains("://"),
289            #[cfg(feature = "tracing")]
290            enabled_otel: conf.otlp_exporter.is_some(),
291            #[cfg(feature = "tracing")]
292            prometheus_metrics,
293            #[cfg(feature = "tracing")]
294            prometheus,
295            enable_server_timing: conf.enable_server_timing,
296            modules: conf.modules.clone(),
297            downstream_read_timeout: conf.downstream_read_timeout,
298            downstream_write_timeout: conf.downstream_write_timeout,
299            server_locations_provider: ctx.server_locations_provider,
300            location_provider: ctx.location_provider,
301            upstream_provider: ctx.upstream_provider,
302            plugin_provider: ctx.plugin_provider,
303            certificate_provider: ctx.certificate_provider,
304            access_logger: ctx.logger,
305            config_manager: ctx.config_manager,
306        };
307        Ok(s)
308    }
309    /// Enable lets encrypt proxy plugin for handling ACME challenges at
310    /// `/.well-known/acme-challenge` path
311    pub fn enable_lets_encrypt(&mut self) {
312        self.lets_encrypt_enabled = true;
313    }
314    /// Get the prometheus push service configuration if enabled.
315    /// Returns a tuple of (metrics endpoint, service future) if push mode is configured.
316    pub fn get_prometheus_push_service(
317        &self,
318    ) -> Option<Box<dyn BackgroundTask>> {
319        if !self.prometheus_push_mode {
320            return None;
321        }
322        cfg_if::cfg_if! {
323            if #[cfg(feature = "tracing")] {
324                let Some(prometheus) = &self.prometheus else {
325                    return None;
326                };
327                match new_prometheus_push_service(
328                    &self.name,
329                    &self.prometheus_metrics,
330                    prometheus.clone(),
331                ) {
332                    Ok(service) => Some(service),
333                    Err(e) => {
334                        error!(
335                            target: LOG_TARGET,
336                            error = %e,
337                            name = self.name,
338                            "new prometheus push service fail"
339                        );
340                        None
341                    },
342                }
343            } else {
344               None
345            }
346        }
347    }
348
349    /// Starts the server and sets up TCP/TLS listening endpoints.
350    /// - Configures listeners for each address
351    /// - Sets up TLS if enabled
352    /// - Initializes HTTP/2 support
353    /// - Configures thread pool
354    pub fn run(
355        self,
356        conf: Arc<configuration::ServerConf>,
357    ) -> Result<ServerServices> {
358        let addr = self.addr.clone();
359        let tcp_socket_options = self.tcp_socket_options.clone();
360
361        let name = self.name.clone();
362        let mut dynamic_cert = None;
363        // tls
364        if self.global_certificates {
365            dynamic_cert =
366                Some(GlobalCertificate::new(self.certificate_provider.clone()));
367        }
368
369        let is_tls = dynamic_cert.is_some();
370
371        let enabled_h2 = self.enabled_h2;
372        let threads = if let Some(threads) = self.threads {
373            // use cpus when set threads:0
374            let value = if threads == 0 {
375                num_cpus::get()
376            } else {
377                threads
378            };
379            Some(value)
380        } else {
381            None
382        };
383
384        info!(
385            target: LOG_TARGET,
386            name,
387            addr,
388            threads,
389            is_tls,
390            h2 = enabled_h2,
391            tcp_socket_options = format!("{:?}", tcp_socket_options),
392            "server is listening"
393        );
394        let cipher_list = self.tls_cipher_list.clone();
395        let cipher_suites = self.tls_ciphersuites.clone();
396        let tls_min_version = self.tls_min_version.clone();
397        let tls_max_version = self.tls_max_version.clone();
398        let mut lb = http_proxy_service(&conf, self);
399        // use h2c if not tls and enable http2
400        if !is_tls && enabled_h2 {
401            if let Some(http_logic) = lb.app_logic_mut() {
402                let mut http_server_options = HttpServerOptions::default();
403                http_server_options.h2c = true;
404                http_logic.server_options = Some(http_server_options);
405            }
406        }
407        lb.threads = threads;
408        // support listen multi address
409        for addr in addr.split(',') {
410            // tls
411            if let Some(dynamic_cert) = &dynamic_cert {
412                let tls_settings = dynamic_cert
413                    .new_tls_settings(&TlsSettingParams {
414                        server_name: name.clone(),
415                        enabled_h2,
416                        cipher_list: cipher_list.clone(),
417                        cipher_suites: cipher_suites.clone(),
418                        tls_min_version: tls_min_version.clone(),
419                        tls_max_version: tls_max_version.clone(),
420                    })
421                    .map_err(|e| Error::Common {
422                        category: "tls".to_string(),
423                        message: e.to_string(),
424                    })?;
425                lb.add_tls_with_settings(
426                    addr,
427                    tcp_socket_options.clone(),
428                    tls_settings,
429                );
430            } else if let Some(opt) = &tcp_socket_options {
431                lb.add_tcp_with_settings(addr, opt.clone());
432            } else {
433                lb.add_tcp(addr);
434            }
435        }
436        Ok(ServerServices { lb })
437    }
438    /// Handles requests to the admin interface.
439    /// Processes admin-specific plugins and returns response if handled.
440    async fn serve_admin(
441        &self,
442        session: &mut Session,
443        ctx: &mut Ctx,
444    ) -> pingora::Result<bool> {
445        if let Some(plugin) = self.plugin_provider.get("pingap:admin") {
446            let result = plugin
447                .handle_request(PluginStep::Request, session, ctx)
448                .await?;
449            if let RequestPluginResult::Respond(resp) = result {
450                ctx.state.status = Some(resp.status);
451                resp.send(session).await?;
452                return Ok(true);
453            }
454        }
455        Ok(false)
456    }
457    #[inline]
458    fn initialize_context(&self, session: &mut Session, ctx: &mut Ctx) {
459        session.set_read_timeout(self.downstream_read_timeout);
460        session.set_write_timeout(self.downstream_write_timeout);
461
462        if let Some(stream) = session.stream() {
463            ctx.conn.id = stream.id() as usize;
464        }
465        // get digest of timing and tls
466        if let Some(digest) = session.digest() {
467            let digest_detail = get_digest_detail(digest);
468            ctx.timing.connection_duration = digest_detail.connection_time;
469            ctx.conn.reused = digest_detail.connection_reused;
470
471            if !ctx.conn.reused
472                && digest_detail.tls_established
473                    >= digest_detail.tcp_established
474            {
475                let latency = digest_detail.tls_established
476                    - digest_detail.tcp_established;
477                ctx.timing.tls_handshake = Some(latency as i32);
478            }
479            ctx.conn.tls_cipher = digest_detail.tls_cipher;
480            ctx.conn.tls_version = digest_detail.tls_version;
481        };
482        accept_request();
483
484        ctx.state.processing_count =
485            self.processing.fetch_add(1, Ordering::Relaxed) + 1;
486        ctx.state.accepted_count =
487            self.accepted.fetch_add(1, Ordering::Relaxed) + 1;
488        if let Some((remote_addr, remote_port)) =
489            pingap_core::get_remote_addr(session)
490        {
491            ctx.conn.remote_addr = Some(remote_addr);
492            ctx.conn.remote_port = Some(remote_port);
493        }
494        if let Some(addr) =
495            session.server_addr().and_then(|addr| addr.as_inet())
496        {
497            ctx.conn.server_addr = Some(addr.ip().to_string());
498            ctx.conn.server_port = Some(addr.port());
499        }
500    }
501
502    #[inline]
503    async fn find_and_apply_location(
504        &self,
505        session: &mut Session,
506        ctx: &mut Ctx,
507    ) -> pingora::Result<()> {
508        let header = session.req_header();
509        let host = pingap_core::get_host(header).unwrap_or_default();
510        let path = header.uri.path();
511
512        // locations not found
513        let Some(locations) = self.server_locations_provider.get(&self.name)
514        else {
515            return Ok(());
516        };
517
518        // use find_map to optimize logic, performance and readability
519        let matched_info = locations.iter().find_map(|name| {
520            let location = self.location_provider.get(name)?;
521            let (matched, captures) = location.match_host_path(host, path);
522            if matched {
523                Some((location, captures))
524            } else {
525                None
526            }
527        });
528
529        let Some((location, captures)) = matched_info else {
530            return Ok(());
531        };
532
533        // only execute all subsequent operations after successtracingy matching location
534        ctx.upstream.location = location.name.clone();
535        ctx.upstream.location_instance = Some(location.clone());
536        ctx.upstream.max_retries = location.max_retries;
537        ctx.upstream.max_retry_window = location.max_retry_window;
538        if let Some(captures) = captures {
539            ctx.extend_variables(captures);
540        }
541
542        debug!(
543            target: LOG_TARGET,
544            "variables: {:?}",
545            ctx.features.as_ref().map(|item| &item.variables)
546        );
547
548        // set prometheus stats
549        #[cfg(feature = "tracing")]
550        if let Some(prom) = &self.prometheus {
551            prom.before(&ctx.upstream.location);
552        }
553
554        // validate content length
555        location
556            .validate_content_length(header)
557            .map_err(|e| new_internal_error(413, e))?;
558
559        // limit processing
560        let (accepted, processing) = location.on_request()?;
561        ctx.state.location_accepted_count = accepted;
562        ctx.state.location_processing_count = processing;
563
564        // initialize gRPC Web
565        if location.support_grpc_web() {
566            let grpc_web = session
567                .downstream_modules_ctx
568                .get_mut::<GrpcWebBridge>()
569                .ok_or_else(|| {
570                    new_internal_error(
571                        500,
572                        "grpc web bridge module should be added",
573                    )
574                })?;
575            grpc_web.init();
576        }
577
578        // initialize plugins and execute
579        ctx.plugins = self.get_context_plugins(location.clone(), session);
580        let _ = self
581            .handle_request_plugin(PluginStep::EarlyRequest, session, ctx)
582            .await?;
583
584        Ok(())
585    }
586
587    #[inline]
588    async fn handle_admin_request(
589        &self,
590        session: &mut Session,
591        ctx: &mut Ctx,
592    ) -> Option<pingora::Result<bool>> {
593        if self.admin {
594            match self.serve_admin(session, ctx).await {
595                Ok(true) => return Some(Ok(true)), // handled
596                Ok(false) => {}, // not admin request, continue
597                Err(e) => return Some(Err(e)), // error
598            }
599        }
600        None // not admin service, continue
601    }
602    #[inline]
603    async fn handle_acme_challenge(
604        &self,
605        session: &mut Session,
606        ctx: &mut Ctx,
607    ) -> Option<pingora::Result<bool>> {
608        if self.lets_encrypt_enabled {
609            return match handle_lets_encrypt(
610                self.config_manager.clone(),
611                session,
612                ctx,
613            )
614            .await
615            {
616                Ok(true) => Some(Ok(true)),   // handle ACME request
617                Ok(false) => Some(Ok(false)), // not handle ACME request, continue
618                Err(e) => Some(Err(e)),
619            };
620        }
621        None // not enable ACME, continue
622    }
623    #[inline]
624    #[cfg(feature = "tracing")]
625    async fn handle_metrics_request(
626        &self,
627        session: &mut Session,
628        _ctx: &mut Ctx,
629    ) -> Option<pingora::Result<bool>> {
630        let header = session.req_header();
631        let should_handle = !self.prometheus_push_mode
632            && self.prometheus.is_some()
633            && header.uri.path() == self.prometheus_metrics;
634
635        if should_handle {
636            let prom = self.prometheus.as_ref().unwrap(); // is_some() 检查已通过
637            let result = async {
638                let body =
639                    prom.metrics().map_err(|e| new_internal_error(500, e))?;
640                HttpResponse::text(body).send(session).await?;
641                Ok(true)
642            }
643            .await;
644            return Some(result);
645        }
646        None
647    }
648    #[inline]
649    async fn handle_standard_request(
650        &self,
651        session: &mut Session,
652        ctx: &mut Ctx,
653    ) -> pingora::Result<bool> {
654        let Some(location) = &ctx.upstream.location_instance else {
655            let header = session.req_header();
656            let host = pingap_core::get_host(header).unwrap_or_default();
657            let message = format!(
658                "No matching location, host:{host} path:{}",
659                header.uri.path()
660            );
661            return Err(pingap_core::new_internal_error(500, message));
662        };
663
664        debug!(
665            target: LOG_TARGET,
666            server = self.name,
667            location = location.name(),
668            "location is matched"
669        );
670
671        let variables = if let Some(features) = &mut ctx.features {
672            features.variables.take()
673        } else {
674            None
675        };
676
677        let (_, capture_variables) =
678            location.rewrite(session.req_header_mut(), variables);
679        if let Some(capture_variables) = capture_variables {
680            ctx.extend_variables(capture_variables);
681        }
682
683        if self
684            .handle_request_plugin(PluginStep::Request, session, ctx)
685            .await?
686        {
687            return Ok(true);
688        }
689
690        Ok(false)
691    }
692}
693
694const MODULE_GRPC_WEB: &str = "grpc-web";
695
696impl Server {
697    #[inline]
698    fn get_context_plugins(
699        &self,
700        location: Arc<Location>,
701        session: &Session,
702    ) -> Option<Vec<(String, Arc<dyn Plugin>)>> {
703        if session.is_upgrade_req() {
704            return None;
705        }
706        let plugins = location.plugins.as_ref()?;
707
708        let location_plugins: Vec<_> = plugins
709            .iter()
710            .filter_map(|name| {
711                self.plugin_provider
712                    .get(name)
713                    .map(|plugin| (name.clone(), plugin))
714            })
715            .collect();
716
717        // use then_some to handle empty collection
718        (!location_plugins.is_empty()).then_some(location_plugins)
719    }
720
721    /// Executes request plugins in the configured chain
722    /// Returns true if a plugin handled the request completely
723    #[inline]
724    pub async fn handle_request_plugin(
725        &self,
726        step: PluginStep,
727        session: &mut Session,
728        ctx: &mut Ctx,
729    ) -> pingora::Result<bool> {
730        let plugins = match ctx.plugins.take() {
731            Some(p) => p,
732            None => return Ok(false), // No plugins, exit early.
733        };
734        if plugins.is_empty() {
735            return Ok(false);
736        }
737
738        let result = async {
739            let mut request_done = false;
740            for (name, plugin) in plugins.iter() {
741                let now = Instant::now();
742                let result = plugin.handle_request(step, session, ctx).await?;
743                let elapsed = now.elapsed().as_millis() as u32;
744
745                // extract repeated logging and timing logic
746                let mut record_time = |msg: &str| {
747                    debug!(
748                        target: LOG_TARGET,
749                        name,
750                        elapsed,
751                        step = step.to_string(),
752                        "{msg}"
753                    );
754                    ctx.add_plugin_processing_time(name, elapsed);
755                };
756
757                match result {
758                    RequestPluginResult::Skipped => {
759                        continue;
760                    },
761                    RequestPluginResult::Respond(resp) => {
762                        record_time("request plugin create new response");
763                        // ignore status >= 900
764                        if resp.status.as_u16() < 900 {
765                            ctx.state.status = Some(resp.status);
766                            resp.send(session).await?;
767                        }
768                        request_done = true;
769                        break;
770                    },
771                    RequestPluginResult::Continue => {
772                        record_time("request plugin run and continue request");
773                    },
774                }
775            }
776            Ok(request_done)
777        }
778        .await;
779        ctx.plugins = Some(plugins);
780        result
781    }
782
783    /// Run response plugins
784    #[inline]
785    pub async fn handle_response_plugin(
786        &self,
787        session: &mut Session,
788        ctx: &mut Ctx,
789        upstream_response: &mut ResponseHeader,
790    ) -> pingora::Result<()> {
791        let plugins = match ctx.plugins.take() {
792            Some(p) => p,
793            None => return Ok(()), // No plugins, exit early.
794        };
795        if plugins.is_empty() {
796            return Ok(());
797        }
798
799        let result = async {
800            for (name, plugin) in plugins.iter() {
801                let now = Instant::now();
802                if let ResponsePluginResult::Modified = plugin
803                    .handle_response(session, ctx, upstream_response)
804                    .await?
805                {
806                    let elapsed = now.elapsed().as_millis() as u32;
807                    debug!(
808                        target: LOG_TARGET,
809                        name, elapsed, "response plugin modify headers"
810                    );
811                    ctx.add_plugin_processing_time(name, elapsed);
812                };
813            }
814            Ok(())
815        }
816        .await;
817        ctx.plugins = Some(plugins);
818        result
819    }
820
821    #[inline]
822    pub fn handle_upstream_response_plugin(
823        &self,
824        session: &mut Session,
825        ctx: &mut Ctx,
826        upstream_response: &mut ResponseHeader,
827    ) -> pingora::Result<()> {
828        let plugins = match ctx.plugins.take() {
829            Some(p) => p,
830            None => return Ok(()), // No plugins, exit early.
831        };
832        if plugins.is_empty() {
833            return Ok(());
834        }
835
836        let result = {
837            for (name, plugin) in plugins.iter() {
838                let now = Instant::now();
839                if let ResponsePluginResult::Modified = plugin
840                    .handle_upstream_response(session, ctx, upstream_response)?
841                {
842                    let elapsed = now.elapsed().as_millis() as u32;
843                    debug!(
844                        target: LOG_TARGET,
845                        name,
846                        elapsed,
847                        "upstream response plugin modify headers"
848                    );
849                    ctx.add_plugin_processing_time(name, elapsed);
850                };
851            }
852            Ok(())
853        };
854        ctx.plugins = Some(plugins);
855        result
856    }
857
858    #[inline]
859    pub fn handle_upstream_response_body_plugin(
860        &self,
861        session: &mut Session,
862        ctx: &mut Ctx,
863        body: &mut Option<bytes::Bytes>,
864        end_of_stream: bool,
865    ) -> pingora::Result<()> {
866        let plugins = match ctx.plugins.take() {
867            Some(p) => p,
868            None => return Ok(()), // No plugins, exit early.
869        };
870        if plugins.is_empty() {
871            return Ok(());
872        }
873
874        let result = {
875            for (name, plugin) in plugins.iter() {
876                let now = Instant::now();
877                match plugin.handle_upstream_response_body(
878                    session,
879                    ctx,
880                    body,
881                    end_of_stream,
882                )? {
883                    ResponseBodyPluginResult::PartialReplaced
884                    | ResponseBodyPluginResult::FullyReplaced => {
885                        let elapsed = now.elapsed().as_millis() as u32;
886                        ctx.add_plugin_processing_time(name, elapsed);
887                        debug!(
888                            target: LOG_TARGET,
889                            name, elapsed, "response body plugin modify body"
890                        );
891                    },
892                    _ => {},
893                }
894            }
895            Ok(())
896        };
897        ctx.plugins = Some(plugins);
898        result
899    }
900
901    #[inline]
902    pub fn handle_response_body_plugin(
903        &self,
904        session: &mut Session,
905        ctx: &mut Ctx,
906        body: &mut Option<bytes::Bytes>,
907        end_of_stream: bool,
908    ) -> pingora::Result<()> {
909        let plugins = match ctx.plugins.take() {
910            Some(p) => p,
911            None => return Ok(()), // No plugins, exit early.
912        };
913        if plugins.is_empty() {
914            return Ok(());
915        }
916        let result = {
917            for (name, plugin) in plugins.iter() {
918                let now = Instant::now();
919                match plugin.handle_response_body(
920                    session,
921                    ctx,
922                    body,
923                    end_of_stream,
924                )? {
925                    ResponseBodyPluginResult::PartialReplaced
926                    | ResponseBodyPluginResult::FullyReplaced => {
927                        let elapsed = now.elapsed().as_millis() as u32;
928                        ctx.add_plugin_processing_time(name, elapsed);
929                        debug!(
930                            target: LOG_TARGET,
931                            name, elapsed, "response body plugin modify body"
932                        );
933                    },
934                    _ => {},
935                }
936            }
937            Ok(())
938        };
939        ctx.plugins = Some(plugins);
940        result
941    }
942
943    fn process_cache_control(
944        &self,
945        c: &mut CacheControl,
946        max_ttl: Option<Duration>,
947    ) -> Result<(), NoCacheReason> {
948        // no-cache, no-store, private
949        if c.no_cache() || c.no_store() || c.private() {
950            return Err(NoCacheReason::OriginNotCache);
951        }
952
953        // max-age=0
954        if c.max_age().ok().flatten().unwrap_or_default() == 0 {
955            return Err(NoCacheReason::OriginNotCache);
956        }
957
958        // set cache max ttl
959        if let Some(d) = max_ttl {
960            if c.fresh_duration().unwrap_or_default() > d {
961                // 更新 s-maxage 的值
962                let s_maxage_value =
963                    itoa::Buffer::new().format(d.as_secs()).as_bytes().to_vec();
964                c.directives.insert(
965                    "s-maxage".to_string(),
966                    Some(DirectiveValue(s_maxage_value)),
967                );
968            }
969        }
970
971        Ok(())
972    }
973
974    #[inline]
975    fn handle_cache_headers(
976        &self,
977        session: &Session,
978        upstream_response: &mut ResponseHeader,
979        ctx: &mut Ctx,
980    ) {
981        let cache_status = session.cache.phase().as_str();
982        let _ = upstream_response.insert_header("x-cache-status", cache_status);
983
984        // process lookup duration
985        let lookup_duration_str = self.process_cache_timing(
986            session.cache.lookup_duration(),
987            "x-cache-lookup",
988            upstream_response,
989            &mut ctx.timing.cache_lookup,
990        );
991
992        // process lock duration
993        let lock_duration_str = self.process_cache_timing(
994            session.cache.lock_duration(),
995            "x-cache-lock",
996            upstream_response,
997            &mut ctx.timing.cache_lock,
998        );
999
1000        #[cfg(not(feature = "tracing"))]
1001        {
1002            let _ = lookup_duration_str;
1003            let _ = lock_duration_str;
1004        }
1005
1006        // (optional) process OpenTelemetry
1007        #[cfg(feature = "tracing")]
1008        update_otel_cache_attrs(
1009            ctx,
1010            cache_status,
1011            lookup_duration_str,
1012            lock_duration_str,
1013        );
1014    }
1015
1016    #[inline]
1017    fn process_cache_timing(
1018        &self,
1019        duration_opt: Option<Duration>,
1020        header_name: &'static str,
1021        resp: &mut ResponseHeader,
1022        ctx_field: &mut Option<i32>,
1023    ) -> String {
1024        if let Some(d) = duration_opt {
1025            let ms = d.as_millis() as i32;
1026
1027            // use itoa to avoid format! heap memory allocation
1028            let mut buffer = itoa::Buffer::new();
1029            let mut value_bytes = Vec::with_capacity(6);
1030            value_bytes.extend_from_slice(buffer.format(ms).as_bytes());
1031            value_bytes.extend_from_slice(b"ms");
1032
1033            let _ = resp.insert_header(header_name, value_bytes);
1034            *ctx_field = Some(ms);
1035
1036            #[cfg(feature = "tracing")]
1037            return humantime::Duration::from(d).to_string();
1038        }
1039        String::new()
1040    }
1041}
1042
1043#[inline]
1044fn get_upstream_with_variables(
1045    upstream: &str,
1046    ctx: &Ctx,
1047    upstreams: &dyn UpstreamProvider,
1048) -> Option<Arc<Upstream>> {
1049    let key = upstream
1050        .strip_prefix('$')
1051        .and_then(|var_name| ctx.get_variable(var_name))
1052        .unwrap_or(upstream);
1053    upstreams.get(key)
1054}
1055
1056#[async_trait]
1057impl ProxyHttp for Server {
1058    type CTX = Ctx;
1059    fn new_ctx(&self) -> Self::CTX {
1060        debug!(target: LOG_TARGET, "new ctx");
1061        Ctx::new()
1062    }
1063    fn init_downstream_modules(&self, modules: &mut HttpModules) {
1064        debug!(target: LOG_TARGET, "--> init downstream modules");
1065        defer!(debug!(target: LOG_TARGET, "<-- init downstream modules"););
1066        // Add disabled downstream compression module by default
1067        modules.add_module(ResponseCompressionBuilder::enable(0));
1068
1069        self.modules.iter().flatten().for_each(|item| {
1070            if item == MODULE_GRPC_WEB {
1071                modules.add_module(Box::new(GrpcWeb));
1072            }
1073        });
1074    }
1075    /// Handles early request processing before main request handling.
1076    /// Key responsibilities:
1077    /// - Sets up connection tracking and metrics
1078    /// - Records timing information
1079    /// - Initializes OpenTelemetry tracing
1080    /// - Matches request to location configuration
1081    /// - Validates request parameters
1082    /// - Initializes compression and gRPC modules if needed
1083    async fn early_request_filter(
1084        &self,
1085        session: &mut Session,
1086        ctx: &mut Self::CTX,
1087    ) -> pingora::Result<()>
1088    where
1089        Self::CTX: Send + Sync,
1090    {
1091        debug!(target: LOG_TARGET, "--> early request filter");
1092        defer!(debug!(target: LOG_TARGET, "<-- early request filter"););
1093
1094        self.initialize_context(session, ctx);
1095        #[cfg(feature = "tracing")]
1096        if self.enabled_otel {
1097            initialize_telemetry(&self.name, session, ctx);
1098        }
1099        self.find_and_apply_location(session, ctx).await?;
1100
1101        Ok(())
1102    }
1103    /// Main request processing filter.
1104    /// Handles:
1105    /// - Admin interface requests
1106    /// - Let's Encrypt certificate challenges
1107    /// - Location-specific processing
1108    /// - URL rewriting
1109    /// - Plugin execution
1110    async fn request_filter(
1111        &self,
1112        session: &mut Session,
1113        ctx: &mut Self::CTX,
1114    ) -> pingora::Result<bool>
1115    where
1116        Self::CTX: Send + Sync,
1117    {
1118        debug!(target: LOG_TARGET, "--> request filter");
1119        defer!(debug!(target: LOG_TARGET, "<-- request filter"););
1120        // try to handle special requests in order
1121        // admin route
1122        if let Some(result) = self.handle_admin_request(session, ctx).await {
1123            return result;
1124        }
1125        // acme http challengt
1126        if let Some(result) = self.handle_acme_challenge(session, ctx).await {
1127            return result;
1128        }
1129        // prometheus metrics pull request
1130        #[cfg(feature = "tracing")]
1131        if let Some(result) = self.handle_metrics_request(session, ctx).await {
1132            return result;
1133        }
1134
1135        self.handle_standard_request(session, ctx).await
1136    }
1137
1138    /// Filters requests before sending to upstream.
1139    /// Allows modifying request before proxying.
1140    async fn proxy_upstream_filter(
1141        &self,
1142        session: &mut Session,
1143        ctx: &mut Self::CTX,
1144    ) -> pingora::Result<bool>
1145    where
1146        Self::CTX: Send + Sync,
1147    {
1148        debug!(target: LOG_TARGET, "--> proxy upstream filter");
1149        defer!(debug!(target: LOG_TARGET, "<-- proxy upstream filter"););
1150        let done = self
1151            .handle_request_plugin(PluginStep::ProxyUpstream, session, ctx)
1152            .await?;
1153
1154        if done {
1155            return Ok(false);
1156        }
1157        Ok(true)
1158    }
1159
1160    /// Selects and configures the upstream peer to proxy to.
1161    /// Handles upstream connection pooling and health checking.
1162    async fn upstream_peer(
1163        &self,
1164        session: &mut Session,
1165        ctx: &mut Ctx,
1166    ) -> pingora::Result<Box<HttpPeer>> {
1167        debug!(target: LOG_TARGET, "--> upstream peer");
1168        defer!(debug!(target: LOG_TARGET, "<-- upstream peer"););
1169
1170        let peer = ctx
1171            .upstream
1172            .location_instance
1173            .clone()
1174            .as_ref()
1175            .and_then(|location| {
1176                let upstream = if ctx.upstream.name.is_empty() {
1177                    get_upstream_with_variables(
1178                        location.upstream(),
1179                        ctx,
1180                        self.upstream_provider.as_ref(),
1181                    )?
1182                } else {
1183                    // override upstream by other plugin
1184                    get_upstream_with_variables(
1185                        &ctx.upstream.name,
1186                        ctx,
1187                        self.upstream_provider.as_ref(),
1188                    )?
1189                };
1190                ctx.upstream.upstream_instance = Some(upstream.clone());
1191                Some(upstream)
1192            })
1193            .and_then(|upstream| {
1194                ctx.upstream.connected_count = upstream.connected();
1195                ctx.upstream.name = upstream.name.clone();
1196                #[cfg(feature = "tracing")]
1197                if let Some(features) = &ctx.features {
1198                    if let Some(tracer) = &features.otel_tracer {
1199                        let name = format!("upstream.{}", &upstream.name);
1200                        let mut span = tracer.new_upstream_span(&name);
1201                        span.set_attribute(KeyValue::new(
1202                            "upstream.connected",
1203                            ctx.upstream.connected_count.unwrap_or_default()
1204                                as i64,
1205                        ));
1206                        let features = ctx.features.get_or_insert_default();
1207                        features.upstream_span = Some(span);
1208                    }
1209                }
1210                upstream
1211                    .new_http_peer(session, &ctx.conn.client_ip)
1212                    .inspect(|peer| {
1213                        ctx.upstream.address = peer.address().to_string();
1214                    })
1215            })
1216            .ok_or_else(|| {
1217                new_internal_error(
1218                    503,
1219                    format!(
1220                        "No available upstream for {}",
1221                        &ctx.upstream.location
1222                    ),
1223                )
1224            })?;
1225
1226        // start connect to upstream
1227        ctx.timing.upstream_connect =
1228            Some(get_start_time(&ctx.timing.created_at));
1229
1230        Ok(Box::new(peer))
1231    }
1232    /// Called when connection is established to upstream.
1233    /// Records timing metrics and TLS details.
1234    async fn connected_to_upstream(
1235        &self,
1236        _session: &mut Session,
1237        reused: bool,
1238        _peer: &HttpPeer,
1239        #[cfg(unix)] _fd: std::os::unix::io::RawFd,
1240        #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
1241        digest: Option<&Digest>,
1242        ctx: &mut Self::CTX,
1243    ) -> pingora::Result<()>
1244    where
1245        Self::CTX: Send + Sync,
1246    {
1247        debug!(target: LOG_TARGET, "--> connected to upstream");
1248        defer!(debug!(target: LOG_TARGET, "<-- connected to upstream"););
1249        ctx.timing.upstream_connect =
1250            get_latency(&ctx.timing.created_at, &ctx.timing.upstream_connect);
1251        if let Some(digest) = digest {
1252            ctx.update_upstream_timing_from_digest(digest, reused);
1253        }
1254        ctx.upstream.reused = reused;
1255
1256        // upstream start processing
1257        ctx.timing.upstream_processing =
1258            Some(get_start_time(&ctx.timing.created_at));
1259
1260        Ok(())
1261    }
1262    fn fail_to_connect(
1263        &self,
1264        _session: &mut Session,
1265        peer: &HttpPeer,
1266        ctx: &mut Self::CTX,
1267        mut e: Box<pingora::Error>,
1268    ) -> Box<pingora::Error> {
1269        if let Some(upstream_instance) = &ctx.upstream.upstream_instance {
1270            upstream_instance.on_transport_failure(&peer.address().to_string());
1271        }
1272        let Some(max_retries) = ctx.upstream.max_retries else {
1273            return e;
1274        };
1275        if ctx.upstream.retries >= max_retries {
1276            return e;
1277        }
1278        if let Some(max_retry_window) = ctx.upstream.max_retry_window {
1279            if ctx.timing.created_at.elapsed() > max_retry_window {
1280                return e;
1281            }
1282        }
1283        ctx.upstream.retries += 1;
1284        e.set_retry(true);
1285        e
1286    }
1287    /// Filters upstream request before sending.
1288    /// Adds proxy headers and performs any request modifications.
1289    async fn upstream_request_filter(
1290        &self,
1291        session: &mut Session,
1292        upstream_response: &mut RequestHeader,
1293        ctx: &mut Self::CTX,
1294    ) -> pingora::Result<()>
1295    where
1296        Self::CTX: Send + Sync,
1297    {
1298        debug!(target: LOG_TARGET, "--> upstream request filter");
1299        defer!(debug!(target: LOG_TARGET, "<-- upstream request filter"););
1300        set_append_proxy_headers(session, ctx, upstream_response);
1301        Ok(())
1302    }
1303    /// Filters request body chunks before sending upstream.
1304    /// Tracks payload size and enforces size limits.
1305    async fn request_body_filter(
1306        &self,
1307        _session: &mut Session,
1308        body: &mut Option<Bytes>,
1309        _end_of_stream: bool,
1310        ctx: &mut Self::CTX,
1311    ) -> pingora::Result<()>
1312    where
1313        Self::CTX: Send + Sync,
1314    {
1315        debug!(target: LOG_TARGET, "--> request body filter");
1316        defer!(debug!(target: LOG_TARGET, "<-- request body filter"););
1317        if let Some(buf) = body {
1318            ctx.state.payload_size += buf.len();
1319            if let Some(location) = &ctx.upstream.location_instance {
1320                let size = location.client_body_size_limit();
1321                if size > 0 && ctx.state.payload_size > size {
1322                    return Err(new_internal_error(
1323                        413,
1324                        format!("Request Entity Too Large, max:{size}"),
1325                    ));
1326                }
1327            }
1328        }
1329        Ok(())
1330    }
1331    /// Generates cache keys for request caching.
1332    /// Combines:
1333    /// - Cache namespace
1334    /// - Request method
1335    /// - URL path and query
1336    /// - Optional custom prefix
1337    fn cache_key_callback(
1338        &self,
1339        session: &Session,
1340        ctx: &mut Self::CTX,
1341    ) -> pingora::Result<CacheKey> {
1342        debug!(target: LOG_TARGET, "--> cache key callback");
1343        defer!(debug!(target: LOG_TARGET, "<-- cache key callback"););
1344        let key = get_cache_key(
1345            ctx,
1346            session.req_header().method.as_ref(),
1347            &session.req_header().uri,
1348        );
1349        debug!(
1350            target: LOG_TARGET,
1351            namespace = key.namespace_str(),
1352            primary = key.primary_key_str(),
1353            user_tag = key.user_tag(),
1354            "cache key callback"
1355        );
1356        Ok(key)
1357    }
1358
1359    /// Determines if and how responses should be cached.
1360    /// Checks:
1361    /// - Cache-Control headers
1362    /// - TTL settings
1363    /// - Cache privacy settings
1364    /// - Custom cache control directives
1365    fn response_cache_filter(
1366        &self,
1367        _session: &Session,
1368        resp: &ResponseHeader,
1369        ctx: &mut Self::CTX,
1370    ) -> pingora::Result<RespCacheable> {
1371        debug!(target: LOG_TARGET, "--> response cache filter");
1372        defer!(debug!(target: LOG_TARGET, "<-- response cache filter"););
1373
1374        let (check_cache_control, max_ttl) = ctx.cache.as_ref().map_or(
1375            (false, None), // ctx.cache is None
1376            |c| (c.check_cache_control, c.max_ttl),
1377        );
1378
1379        let mut cc = CacheControl::from_resp_headers(resp);
1380
1381        if let Some(c) = &mut cc {
1382            // delegate all complex validation and modification logic to the helper function
1383            if let Err(reason) = self.process_cache_control(c, max_ttl) {
1384                return Ok(RespCacheable::Uncacheable(reason));
1385            }
1386        } else if check_cache_control {
1387            // if Cache-Control header is required but it doesn't exist or parsing fails
1388            return Ok(RespCacheable::Uncacheable(
1389                NoCacheReason::OriginNotCache,
1390            ));
1391        }
1392
1393        Ok(resp_cacheable(
1394            cc.as_ref(),
1395            resp.clone(),
1396            false,
1397            &META_DEFAULTS,
1398        ))
1399    }
1400
1401    async fn response_filter(
1402        &self,
1403        session: &mut Session,
1404        upstream_response: &mut ResponseHeader,
1405        ctx: &mut Self::CTX,
1406    ) -> pingora::Result<()>
1407    where
1408        Self::CTX: Send + Sync,
1409    {
1410        debug!(target: LOG_TARGET, "--> response filter");
1411        defer!(debug!(target: LOG_TARGET, "<-- response filter"););
1412        if session.cache.enabled() {
1413            self.handle_cache_headers(session, upstream_response, ctx);
1414        }
1415
1416        // call response plugin
1417        self.handle_response_plugin(session, ctx, upstream_response)
1418            .await?;
1419
1420        // add server-timing response header
1421        if self.enable_server_timing {
1422            let _ = upstream_response
1423                .insert_header("server-timing", ctx.generate_server_timing());
1424        }
1425        Ok(())
1426    }
1427
1428    async fn upstream_response_filter(
1429        &self,
1430        session: &mut Session,
1431        upstream_response: &mut ResponseHeader,
1432        ctx: &mut Self::CTX,
1433    ) -> pingora::Result<()> {
1434        debug!(target: LOG_TARGET, "--> upstream response filter");
1435        defer!(debug!(target: LOG_TARGET, "<-- upstream response filter"););
1436        self.handle_upstream_response_plugin(session, ctx, upstream_response)?;
1437        #[cfg(feature = "tracing")]
1438        inject_telemetry_headers(ctx, upstream_response);
1439        ctx.upstream.status = Some(upstream_response.status);
1440
1441        if ctx.state.status.is_none() {
1442            ctx.state.status = Some(upstream_response.status);
1443            // start to get upstream response data
1444            ctx.timing.upstream_response =
1445                Some(get_start_time(&ctx.timing.created_at));
1446        }
1447
1448        if let Some(id) = &ctx.state.request_id {
1449            let _ = upstream_response
1450                .insert_header(&HTTP_HEADER_NAME_X_REQUEST_ID, id);
1451        }
1452
1453        ctx.timing.upstream_processing = get_latency(
1454            &ctx.timing.created_at,
1455            &ctx.timing.upstream_processing,
1456        );
1457
1458        if let Some(upstream_instance) = &ctx.upstream.upstream_instance {
1459            upstream_instance
1460                .on_response(&ctx.upstream.address, upstream_response.status);
1461        }
1462
1463        Ok(())
1464    }
1465
1466    /// Filters upstream response body chunks.
1467    /// Records timing metrics and finalizes spans.
1468    fn upstream_response_body_filter(
1469        &self,
1470        session: &mut Session,
1471        body: &mut Option<bytes::Bytes>,
1472        end_of_stream: bool,
1473        ctx: &mut Self::CTX,
1474    ) -> pingora::Result<Option<std::time::Duration>> {
1475        debug!(target: LOG_TARGET, "--> upstream response body filter");
1476        defer!(debug!(target: LOG_TARGET, "<-- upstream response body filter"););
1477
1478        self.handle_upstream_response_body_plugin(
1479            session,
1480            ctx,
1481            body,
1482            end_of_stream,
1483        )?;
1484
1485        if end_of_stream {
1486            ctx.timing.upstream_response = get_latency(
1487                &ctx.timing.created_at,
1488                &ctx.timing.upstream_response,
1489            );
1490
1491            #[cfg(feature = "tracing")]
1492            set_otel_upstream_attrs(ctx);
1493            // self.finalize_upstream_session(ctx);
1494        }
1495        Ok(None)
1496    }
1497
1498    /// Final filter for response body before sending to client.
1499    /// Handles response body modifications and compression.
1500    fn response_body_filter(
1501        &self,
1502        session: &mut Session,
1503        body: &mut Option<Bytes>,
1504        end_of_stream: bool,
1505        ctx: &mut Self::CTX,
1506    ) -> pingora::Result<Option<std::time::Duration>>
1507    where
1508        Self::CTX: Send + Sync,
1509    {
1510        debug!(target: LOG_TARGET, "--> response body filter");
1511        defer!(debug!(target: LOG_TARGET, "<-- response body filter"););
1512        self.handle_response_body_plugin(session, ctx, body, end_of_stream)?;
1513        Ok(None)
1514    }
1515
1516    /// Handles proxy failures and generates appropriate error responses.
1517    /// Error handling for:
1518    /// - Upstream connection failures (502, 504)
1519    /// - Client timeouts (408)
1520    /// - Client disconnections (499)
1521    /// Generates error pages using configured template
1522    async fn fail_to_proxy(
1523        &self,
1524        session: &mut Session,
1525        e: &pingora::Error,
1526        ctx: &mut Self::CTX,
1527    ) -> FailToProxy
1528    where
1529        Self::CTX: Send + Sync,
1530    {
1531        debug!(target: LOG_TARGET, "--> fail to proxy");
1532        defer!(debug!(target: LOG_TARGET, "<-- fail to proxy"););
1533        let server_session = session.as_mut();
1534
1535        let code = match e.etype() {
1536            pingora::HTTPStatus(code) => *code,
1537            // spellchecker:off
1538            _ => match e.esource() {
1539                pingora::ErrorSource::Upstream => 502,
1540                pingora::ErrorSource::Downstream => match e.etype() {
1541                    pingora::ErrorType::ConnectTimedout => 408,
1542                    // client close the connection
1543                    pingora::ErrorType::ConnectionClosed => 499,
1544                    _ => 500,
1545                },
1546                pingora::ErrorSource::Internal
1547                | pingora::ErrorSource::Unset => 500,
1548            },
1549            // spellchecker:on
1550        };
1551        let mut resp = match code {
1552            502 => error_resp::HTTP_502_RESPONSE.clone(),
1553            400 => error_resp::HTTP_400_RESPONSE.clone(),
1554            500 => HTTP_500_RESPONSE.clone(),
1555            _ => error_resp::gen_error_response(code),
1556        };
1557
1558        let error_type = e.etype().as_str();
1559        let content = self
1560            .error_template
1561            .replace("{{version}}", pingap_util::get_pkg_version())
1562            .replace("{{content}}", &e.to_string())
1563            .replace("{{error_type}}", error_type);
1564        let buf = Bytes::from(content);
1565        ctx.state.status = Some(
1566            StatusCode::from_u16(code)
1567                .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
1568        );
1569        let content_type = if buf.starts_with(b"{") {
1570            "application/json; charset=utf-8"
1571        } else {
1572            "text/html; charset=utf-8"
1573        };
1574        let _ = resp.insert_header(http::header::CONTENT_TYPE, content_type);
1575        let _ = resp.insert_header("X-Pingap-EType", error_type);
1576        let _ = resp
1577            .insert_header(http::header::CONTENT_LENGTH, buf.len().to_string());
1578
1579        let user_agent = server_session
1580            .get_header(http::header::USER_AGENT)
1581            .map(|v| v.clone().to_str().unwrap_or_default().to_string());
1582
1583        error!(
1584            target: LOG_TARGET,
1585            error = %e,
1586            remote_addr = ctx.conn.remote_addr,
1587            client_ip = ctx.conn.client_ip,
1588            user_agent,
1589            error_type,
1590            path = server_session.req_header().uri.path(),
1591            "fail to proxy"
1592        );
1593
1594        // TODO: we shouldn't be closing downstream connections on internally generated errors
1595        // and possibly other upstream connect() errors (connection refused, timeout, etc)
1596        //
1597        // This change is only here because we DO NOT re-use downstream connections
1598        // today on these errors and we should signal to the client that pingora is dropping it
1599        // rather than a misleading the client with 'keep-alive'
1600        server_session.set_keepalive(None);
1601
1602        server_session
1603            .write_response_header(Box::new(resp))
1604            .await
1605            .unwrap_or_else(|e| {
1606                error!(
1607                    target: LOG_TARGET,
1608                    error = %e,
1609                    "send error response to downstream fail"
1610                );
1611            });
1612
1613        let _ = server_session.write_response_body(buf, true).await;
1614        FailToProxy {
1615            error_code: code,
1616            can_reuse_downstream: false,
1617        }
1618    }
1619    /// Performs request logging and cleanup after request completion.
1620    /// Handles:
1621    /// - Request counting cleanup
1622    /// - Compression statistics
1623    /// - Prometheus metrics
1624    /// - OpenTelemetry span completion
1625    /// - Access logging
1626    async fn logging(
1627        &self,
1628        session: &mut Session,
1629        _e: Option<&pingora::Error>,
1630        ctx: &mut Self::CTX,
1631    ) where
1632        Self::CTX: Send + Sync,
1633    {
1634        debug!(target: LOG_TARGET, "--> logging");
1635        defer!(debug!(target: LOG_TARGET, "<-- logging"););
1636        end_request();
1637        self.processing.fetch_sub(1, Ordering::Relaxed);
1638        if let Some(location) = &ctx.upstream.location_instance {
1639            location.on_response();
1640        }
1641        // get from cache does not connect to upstream
1642        if let Some(upstream_instance) = &ctx.upstream.upstream_instance {
1643            ctx.upstream.processing_count = Some(upstream_instance.completed());
1644        }
1645        if ctx.state.status.is_none() {
1646            if let Some(header) = session.response_written() {
1647                ctx.state.status = Some(header.status);
1648            }
1649        }
1650        #[cfg(feature = "tracing")]
1651        // enable open telemetry and proxy upstream fail
1652        if let Some(features) = ctx.features.as_mut() {
1653            if let Some(ref mut span) = features.upstream_span.as_mut() {
1654                span.end();
1655            }
1656        }
1657
1658        if let Some(c) =
1659            session.downstream_modules_ctx.get::<ResponseCompression>()
1660        {
1661            if c.is_enabled() {
1662                if let Some((algorithm, in_bytes, out_bytes, took)) =
1663                    c.get_info()
1664                {
1665                    let features = ctx.features.get_or_insert_default();
1666                    features.compression_stat = Some(CompressionStat {
1667                        algorithm: algorithm.to_string(),
1668                        in_bytes,
1669                        out_bytes,
1670                        duration: took,
1671                    });
1672                }
1673            }
1674        }
1675        #[cfg(feature = "tracing")]
1676        if let Some(prom) = &self.prometheus {
1677            // because prom.before will not be called if location is empty
1678            if !ctx.upstream.location.is_empty() {
1679                prom.after(session, ctx);
1680            }
1681        }
1682
1683        #[cfg(feature = "tracing")]
1684        set_otel_request_attrs(session, ctx);
1685
1686        if let Some(p) = &self.log_parser {
1687            let buf = p.format(session, ctx);
1688            if let Some(logger) = &self.access_logger {
1689                let _ = logger.try_send(buf);
1690            } else {
1691                let msg = buf.as_bstr();
1692                info!(target: LOG_TARGET, "{msg}");
1693            }
1694        }
1695    }
1696}
1697
1698#[cfg(test)]
1699mod tests {
1700    use super::*;
1701    use crate::server_conf::parse_from_conf;
1702    use ahash::AHashMap;
1703    use pingap_certificate::{DynamicCertificates, TlsCertificate};
1704    use pingap_config::{PingapConfig, new_file_config_manager};
1705    use pingap_core::{CacheInfo, Ctx, UpstreamInfo};
1706    use pingap_location::LocationStats;
1707    use pingora::http::ResponseHeader;
1708    use pingora::protocols::tls::SslDigest;
1709    use pingora::protocols::tls::SslDigestExtension;
1710    use pingora::protocols::{Digest, TimingDigest};
1711    use pingora::proxy::{ProxyHttp, Session};
1712    use pingora::server::configuration;
1713    use pingora::services::Service;
1714    use pretty_assertions::assert_eq;
1715    use std::collections::HashMap;
1716    use std::sync::Arc;
1717    use std::time::{Duration, SystemTime};
1718    use tokio_test::io::Builder;
1719
1720    #[test]
1721    fn test_get_digest_detail() {
1722        let digest = Digest {
1723            timing_digest: vec![Some(TimingDigest {
1724                established_ts: SystemTime::UNIX_EPOCH
1725                    .checked_add(Duration::from_secs(10))
1726                    .unwrap(),
1727            })],
1728            ssl_digest: Some(Arc::new(SslDigest {
1729                cipher: "123".into(),
1730                version: "1.3".into(),
1731                organization: None,
1732                serial_number: None,
1733                cert_digest: vec![],
1734                extension: SslDigestExtension::default(),
1735            })),
1736            ..Default::default()
1737        };
1738        let result = get_digest_detail(&digest);
1739        assert_eq!(10000, result.tcp_established);
1740        assert_eq!("1.3", result.tls_version.unwrap_or_default());
1741    }
1742
1743    /// Creates a new test server instance with default configuration
1744    fn new_server() -> Server {
1745        let toml_data = r###"
1746[upstreams.charts]
1747# upstream address list
1748addrs = ["127.0.0.1:5000"]
1749
1750
1751[upstreams.diving]
1752addrs = ["127.0.0.1:5001"]
1753
1754
1755[locations.lo]
1756# upstream of location (default none)
1757upstream = "charts"
1758
1759# location match path (default none)
1760path = "/"
1761
1762# location match host, multiple domain names are separated by commas (default none)
1763host = ""
1764
1765# set headers to request (default none)
1766includes = ["proxySetHeader"]
1767
1768# add headers to request (default none)
1769proxy_add_headers = ["name:value"]
1770
1771
1772# the weigh of location (default none)
1773weight = 1024
1774
1775
1776# plugin list for location
1777plugins = ["pingap:requestId", "stats"]
1778
1779[servers.test]
1780# server linsten address, multiple addresses are separated by commas (default none)
1781addr = "0.0.0.0:6188"
1782
1783# access log format (default none)
1784access_log = "tiny"
1785
1786# the locations for server
1787locations = ["lo"]
1788
1789# the threads count for server (default 1)
1790threads = 1
1791
1792[plugins.stats]
1793value = "/stats"
1794category = "stats"
1795
1796[storages.authToken]
1797category = "secret"
1798secret = "123123"
1799value = "PLpKJqvfkjTcYTDpauJf+2JnEayP+bm+0Oe60Jk="
1800
1801[storages.proxySetHeader]
1802category = "config"
1803value = 'proxy_set_headers = ["name:value"]'
1804        "###;
1805        let pingap_conf = PingapConfig::new(toml_data.as_ref(), false).unwrap();
1806
1807        let location = Arc::new(
1808            Location::new("lo", pingap_conf.locations.get("lo").unwrap())
1809                .unwrap(),
1810        );
1811        let upstream = Arc::new(
1812            Upstream::new(
1813                "charts",
1814                pingap_conf.upstreams.get("charts").unwrap(),
1815                None,
1816            )
1817            .unwrap(),
1818        );
1819
1820        struct TmpPluginLoader {}
1821        impl PluginProvider for TmpPluginLoader {
1822            fn get(&self, _name: &str) -> Option<Arc<dyn Plugin>> {
1823                None
1824            }
1825        }
1826        struct TmpLocationLoader {
1827            location: Arc<Location>,
1828        }
1829        impl LocationProvider for TmpLocationLoader {
1830            fn get(&self, _name: &str) -> Option<Arc<Location>> {
1831                Some(self.location.clone())
1832            }
1833            fn stats(&self) -> HashMap<String, LocationStats> {
1834                HashMap::new()
1835            }
1836        }
1837        struct TmpUpstreamLoader {
1838            upstream: Arc<Upstream>,
1839        }
1840        impl UpstreamProvider for TmpUpstreamLoader {
1841            fn get(&self, _name: &str) -> Option<Arc<Upstream>> {
1842                Some(self.upstream.clone())
1843            }
1844            fn list(&self) -> Vec<(String, Arc<Upstream>)> {
1845                vec![("charts".to_string(), self.upstream.clone())]
1846            }
1847        }
1848        struct TmpServerLocationsLoader {
1849            server_locations: Arc<Vec<String>>,
1850        }
1851        impl ServerLocationsProvider for TmpServerLocationsLoader {
1852            fn get(&self, _name: &str) -> Option<Arc<Vec<String>>> {
1853                Some(self.server_locations.clone())
1854            }
1855        }
1856
1857        struct TmpCertificateLoader {}
1858        impl CertificateProvider for TmpCertificateLoader {
1859            fn get(&self, _sni: &str) -> Option<Arc<TlsCertificate>> {
1860                None
1861            }
1862            fn list(&self) -> Arc<DynamicCertificates> {
1863                Arc::new(AHashMap::new())
1864            }
1865            fn store(&self, _data: DynamicCertificates) {}
1866        }
1867
1868        let confs = parse_from_conf(pingap_conf);
1869        let file = tempfile::NamedTempFile::with_suffix(".toml").unwrap();
1870
1871        Server::new(
1872            &confs[0],
1873            AppContext {
1874                logger: None,
1875                config_manager: Arc::new(
1876                    new_file_config_manager(&file.path().to_string_lossy())
1877                        .unwrap(),
1878                ),
1879                server_locations_provider: Arc::new(TmpServerLocationsLoader {
1880                    server_locations: Arc::new(vec!["lo".to_string()]),
1881                }),
1882                location_provider: Arc::new(TmpLocationLoader { location }),
1883                upstream_provider: Arc::new(TmpUpstreamLoader { upstream }),
1884                plugin_provider: Arc::new(TmpPluginLoader {}),
1885                certificate_provider: Arc::new(TmpCertificateLoader {}),
1886            },
1887        )
1888        .unwrap()
1889    }
1890
1891    #[test]
1892    fn test_new_server() {
1893        let server = new_server();
1894        let services = server
1895            .run(Arc::new(configuration::ServerConf::default()))
1896            .unwrap();
1897
1898        assert_eq!("Pingora HTTP Proxy Service", services.lb.name());
1899    }
1900
1901    #[tokio::test]
1902    async fn test_early_request_filter() {
1903        let server = new_server();
1904
1905        let headers = [""].join("\r\n");
1906        let input_header =
1907            format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
1908        let mock_io = Builder::new().read(input_header.as_bytes()).build();
1909        let mut session = Session::new_h1(Box::new(mock_io));
1910        session.read_request().await.unwrap();
1911
1912        let mut ctx = Ctx::default();
1913        server
1914            .early_request_filter(&mut session, &mut ctx)
1915            .await
1916            .unwrap();
1917        assert_eq!("lo", ctx.upstream.location.as_ref());
1918    }
1919
1920    #[tokio::test]
1921    async fn test_request_filter() {
1922        let server = new_server();
1923
1924        let headers = [""].join("\r\n");
1925        let input_header =
1926            format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
1927        let mock_io = Builder::new().read(input_header.as_bytes()).build();
1928        let mut session = Session::new_h1(Box::new(mock_io));
1929        session.read_request().await.unwrap();
1930
1931        let location = server.location_provider.get("lo").unwrap();
1932        let mut ctx = Ctx {
1933            upstream: UpstreamInfo {
1934                location: "lo".to_string().into(),
1935                location_instance: Some(location.clone()),
1936                ..Default::default()
1937            },
1938            ..Default::default()
1939        };
1940        let done = server.request_filter(&mut session, &mut ctx).await.unwrap();
1941        assert_eq!(false, done);
1942    }
1943
1944    #[tokio::test]
1945    async fn test_cache_key_callback() {
1946        let server = new_server();
1947
1948        let headers = [""].join("\r\n");
1949        let input_header =
1950            format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
1951        let mock_io = Builder::new().read(input_header.as_bytes()).build();
1952        let mut session = Session::new_h1(Box::new(mock_io));
1953        session.read_request().await.unwrap();
1954
1955        let key = server
1956            .cache_key_callback(
1957                &session,
1958                &mut Ctx {
1959                    cache: Some(CacheInfo {
1960                        namespace: Some("pingap".to_string()),
1961                        keys: Some(vec!["ss".to_string()]),
1962                        ..Default::default()
1963                    }),
1964                    ..Default::default()
1965                },
1966            )
1967            .unwrap();
1968        assert_eq!(
1969            key.primary_key_str(),
1970            Some("ss:GET:/vicanso/pingap?size=1")
1971        );
1972        assert_eq!(key.namespace_str(), Some("pingap"));
1973        assert_eq!(
1974            r#"CacheKey { namespace: [112, 105, 110, 103, 97, 112], primary: [115, 115, 58, 71, 69, 84, 58, 47, 118, 105, 99, 97, 110, 115, 111, 47, 112, 105, 110, 103, 97, 112, 63, 115, 105, 122, 101, 61, 49], primary_bin_override: None, variance: None, user_tag: "", extensions: {} }"#,
1975            format!("{key:?}")
1976        );
1977    }
1978
1979    #[tokio::test]
1980    async fn test_response_cache_filter() {
1981        let server = new_server();
1982
1983        let headers = [""].join("\r\n");
1984        let input_header =
1985            format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
1986        let mock_io = Builder::new().read(input_header.as_bytes()).build();
1987        let mut session = Session::new_h1(Box::new(mock_io));
1988        session.read_request().await.unwrap();
1989
1990        let mut upstream_response =
1991            ResponseHeader::build_no_case(200, None).unwrap();
1992        upstream_response
1993            .append_header("Content-Type", "application/json")
1994            .unwrap();
1995        let result = server
1996            .response_cache_filter(
1997                &session,
1998                &upstream_response,
1999                &mut Ctx {
2000                    cache: Some(CacheInfo {
2001                        keys: Some(vec!["ss".to_string()]),
2002                        ..Default::default()
2003                    }),
2004                    ..Default::default()
2005                },
2006            )
2007            .unwrap();
2008        assert_eq!(true, result.is_cacheable());
2009
2010        let mut upstream_response =
2011            ResponseHeader::build_no_case(200, None).unwrap();
2012        upstream_response
2013            .append_header("Cache-Control", "no-cache")
2014            .unwrap();
2015        let result = server
2016            .response_cache_filter(
2017                &session,
2018                &upstream_response,
2019                &mut Ctx {
2020                    cache: Some(CacheInfo {
2021                        keys: Some(vec!["ss".to_string()]),
2022                        ..Default::default()
2023                    }),
2024                    ..Default::default()
2025                },
2026            )
2027            .unwrap();
2028        assert_eq!(false, result.is_cacheable());
2029
2030        let mut upstream_response =
2031            ResponseHeader::build_no_case(200, None).unwrap();
2032        upstream_response
2033            .append_header("Cache-Control", "no-store")
2034            .unwrap();
2035        let result = server
2036            .response_cache_filter(
2037                &session,
2038                &upstream_response,
2039                &mut Ctx {
2040                    cache: Some(CacheInfo {
2041                        keys: Some(vec!["ss".to_string()]),
2042                        ..Default::default()
2043                    }),
2044                    ..Default::default()
2045                },
2046            )
2047            .unwrap();
2048        assert_eq!(false, result.is_cacheable());
2049
2050        let mut upstream_response =
2051            ResponseHeader::build_no_case(200, None).unwrap();
2052        upstream_response
2053            .append_header("Cache-Control", "private, max-age=100")
2054            .unwrap();
2055        let result = server
2056            .response_cache_filter(
2057                &session,
2058                &upstream_response,
2059                &mut Ctx {
2060                    cache: Some(CacheInfo {
2061                        keys: Some(vec!["ss".to_string()]),
2062                        ..Default::default()
2063                    }),
2064                    ..Default::default()
2065                },
2066            )
2067            .unwrap();
2068        assert_eq!(false, result.is_cacheable());
2069    }
2070}