1#[cfg(feature = "tracing")]
16use super::tracing::{
17 initialize_telemetry, inject_telemetry_headers, set_otel_request_attrs,
18 set_otel_upstream_attrs, update_otel_cache_attrs,
19};
20use super::{LOG_TARGET, ServerConf, set_append_proxy_headers};
21use crate::ServerLocationsProvider;
22use async_trait::async_trait;
23use bstr::ByteSlice;
24use bytes::Bytes;
25use bytes::BytesMut;
26use http::StatusCode;
27use pingap_acme::handle_lets_encrypt;
28use pingap_certificate::CertificateProvider;
29use pingap_certificate::{GlobalCertificate, TlsSettingParams};
30use pingap_config::ConfigManager;
31use pingap_core::BackgroundTask;
32#[cfg(feature = "tracing")]
33use pingap_core::HttpResponse;
34use pingap_core::LocationInstance;
35use pingap_core::PluginProvider;
36use pingap_core::{
37 CompressionStat, Ctx, PluginStep, RequestPluginResult,
38 ResponseBodyPluginResult, ResponsePluginResult, get_cache_key,
39};
40use pingap_core::{HTTP_HEADER_NAME_X_REQUEST_ID, get_digest_detail};
41use pingap_core::{Plugin, new_internal_error};
42use pingap_location::{Location, LocationProvider};
43use pingap_logger::{Parser, parse_access_log_directive};
44#[cfg(feature = "tracing")]
45use pingap_otel::{KeyValue, trace::Span};
46#[cfg(feature = "tracing")]
47use pingap_performance::{
48 Prometheus, new_prometheus, new_prometheus_push_service,
49};
50use pingap_performance::{accept_request, end_request};
51use pingap_upstream::{Upstream, UpstreamProvider};
52use pingora::apps::HttpServerOptions;
53use pingora::cache::cache_control::DirectiveValue;
54use pingora::cache::cache_control::{CacheControl, InterpretCacheControl};
55use pingora::cache::filters::resp_cacheable;
56use pingora::cache::key::CacheHashKey;
57use pingora::cache::{
58 CacheKey, CacheMetaDefaults, NoCacheReason, RespCacheable,
59};
60use pingora::http::{RequestHeader, ResponseHeader};
61use pingora::listeners::TcpSocketOptions;
62use pingora::modules::http::HttpModules;
63use pingora::modules::http::compression::{
64 ResponseCompression, ResponseCompressionBuilder,
65};
66use pingora::modules::http::grpc_web::{GrpcWeb, GrpcWebBridge};
67use pingora::protocols::Digest;
68use pingora::protocols::http::error_resp;
69use pingora::proxy::{FailToProxy, HttpProxy, http_proxy_service};
70use pingora::proxy::{ProxyHttp, Session};
71use pingora::server::configuration;
72use pingora::services::listening::Service;
73use pingora::upstreams::peer::{HttpPeer, Peer};
74use scopeguard::defer;
75use snafu::Snafu;
76use std::sync::Arc;
77use std::sync::LazyLock;
78use std::sync::atomic::{AtomicI32, AtomicU64, Ordering};
79use std::time::{Duration, Instant};
80use tokio::sync::mpsc::Sender;
81use tracing::{debug, error, info};
82
83#[derive(Debug, Snafu)]
84pub enum Error {
85 #[snafu(display("Common error, category: {category}, {message}"))]
86 Common { category: String, message: String },
87}
88type Result<T, E = Error> = std::result::Result<T, E>;
89
90#[inline]
91pub fn get_start_time(started_at: &Instant) -> i32 {
92 let value = started_at.elapsed().as_millis() as i32;
94 if value == 0 {
95 return -1;
96 }
97 -value
98}
99
100#[inline]
101pub fn get_latency(started_at: &Instant, value: &Option<i32>) -> Option<i32> {
102 let Some(value) = value else {
103 return None;
104 };
105 if *value >= 0 {
106 return None;
107 }
108 let latency = started_at.elapsed().as_millis() as i32 + *value;
109
110 Some(latency)
111}
112
113pub struct Server {
116 name: String,
118
119 admin: bool,
121
122 addr: String,
124
125 accepted: AtomicU64,
127
128 processing: AtomicI32,
130
131 log_parser: Option<Parser>,
133
134 error_template: String,
136
137 threads: Option<usize>,
139
140 tls_cipher_list: Option<String>,
142
143 tls_ciphersuites: Option<String>,
145
146 tls_min_version: Option<String>,
148
149 tls_max_version: Option<String>,
151
152 enabled_h2: bool,
154
155 lets_encrypt_enabled: bool,
157
158 global_certificates: bool,
160
161 tcp_socket_options: Option<TcpSocketOptions>,
163
164 #[cfg(feature = "tracing")]
166 prometheus: Option<Arc<Prometheus>>,
167
168 prometheus_push_mode: bool,
170
171 #[cfg(feature = "tracing")]
173 prometheus_metrics: String,
174
175 #[cfg(feature = "tracing")]
177 enabled_otel: bool,
178
179 modules: Option<Vec<String>>,
181
182 enable_server_timing: bool,
184
185 downstream_read_timeout: Option<Duration>,
187 downstream_write_timeout: Option<Duration>,
189
190 server_locations_provider: Arc<dyn ServerLocationsProvider>,
192 plugin_provider: Arc<dyn PluginProvider>,
194
195 location_provider: Arc<dyn LocationProvider>,
197
198 upstream_provider: Arc<dyn UpstreamProvider>,
200
201 certificate_provider: Arc<dyn CertificateProvider>,
203
204 config_manager: Arc<ConfigManager>,
206
207 access_logger: Option<Sender<BytesMut>>,
209}
210
211pub struct ServerServices {
212 pub lb: Service<HttpProxy<Server>>,
213}
214
215const META_DEFAULTS: CacheMetaDefaults =
216 CacheMetaDefaults::new(|_| Some(Duration::from_secs(1)), 1, 1);
217
218static HTTP_500_RESPONSE: LazyLock<ResponseHeader> =
219 LazyLock::new(|| error_resp::gen_error_response(500));
220
221#[derive(Clone)]
222pub struct AppContext {
223 pub logger: Option<Sender<BytesMut>>,
224 pub config_manager: Arc<ConfigManager>,
225 pub server_locations_provider: Arc<dyn ServerLocationsProvider>,
226 pub location_provider: Arc<dyn LocationProvider>,
227 pub upstream_provider: Arc<dyn UpstreamProvider>,
228 pub plugin_provider: Arc<dyn PluginProvider>,
229 pub certificate_provider: Arc<dyn CertificateProvider>,
230}
231
232impl Server {
233 pub fn new(conf: &ServerConf, ctx: AppContext) -> Result<Self> {
240 debug!(target: LOG_TARGET, config = conf.to_string(), "new server");
241 let mut p = None;
242 let (access_log, _) =
243 parse_access_log_directive(conf.access_log.as_ref());
244 if let Some(access_log) = access_log {
245 p = Some(Parser::from(access_log.as_str()));
246 }
247 let tcp_socket_options = if conf.tcp_fastopen.is_some()
248 || conf.tcp_keepalive.is_some()
249 || conf.reuse_port.is_some()
250 {
251 let mut opts = TcpSocketOptions::default();
252 opts.tcp_fastopen = conf.tcp_fastopen;
253 opts.tcp_keepalive.clone_from(&conf.tcp_keepalive);
254 opts.so_reuseport = conf.reuse_port;
255 Some(opts)
256 } else {
257 None
258 };
259 let prometheus_metrics =
260 conf.prometheus_metrics.clone().unwrap_or_default();
261 #[cfg(feature = "tracing")]
262 let prometheus = if prometheus_metrics.is_empty() {
263 None
264 } else {
265 let p = new_prometheus(&conf.name).map_err(|e| Error::Common {
266 category: "prometheus".to_string(),
267 message: e.to_string(),
268 })?;
269 Some(Arc::new(p))
270 };
271 let s = Server {
272 name: conf.name.clone(),
273 admin: conf.admin,
274 accepted: AtomicU64::new(0),
275 processing: AtomicI32::new(0),
276 addr: conf.addr.clone(),
277 log_parser: p,
278 error_template: conf.error_template.clone(),
279 tls_cipher_list: conf.tls_cipher_list.clone(),
280 tls_ciphersuites: conf.tls_ciphersuites.clone(),
281 tls_min_version: conf.tls_min_version.clone(),
282 tls_max_version: conf.tls_max_version.clone(),
283 threads: conf.threads,
284 lets_encrypt_enabled: false,
285 global_certificates: conf.global_certificates,
286 enabled_h2: conf.enabled_h2,
287 tcp_socket_options,
288 prometheus_push_mode: prometheus_metrics.contains("://"),
289 #[cfg(feature = "tracing")]
290 enabled_otel: conf.otlp_exporter.is_some(),
291 #[cfg(feature = "tracing")]
292 prometheus_metrics,
293 #[cfg(feature = "tracing")]
294 prometheus,
295 enable_server_timing: conf.enable_server_timing,
296 modules: conf.modules.clone(),
297 downstream_read_timeout: conf.downstream_read_timeout,
298 downstream_write_timeout: conf.downstream_write_timeout,
299 server_locations_provider: ctx.server_locations_provider,
300 location_provider: ctx.location_provider,
301 upstream_provider: ctx.upstream_provider,
302 plugin_provider: ctx.plugin_provider,
303 certificate_provider: ctx.certificate_provider,
304 access_logger: ctx.logger,
305 config_manager: ctx.config_manager,
306 };
307 Ok(s)
308 }
309 pub fn enable_lets_encrypt(&mut self) {
312 self.lets_encrypt_enabled = true;
313 }
314 pub fn get_prometheus_push_service(
317 &self,
318 ) -> Option<Box<dyn BackgroundTask>> {
319 if !self.prometheus_push_mode {
320 return None;
321 }
322 cfg_if::cfg_if! {
323 if #[cfg(feature = "tracing")] {
324 let Some(prometheus) = &self.prometheus else {
325 return None;
326 };
327 match new_prometheus_push_service(
328 &self.name,
329 &self.prometheus_metrics,
330 prometheus.clone(),
331 ) {
332 Ok(service) => Some(service),
333 Err(e) => {
334 error!(
335 target: LOG_TARGET,
336 error = %e,
337 name = self.name,
338 "new prometheus push service fail"
339 );
340 None
341 },
342 }
343 } else {
344 None
345 }
346 }
347 }
348
349 pub fn run(
355 self,
356 conf: Arc<configuration::ServerConf>,
357 ) -> Result<ServerServices> {
358 let addr = self.addr.clone();
359 let tcp_socket_options = self.tcp_socket_options.clone();
360
361 let name = self.name.clone();
362 let mut dynamic_cert = None;
363 if self.global_certificates {
365 dynamic_cert =
366 Some(GlobalCertificate::new(self.certificate_provider.clone()));
367 }
368
369 let is_tls = dynamic_cert.is_some();
370
371 let enabled_h2 = self.enabled_h2;
372 let threads = if let Some(threads) = self.threads {
373 let value = if threads == 0 {
375 num_cpus::get()
376 } else {
377 threads
378 };
379 Some(value)
380 } else {
381 None
382 };
383
384 info!(
385 target: LOG_TARGET,
386 name,
387 addr,
388 threads,
389 is_tls,
390 h2 = enabled_h2,
391 tcp_socket_options = format!("{:?}", tcp_socket_options),
392 "server is listening"
393 );
394 let cipher_list = self.tls_cipher_list.clone();
395 let cipher_suites = self.tls_ciphersuites.clone();
396 let tls_min_version = self.tls_min_version.clone();
397 let tls_max_version = self.tls_max_version.clone();
398 let mut lb = http_proxy_service(&conf, self);
399 if !is_tls && enabled_h2 {
401 if let Some(http_logic) = lb.app_logic_mut() {
402 let mut http_server_options = HttpServerOptions::default();
403 http_server_options.h2c = true;
404 http_logic.server_options = Some(http_server_options);
405 }
406 }
407 lb.threads = threads;
408 for addr in addr.split(',') {
410 if let Some(dynamic_cert) = &dynamic_cert {
412 let tls_settings = dynamic_cert
413 .new_tls_settings(&TlsSettingParams {
414 server_name: name.clone(),
415 enabled_h2,
416 cipher_list: cipher_list.clone(),
417 cipher_suites: cipher_suites.clone(),
418 tls_min_version: tls_min_version.clone(),
419 tls_max_version: tls_max_version.clone(),
420 })
421 .map_err(|e| Error::Common {
422 category: "tls".to_string(),
423 message: e.to_string(),
424 })?;
425 lb.add_tls_with_settings(
426 addr,
427 tcp_socket_options.clone(),
428 tls_settings,
429 );
430 } else if let Some(opt) = &tcp_socket_options {
431 lb.add_tcp_with_settings(addr, opt.clone());
432 } else {
433 lb.add_tcp(addr);
434 }
435 }
436 Ok(ServerServices { lb })
437 }
438 async fn serve_admin(
441 &self,
442 session: &mut Session,
443 ctx: &mut Ctx,
444 ) -> pingora::Result<bool> {
445 if let Some(plugin) = self.plugin_provider.get("pingap:admin") {
446 let result = plugin
447 .handle_request(PluginStep::Request, session, ctx)
448 .await?;
449 if let RequestPluginResult::Respond(resp) = result {
450 ctx.state.status = Some(resp.status);
451 resp.send(session).await?;
452 return Ok(true);
453 }
454 }
455 Ok(false)
456 }
457 #[inline]
458 fn initialize_context(&self, session: &mut Session, ctx: &mut Ctx) {
459 session.set_read_timeout(self.downstream_read_timeout);
460 session.set_write_timeout(self.downstream_write_timeout);
461
462 if let Some(stream) = session.stream() {
463 ctx.conn.id = stream.id() as usize;
464 }
465 if let Some(digest) = session.digest() {
467 let digest_detail = get_digest_detail(digest);
468 ctx.timing.connection_duration = digest_detail.connection_time;
469 ctx.conn.reused = digest_detail.connection_reused;
470
471 if !ctx.conn.reused
472 && digest_detail.tls_established
473 >= digest_detail.tcp_established
474 {
475 let latency = digest_detail.tls_established
476 - digest_detail.tcp_established;
477 ctx.timing.tls_handshake = Some(latency as i32);
478 }
479 ctx.conn.tls_cipher = digest_detail.tls_cipher;
480 ctx.conn.tls_version = digest_detail.tls_version;
481 };
482 accept_request();
483
484 ctx.state.processing_count =
485 self.processing.fetch_add(1, Ordering::Relaxed) + 1;
486 ctx.state.accepted_count =
487 self.accepted.fetch_add(1, Ordering::Relaxed) + 1;
488 if let Some((remote_addr, remote_port)) =
489 pingap_core::get_remote_addr(session)
490 {
491 ctx.conn.remote_addr = Some(remote_addr);
492 ctx.conn.remote_port = Some(remote_port);
493 }
494 if let Some(addr) =
495 session.server_addr().and_then(|addr| addr.as_inet())
496 {
497 ctx.conn.server_addr = Some(addr.ip().to_string());
498 ctx.conn.server_port = Some(addr.port());
499 }
500 }
501
502 #[inline]
503 async fn find_and_apply_location(
504 &self,
505 session: &mut Session,
506 ctx: &mut Ctx,
507 ) -> pingora::Result<()> {
508 let header = session.req_header();
509 let host = pingap_core::get_host(header).unwrap_or_default();
510 let path = header.uri.path();
511
512 let Some(locations) = self.server_locations_provider.get(&self.name)
514 else {
515 return Ok(());
516 };
517
518 let matched_info = locations.iter().find_map(|name| {
520 let location = self.location_provider.get(name)?;
521 let (matched, captures) = location.match_host_path(host, path);
522 if matched {
523 Some((location, captures))
524 } else {
525 None
526 }
527 });
528
529 let Some((location, captures)) = matched_info else {
530 return Ok(());
531 };
532
533 ctx.upstream.location = location.name.clone();
535 ctx.upstream.location_instance = Some(location.clone());
536 ctx.upstream.max_retries = location.max_retries;
537 ctx.upstream.max_retry_window = location.max_retry_window;
538 if let Some(captures) = captures {
539 ctx.extend_variables(captures);
540 }
541
542 debug!(
543 target: LOG_TARGET,
544 "variables: {:?}",
545 ctx.features.as_ref().map(|item| &item.variables)
546 );
547
548 #[cfg(feature = "tracing")]
550 if let Some(prom) = &self.prometheus {
551 prom.before(&ctx.upstream.location);
552 }
553
554 location
556 .validate_content_length(header)
557 .map_err(|e| new_internal_error(413, e))?;
558
559 let (accepted, processing) = location.on_request()?;
561 ctx.state.location_accepted_count = accepted;
562 ctx.state.location_processing_count = processing;
563
564 if location.support_grpc_web() {
566 let grpc_web = session
567 .downstream_modules_ctx
568 .get_mut::<GrpcWebBridge>()
569 .ok_or_else(|| {
570 new_internal_error(
571 500,
572 "grpc web bridge module should be added",
573 )
574 })?;
575 grpc_web.init();
576 }
577
578 ctx.plugins = self.get_context_plugins(location.clone(), session);
580 let _ = self
581 .handle_request_plugin(PluginStep::EarlyRequest, session, ctx)
582 .await?;
583
584 Ok(())
585 }
586
587 #[inline]
588 async fn handle_admin_request(
589 &self,
590 session: &mut Session,
591 ctx: &mut Ctx,
592 ) -> Option<pingora::Result<bool>> {
593 if self.admin {
594 match self.serve_admin(session, ctx).await {
595 Ok(true) => return Some(Ok(true)), Ok(false) => {}, Err(e) => return Some(Err(e)), }
599 }
600 None }
602 #[inline]
603 async fn handle_acme_challenge(
604 &self,
605 session: &mut Session,
606 ctx: &mut Ctx,
607 ) -> Option<pingora::Result<bool>> {
608 if self.lets_encrypt_enabled {
609 return match handle_lets_encrypt(
610 self.config_manager.clone(),
611 session,
612 ctx,
613 )
614 .await
615 {
616 Ok(true) => Some(Ok(true)), Ok(false) => Some(Ok(false)), Err(e) => Some(Err(e)),
619 };
620 }
621 None }
623 #[inline]
624 #[cfg(feature = "tracing")]
625 async fn handle_metrics_request(
626 &self,
627 session: &mut Session,
628 _ctx: &mut Ctx,
629 ) -> Option<pingora::Result<bool>> {
630 let header = session.req_header();
631 let should_handle = !self.prometheus_push_mode
632 && self.prometheus.is_some()
633 && header.uri.path() == self.prometheus_metrics;
634
635 if should_handle {
636 let prom = self.prometheus.as_ref().unwrap(); let result = async {
638 let body =
639 prom.metrics().map_err(|e| new_internal_error(500, e))?;
640 HttpResponse::text(body).send(session).await?;
641 Ok(true)
642 }
643 .await;
644 return Some(result);
645 }
646 None
647 }
648 #[inline]
649 async fn handle_standard_request(
650 &self,
651 session: &mut Session,
652 ctx: &mut Ctx,
653 ) -> pingora::Result<bool> {
654 let Some(location) = &ctx.upstream.location_instance else {
655 let header = session.req_header();
656 let host = pingap_core::get_host(header).unwrap_or_default();
657 let message = format!(
658 "No matching location, host:{host} path:{}",
659 header.uri.path()
660 );
661 return Err(pingap_core::new_internal_error(500, message));
662 };
663
664 debug!(
665 target: LOG_TARGET,
666 server = self.name,
667 location = location.name(),
668 "location is matched"
669 );
670
671 let variables = if let Some(features) = &mut ctx.features {
672 features.variables.take()
673 } else {
674 None
675 };
676
677 let (_, capture_variables) =
678 location.rewrite(session.req_header_mut(), variables);
679 if let Some(capture_variables) = capture_variables {
680 ctx.extend_variables(capture_variables);
681 }
682
683 if self
684 .handle_request_plugin(PluginStep::Request, session, ctx)
685 .await?
686 {
687 return Ok(true);
688 }
689
690 Ok(false)
691 }
692}
693
694const MODULE_GRPC_WEB: &str = "grpc-web";
695
696impl Server {
697 #[inline]
698 fn get_context_plugins(
699 &self,
700 location: Arc<Location>,
701 session: &Session,
702 ) -> Option<Vec<(String, Arc<dyn Plugin>)>> {
703 if session.is_upgrade_req() {
704 return None;
705 }
706 let plugins = location.plugins.as_ref()?;
707
708 let location_plugins: Vec<_> = plugins
709 .iter()
710 .filter_map(|name| {
711 self.plugin_provider
712 .get(name)
713 .map(|plugin| (name.clone(), plugin))
714 })
715 .collect();
716
717 (!location_plugins.is_empty()).then_some(location_plugins)
719 }
720
721 #[inline]
724 pub async fn handle_request_plugin(
725 &self,
726 step: PluginStep,
727 session: &mut Session,
728 ctx: &mut Ctx,
729 ) -> pingora::Result<bool> {
730 let plugins = match ctx.plugins.take() {
731 Some(p) => p,
732 None => return Ok(false), };
734 if plugins.is_empty() {
735 return Ok(false);
736 }
737
738 let result = async {
739 let mut request_done = false;
740 for (name, plugin) in plugins.iter() {
741 let now = Instant::now();
742 let result = plugin.handle_request(step, session, ctx).await?;
743 let elapsed = now.elapsed().as_millis() as u32;
744
745 let mut record_time = |msg: &str| {
747 debug!(
748 target: LOG_TARGET,
749 name,
750 elapsed,
751 step = step.to_string(),
752 "{msg}"
753 );
754 ctx.add_plugin_processing_time(name, elapsed);
755 };
756
757 match result {
758 RequestPluginResult::Skipped => {
759 continue;
760 },
761 RequestPluginResult::Respond(resp) => {
762 record_time("request plugin create new response");
763 if resp.status.as_u16() < 900 {
765 ctx.state.status = Some(resp.status);
766 resp.send(session).await?;
767 }
768 request_done = true;
769 break;
770 },
771 RequestPluginResult::Continue => {
772 record_time("request plugin run and continue request");
773 },
774 }
775 }
776 Ok(request_done)
777 }
778 .await;
779 ctx.plugins = Some(plugins);
780 result
781 }
782
783 #[inline]
785 pub async fn handle_response_plugin(
786 &self,
787 session: &mut Session,
788 ctx: &mut Ctx,
789 upstream_response: &mut ResponseHeader,
790 ) -> pingora::Result<()> {
791 let plugins = match ctx.plugins.take() {
792 Some(p) => p,
793 None => return Ok(()), };
795 if plugins.is_empty() {
796 return Ok(());
797 }
798
799 let result = async {
800 for (name, plugin) in plugins.iter() {
801 let now = Instant::now();
802 if let ResponsePluginResult::Modified = plugin
803 .handle_response(session, ctx, upstream_response)
804 .await?
805 {
806 let elapsed = now.elapsed().as_millis() as u32;
807 debug!(
808 target: LOG_TARGET,
809 name, elapsed, "response plugin modify headers"
810 );
811 ctx.add_plugin_processing_time(name, elapsed);
812 };
813 }
814 Ok(())
815 }
816 .await;
817 ctx.plugins = Some(plugins);
818 result
819 }
820
821 #[inline]
822 pub fn handle_upstream_response_plugin(
823 &self,
824 session: &mut Session,
825 ctx: &mut Ctx,
826 upstream_response: &mut ResponseHeader,
827 ) -> pingora::Result<()> {
828 let plugins = match ctx.plugins.take() {
829 Some(p) => p,
830 None => return Ok(()), };
832 if plugins.is_empty() {
833 return Ok(());
834 }
835
836 let result = {
837 for (name, plugin) in plugins.iter() {
838 let now = Instant::now();
839 if let ResponsePluginResult::Modified = plugin
840 .handle_upstream_response(session, ctx, upstream_response)?
841 {
842 let elapsed = now.elapsed().as_millis() as u32;
843 debug!(
844 target: LOG_TARGET,
845 name,
846 elapsed,
847 "upstream response plugin modify headers"
848 );
849 ctx.add_plugin_processing_time(name, elapsed);
850 };
851 }
852 Ok(())
853 };
854 ctx.plugins = Some(plugins);
855 result
856 }
857
858 #[inline]
859 pub fn handle_upstream_response_body_plugin(
860 &self,
861 session: &mut Session,
862 ctx: &mut Ctx,
863 body: &mut Option<bytes::Bytes>,
864 end_of_stream: bool,
865 ) -> pingora::Result<()> {
866 let plugins = match ctx.plugins.take() {
867 Some(p) => p,
868 None => return Ok(()), };
870 if plugins.is_empty() {
871 return Ok(());
872 }
873
874 let result = {
875 for (name, plugin) in plugins.iter() {
876 let now = Instant::now();
877 match plugin.handle_upstream_response_body(
878 session,
879 ctx,
880 body,
881 end_of_stream,
882 )? {
883 ResponseBodyPluginResult::PartialReplaced
884 | ResponseBodyPluginResult::FullyReplaced => {
885 let elapsed = now.elapsed().as_millis() as u32;
886 ctx.add_plugin_processing_time(name, elapsed);
887 debug!(
888 target: LOG_TARGET,
889 name, elapsed, "response body plugin modify body"
890 );
891 },
892 _ => {},
893 }
894 }
895 Ok(())
896 };
897 ctx.plugins = Some(plugins);
898 result
899 }
900
901 #[inline]
902 pub fn handle_response_body_plugin(
903 &self,
904 session: &mut Session,
905 ctx: &mut Ctx,
906 body: &mut Option<bytes::Bytes>,
907 end_of_stream: bool,
908 ) -> pingora::Result<()> {
909 let plugins = match ctx.plugins.take() {
910 Some(p) => p,
911 None => return Ok(()), };
913 if plugins.is_empty() {
914 return Ok(());
915 }
916 let result = {
917 for (name, plugin) in plugins.iter() {
918 let now = Instant::now();
919 match plugin.handle_response_body(
920 session,
921 ctx,
922 body,
923 end_of_stream,
924 )? {
925 ResponseBodyPluginResult::PartialReplaced
926 | ResponseBodyPluginResult::FullyReplaced => {
927 let elapsed = now.elapsed().as_millis() as u32;
928 ctx.add_plugin_processing_time(name, elapsed);
929 debug!(
930 target: LOG_TARGET,
931 name, elapsed, "response body plugin modify body"
932 );
933 },
934 _ => {},
935 }
936 }
937 Ok(())
938 };
939 ctx.plugins = Some(plugins);
940 result
941 }
942
943 fn process_cache_control(
944 &self,
945 c: &mut CacheControl,
946 max_ttl: Option<Duration>,
947 ) -> Result<(), NoCacheReason> {
948 if c.no_cache() || c.no_store() || c.private() {
950 return Err(NoCacheReason::OriginNotCache);
951 }
952
953 if c.max_age().ok().flatten().unwrap_or_default() == 0 {
955 return Err(NoCacheReason::OriginNotCache);
956 }
957
958 if let Some(d) = max_ttl {
960 if c.fresh_duration().unwrap_or_default() > d {
961 let s_maxage_value =
963 itoa::Buffer::new().format(d.as_secs()).as_bytes().to_vec();
964 c.directives.insert(
965 "s-maxage".to_string(),
966 Some(DirectiveValue(s_maxage_value)),
967 );
968 }
969 }
970
971 Ok(())
972 }
973
974 #[inline]
975 fn handle_cache_headers(
976 &self,
977 session: &Session,
978 upstream_response: &mut ResponseHeader,
979 ctx: &mut Ctx,
980 ) {
981 let cache_status = session.cache.phase().as_str();
982 let _ = upstream_response.insert_header("x-cache-status", cache_status);
983
984 let lookup_duration_str = self.process_cache_timing(
986 session.cache.lookup_duration(),
987 "x-cache-lookup",
988 upstream_response,
989 &mut ctx.timing.cache_lookup,
990 );
991
992 let lock_duration_str = self.process_cache_timing(
994 session.cache.lock_duration(),
995 "x-cache-lock",
996 upstream_response,
997 &mut ctx.timing.cache_lock,
998 );
999
1000 #[cfg(not(feature = "tracing"))]
1001 {
1002 let _ = lookup_duration_str;
1003 let _ = lock_duration_str;
1004 }
1005
1006 #[cfg(feature = "tracing")]
1008 update_otel_cache_attrs(
1009 ctx,
1010 cache_status,
1011 lookup_duration_str,
1012 lock_duration_str,
1013 );
1014 }
1015
1016 #[inline]
1017 fn process_cache_timing(
1018 &self,
1019 duration_opt: Option<Duration>,
1020 header_name: &'static str,
1021 resp: &mut ResponseHeader,
1022 ctx_field: &mut Option<i32>,
1023 ) -> String {
1024 if let Some(d) = duration_opt {
1025 let ms = d.as_millis() as i32;
1026
1027 let mut buffer = itoa::Buffer::new();
1029 let mut value_bytes = Vec::with_capacity(6);
1030 value_bytes.extend_from_slice(buffer.format(ms).as_bytes());
1031 value_bytes.extend_from_slice(b"ms");
1032
1033 let _ = resp.insert_header(header_name, value_bytes);
1034 *ctx_field = Some(ms);
1035
1036 #[cfg(feature = "tracing")]
1037 return humantime::Duration::from(d).to_string();
1038 }
1039 String::new()
1040 }
1041}
1042
1043#[inline]
1044fn get_upstream_with_variables(
1045 upstream: &str,
1046 ctx: &Ctx,
1047 upstreams: &dyn UpstreamProvider,
1048) -> Option<Arc<Upstream>> {
1049 let key = upstream
1050 .strip_prefix('$')
1051 .and_then(|var_name| ctx.get_variable(var_name))
1052 .unwrap_or(upstream);
1053 upstreams.get(key)
1054}
1055
1056#[async_trait]
1057impl ProxyHttp for Server {
1058 type CTX = Ctx;
1059 fn new_ctx(&self) -> Self::CTX {
1060 debug!(target: LOG_TARGET, "new ctx");
1061 Ctx::new()
1062 }
1063 fn init_downstream_modules(&self, modules: &mut HttpModules) {
1064 debug!(target: LOG_TARGET, "--> init downstream modules");
1065 defer!(debug!(target: LOG_TARGET, "<-- init downstream modules"););
1066 modules.add_module(ResponseCompressionBuilder::enable(0));
1068
1069 self.modules.iter().flatten().for_each(|item| {
1070 if item == MODULE_GRPC_WEB {
1071 modules.add_module(Box::new(GrpcWeb));
1072 }
1073 });
1074 }
1075 async fn early_request_filter(
1084 &self,
1085 session: &mut Session,
1086 ctx: &mut Self::CTX,
1087 ) -> pingora::Result<()>
1088 where
1089 Self::CTX: Send + Sync,
1090 {
1091 debug!(target: LOG_TARGET, "--> early request filter");
1092 defer!(debug!(target: LOG_TARGET, "<-- early request filter"););
1093
1094 self.initialize_context(session, ctx);
1095 #[cfg(feature = "tracing")]
1096 if self.enabled_otel {
1097 initialize_telemetry(&self.name, session, ctx);
1098 }
1099 self.find_and_apply_location(session, ctx).await?;
1100
1101 Ok(())
1102 }
1103 async fn request_filter(
1111 &self,
1112 session: &mut Session,
1113 ctx: &mut Self::CTX,
1114 ) -> pingora::Result<bool>
1115 where
1116 Self::CTX: Send + Sync,
1117 {
1118 debug!(target: LOG_TARGET, "--> request filter");
1119 defer!(debug!(target: LOG_TARGET, "<-- request filter"););
1120 if let Some(result) = self.handle_admin_request(session, ctx).await {
1123 return result;
1124 }
1125 if let Some(result) = self.handle_acme_challenge(session, ctx).await {
1127 return result;
1128 }
1129 #[cfg(feature = "tracing")]
1131 if let Some(result) = self.handle_metrics_request(session, ctx).await {
1132 return result;
1133 }
1134
1135 self.handle_standard_request(session, ctx).await
1136 }
1137
1138 async fn proxy_upstream_filter(
1141 &self,
1142 session: &mut Session,
1143 ctx: &mut Self::CTX,
1144 ) -> pingora::Result<bool>
1145 where
1146 Self::CTX: Send + Sync,
1147 {
1148 debug!(target: LOG_TARGET, "--> proxy upstream filter");
1149 defer!(debug!(target: LOG_TARGET, "<-- proxy upstream filter"););
1150 let done = self
1151 .handle_request_plugin(PluginStep::ProxyUpstream, session, ctx)
1152 .await?;
1153
1154 if done {
1155 return Ok(false);
1156 }
1157 Ok(true)
1158 }
1159
1160 async fn upstream_peer(
1163 &self,
1164 session: &mut Session,
1165 ctx: &mut Ctx,
1166 ) -> pingora::Result<Box<HttpPeer>> {
1167 debug!(target: LOG_TARGET, "--> upstream peer");
1168 defer!(debug!(target: LOG_TARGET, "<-- upstream peer"););
1169
1170 let peer = ctx
1171 .upstream
1172 .location_instance
1173 .clone()
1174 .as_ref()
1175 .and_then(|location| {
1176 let upstream = if ctx.upstream.name.is_empty() {
1177 get_upstream_with_variables(
1178 location.upstream(),
1179 ctx,
1180 self.upstream_provider.as_ref(),
1181 )?
1182 } else {
1183 get_upstream_with_variables(
1185 &ctx.upstream.name,
1186 ctx,
1187 self.upstream_provider.as_ref(),
1188 )?
1189 };
1190 ctx.upstream.upstream_instance = Some(upstream.clone());
1191 Some(upstream)
1192 })
1193 .and_then(|upstream| {
1194 ctx.upstream.connected_count = upstream.connected();
1195 ctx.upstream.name = upstream.name.clone();
1196 #[cfg(feature = "tracing")]
1197 if let Some(features) = &ctx.features {
1198 if let Some(tracer) = &features.otel_tracer {
1199 let name = format!("upstream.{}", &upstream.name);
1200 let mut span = tracer.new_upstream_span(&name);
1201 span.set_attribute(KeyValue::new(
1202 "upstream.connected",
1203 ctx.upstream.connected_count.unwrap_or_default()
1204 as i64,
1205 ));
1206 let features = ctx.features.get_or_insert_default();
1207 features.upstream_span = Some(span);
1208 }
1209 }
1210 upstream
1211 .new_http_peer(session, &ctx.conn.client_ip)
1212 .inspect(|peer| {
1213 ctx.upstream.address = peer.address().to_string();
1214 })
1215 })
1216 .ok_or_else(|| {
1217 new_internal_error(
1218 503,
1219 format!(
1220 "No available upstream for {}",
1221 &ctx.upstream.location
1222 ),
1223 )
1224 })?;
1225
1226 ctx.timing.upstream_connect =
1228 Some(get_start_time(&ctx.timing.created_at));
1229
1230 Ok(Box::new(peer))
1231 }
1232 async fn connected_to_upstream(
1235 &self,
1236 _session: &mut Session,
1237 reused: bool,
1238 _peer: &HttpPeer,
1239 #[cfg(unix)] _fd: std::os::unix::io::RawFd,
1240 #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
1241 digest: Option<&Digest>,
1242 ctx: &mut Self::CTX,
1243 ) -> pingora::Result<()>
1244 where
1245 Self::CTX: Send + Sync,
1246 {
1247 debug!(target: LOG_TARGET, "--> connected to upstream");
1248 defer!(debug!(target: LOG_TARGET, "<-- connected to upstream"););
1249 ctx.timing.upstream_connect =
1250 get_latency(&ctx.timing.created_at, &ctx.timing.upstream_connect);
1251 if let Some(digest) = digest {
1252 ctx.update_upstream_timing_from_digest(digest, reused);
1253 }
1254 ctx.upstream.reused = reused;
1255
1256 ctx.timing.upstream_processing =
1258 Some(get_start_time(&ctx.timing.created_at));
1259
1260 Ok(())
1261 }
1262 fn fail_to_connect(
1263 &self,
1264 _session: &mut Session,
1265 peer: &HttpPeer,
1266 ctx: &mut Self::CTX,
1267 mut e: Box<pingora::Error>,
1268 ) -> Box<pingora::Error> {
1269 if let Some(upstream_instance) = &ctx.upstream.upstream_instance {
1270 upstream_instance.on_transport_failure(&peer.address().to_string());
1271 }
1272 let Some(max_retries) = ctx.upstream.max_retries else {
1273 return e;
1274 };
1275 if ctx.upstream.retries >= max_retries {
1276 return e;
1277 }
1278 if let Some(max_retry_window) = ctx.upstream.max_retry_window {
1279 if ctx.timing.created_at.elapsed() > max_retry_window {
1280 return e;
1281 }
1282 }
1283 ctx.upstream.retries += 1;
1284 e.set_retry(true);
1285 e
1286 }
1287 async fn upstream_request_filter(
1290 &self,
1291 session: &mut Session,
1292 upstream_response: &mut RequestHeader,
1293 ctx: &mut Self::CTX,
1294 ) -> pingora::Result<()>
1295 where
1296 Self::CTX: Send + Sync,
1297 {
1298 debug!(target: LOG_TARGET, "--> upstream request filter");
1299 defer!(debug!(target: LOG_TARGET, "<-- upstream request filter"););
1300 set_append_proxy_headers(session, ctx, upstream_response);
1301 Ok(())
1302 }
1303 async fn request_body_filter(
1306 &self,
1307 _session: &mut Session,
1308 body: &mut Option<Bytes>,
1309 _end_of_stream: bool,
1310 ctx: &mut Self::CTX,
1311 ) -> pingora::Result<()>
1312 where
1313 Self::CTX: Send + Sync,
1314 {
1315 debug!(target: LOG_TARGET, "--> request body filter");
1316 defer!(debug!(target: LOG_TARGET, "<-- request body filter"););
1317 if let Some(buf) = body {
1318 ctx.state.payload_size += buf.len();
1319 if let Some(location) = &ctx.upstream.location_instance {
1320 let size = location.client_body_size_limit();
1321 if size > 0 && ctx.state.payload_size > size {
1322 return Err(new_internal_error(
1323 413,
1324 format!("Request Entity Too Large, max:{size}"),
1325 ));
1326 }
1327 }
1328 }
1329 Ok(())
1330 }
1331 fn cache_key_callback(
1338 &self,
1339 session: &Session,
1340 ctx: &mut Self::CTX,
1341 ) -> pingora::Result<CacheKey> {
1342 debug!(target: LOG_TARGET, "--> cache key callback");
1343 defer!(debug!(target: LOG_TARGET, "<-- cache key callback"););
1344 let key = get_cache_key(
1345 ctx,
1346 session.req_header().method.as_ref(),
1347 &session.req_header().uri,
1348 );
1349 debug!(
1350 target: LOG_TARGET,
1351 namespace = key.namespace_str(),
1352 primary = key.primary_key_str(),
1353 user_tag = key.user_tag(),
1354 "cache key callback"
1355 );
1356 Ok(key)
1357 }
1358
1359 fn response_cache_filter(
1366 &self,
1367 _session: &Session,
1368 resp: &ResponseHeader,
1369 ctx: &mut Self::CTX,
1370 ) -> pingora::Result<RespCacheable> {
1371 debug!(target: LOG_TARGET, "--> response cache filter");
1372 defer!(debug!(target: LOG_TARGET, "<-- response cache filter"););
1373
1374 let (check_cache_control, max_ttl) = ctx.cache.as_ref().map_or(
1375 (false, None), |c| (c.check_cache_control, c.max_ttl),
1377 );
1378
1379 let mut cc = CacheControl::from_resp_headers(resp);
1380
1381 if let Some(c) = &mut cc {
1382 if let Err(reason) = self.process_cache_control(c, max_ttl) {
1384 return Ok(RespCacheable::Uncacheable(reason));
1385 }
1386 } else if check_cache_control {
1387 return Ok(RespCacheable::Uncacheable(
1389 NoCacheReason::OriginNotCache,
1390 ));
1391 }
1392
1393 Ok(resp_cacheable(
1394 cc.as_ref(),
1395 resp.clone(),
1396 false,
1397 &META_DEFAULTS,
1398 ))
1399 }
1400
1401 async fn response_filter(
1402 &self,
1403 session: &mut Session,
1404 upstream_response: &mut ResponseHeader,
1405 ctx: &mut Self::CTX,
1406 ) -> pingora::Result<()>
1407 where
1408 Self::CTX: Send + Sync,
1409 {
1410 debug!(target: LOG_TARGET, "--> response filter");
1411 defer!(debug!(target: LOG_TARGET, "<-- response filter"););
1412 if session.cache.enabled() {
1413 self.handle_cache_headers(session, upstream_response, ctx);
1414 }
1415
1416 self.handle_response_plugin(session, ctx, upstream_response)
1418 .await?;
1419
1420 if self.enable_server_timing {
1422 let _ = upstream_response
1423 .insert_header("server-timing", ctx.generate_server_timing());
1424 }
1425 Ok(())
1426 }
1427
1428 async fn upstream_response_filter(
1429 &self,
1430 session: &mut Session,
1431 upstream_response: &mut ResponseHeader,
1432 ctx: &mut Self::CTX,
1433 ) -> pingora::Result<()> {
1434 debug!(target: LOG_TARGET, "--> upstream response filter");
1435 defer!(debug!(target: LOG_TARGET, "<-- upstream response filter"););
1436 self.handle_upstream_response_plugin(session, ctx, upstream_response)?;
1437 #[cfg(feature = "tracing")]
1438 inject_telemetry_headers(ctx, upstream_response);
1439 ctx.upstream.status = Some(upstream_response.status);
1440
1441 if ctx.state.status.is_none() {
1442 ctx.state.status = Some(upstream_response.status);
1443 ctx.timing.upstream_response =
1445 Some(get_start_time(&ctx.timing.created_at));
1446 }
1447
1448 if let Some(id) = &ctx.state.request_id {
1449 let _ = upstream_response
1450 .insert_header(&HTTP_HEADER_NAME_X_REQUEST_ID, id);
1451 }
1452
1453 ctx.timing.upstream_processing = get_latency(
1454 &ctx.timing.created_at,
1455 &ctx.timing.upstream_processing,
1456 );
1457
1458 if let Some(upstream_instance) = &ctx.upstream.upstream_instance {
1459 upstream_instance
1460 .on_response(&ctx.upstream.address, upstream_response.status);
1461 }
1462
1463 Ok(())
1464 }
1465
1466 fn upstream_response_body_filter(
1469 &self,
1470 session: &mut Session,
1471 body: &mut Option<bytes::Bytes>,
1472 end_of_stream: bool,
1473 ctx: &mut Self::CTX,
1474 ) -> pingora::Result<Option<std::time::Duration>> {
1475 debug!(target: LOG_TARGET, "--> upstream response body filter");
1476 defer!(debug!(target: LOG_TARGET, "<-- upstream response body filter"););
1477
1478 self.handle_upstream_response_body_plugin(
1479 session,
1480 ctx,
1481 body,
1482 end_of_stream,
1483 )?;
1484
1485 if end_of_stream {
1486 ctx.timing.upstream_response = get_latency(
1487 &ctx.timing.created_at,
1488 &ctx.timing.upstream_response,
1489 );
1490
1491 #[cfg(feature = "tracing")]
1492 set_otel_upstream_attrs(ctx);
1493 }
1495 Ok(None)
1496 }
1497
1498 fn response_body_filter(
1501 &self,
1502 session: &mut Session,
1503 body: &mut Option<Bytes>,
1504 end_of_stream: bool,
1505 ctx: &mut Self::CTX,
1506 ) -> pingora::Result<Option<std::time::Duration>>
1507 where
1508 Self::CTX: Send + Sync,
1509 {
1510 debug!(target: LOG_TARGET, "--> response body filter");
1511 defer!(debug!(target: LOG_TARGET, "<-- response body filter"););
1512 self.handle_response_body_plugin(session, ctx, body, end_of_stream)?;
1513 Ok(None)
1514 }
1515
1516 async fn fail_to_proxy(
1523 &self,
1524 session: &mut Session,
1525 e: &pingora::Error,
1526 ctx: &mut Self::CTX,
1527 ) -> FailToProxy
1528 where
1529 Self::CTX: Send + Sync,
1530 {
1531 debug!(target: LOG_TARGET, "--> fail to proxy");
1532 defer!(debug!(target: LOG_TARGET, "<-- fail to proxy"););
1533 let server_session = session.as_mut();
1534
1535 let code = match e.etype() {
1536 pingora::HTTPStatus(code) => *code,
1537 _ => match e.esource() {
1539 pingora::ErrorSource::Upstream => 502,
1540 pingora::ErrorSource::Downstream => match e.etype() {
1541 pingora::ErrorType::ConnectTimedout => 408,
1542 pingora::ErrorType::ConnectionClosed => 499,
1544 _ => 500,
1545 },
1546 pingora::ErrorSource::Internal
1547 | pingora::ErrorSource::Unset => 500,
1548 },
1549 };
1551 let mut resp = match code {
1552 502 => error_resp::HTTP_502_RESPONSE.clone(),
1553 400 => error_resp::HTTP_400_RESPONSE.clone(),
1554 500 => HTTP_500_RESPONSE.clone(),
1555 _ => error_resp::gen_error_response(code),
1556 };
1557
1558 let error_type = e.etype().as_str();
1559 let content = self
1560 .error_template
1561 .replace("{{version}}", pingap_util::get_pkg_version())
1562 .replace("{{content}}", &e.to_string())
1563 .replace("{{error_type}}", error_type);
1564 let buf = Bytes::from(content);
1565 ctx.state.status = Some(
1566 StatusCode::from_u16(code)
1567 .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
1568 );
1569 let content_type = if buf.starts_with(b"{") {
1570 "application/json; charset=utf-8"
1571 } else {
1572 "text/html; charset=utf-8"
1573 };
1574 let _ = resp.insert_header(http::header::CONTENT_TYPE, content_type);
1575 let _ = resp.insert_header("X-Pingap-EType", error_type);
1576 let _ = resp
1577 .insert_header(http::header::CONTENT_LENGTH, buf.len().to_string());
1578
1579 let user_agent = server_session
1580 .get_header(http::header::USER_AGENT)
1581 .map(|v| v.clone().to_str().unwrap_or_default().to_string());
1582
1583 error!(
1584 target: LOG_TARGET,
1585 error = %e,
1586 remote_addr = ctx.conn.remote_addr,
1587 client_ip = ctx.conn.client_ip,
1588 user_agent,
1589 error_type,
1590 path = server_session.req_header().uri.path(),
1591 "fail to proxy"
1592 );
1593
1594 server_session.set_keepalive(None);
1601
1602 server_session
1603 .write_response_header(Box::new(resp))
1604 .await
1605 .unwrap_or_else(|e| {
1606 error!(
1607 target: LOG_TARGET,
1608 error = %e,
1609 "send error response to downstream fail"
1610 );
1611 });
1612
1613 let _ = server_session.write_response_body(buf, true).await;
1614 FailToProxy {
1615 error_code: code,
1616 can_reuse_downstream: false,
1617 }
1618 }
1619 async fn logging(
1627 &self,
1628 session: &mut Session,
1629 _e: Option<&pingora::Error>,
1630 ctx: &mut Self::CTX,
1631 ) where
1632 Self::CTX: Send + Sync,
1633 {
1634 debug!(target: LOG_TARGET, "--> logging");
1635 defer!(debug!(target: LOG_TARGET, "<-- logging"););
1636 end_request();
1637 self.processing.fetch_sub(1, Ordering::Relaxed);
1638 if let Some(location) = &ctx.upstream.location_instance {
1639 location.on_response();
1640 }
1641 if let Some(upstream_instance) = &ctx.upstream.upstream_instance {
1643 ctx.upstream.processing_count = Some(upstream_instance.completed());
1644 }
1645 if ctx.state.status.is_none() {
1646 if let Some(header) = session.response_written() {
1647 ctx.state.status = Some(header.status);
1648 }
1649 }
1650 #[cfg(feature = "tracing")]
1651 if let Some(features) = ctx.features.as_mut() {
1653 if let Some(ref mut span) = features.upstream_span.as_mut() {
1654 span.end();
1655 }
1656 }
1657
1658 if let Some(c) =
1659 session.downstream_modules_ctx.get::<ResponseCompression>()
1660 {
1661 if c.is_enabled() {
1662 if let Some((algorithm, in_bytes, out_bytes, took)) =
1663 c.get_info()
1664 {
1665 let features = ctx.features.get_or_insert_default();
1666 features.compression_stat = Some(CompressionStat {
1667 algorithm: algorithm.to_string(),
1668 in_bytes,
1669 out_bytes,
1670 duration: took,
1671 });
1672 }
1673 }
1674 }
1675 #[cfg(feature = "tracing")]
1676 if let Some(prom) = &self.prometheus {
1677 if !ctx.upstream.location.is_empty() {
1679 prom.after(session, ctx);
1680 }
1681 }
1682
1683 #[cfg(feature = "tracing")]
1684 set_otel_request_attrs(session, ctx);
1685
1686 if let Some(p) = &self.log_parser {
1687 let buf = p.format(session, ctx);
1688 if let Some(logger) = &self.access_logger {
1689 let _ = logger.try_send(buf);
1690 } else {
1691 let msg = buf.as_bstr();
1692 info!(target: LOG_TARGET, "{msg}");
1693 }
1694 }
1695 }
1696}
1697
1698#[cfg(test)]
1699mod tests {
1700 use super::*;
1701 use crate::server_conf::parse_from_conf;
1702 use ahash::AHashMap;
1703 use pingap_certificate::{DynamicCertificates, TlsCertificate};
1704 use pingap_config::{PingapConfig, new_file_config_manager};
1705 use pingap_core::{CacheInfo, Ctx, UpstreamInfo};
1706 use pingap_location::LocationStats;
1707 use pingora::http::ResponseHeader;
1708 use pingora::protocols::tls::SslDigest;
1709 use pingora::protocols::tls::SslDigestExtension;
1710 use pingora::protocols::{Digest, TimingDigest};
1711 use pingora::proxy::{ProxyHttp, Session};
1712 use pingora::server::configuration;
1713 use pingora::services::Service;
1714 use pretty_assertions::assert_eq;
1715 use std::collections::HashMap;
1716 use std::sync::Arc;
1717 use std::time::{Duration, SystemTime};
1718 use tokio_test::io::Builder;
1719
1720 #[test]
1721 fn test_get_digest_detail() {
1722 let digest = Digest {
1723 timing_digest: vec![Some(TimingDigest {
1724 established_ts: SystemTime::UNIX_EPOCH
1725 .checked_add(Duration::from_secs(10))
1726 .unwrap(),
1727 })],
1728 ssl_digest: Some(Arc::new(SslDigest {
1729 cipher: "123".into(),
1730 version: "1.3".into(),
1731 organization: None,
1732 serial_number: None,
1733 cert_digest: vec![],
1734 extension: SslDigestExtension::default(),
1735 })),
1736 ..Default::default()
1737 };
1738 let result = get_digest_detail(&digest);
1739 assert_eq!(10000, result.tcp_established);
1740 assert_eq!("1.3", result.tls_version.unwrap_or_default());
1741 }
1742
1743 fn new_server() -> Server {
1745 let toml_data = r###"
1746[upstreams.charts]
1747# upstream address list
1748addrs = ["127.0.0.1:5000"]
1749
1750
1751[upstreams.diving]
1752addrs = ["127.0.0.1:5001"]
1753
1754
1755[locations.lo]
1756# upstream of location (default none)
1757upstream = "charts"
1758
1759# location match path (default none)
1760path = "/"
1761
1762# location match host, multiple domain names are separated by commas (default none)
1763host = ""
1764
1765# set headers to request (default none)
1766includes = ["proxySetHeader"]
1767
1768# add headers to request (default none)
1769proxy_add_headers = ["name:value"]
1770
1771
1772# the weigh of location (default none)
1773weight = 1024
1774
1775
1776# plugin list for location
1777plugins = ["pingap:requestId", "stats"]
1778
1779[servers.test]
1780# server linsten address, multiple addresses are separated by commas (default none)
1781addr = "0.0.0.0:6188"
1782
1783# access log format (default none)
1784access_log = "tiny"
1785
1786# the locations for server
1787locations = ["lo"]
1788
1789# the threads count for server (default 1)
1790threads = 1
1791
1792[plugins.stats]
1793value = "/stats"
1794category = "stats"
1795
1796[storages.authToken]
1797category = "secret"
1798secret = "123123"
1799value = "PLpKJqvfkjTcYTDpauJf+2JnEayP+bm+0Oe60Jk="
1800
1801[storages.proxySetHeader]
1802category = "config"
1803value = 'proxy_set_headers = ["name:value"]'
1804 "###;
1805 let pingap_conf = PingapConfig::new(toml_data.as_ref(), false).unwrap();
1806
1807 let location = Arc::new(
1808 Location::new("lo", pingap_conf.locations.get("lo").unwrap())
1809 .unwrap(),
1810 );
1811 let upstream = Arc::new(
1812 Upstream::new(
1813 "charts",
1814 pingap_conf.upstreams.get("charts").unwrap(),
1815 None,
1816 )
1817 .unwrap(),
1818 );
1819
1820 struct TmpPluginLoader {}
1821 impl PluginProvider for TmpPluginLoader {
1822 fn get(&self, _name: &str) -> Option<Arc<dyn Plugin>> {
1823 None
1824 }
1825 }
1826 struct TmpLocationLoader {
1827 location: Arc<Location>,
1828 }
1829 impl LocationProvider for TmpLocationLoader {
1830 fn get(&self, _name: &str) -> Option<Arc<Location>> {
1831 Some(self.location.clone())
1832 }
1833 fn stats(&self) -> HashMap<String, LocationStats> {
1834 HashMap::new()
1835 }
1836 }
1837 struct TmpUpstreamLoader {
1838 upstream: Arc<Upstream>,
1839 }
1840 impl UpstreamProvider for TmpUpstreamLoader {
1841 fn get(&self, _name: &str) -> Option<Arc<Upstream>> {
1842 Some(self.upstream.clone())
1843 }
1844 fn list(&self) -> Vec<(String, Arc<Upstream>)> {
1845 vec![("charts".to_string(), self.upstream.clone())]
1846 }
1847 }
1848 struct TmpServerLocationsLoader {
1849 server_locations: Arc<Vec<String>>,
1850 }
1851 impl ServerLocationsProvider for TmpServerLocationsLoader {
1852 fn get(&self, _name: &str) -> Option<Arc<Vec<String>>> {
1853 Some(self.server_locations.clone())
1854 }
1855 }
1856
1857 struct TmpCertificateLoader {}
1858 impl CertificateProvider for TmpCertificateLoader {
1859 fn get(&self, _sni: &str) -> Option<Arc<TlsCertificate>> {
1860 None
1861 }
1862 fn list(&self) -> Arc<DynamicCertificates> {
1863 Arc::new(AHashMap::new())
1864 }
1865 fn store(&self, _data: DynamicCertificates) {}
1866 }
1867
1868 let confs = parse_from_conf(pingap_conf);
1869 let file = tempfile::NamedTempFile::with_suffix(".toml").unwrap();
1870
1871 Server::new(
1872 &confs[0],
1873 AppContext {
1874 logger: None,
1875 config_manager: Arc::new(
1876 new_file_config_manager(&file.path().to_string_lossy())
1877 .unwrap(),
1878 ),
1879 server_locations_provider: Arc::new(TmpServerLocationsLoader {
1880 server_locations: Arc::new(vec!["lo".to_string()]),
1881 }),
1882 location_provider: Arc::new(TmpLocationLoader { location }),
1883 upstream_provider: Arc::new(TmpUpstreamLoader { upstream }),
1884 plugin_provider: Arc::new(TmpPluginLoader {}),
1885 certificate_provider: Arc::new(TmpCertificateLoader {}),
1886 },
1887 )
1888 .unwrap()
1889 }
1890
1891 #[test]
1892 fn test_new_server() {
1893 let server = new_server();
1894 let services = server
1895 .run(Arc::new(configuration::ServerConf::default()))
1896 .unwrap();
1897
1898 assert_eq!("Pingora HTTP Proxy Service", services.lb.name());
1899 }
1900
1901 #[tokio::test]
1902 async fn test_early_request_filter() {
1903 let server = new_server();
1904
1905 let headers = [""].join("\r\n");
1906 let input_header =
1907 format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
1908 let mock_io = Builder::new().read(input_header.as_bytes()).build();
1909 let mut session = Session::new_h1(Box::new(mock_io));
1910 session.read_request().await.unwrap();
1911
1912 let mut ctx = Ctx::default();
1913 server
1914 .early_request_filter(&mut session, &mut ctx)
1915 .await
1916 .unwrap();
1917 assert_eq!("lo", ctx.upstream.location.as_ref());
1918 }
1919
1920 #[tokio::test]
1921 async fn test_request_filter() {
1922 let server = new_server();
1923
1924 let headers = [""].join("\r\n");
1925 let input_header =
1926 format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
1927 let mock_io = Builder::new().read(input_header.as_bytes()).build();
1928 let mut session = Session::new_h1(Box::new(mock_io));
1929 session.read_request().await.unwrap();
1930
1931 let location = server.location_provider.get("lo").unwrap();
1932 let mut ctx = Ctx {
1933 upstream: UpstreamInfo {
1934 location: "lo".to_string().into(),
1935 location_instance: Some(location.clone()),
1936 ..Default::default()
1937 },
1938 ..Default::default()
1939 };
1940 let done = server.request_filter(&mut session, &mut ctx).await.unwrap();
1941 assert_eq!(false, done);
1942 }
1943
1944 #[tokio::test]
1945 async fn test_cache_key_callback() {
1946 let server = new_server();
1947
1948 let headers = [""].join("\r\n");
1949 let input_header =
1950 format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
1951 let mock_io = Builder::new().read(input_header.as_bytes()).build();
1952 let mut session = Session::new_h1(Box::new(mock_io));
1953 session.read_request().await.unwrap();
1954
1955 let key = server
1956 .cache_key_callback(
1957 &session,
1958 &mut Ctx {
1959 cache: Some(CacheInfo {
1960 namespace: Some("pingap".to_string()),
1961 keys: Some(vec!["ss".to_string()]),
1962 ..Default::default()
1963 }),
1964 ..Default::default()
1965 },
1966 )
1967 .unwrap();
1968 assert_eq!(
1969 key.primary_key_str(),
1970 Some("ss:GET:/vicanso/pingap?size=1")
1971 );
1972 assert_eq!(key.namespace_str(), Some("pingap"));
1973 assert_eq!(
1974 r#"CacheKey { namespace: [112, 105, 110, 103, 97, 112], primary: [115, 115, 58, 71, 69, 84, 58, 47, 118, 105, 99, 97, 110, 115, 111, 47, 112, 105, 110, 103, 97, 112, 63, 115, 105, 122, 101, 61, 49], primary_bin_override: None, variance: None, user_tag: "", extensions: {} }"#,
1975 format!("{key:?}")
1976 );
1977 }
1978
1979 #[tokio::test]
1980 async fn test_response_cache_filter() {
1981 let server = new_server();
1982
1983 let headers = [""].join("\r\n");
1984 let input_header =
1985 format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
1986 let mock_io = Builder::new().read(input_header.as_bytes()).build();
1987 let mut session = Session::new_h1(Box::new(mock_io));
1988 session.read_request().await.unwrap();
1989
1990 let mut upstream_response =
1991 ResponseHeader::build_no_case(200, None).unwrap();
1992 upstream_response
1993 .append_header("Content-Type", "application/json")
1994 .unwrap();
1995 let result = server
1996 .response_cache_filter(
1997 &session,
1998 &upstream_response,
1999 &mut Ctx {
2000 cache: Some(CacheInfo {
2001 keys: Some(vec!["ss".to_string()]),
2002 ..Default::default()
2003 }),
2004 ..Default::default()
2005 },
2006 )
2007 .unwrap();
2008 assert_eq!(true, result.is_cacheable());
2009
2010 let mut upstream_response =
2011 ResponseHeader::build_no_case(200, None).unwrap();
2012 upstream_response
2013 .append_header("Cache-Control", "no-cache")
2014 .unwrap();
2015 let result = server
2016 .response_cache_filter(
2017 &session,
2018 &upstream_response,
2019 &mut Ctx {
2020 cache: Some(CacheInfo {
2021 keys: Some(vec!["ss".to_string()]),
2022 ..Default::default()
2023 }),
2024 ..Default::default()
2025 },
2026 )
2027 .unwrap();
2028 assert_eq!(false, result.is_cacheable());
2029
2030 let mut upstream_response =
2031 ResponseHeader::build_no_case(200, None).unwrap();
2032 upstream_response
2033 .append_header("Cache-Control", "no-store")
2034 .unwrap();
2035 let result = server
2036 .response_cache_filter(
2037 &session,
2038 &upstream_response,
2039 &mut Ctx {
2040 cache: Some(CacheInfo {
2041 keys: Some(vec!["ss".to_string()]),
2042 ..Default::default()
2043 }),
2044 ..Default::default()
2045 },
2046 )
2047 .unwrap();
2048 assert_eq!(false, result.is_cacheable());
2049
2050 let mut upstream_response =
2051 ResponseHeader::build_no_case(200, None).unwrap();
2052 upstream_response
2053 .append_header("Cache-Control", "private, max-age=100")
2054 .unwrap();
2055 let result = server
2056 .response_cache_filter(
2057 &session,
2058 &upstream_response,
2059 &mut Ctx {
2060 cache: Some(CacheInfo {
2061 keys: Some(vec!["ss".to_string()]),
2062 ..Default::default()
2063 }),
2064 ..Default::default()
2065 },
2066 )
2067 .unwrap();
2068 assert_eq!(false, result.is_cacheable());
2069 }
2070}