1#[cfg(feature = "tracing")]
16use super::tracing::{
17 initialize_telemetry, inject_telemetry_headers, set_otel_request_attrs,
18 set_otel_upstream_attrs, update_otel_cache_attrs,
19};
20use super::{LOG_TARGET, ServerConf, set_append_proxy_headers};
21use crate::ServerLocationsProvider;
22use async_trait::async_trait;
23use bstr::ByteSlice;
24use bytes::Bytes;
25use bytes::BytesMut;
26use http::StatusCode;
27use pingap_acme::handle_lets_encrypt;
28use pingap_certificate::CertificateProvider;
29use pingap_certificate::{GlobalCertificate, TlsSettingParams};
30use pingap_config::ConfigManager;
31use pingap_core::BackgroundTask;
32#[cfg(feature = "tracing")]
33use pingap_core::HttpResponse;
34use pingap_core::LocationInstance;
35use pingap_core::PluginProvider;
36use pingap_core::{
37 CompressionStat, Ctx, PluginStep, RequestPluginResult,
38 ResponseBodyPluginResult, ResponsePluginResult, get_cache_key,
39};
40use pingap_core::{HTTP_HEADER_NAME_X_REQUEST_ID, get_digest_detail};
41use pingap_core::{Plugin, new_internal_error};
42use pingap_location::{Location, LocationProvider};
43use pingap_logger::{Parser, parse_access_log_directive};
44#[cfg(feature = "tracing")]
45use pingap_otel::{KeyValue, trace::Span};
46#[cfg(feature = "tracing")]
47use pingap_performance::{
48 Prometheus, new_prometheus, new_prometheus_push_service,
49};
50use pingap_performance::{accept_request, end_request};
51use pingap_upstream::{Upstream, UpstreamProvider};
52use pingora::apps::HttpServerOptions;
53use pingora::cache::cache_control::DirectiveValue;
54use pingora::cache::cache_control::{CacheControl, InterpretCacheControl};
55use pingora::cache::filters::resp_cacheable;
56use pingora::cache::key::CacheHashKey;
57use pingora::cache::{
58 CacheKey, CacheMetaDefaults, NoCacheReason, RespCacheable,
59};
60use pingora::http::{RequestHeader, ResponseHeader};
61use pingora::listeners::TcpSocketOptions;
62use pingora::modules::http::HttpModules;
63use pingora::modules::http::compression::{
64 ResponseCompression, ResponseCompressionBuilder,
65};
66use pingora::modules::http::grpc_web::{GrpcWeb, GrpcWebBridge};
67use pingora::protocols::Digest;
68use pingora::protocols::http::error_resp;
69use pingora::proxy::{FailToProxy, HttpProxy, http_proxy_service};
70use pingora::proxy::{ProxyHttp, Session};
71use pingora::server::configuration;
72use pingora::services::listening::Service;
73use pingora::upstreams::peer::{HttpPeer, Peer};
74use scopeguard::defer;
75use snafu::Snafu;
76use std::sync::Arc;
77use std::sync::LazyLock;
78use std::sync::atomic::{AtomicI32, AtomicU64, Ordering};
79use std::time::{Duration, Instant};
80use tokio::sync::mpsc::Sender;
81use tracing::{debug, error, info};
82
83#[derive(Debug, Snafu)]
84pub enum Error {
85 #[snafu(display("Common error, category: {category}, {message}"))]
86 Common { category: String, message: String },
87}
88type Result<T, E = Error> = std::result::Result<T, E>;
89
90#[inline]
91pub fn get_start_time(started_at: &Instant) -> i32 {
92 let value = started_at.elapsed().as_millis() as i32;
94 if value == 0 {
95 return -1;
96 }
97 -value
98}
99
100#[inline]
101pub fn get_latency(started_at: &Instant, value: &Option<i32>) -> Option<i32> {
102 let Some(value) = value else {
103 return None;
104 };
105 if *value >= 0 {
106 return None;
107 }
108 let latency = started_at.elapsed().as_millis() as i32 + *value;
109
110 Some(latency)
111}
112
113pub struct Server {
116 name: String,
118
119 admin: bool,
121
122 addr: String,
124
125 accepted: AtomicU64,
127
128 processing: AtomicI32,
130
131 log_parser: Option<Parser>,
133
134 error_template: String,
136
137 threads: Option<usize>,
139
140 tls_cipher_list: Option<String>,
142
143 tls_ciphersuites: Option<String>,
145
146 tls_min_version: Option<String>,
148
149 tls_max_version: Option<String>,
151
152 enabled_h2: bool,
154
155 lets_encrypt_enabled: bool,
157
158 global_certificates: bool,
160
161 tcp_socket_options: Option<TcpSocketOptions>,
163
164 #[cfg(feature = "tracing")]
166 prometheus: Option<Arc<Prometheus>>,
167
168 prometheus_push_mode: bool,
170
171 #[cfg(feature = "tracing")]
173 prometheus_metrics: String,
174
175 #[cfg(feature = "tracing")]
177 enabled_otel: bool,
178
179 modules: Option<Vec<String>>,
181
182 enable_server_timing: bool,
184
185 downstream_read_timeout: Option<Duration>,
187 downstream_write_timeout: Option<Duration>,
189
190 server_locations_provider: Arc<dyn ServerLocationsProvider>,
192 plugin_provider: Arc<dyn PluginProvider>,
194
195 location_provider: Arc<dyn LocationProvider>,
197
198 upstream_provider: Arc<dyn UpstreamProvider>,
200
201 certificate_provider: Arc<dyn CertificateProvider>,
203
204 config_manager: Arc<ConfigManager>,
206
207 access_logger: Option<Sender<BytesMut>>,
209}
210
211pub struct ServerServices {
212 pub lb: Service<HttpProxy<Server>>,
213}
214
215const META_DEFAULTS: CacheMetaDefaults =
216 CacheMetaDefaults::new(|_| Some(Duration::from_secs(1)), 1, 1);
217
218static HTTP_500_RESPONSE: LazyLock<ResponseHeader> =
219 LazyLock::new(|| error_resp::gen_error_response(500));
220
221#[derive(Clone)]
222pub struct AppContext {
223 pub logger: Option<Sender<BytesMut>>,
224 pub config_manager: Arc<ConfigManager>,
225 pub server_locations_provider: Arc<dyn ServerLocationsProvider>,
226 pub location_provider: Arc<dyn LocationProvider>,
227 pub upstream_provider: Arc<dyn UpstreamProvider>,
228 pub plugin_provider: Arc<dyn PluginProvider>,
229 pub certificate_provider: Arc<dyn CertificateProvider>,
230}
231
232impl Server {
233 pub fn new(conf: &ServerConf, ctx: AppContext) -> Result<Self> {
240 debug!(target: LOG_TARGET, config = conf.to_string(), "new server");
241 let mut p = None;
242 let (access_log, _) =
243 parse_access_log_directive(conf.access_log.as_ref());
244 if let Some(access_log) = access_log {
245 p = Some(Parser::from(access_log.as_str()));
246 }
247 let tcp_socket_options = if conf.tcp_fastopen.is_some()
248 || conf.tcp_keepalive.is_some()
249 || conf.reuse_port.is_some()
250 {
251 let mut opts = TcpSocketOptions::default();
252 opts.tcp_fastopen = conf.tcp_fastopen;
253 opts.tcp_keepalive.clone_from(&conf.tcp_keepalive);
254 opts.so_reuseport = conf.reuse_port;
255 Some(opts)
256 } else {
257 None
258 };
259 let prometheus_metrics =
260 conf.prometheus_metrics.clone().unwrap_or_default();
261 #[cfg(feature = "tracing")]
262 let prometheus = if prometheus_metrics.is_empty() {
263 None
264 } else {
265 let p = new_prometheus(&conf.name).map_err(|e| Error::Common {
266 category: "prometheus".to_string(),
267 message: e.to_string(),
268 })?;
269 Some(Arc::new(p))
270 };
271 let s = Server {
272 name: conf.name.clone(),
273 admin: conf.admin,
274 accepted: AtomicU64::new(0),
275 processing: AtomicI32::new(0),
276 addr: conf.addr.clone(),
277 log_parser: p,
278 error_template: conf.error_template.clone(),
279 tls_cipher_list: conf.tls_cipher_list.clone(),
280 tls_ciphersuites: conf.tls_ciphersuites.clone(),
281 tls_min_version: conf.tls_min_version.clone(),
282 tls_max_version: conf.tls_max_version.clone(),
283 threads: conf.threads,
284 lets_encrypt_enabled: false,
285 global_certificates: conf.global_certificates,
286 enabled_h2: conf.enabled_h2,
287 tcp_socket_options,
288 prometheus_push_mode: prometheus_metrics.contains("://"),
289 #[cfg(feature = "tracing")]
290 enabled_otel: conf.otlp_exporter.is_some(),
291 #[cfg(feature = "tracing")]
292 prometheus_metrics,
293 #[cfg(feature = "tracing")]
294 prometheus,
295 enable_server_timing: conf.enable_server_timing,
296 modules: conf.modules.clone(),
297 downstream_read_timeout: conf.downstream_read_timeout,
298 downstream_write_timeout: conf.downstream_write_timeout,
299 server_locations_provider: ctx.server_locations_provider,
300 location_provider: ctx.location_provider,
301 upstream_provider: ctx.upstream_provider,
302 plugin_provider: ctx.plugin_provider,
303 certificate_provider: ctx.certificate_provider,
304 access_logger: ctx.logger,
305 config_manager: ctx.config_manager,
306 };
307 Ok(s)
308 }
309 pub fn enable_lets_encrypt(&mut self) {
312 self.lets_encrypt_enabled = true;
313 }
314 pub fn get_prometheus_push_service(
317 &self,
318 ) -> Option<Box<dyn BackgroundTask>> {
319 if !self.prometheus_push_mode {
320 return None;
321 }
322 cfg_if::cfg_if! {
323 if #[cfg(feature = "tracing")] {
324 let Some(prometheus) = &self.prometheus else {
325 return None;
326 };
327 match new_prometheus_push_service(
328 &self.name,
329 &self.prometheus_metrics,
330 prometheus.clone(),
331 ) {
332 Ok(service) => Some(service),
333 Err(e) => {
334 error!(
335 target: LOG_TARGET,
336 error = %e,
337 name = self.name,
338 "new prometheus push service fail"
339 );
340 None
341 },
342 }
343 } else {
344 None
345 }
346 }
347 }
348
349 pub fn run(
355 self,
356 conf: Arc<configuration::ServerConf>,
357 ) -> Result<ServerServices> {
358 let addr = self.addr.clone();
359 let tcp_socket_options = self.tcp_socket_options.clone();
360
361 let name = self.name.clone();
362 let mut dynamic_cert = None;
363 if self.global_certificates {
365 dynamic_cert =
366 Some(GlobalCertificate::new(self.certificate_provider.clone()));
367 }
368
369 let is_tls = dynamic_cert.is_some();
370
371 let enabled_h2 = self.enabled_h2;
372 let threads = if let Some(threads) = self.threads {
373 let value = if threads == 0 {
375 num_cpus::get()
376 } else {
377 threads
378 };
379 Some(value)
380 } else {
381 None
382 };
383
384 info!(
385 target: LOG_TARGET,
386 name,
387 addr,
388 threads,
389 is_tls,
390 h2 = enabled_h2,
391 tcp_socket_options = format!("{:?}", tcp_socket_options),
392 "server is listening"
393 );
394 let cipher_list = self.tls_cipher_list.clone();
395 let cipher_suites = self.tls_ciphersuites.clone();
396 let tls_min_version = self.tls_min_version.clone();
397 let tls_max_version = self.tls_max_version.clone();
398 let mut lb = http_proxy_service(&conf, self);
399 if !is_tls
401 && enabled_h2
402 && let Some(http_logic) = lb.app_logic_mut()
403 {
404 let mut http_server_options = HttpServerOptions::default();
405 http_server_options.h2c = true;
406 http_logic.server_options = Some(http_server_options);
407 }
408 lb.threads = threads;
409 for addr in addr.split(',') {
411 if let Some(dynamic_cert) = &dynamic_cert {
413 let tls_settings = dynamic_cert
414 .new_tls_settings(&TlsSettingParams {
415 server_name: name.clone(),
416 enabled_h2,
417 cipher_list: cipher_list.clone(),
418 cipher_suites: cipher_suites.clone(),
419 tls_min_version: tls_min_version.clone(),
420 tls_max_version: tls_max_version.clone(),
421 })
422 .map_err(|e| Error::Common {
423 category: "tls".to_string(),
424 message: e.to_string(),
425 })?;
426 lb.add_tls_with_settings(
427 addr,
428 tcp_socket_options.clone(),
429 tls_settings,
430 );
431 } else if let Some(opt) = &tcp_socket_options {
432 lb.add_tcp_with_settings(addr, opt.clone());
433 } else {
434 lb.add_tcp(addr);
435 }
436 }
437 Ok(ServerServices { lb })
438 }
439 async fn serve_admin(
442 &self,
443 session: &mut Session,
444 ctx: &mut Ctx,
445 ) -> pingora::Result<bool> {
446 if let Some(plugin) = self.plugin_provider.get("pingap:admin") {
447 let result = plugin
448 .handle_request(PluginStep::Request, session, ctx)
449 .await?;
450 if let RequestPluginResult::Respond(resp) = result {
451 ctx.state.status = Some(resp.status);
452 resp.send(session).await?;
453 return Ok(true);
454 }
455 }
456 Ok(false)
457 }
458 #[inline]
459 fn initialize_context(&self, session: &mut Session, ctx: &mut Ctx) {
460 session.set_read_timeout(self.downstream_read_timeout);
461 session.set_write_timeout(self.downstream_write_timeout);
462
463 if let Some(stream) = session.stream() {
464 ctx.conn.id = stream.id() as usize;
465 }
466 if let Some(digest) = session.digest() {
468 let digest_detail = get_digest_detail(digest);
469 ctx.timing.connection_duration = digest_detail.connection_time;
470 ctx.conn.reused = digest_detail.connection_reused;
471
472 if !ctx.conn.reused
473 && digest_detail.tls_established
474 >= digest_detail.tcp_established
475 {
476 let latency = digest_detail.tls_established
477 - digest_detail.tcp_established;
478 ctx.timing.tls_handshake = Some(latency as i32);
479 }
480 ctx.conn.tls_cipher = digest_detail.tls_cipher;
481 ctx.conn.tls_version = digest_detail.tls_version;
482 };
483 accept_request();
484
485 ctx.state.processing_count =
486 self.processing.fetch_add(1, Ordering::Relaxed) + 1;
487 ctx.state.accepted_count =
488 self.accepted.fetch_add(1, Ordering::Relaxed) + 1;
489 if let Some((remote_addr, remote_port)) =
490 pingap_core::get_remote_addr(session)
491 {
492 ctx.conn.remote_addr = Some(remote_addr);
493 ctx.conn.remote_port = Some(remote_port);
494 }
495 if let Some(addr) =
496 session.server_addr().and_then(|addr| addr.as_inet())
497 {
498 ctx.conn.server_addr = Some(addr.ip().to_string());
499 ctx.conn.server_port = Some(addr.port());
500 }
501 }
502
503 #[inline]
504 async fn find_and_apply_location(
505 &self,
506 session: &mut Session,
507 ctx: &mut Ctx,
508 ) -> pingora::Result<()> {
509 let header = session.req_header();
510 let host = pingap_core::get_host(header).unwrap_or_default();
511 let path = header.uri.path();
512
513 let Some(locations) = self.server_locations_provider.get(&self.name)
515 else {
516 return Ok(());
517 };
518
519 let matched_info = locations.iter().find_map(|name| {
521 let location = self.location_provider.get(name)?;
522 let (matched, captures) = location.match_host_path(host, path);
523 if matched {
524 Some((location, captures))
525 } else {
526 None
527 }
528 });
529
530 let Some((location, captures)) = matched_info else {
531 return Ok(());
532 };
533
534 ctx.upstream.location = location.name.clone();
536 ctx.upstream.location_instance = Some(location.clone());
537 ctx.upstream.max_retries = location.max_retries;
538 ctx.upstream.max_retry_window = location.max_retry_window;
539 if let Some(captures) = captures {
540 ctx.extend_variables(captures);
541 }
542
543 debug!(
544 target: LOG_TARGET,
545 "variables: {:?}",
546 ctx.features.as_ref().map(|item| &item.variables)
547 );
548
549 #[cfg(feature = "tracing")]
551 if let Some(prom) = &self.prometheus {
552 prom.before(&ctx.upstream.location);
553 }
554
555 location
557 .validate_content_length(header)
558 .map_err(|e| new_internal_error(413, e))?;
559
560 let (accepted, processing) = location.on_request()?;
562 ctx.state.location_accepted_count = accepted;
563 ctx.state.location_processing_count = processing;
564
565 if location.support_grpc_web() {
567 let grpc_web = session
568 .downstream_modules_ctx
569 .get_mut::<GrpcWebBridge>()
570 .ok_or_else(|| {
571 new_internal_error(
572 500,
573 "grpc web bridge module should be added",
574 )
575 })?;
576 grpc_web.init();
577 }
578
579 ctx.plugins = self.get_context_plugins(location.clone(), session);
581 let _ = self
582 .handle_request_plugin(PluginStep::EarlyRequest, session, ctx)
583 .await?;
584
585 Ok(())
586 }
587
588 #[inline]
589 async fn handle_admin_request(
590 &self,
591 session: &mut Session,
592 ctx: &mut Ctx,
593 ) -> Option<pingora::Result<bool>> {
594 if self.admin {
595 match self.serve_admin(session, ctx).await {
596 Ok(true) => return Some(Ok(true)), Ok(false) => {}, Err(e) => return Some(Err(e)), }
600 }
601 None }
603 #[inline]
604 async fn handle_acme_challenge(
605 &self,
606 session: &mut Session,
607 ctx: &mut Ctx,
608 ) -> Option<pingora::Result<bool>> {
609 if self.lets_encrypt_enabled {
610 return match handle_lets_encrypt(
611 self.config_manager.clone(),
612 session,
613 ctx,
614 )
615 .await
616 {
617 Ok(true) => Some(Ok(true)), Ok(false) => None, Err(e) => Some(Err(e)),
620 };
621 }
622 None }
624 #[inline]
625 #[cfg(feature = "tracing")]
626 async fn handle_metrics_request(
627 &self,
628 session: &mut Session,
629 _ctx: &mut Ctx,
630 ) -> Option<pingora::Result<bool>> {
631 let header = session.req_header();
632 let should_handle = !self.prometheus_push_mode
633 && self.prometheus.is_some()
634 && header.uri.path() == self.prometheus_metrics;
635
636 if should_handle {
637 let prom = self.prometheus.as_ref().unwrap(); let result = async {
639 let body =
640 prom.metrics().map_err(|e| new_internal_error(500, e))?;
641 HttpResponse::text(body).send(session).await?;
642 Ok(true)
643 }
644 .await;
645 return Some(result);
646 }
647 None
648 }
649 #[inline]
650 async fn handle_standard_request(
651 &self,
652 session: &mut Session,
653 ctx: &mut Ctx,
654 ) -> pingora::Result<bool> {
655 let Some(location) = &ctx.upstream.location_instance else {
656 let header = session.req_header();
657 let host = pingap_core::get_host(header).unwrap_or_default();
658 let message = format!(
659 "No matching location, host:{host} path:{}",
660 header.uri.path()
661 );
662 return Err(pingap_core::new_internal_error(500, message));
663 };
664
665 debug!(
666 target: LOG_TARGET,
667 server = self.name,
668 location = location.name(),
669 "location is matched"
670 );
671
672 let variables = if let Some(features) = &mut ctx.features {
673 features.variables.take()
674 } else {
675 None
676 };
677
678 let (_, capture_variables) =
679 location.rewrite(session.req_header_mut(), variables);
680 if let Some(capture_variables) = capture_variables {
681 ctx.extend_variables(capture_variables);
682 }
683
684 if self
685 .handle_request_plugin(PluginStep::Request, session, ctx)
686 .await?
687 {
688 return Ok(true);
689 }
690
691 Ok(false)
692 }
693}
694
695const MODULE_GRPC_WEB: &str = "grpc-web";
696
697impl Server {
698 #[inline]
699 fn get_context_plugins(
700 &self,
701 location: Arc<Location>,
702 session: &Session,
703 ) -> Option<Vec<(String, Arc<dyn Plugin>)>> {
704 if session.is_upgrade_req() {
705 return None;
706 }
707 let plugins = location.plugins.as_ref()?;
708
709 let location_plugins: Vec<_> = plugins
710 .iter()
711 .filter_map(|name| {
712 self.plugin_provider
713 .get(name)
714 .map(|plugin| (name.clone(), plugin))
715 })
716 .collect();
717
718 (!location_plugins.is_empty()).then_some(location_plugins)
720 }
721
722 #[inline]
725 pub async fn handle_request_plugin(
726 &self,
727 step: PluginStep,
728 session: &mut Session,
729 ctx: &mut Ctx,
730 ) -> pingora::Result<bool> {
731 let plugins = match ctx.plugins.take() {
732 Some(p) => p,
733 None => return Ok(false), };
735 if plugins.is_empty() {
736 return Ok(false);
737 }
738
739 let result = async {
740 let mut request_done = false;
741 for (name, plugin) in plugins.iter() {
742 let now = Instant::now();
743 let result = plugin.handle_request(step, session, ctx).await?;
744 let elapsed = now.elapsed().as_millis() as u32;
745
746 let mut record_time = |msg: &str| {
748 debug!(
749 target: LOG_TARGET,
750 name,
751 elapsed,
752 step = step.to_string(),
753 "{msg}"
754 );
755 ctx.add_plugin_processing_time(name, elapsed);
756 };
757
758 match result {
759 RequestPluginResult::Skipped => {
760 continue;
761 },
762 RequestPluginResult::Respond(resp) => {
763 record_time("request plugin create new response");
764 if resp.status.as_u16() < 900 {
766 ctx.state.status = Some(resp.status);
767 resp.send(session).await?;
768 }
769 request_done = true;
770 break;
771 },
772 RequestPluginResult::Continue => {
773 record_time("request plugin run and continue request");
774 },
775 }
776 }
777 Ok(request_done)
778 }
779 .await;
780 ctx.plugins = Some(plugins);
781 result
782 }
783
784 #[inline]
786 pub async fn handle_response_plugin(
787 &self,
788 session: &mut Session,
789 ctx: &mut Ctx,
790 upstream_response: &mut ResponseHeader,
791 ) -> pingora::Result<()> {
792 let plugins = match ctx.plugins.take() {
793 Some(p) => p,
794 None => return Ok(()), };
796 if plugins.is_empty() {
797 return Ok(());
798 }
799
800 let result = async {
801 for (name, plugin) in plugins.iter() {
802 let now = Instant::now();
803 if let ResponsePluginResult::Modified = plugin
804 .handle_response(session, ctx, upstream_response)
805 .await?
806 {
807 let elapsed = now.elapsed().as_millis() as u32;
808 debug!(
809 target: LOG_TARGET,
810 name, elapsed, "response plugin modify headers"
811 );
812 ctx.add_plugin_processing_time(name, elapsed);
813 };
814 }
815 Ok(())
816 }
817 .await;
818 ctx.plugins = Some(plugins);
819 result
820 }
821
822 #[inline]
823 pub fn handle_upstream_response_plugin(
824 &self,
825 session: &mut Session,
826 ctx: &mut Ctx,
827 upstream_response: &mut ResponseHeader,
828 ) -> pingora::Result<()> {
829 let plugins = match ctx.plugins.take() {
830 Some(p) => p,
831 None => return Ok(()), };
833 if plugins.is_empty() {
834 return Ok(());
835 }
836
837 let result = {
838 for (name, plugin) in plugins.iter() {
839 let now = Instant::now();
840 if let ResponsePluginResult::Modified = plugin
841 .handle_upstream_response(session, ctx, upstream_response)?
842 {
843 let elapsed = now.elapsed().as_millis() as u32;
844 debug!(
845 target: LOG_TARGET,
846 name,
847 elapsed,
848 "upstream response plugin modify headers"
849 );
850 ctx.add_plugin_processing_time(name, elapsed);
851 };
852 }
853 Ok(())
854 };
855 ctx.plugins = Some(plugins);
856 result
857 }
858
859 #[inline]
860 pub fn handle_upstream_response_body_plugin(
861 &self,
862 session: &mut Session,
863 ctx: &mut Ctx,
864 body: &mut Option<bytes::Bytes>,
865 end_of_stream: bool,
866 ) -> pingora::Result<()> {
867 let plugins = match ctx.plugins.take() {
868 Some(p) => p,
869 None => return Ok(()), };
871 if plugins.is_empty() {
872 return Ok(());
873 }
874
875 let result = {
876 for (name, plugin) in plugins.iter() {
877 let now = Instant::now();
878 match plugin.handle_upstream_response_body(
879 session,
880 ctx,
881 body,
882 end_of_stream,
883 )? {
884 ResponseBodyPluginResult::PartialReplaced
885 | ResponseBodyPluginResult::FullyReplaced => {
886 let elapsed = now.elapsed().as_millis() as u32;
887 ctx.add_plugin_processing_time(name, elapsed);
888 debug!(
889 target: LOG_TARGET,
890 name, elapsed, "response body plugin modify body"
891 );
892 },
893 _ => {},
894 }
895 }
896 Ok(())
897 };
898 ctx.plugins = Some(plugins);
899 result
900 }
901
902 #[inline]
903 pub fn handle_response_body_plugin(
904 &self,
905 session: &mut Session,
906 ctx: &mut Ctx,
907 body: &mut Option<bytes::Bytes>,
908 end_of_stream: bool,
909 ) -> pingora::Result<()> {
910 let plugins = match ctx.plugins.take() {
911 Some(p) => p,
912 None => return Ok(()), };
914 if plugins.is_empty() {
915 return Ok(());
916 }
917 let result = {
918 for (name, plugin) in plugins.iter() {
919 let now = Instant::now();
920 match plugin.handle_response_body(
921 session,
922 ctx,
923 body,
924 end_of_stream,
925 )? {
926 ResponseBodyPluginResult::PartialReplaced
927 | ResponseBodyPluginResult::FullyReplaced => {
928 let elapsed = now.elapsed().as_millis() as u32;
929 ctx.add_plugin_processing_time(name, elapsed);
930 debug!(
931 target: LOG_TARGET,
932 name, elapsed, "response body plugin modify body"
933 );
934 },
935 _ => {},
936 }
937 }
938 Ok(())
939 };
940 ctx.plugins = Some(plugins);
941 result
942 }
943
944 fn process_cache_control(
945 &self,
946 c: &mut CacheControl,
947 max_ttl: Option<Duration>,
948 ) -> Result<(), NoCacheReason> {
949 if c.no_cache() || c.no_store() || c.private() {
951 return Err(NoCacheReason::OriginNotCache);
952 }
953
954 if c.max_age().ok().flatten().unwrap_or_default() == 0 {
956 return Err(NoCacheReason::OriginNotCache);
957 }
958
959 if let Some(d) = max_ttl
961 && c.fresh_duration().unwrap_or_default() > d
962 {
963 let s_maxage_value =
965 itoa::Buffer::new().format(d.as_secs()).as_bytes().to_vec();
966 c.directives.insert(
967 "s-maxage".to_string(),
968 Some(DirectiveValue(s_maxage_value)),
969 );
970 }
971
972 Ok(())
973 }
974
975 #[inline]
976 fn handle_cache_headers(
977 &self,
978 session: &Session,
979 upstream_response: &mut ResponseHeader,
980 ctx: &mut Ctx,
981 ) {
982 let cache_status = session.cache.phase().as_str();
983 let _ = upstream_response.insert_header("x-cache-status", cache_status);
984
985 let lookup_duration_str = self.process_cache_timing(
987 session.cache.lookup_duration(),
988 "x-cache-lookup",
989 upstream_response,
990 &mut ctx.timing.cache_lookup,
991 );
992
993 let lock_duration_str = self.process_cache_timing(
995 session.cache.lock_duration(),
996 "x-cache-lock",
997 upstream_response,
998 &mut ctx.timing.cache_lock,
999 );
1000
1001 #[cfg(not(feature = "tracing"))]
1002 {
1003 let _ = lookup_duration_str;
1004 let _ = lock_duration_str;
1005 }
1006
1007 #[cfg(feature = "tracing")]
1009 update_otel_cache_attrs(
1010 ctx,
1011 cache_status,
1012 lookup_duration_str,
1013 lock_duration_str,
1014 );
1015 }
1016
1017 #[inline]
1018 fn process_cache_timing(
1019 &self,
1020 duration_opt: Option<Duration>,
1021 header_name: &'static str,
1022 resp: &mut ResponseHeader,
1023 ctx_field: &mut Option<i32>,
1024 ) -> String {
1025 if let Some(d) = duration_opt {
1026 let ms = d.as_millis() as i32;
1027
1028 let mut buffer = itoa::Buffer::new();
1030 let mut value_bytes = Vec::with_capacity(6);
1031 value_bytes.extend_from_slice(buffer.format(ms).as_bytes());
1032 value_bytes.extend_from_slice(b"ms");
1033
1034 let _ = resp.insert_header(header_name, value_bytes);
1035 *ctx_field = Some(ms);
1036
1037 #[cfg(feature = "tracing")]
1038 return humantime::Duration::from(d).to_string();
1039 }
1040 String::new()
1041 }
1042}
1043
1044#[inline]
1045fn get_upstream_with_variables(
1046 upstream: &str,
1047 ctx: &Ctx,
1048 upstreams: &dyn UpstreamProvider,
1049) -> Option<Arc<Upstream>> {
1050 let key = upstream
1051 .strip_prefix('$')
1052 .and_then(|var_name| ctx.get_variable(var_name))
1053 .unwrap_or(upstream);
1054 upstreams.get(key)
1055}
1056
1057#[async_trait]
1058impl ProxyHttp for Server {
1059 type CTX = Ctx;
1060 fn new_ctx(&self) -> Self::CTX {
1061 debug!(target: LOG_TARGET, "new ctx");
1062 Ctx::new()
1063 }
1064 fn init_downstream_modules(&self, modules: &mut HttpModules) {
1065 debug!(target: LOG_TARGET, "--> init downstream modules");
1066 defer!(debug!(target: LOG_TARGET, "<-- init downstream modules"););
1067 modules.add_module(ResponseCompressionBuilder::enable(0));
1069
1070 self.modules.iter().flatten().for_each(|item| {
1071 if item == MODULE_GRPC_WEB {
1072 modules.add_module(Box::new(GrpcWeb));
1073 }
1074 });
1075 }
1076 async fn early_request_filter(
1085 &self,
1086 session: &mut Session,
1087 ctx: &mut Self::CTX,
1088 ) -> pingora::Result<()>
1089 where
1090 Self::CTX: Send + Sync,
1091 {
1092 debug!(target: LOG_TARGET, "--> early request filter");
1093 defer!(debug!(target: LOG_TARGET, "<-- early request filter"););
1094
1095 self.initialize_context(session, ctx);
1096 #[cfg(feature = "tracing")]
1097 if self.enabled_otel {
1098 initialize_telemetry(&self.name, session, ctx);
1099 }
1100 self.find_and_apply_location(session, ctx).await?;
1101
1102 Ok(())
1103 }
1104 async fn request_filter(
1112 &self,
1113 session: &mut Session,
1114 ctx: &mut Self::CTX,
1115 ) -> pingora::Result<bool>
1116 where
1117 Self::CTX: Send + Sync,
1118 {
1119 debug!(target: LOG_TARGET, "--> request filter");
1120 defer!(debug!(target: LOG_TARGET, "<-- request filter"););
1121 if let Some(result) = self.handle_admin_request(session, ctx).await {
1124 return result;
1125 }
1126 if let Some(result) = self.handle_acme_challenge(session, ctx).await {
1128 return result;
1129 }
1130 #[cfg(feature = "tracing")]
1132 if let Some(result) = self.handle_metrics_request(session, ctx).await {
1133 return result;
1134 }
1135
1136 self.handle_standard_request(session, ctx).await
1137 }
1138
1139 async fn proxy_upstream_filter(
1142 &self,
1143 session: &mut Session,
1144 ctx: &mut Self::CTX,
1145 ) -> pingora::Result<bool>
1146 where
1147 Self::CTX: Send + Sync,
1148 {
1149 debug!(target: LOG_TARGET, "--> proxy upstream filter");
1150 defer!(debug!(target: LOG_TARGET, "<-- proxy upstream filter"););
1151 let done = self
1152 .handle_request_plugin(PluginStep::ProxyUpstream, session, ctx)
1153 .await?;
1154
1155 if done {
1156 return Ok(false);
1157 }
1158 Ok(true)
1159 }
1160
1161 async fn upstream_peer(
1164 &self,
1165 session: &mut Session,
1166 ctx: &mut Ctx,
1167 ) -> pingora::Result<Box<HttpPeer>> {
1168 debug!(target: LOG_TARGET, "--> upstream peer");
1169 defer!(debug!(target: LOG_TARGET, "<-- upstream peer"););
1170
1171 let peer = ctx
1172 .upstream
1173 .location_instance
1174 .clone()
1175 .as_ref()
1176 .and_then(|location| {
1177 let upstream = if ctx.upstream.name.is_empty() {
1178 get_upstream_with_variables(
1179 location.upstream(),
1180 ctx,
1181 self.upstream_provider.as_ref(),
1182 )?
1183 } else {
1184 get_upstream_with_variables(
1186 &ctx.upstream.name,
1187 ctx,
1188 self.upstream_provider.as_ref(),
1189 )?
1190 };
1191 ctx.upstream.upstream_instance = Some(upstream.clone());
1192 Some(upstream)
1193 })
1194 .and_then(|upstream| {
1195 ctx.upstream.connected_count = upstream.connected();
1196 ctx.upstream.name = upstream.name.clone();
1197 #[cfg(feature = "tracing")]
1198 if let Some(features) = &ctx.features
1199 && let Some(tracer) = &features.otel_tracer
1200 {
1201 let name = format!("upstream.{}", &upstream.name);
1202 let mut span = tracer.new_upstream_span(&name);
1203 span.set_attribute(KeyValue::new(
1204 "upstream.connected",
1205 ctx.upstream.connected_count.unwrap_or_default() as i64,
1206 ));
1207 let features = ctx.features.get_or_insert_default();
1208 features.upstream_span = Some(span);
1209 }
1210 upstream
1211 .new_http_peer(session, &ctx.conn.client_ip)
1212 .inspect(|peer| {
1213 ctx.upstream.address = peer.address().to_string();
1214 })
1215 })
1216 .ok_or_else(|| {
1217 new_internal_error(
1218 503,
1219 format!(
1220 "No available upstream for {}",
1221 &ctx.upstream.location
1222 ),
1223 )
1224 })?;
1225
1226 ctx.timing.upstream_connect =
1228 Some(get_start_time(&ctx.timing.created_at));
1229
1230 Ok(Box::new(peer))
1231 }
1232 async fn connected_to_upstream(
1235 &self,
1236 _session: &mut Session,
1237 reused: bool,
1238 _peer: &HttpPeer,
1239 #[cfg(unix)] _fd: std::os::unix::io::RawFd,
1240 #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
1241 digest: Option<&Digest>,
1242 ctx: &mut Self::CTX,
1243 ) -> pingora::Result<()>
1244 where
1245 Self::CTX: Send + Sync,
1246 {
1247 debug!(target: LOG_TARGET, "--> connected to upstream");
1248 defer!(debug!(target: LOG_TARGET, "<-- connected to upstream"););
1249 ctx.timing.upstream_connect =
1250 get_latency(&ctx.timing.created_at, &ctx.timing.upstream_connect);
1251 if let Some(digest) = digest {
1252 ctx.update_upstream_timing_from_digest(digest, reused);
1253 }
1254 ctx.upstream.reused = reused;
1255
1256 ctx.timing.upstream_processing =
1258 Some(get_start_time(&ctx.timing.created_at));
1259
1260 Ok(())
1261 }
1262 fn fail_to_connect(
1263 &self,
1264 _session: &mut Session,
1265 peer: &HttpPeer,
1266 ctx: &mut Self::CTX,
1267 mut e: Box<pingora::Error>,
1268 ) -> Box<pingora::Error> {
1269 if let Some(upstream_instance) = &ctx.upstream.upstream_instance {
1270 upstream_instance.on_transport_failure(&peer.address().to_string());
1271 }
1272 let Some(max_retries) = ctx.upstream.max_retries else {
1273 return e;
1274 };
1275 if ctx.upstream.retries >= max_retries {
1276 return e;
1277 }
1278 if let Some(max_retry_window) = ctx.upstream.max_retry_window
1279 && ctx.timing.created_at.elapsed() > max_retry_window
1280 {
1281 return e;
1282 }
1283 ctx.upstream.retries += 1;
1284 e.set_retry(true);
1285 e
1286 }
1287 async fn upstream_request_filter(
1290 &self,
1291 session: &mut Session,
1292 upstream_response: &mut RequestHeader,
1293 ctx: &mut Self::CTX,
1294 ) -> pingora::Result<()>
1295 where
1296 Self::CTX: Send + Sync,
1297 {
1298 debug!(target: LOG_TARGET, "--> upstream request filter");
1299 defer!(debug!(target: LOG_TARGET, "<-- upstream request filter"););
1300 set_append_proxy_headers(session, ctx, upstream_response);
1301 Ok(())
1302 }
1303 async fn request_body_filter(
1306 &self,
1307 _session: &mut Session,
1308 body: &mut Option<Bytes>,
1309 _end_of_stream: bool,
1310 ctx: &mut Self::CTX,
1311 ) -> pingora::Result<()>
1312 where
1313 Self::CTX: Send + Sync,
1314 {
1315 debug!(target: LOG_TARGET, "--> request body filter");
1316 defer!(debug!(target: LOG_TARGET, "<-- request body filter"););
1317 if let Some(buf) = body {
1318 ctx.state.payload_size += buf.len();
1319 if let Some(location) = &ctx.upstream.location_instance {
1320 let size = location.client_body_size_limit();
1321 if size > 0 && ctx.state.payload_size > size {
1322 return Err(new_internal_error(
1323 413,
1324 format!("Request Entity Too Large, max:{size}"),
1325 ));
1326 }
1327 }
1328 }
1329 Ok(())
1330 }
1331 fn cache_key_callback(
1338 &self,
1339 session: &Session,
1340 ctx: &mut Self::CTX,
1341 ) -> pingora::Result<CacheKey> {
1342 debug!(target: LOG_TARGET, "--> cache key callback");
1343 defer!(debug!(target: LOG_TARGET, "<-- cache key callback"););
1344 let key = get_cache_key(
1345 ctx,
1346 session.req_header().method.as_ref(),
1347 &session.req_header().uri,
1348 );
1349 debug!(
1350 target: LOG_TARGET,
1351 namespace = key.namespace_str(),
1352 primary = key.primary_key_str(),
1353 user_tag = key.user_tag(),
1354 "cache key callback"
1355 );
1356 Ok(key)
1357 }
1358
1359 fn response_cache_filter(
1366 &self,
1367 _session: &Session,
1368 resp: &ResponseHeader,
1369 ctx: &mut Self::CTX,
1370 ) -> pingora::Result<RespCacheable> {
1371 debug!(target: LOG_TARGET, "--> response cache filter");
1372 defer!(debug!(target: LOG_TARGET, "<-- response cache filter"););
1373
1374 let (check_cache_control, max_ttl) = ctx.cache.as_ref().map_or(
1375 (false, None), |c| (c.check_cache_control, c.max_ttl),
1377 );
1378
1379 let mut cc = CacheControl::from_resp_headers(resp);
1380
1381 if let Some(c) = &mut cc {
1382 if let Err(reason) = self.process_cache_control(c, max_ttl) {
1384 return Ok(RespCacheable::Uncacheable(reason));
1385 }
1386 } else if check_cache_control {
1387 return Ok(RespCacheable::Uncacheable(
1389 NoCacheReason::OriginNotCache,
1390 ));
1391 }
1392
1393 Ok(resp_cacheable(
1394 cc.as_ref(),
1395 resp.clone(),
1396 false,
1397 &META_DEFAULTS,
1398 ))
1399 }
1400
1401 async fn response_filter(
1402 &self,
1403 session: &mut Session,
1404 upstream_response: &mut ResponseHeader,
1405 ctx: &mut Self::CTX,
1406 ) -> pingora::Result<()>
1407 where
1408 Self::CTX: Send + Sync,
1409 {
1410 debug!(target: LOG_TARGET, "--> response filter");
1411 defer!(debug!(target: LOG_TARGET, "<-- response filter"););
1412 if session.cache.enabled() {
1413 self.handle_cache_headers(session, upstream_response, ctx);
1414 }
1415
1416 self.handle_response_plugin(session, ctx, upstream_response)
1418 .await?;
1419
1420 if self.enable_server_timing {
1422 let _ = upstream_response
1423 .insert_header("server-timing", ctx.generate_server_timing());
1424 }
1425 Ok(())
1426 }
1427
1428 async fn upstream_response_filter(
1429 &self,
1430 session: &mut Session,
1431 upstream_response: &mut ResponseHeader,
1432 ctx: &mut Self::CTX,
1433 ) -> pingora::Result<()> {
1434 debug!(target: LOG_TARGET, "--> upstream response filter");
1435 defer!(debug!(target: LOG_TARGET, "<-- upstream response filter"););
1436 self.handle_upstream_response_plugin(session, ctx, upstream_response)?;
1437 #[cfg(feature = "tracing")]
1438 inject_telemetry_headers(ctx, upstream_response);
1439 ctx.upstream.status = Some(upstream_response.status);
1440
1441 if ctx.state.status.is_none() {
1442 ctx.state.status = Some(upstream_response.status);
1443 ctx.timing.upstream_response =
1445 Some(get_start_time(&ctx.timing.created_at));
1446 }
1447
1448 if let Some(id) = &ctx.state.request_id {
1449 let _ = upstream_response
1450 .insert_header(&HTTP_HEADER_NAME_X_REQUEST_ID, id);
1451 }
1452
1453 ctx.timing.upstream_processing = get_latency(
1454 &ctx.timing.created_at,
1455 &ctx.timing.upstream_processing,
1456 );
1457
1458 if let Some(upstream_instance) = &ctx.upstream.upstream_instance {
1459 upstream_instance
1460 .on_response(&ctx.upstream.address, upstream_response.status);
1461 }
1462
1463 Ok(())
1464 }
1465
1466 fn upstream_response_body_filter(
1469 &self,
1470 session: &mut Session,
1471 body: &mut Option<bytes::Bytes>,
1472 end_of_stream: bool,
1473 ctx: &mut Self::CTX,
1474 ) -> pingora::Result<Option<std::time::Duration>> {
1475 debug!(target: LOG_TARGET, "--> upstream response body filter");
1476 defer!(debug!(target: LOG_TARGET, "<-- upstream response body filter"););
1477
1478 self.handle_upstream_response_body_plugin(
1479 session,
1480 ctx,
1481 body,
1482 end_of_stream,
1483 )?;
1484
1485 if end_of_stream {
1486 ctx.timing.upstream_response = get_latency(
1487 &ctx.timing.created_at,
1488 &ctx.timing.upstream_response,
1489 );
1490
1491 #[cfg(feature = "tracing")]
1492 set_otel_upstream_attrs(ctx);
1493 }
1495 Ok(None)
1496 }
1497
1498 fn response_body_filter(
1501 &self,
1502 session: &mut Session,
1503 body: &mut Option<Bytes>,
1504 end_of_stream: bool,
1505 ctx: &mut Self::CTX,
1506 ) -> pingora::Result<Option<std::time::Duration>>
1507 where
1508 Self::CTX: Send + Sync,
1509 {
1510 debug!(target: LOG_TARGET, "--> response body filter");
1511 defer!(debug!(target: LOG_TARGET, "<-- response body filter"););
1512 self.handle_response_body_plugin(session, ctx, body, end_of_stream)?;
1513 Ok(None)
1514 }
1515
1516 async fn fail_to_proxy(
1523 &self,
1524 session: &mut Session,
1525 e: &pingora::Error,
1526 ctx: &mut Self::CTX,
1527 ) -> FailToProxy
1528 where
1529 Self::CTX: Send + Sync,
1530 {
1531 debug!(target: LOG_TARGET, "--> fail to proxy");
1532 defer!(debug!(target: LOG_TARGET, "<-- fail to proxy"););
1533 let server_session = session.as_mut();
1534
1535 let code = match e.etype() {
1536 pingora::HTTPStatus(code) => *code,
1537 _ => match e.esource() {
1539 pingora::ErrorSource::Upstream => 502,
1540 pingora::ErrorSource::Downstream => match e.etype() {
1541 pingora::ErrorType::ConnectTimedout => 408,
1542 pingora::ErrorType::ConnectionClosed => 499,
1544 _ => 500,
1545 },
1546 pingora::ErrorSource::Internal
1547 | pingora::ErrorSource::Unset => 500,
1548 },
1549 };
1551 let mut resp = match code {
1552 502 => error_resp::HTTP_502_RESPONSE.clone(),
1553 400 => error_resp::HTTP_400_RESPONSE.clone(),
1554 500 => HTTP_500_RESPONSE.clone(),
1555 _ => error_resp::gen_error_response(code),
1556 };
1557
1558 let error_type = e.etype().as_str();
1559 let content = self
1560 .error_template
1561 .replace("{{version}}", pingap_util::get_pkg_version())
1562 .replace("{{content}}", &e.to_string())
1563 .replace("{{error_type}}", error_type);
1564 let buf = Bytes::from(content);
1565 ctx.state.status = Some(
1566 StatusCode::from_u16(code)
1567 .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
1568 );
1569 let content_type = if buf.starts_with(b"{") {
1570 "application/json; charset=utf-8"
1571 } else {
1572 "text/html; charset=utf-8"
1573 };
1574 let _ = resp.insert_header(http::header::CONTENT_TYPE, content_type);
1575 let _ = resp.insert_header("X-Pingap-EType", error_type);
1576 let _ = resp
1577 .insert_header(http::header::CONTENT_LENGTH, buf.len().to_string());
1578
1579 let user_agent = server_session
1580 .get_header(http::header::USER_AGENT)
1581 .map(|v| v.clone().to_str().unwrap_or_default().to_string());
1582
1583 error!(
1584 target: LOG_TARGET,
1585 error = %e,
1586 remote_addr = ctx.conn.remote_addr,
1587 client_ip = ctx.conn.client_ip,
1588 user_agent,
1589 error_type,
1590 path = server_session.req_header().uri.path(),
1591 "fail to proxy"
1592 );
1593
1594 server_session.set_keepalive(None);
1601
1602 server_session
1603 .write_response_header(Box::new(resp))
1604 .await
1605 .unwrap_or_else(|e| {
1606 error!(
1607 target: LOG_TARGET,
1608 error = %e,
1609 "send error response to downstream fail"
1610 );
1611 });
1612
1613 let _ = server_session.write_response_body(buf, true).await;
1614 FailToProxy {
1615 error_code: code,
1616 can_reuse_downstream: false,
1617 }
1618 }
1619 async fn logging(
1627 &self,
1628 session: &mut Session,
1629 _e: Option<&pingora::Error>,
1630 ctx: &mut Self::CTX,
1631 ) where
1632 Self::CTX: Send + Sync,
1633 {
1634 debug!(target: LOG_TARGET, "--> logging");
1635 defer!(debug!(target: LOG_TARGET, "<-- logging"););
1636 end_request();
1637 self.processing.fetch_sub(1, Ordering::Relaxed);
1638 if let Some(location) = &ctx.upstream.location_instance {
1639 location.on_response();
1640 }
1641 if let Some(upstream_instance) = &ctx.upstream.upstream_instance {
1643 ctx.upstream.processing_count = Some(upstream_instance.completed());
1644 }
1645 if ctx.state.status.is_none()
1646 && let Some(header) = session.response_written()
1647 {
1648 ctx.state.status = Some(header.status);
1649 }
1650 #[cfg(feature = "tracing")]
1651 if let Some(features) = ctx.features.as_mut()
1653 && let Some(ref mut span) = features.upstream_span.as_mut()
1654 {
1655 span.end();
1656 }
1657
1658 if let Some(c) =
1659 session.downstream_modules_ctx.get::<ResponseCompression>()
1660 && c.is_enabled()
1661 && let Some((algorithm, in_bytes, out_bytes, took)) = c.get_info()
1662 {
1663 let features = ctx.features.get_or_insert_default();
1664 features.compression_stat = Some(CompressionStat {
1665 algorithm: algorithm.to_string(),
1666 in_bytes,
1667 out_bytes,
1668 duration: took,
1669 });
1670 }
1671 #[cfg(feature = "tracing")]
1672 if let Some(prom) = &self.prometheus {
1673 if !ctx.upstream.location.is_empty() {
1675 prom.after(session, ctx);
1676 }
1677 }
1678
1679 #[cfg(feature = "tracing")]
1680 set_otel_request_attrs(session, ctx);
1681
1682 if let Some(p) = &self.log_parser {
1683 let buf = p.format(session, ctx);
1684 if let Some(logger) = &self.access_logger {
1685 let _ = logger.try_send(buf);
1686 } else {
1687 let msg = buf.as_bstr();
1688 info!(target: LOG_TARGET, "{msg}");
1689 }
1690 }
1691 }
1692}
1693
1694#[cfg(test)]
1695mod tests {
1696 use super::*;
1697 use crate::server_conf::parse_from_conf;
1698 use ahash::AHashMap;
1699 use pingap_certificate::{DynamicCertificates, TlsCertificate};
1700 use pingap_config::{PingapConfig, new_file_config_manager};
1701 use pingap_core::{CacheInfo, Ctx, UpstreamInfo};
1702 use pingap_location::LocationStats;
1703 use pingora::http::ResponseHeader;
1704 use pingora::protocols::tls::SslDigest;
1705 use pingora::protocols::tls::SslDigestExtension;
1706 use pingora::protocols::{Digest, TimingDigest};
1707 use pingora::proxy::{ProxyHttp, Session};
1708 use pingora::server::configuration;
1709 use pingora::services::Service;
1710 use pretty_assertions::assert_eq;
1711 use std::collections::HashMap;
1712 use std::sync::Arc;
1713 use std::time::{Duration, SystemTime};
1714 use tokio_test::io::Builder;
1715
1716 #[test]
1717 fn test_get_digest_detail() {
1718 let digest = Digest {
1719 timing_digest: vec![Some(TimingDigest {
1720 established_ts: SystemTime::UNIX_EPOCH
1721 .checked_add(Duration::from_secs(10))
1722 .unwrap(),
1723 })],
1724 ssl_digest: Some(Arc::new(SslDigest {
1725 cipher: "123".into(),
1726 version: "1.3".into(),
1727 organization: None,
1728 serial_number: None,
1729 cert_digest: vec![],
1730 extension: SslDigestExtension::default(),
1731 })),
1732 ..Default::default()
1733 };
1734 let result = get_digest_detail(&digest);
1735 assert_eq!(10000, result.tcp_established);
1736 assert_eq!("1.3", result.tls_version.unwrap_or_default());
1737 }
1738
1739 fn new_server() -> Server {
1741 let toml_data = r###"
1742[upstreams.charts]
1743# upstream address list
1744addrs = ["127.0.0.1:5000"]
1745
1746
1747[upstreams.diving]
1748addrs = ["127.0.0.1:5001"]
1749
1750
1751[locations.lo]
1752# upstream of location (default none)
1753upstream = "charts"
1754
1755# location match path (default none)
1756path = "/"
1757
1758# location match host, multiple domain names are separated by commas (default none)
1759host = ""
1760
1761# set headers to request (default none)
1762includes = ["proxySetHeader"]
1763
1764# add headers to request (default none)
1765proxy_add_headers = ["name:value"]
1766
1767
1768# the weigh of location (default none)
1769weight = 1024
1770
1771
1772# plugin list for location
1773plugins = ["pingap:requestId", "stats"]
1774
1775[servers.test]
1776# server linsten address, multiple addresses are separated by commas (default none)
1777addr = "0.0.0.0:6188"
1778
1779# access log format (default none)
1780access_log = "tiny"
1781
1782# the locations for server
1783locations = ["lo"]
1784
1785# the threads count for server (default 1)
1786threads = 1
1787
1788[plugins.stats]
1789value = "/stats"
1790category = "stats"
1791
1792[storages.authToken]
1793category = "secret"
1794secret = "123123"
1795value = "PLpKJqvfkjTcYTDpauJf+2JnEayP+bm+0Oe60Jk="
1796
1797[storages.proxySetHeader]
1798category = "config"
1799value = 'proxy_set_headers = ["name:value"]'
1800 "###;
1801 let pingap_conf = PingapConfig::new(toml_data.as_ref(), false).unwrap();
1802
1803 let location = Arc::new(
1804 Location::new("lo", pingap_conf.locations.get("lo").unwrap())
1805 .unwrap(),
1806 );
1807 let upstream = Arc::new(
1808 Upstream::new(
1809 "charts",
1810 pingap_conf.upstreams.get("charts").unwrap(),
1811 None,
1812 )
1813 .unwrap(),
1814 );
1815
1816 struct TmpPluginLoader {}
1817 impl PluginProvider for TmpPluginLoader {
1818 fn get(&self, _name: &str) -> Option<Arc<dyn Plugin>> {
1819 None
1820 }
1821 }
1822 struct TmpLocationLoader {
1823 location: Arc<Location>,
1824 }
1825 impl LocationProvider for TmpLocationLoader {
1826 fn get(&self, _name: &str) -> Option<Arc<Location>> {
1827 Some(self.location.clone())
1828 }
1829 fn stats(&self) -> HashMap<String, LocationStats> {
1830 HashMap::new()
1831 }
1832 }
1833 struct TmpUpstreamLoader {
1834 upstream: Arc<Upstream>,
1835 }
1836 impl UpstreamProvider for TmpUpstreamLoader {
1837 fn get(&self, _name: &str) -> Option<Arc<Upstream>> {
1838 Some(self.upstream.clone())
1839 }
1840 fn list(&self) -> Vec<(String, Arc<Upstream>)> {
1841 vec![("charts".to_string(), self.upstream.clone())]
1842 }
1843 }
1844 struct TmpServerLocationsLoader {
1845 server_locations: Arc<Vec<String>>,
1846 }
1847 impl ServerLocationsProvider for TmpServerLocationsLoader {
1848 fn get(&self, _name: &str) -> Option<Arc<Vec<String>>> {
1849 Some(self.server_locations.clone())
1850 }
1851 }
1852
1853 struct TmpCertificateLoader {}
1854 impl CertificateProvider for TmpCertificateLoader {
1855 fn get(&self, _sni: &str) -> Option<Arc<TlsCertificate>> {
1856 None
1857 }
1858 fn list(&self) -> Arc<DynamicCertificates> {
1859 Arc::new(AHashMap::new())
1860 }
1861 fn store(&self, _data: DynamicCertificates) {}
1862 }
1863
1864 let confs = parse_from_conf(pingap_conf);
1865 let file = tempfile::NamedTempFile::with_suffix(".toml").unwrap();
1866
1867 Server::new(
1868 &confs[0],
1869 AppContext {
1870 logger: None,
1871 config_manager: Arc::new(
1872 new_file_config_manager(&file.path().to_string_lossy())
1873 .unwrap(),
1874 ),
1875 server_locations_provider: Arc::new(TmpServerLocationsLoader {
1876 server_locations: Arc::new(vec!["lo".to_string()]),
1877 }),
1878 location_provider: Arc::new(TmpLocationLoader { location }),
1879 upstream_provider: Arc::new(TmpUpstreamLoader { upstream }),
1880 plugin_provider: Arc::new(TmpPluginLoader {}),
1881 certificate_provider: Arc::new(TmpCertificateLoader {}),
1882 },
1883 )
1884 .unwrap()
1885 }
1886
1887 #[test]
1888 fn test_new_server() {
1889 let server = new_server();
1890 let services = server
1891 .run(Arc::new(configuration::ServerConf::default()))
1892 .unwrap();
1893
1894 assert_eq!("Pingora HTTP Proxy Service", services.lb.name());
1895 }
1896
1897 #[tokio::test]
1898 async fn test_early_request_filter() {
1899 let server = new_server();
1900
1901 let headers = [""].join("\r\n");
1902 let input_header =
1903 format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
1904 let mock_io = Builder::new().read(input_header.as_bytes()).build();
1905 let mut session = Session::new_h1(Box::new(mock_io));
1906 session.read_request().await.unwrap();
1907
1908 let mut ctx = Ctx::default();
1909 server
1910 .early_request_filter(&mut session, &mut ctx)
1911 .await
1912 .unwrap();
1913 assert_eq!("lo", ctx.upstream.location.as_ref());
1914 }
1915
1916 #[tokio::test]
1917 async fn test_request_filter() {
1918 let server = new_server();
1919
1920 let headers = [""].join("\r\n");
1921 let input_header =
1922 format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
1923 let mock_io = Builder::new().read(input_header.as_bytes()).build();
1924 let mut session = Session::new_h1(Box::new(mock_io));
1925 session.read_request().await.unwrap();
1926
1927 let location = server.location_provider.get("lo").unwrap();
1928 let mut ctx = Ctx {
1929 upstream: UpstreamInfo {
1930 location: "lo".to_string().into(),
1931 location_instance: Some(location.clone()),
1932 ..Default::default()
1933 },
1934 ..Default::default()
1935 };
1936 let done = server.request_filter(&mut session, &mut ctx).await.unwrap();
1937 assert_eq!(false, done);
1938 }
1939
1940 #[tokio::test]
1941 async fn test_cache_key_callback() {
1942 let server = new_server();
1943
1944 let headers = [""].join("\r\n");
1945 let input_header =
1946 format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
1947 let mock_io = Builder::new().read(input_header.as_bytes()).build();
1948 let mut session = Session::new_h1(Box::new(mock_io));
1949 session.read_request().await.unwrap();
1950
1951 let key = server
1952 .cache_key_callback(
1953 &session,
1954 &mut Ctx {
1955 cache: Some(CacheInfo {
1956 namespace: Some("pingap".to_string()),
1957 keys: Some(vec!["ss".to_string()]),
1958 ..Default::default()
1959 }),
1960 ..Default::default()
1961 },
1962 )
1963 .unwrap();
1964 assert_eq!(
1965 key.primary_key_str(),
1966 Some("ss:GET:/vicanso/pingap?size=1")
1967 );
1968 assert_eq!(key.namespace_str(), Some("pingap"));
1969 assert_eq!(
1970 r#"CacheKey { namespace: [112, 105, 110, 103, 97, 112], primary: [115, 115, 58, 71, 69, 84, 58, 47, 118, 105, 99, 97, 110, 115, 111, 47, 112, 105, 110, 103, 97, 112, 63, 115, 105, 122, 101, 61, 49], primary_bin_override: None, variance: None, user_tag: "", extensions: {} }"#,
1971 format!("{key:?}")
1972 );
1973 }
1974
1975 #[tokio::test]
1976 async fn test_response_cache_filter() {
1977 let server = new_server();
1978
1979 let headers = [""].join("\r\n");
1980 let input_header =
1981 format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
1982 let mock_io = Builder::new().read(input_header.as_bytes()).build();
1983 let mut session = Session::new_h1(Box::new(mock_io));
1984 session.read_request().await.unwrap();
1985
1986 let mut upstream_response =
1987 ResponseHeader::build_no_case(200, None).unwrap();
1988 upstream_response
1989 .append_header("Content-Type", "application/json")
1990 .unwrap();
1991 let result = server
1992 .response_cache_filter(
1993 &session,
1994 &upstream_response,
1995 &mut Ctx {
1996 cache: Some(CacheInfo {
1997 keys: Some(vec!["ss".to_string()]),
1998 ..Default::default()
1999 }),
2000 ..Default::default()
2001 },
2002 )
2003 .unwrap();
2004 assert_eq!(true, result.is_cacheable());
2005
2006 let mut upstream_response =
2007 ResponseHeader::build_no_case(200, None).unwrap();
2008 upstream_response
2009 .append_header("Cache-Control", "no-cache")
2010 .unwrap();
2011 let result = server
2012 .response_cache_filter(
2013 &session,
2014 &upstream_response,
2015 &mut Ctx {
2016 cache: Some(CacheInfo {
2017 keys: Some(vec!["ss".to_string()]),
2018 ..Default::default()
2019 }),
2020 ..Default::default()
2021 },
2022 )
2023 .unwrap();
2024 assert_eq!(false, result.is_cacheable());
2025
2026 let mut upstream_response =
2027 ResponseHeader::build_no_case(200, None).unwrap();
2028 upstream_response
2029 .append_header("Cache-Control", "no-store")
2030 .unwrap();
2031 let result = server
2032 .response_cache_filter(
2033 &session,
2034 &upstream_response,
2035 &mut Ctx {
2036 cache: Some(CacheInfo {
2037 keys: Some(vec!["ss".to_string()]),
2038 ..Default::default()
2039 }),
2040 ..Default::default()
2041 },
2042 )
2043 .unwrap();
2044 assert_eq!(false, result.is_cacheable());
2045
2046 let mut upstream_response =
2047 ResponseHeader::build_no_case(200, None).unwrap();
2048 upstream_response
2049 .append_header("Cache-Control", "private, max-age=100")
2050 .unwrap();
2051 let result = server
2052 .response_cache_filter(
2053 &session,
2054 &upstream_response,
2055 &mut Ctx {
2056 cache: Some(CacheInfo {
2057 keys: Some(vec!["ss".to_string()]),
2058 ..Default::default()
2059 }),
2060 ..Default::default()
2061 },
2062 )
2063 .unwrap();
2064 assert_eq!(false, result.is_cacheable());
2065 }
2066
2067 fn create_session(path: &str) -> Session {
2068 let headers = ["Host: example.com"].join("\r\n");
2069 let input_header =
2070 format!("GET {} HTTP/1.1\r\n{headers}\r\n\r\n", path);
2071 let mock_io = Builder::new().read(input_header.as_bytes()).build();
2072 Session::new_h1(Box::new(mock_io))
2073 }
2074
2075 #[tokio::test]
2076 async fn test_handle_acme_challenge_not_enabled() {
2077 let server = new_server();
2078
2079 let mut session = create_session("/");
2080 session.read_request().await.unwrap();
2081
2082 let result = server
2083 .handle_acme_challenge(&mut session, &mut Ctx::default())
2084 .await;
2085 assert!(
2086 result.is_none(),
2087 "When ACME not enabled, should return None"
2088 );
2089 }
2090
2091 #[tokio::test]
2092 async fn test_handle_acme_challenge_non_challenge_returns_none() {
2093 let mut server = new_server();
2094 server.enable_lets_encrypt();
2095
2096 let test_paths = ["/", "/api", "/test.html", "/normal/path"];
2097
2098 for path in test_paths {
2099 let mut session = create_session(path);
2100 session.read_request().await.unwrap();
2101
2102 let result = server
2103 .handle_acme_challenge(&mut session, &mut Ctx::default())
2104 .await;
2105 assert!(
2106 result.is_none(),
2107 "Path '{}' should return None to continue processing (bug returns Some(Ok(false)))",
2108 path
2109 );
2110 }
2111 }
2112}