1use crate::{Plugin, real_now_ms};
16use ahash::AHashMap;
17use bytes::BytesMut;
18use http::StatusCode;
19use http::Uri;
20use http::{HeaderName, HeaderValue};
21#[cfg(feature = "tracing")]
22use opentelemetry::{
23 Context,
24 global::{BoxedSpan, BoxedTracer, ObjectSafeSpan},
25 trace::{SpanKind, TraceContextExt, Tracer},
26};
27use pingora::cache::CacheKey;
28use pingora::http::RequestHeader;
29use pingora::protocols::Digest;
30use pingora::protocols::TimingDigest;
31use pingora::proxy::Session;
32use pingora_limits::inflight::Guard;
33use std::fmt::Write;
34use std::sync::Arc;
35use std::time::{Duration, Instant, SystemTime};
36
37const SECOND: u64 = 1_000;
39const MINUTE: u64 = 60 * SECOND;
40const HOUR: u64 = 60 * MINUTE;
41
42#[inline]
43pub fn format_duration(buf: &mut BytesMut, ms: u64) {
46 if ms < SECOND {
47 buf.extend_from_slice(itoa::Buffer::new().format(ms).as_bytes());
49 buf.extend_from_slice(b"ms");
50 } else if ms < MINUTE {
51 buf.extend_from_slice(
53 itoa::Buffer::new().format(ms / SECOND).as_bytes(),
54 );
55 let value = (ms % SECOND) / 100;
56 if value != 0 {
57 buf.extend_from_slice(b".");
58 buf.extend_from_slice(itoa::Buffer::new().format(value).as_bytes());
59 }
60 buf.extend_from_slice(b"s");
61 } else if ms < HOUR {
62 buf.extend_from_slice(
64 itoa::Buffer::new().format(ms / MINUTE).as_bytes(),
65 );
66 let value = ms % MINUTE * 10 / MINUTE;
67 if value != 0 {
68 buf.extend_from_slice(b".");
69 buf.extend_from_slice(itoa::Buffer::new().format(value).as_bytes());
70 }
71 buf.extend_from_slice(b"m");
72 } else {
73 buf.extend_from_slice(itoa::Buffer::new().format(ms / HOUR).as_bytes());
75 let value = ms % HOUR * 10 / HOUR;
76 if value != 0 {
77 buf.extend_from_slice(b".");
78 buf.extend_from_slice(itoa::Buffer::new().format(value).as_bytes());
79 }
80 buf.extend_from_slice(b"h");
81 }
82}
83
84#[derive(PartialEq)]
85pub enum ModifiedMode {
86 Upstream,
87 Response,
88}
89
90impl From<&str> for ModifiedMode {
91 fn from(value: &str) -> Self {
92 match value {
93 "upstream" => ModifiedMode::Upstream,
94 _ => ModifiedMode::Response,
95 }
96 }
97}
98
99pub trait ModifyResponseBody: Sync + Send {
101 fn handle(
103 &mut self,
104 session: &Session,
105 body: &mut Option<bytes::Bytes>,
106 end_of_stream: bool,
107 ) -> pingora::Result<()>;
108 fn name(&self) -> String {
110 "unknown".to_string()
111 }
112}
113
114#[derive(Default)]
116pub struct ConnectionInfo {
117 pub id: usize,
119 pub client_ip: Option<String>,
121 pub remote_addr: Option<String>,
123 pub remote_port: Option<u16>,
125 pub server_addr: Option<String>,
127 pub server_port: Option<u16>,
129 pub tls_version: Option<String>,
131 pub tls_cipher: Option<String>,
133 pub reused: bool,
135}
136
137pub struct Timing {
140 pub created_at: Instant,
142 pub connection_duration: u64,
145 pub tls_handshake: Option<i32>,
147 pub upstream_connect: Option<i32>,
149 pub upstream_tcp_connect: Option<i32>,
151 pub upstream_tls_handshake: Option<i32>,
153 pub upstream_processing: Option<i32>,
155 pub upstream_response: Option<i32>,
157 pub upstream_connection_duration: Option<u64>,
159 pub cache_lookup: Option<i32>,
161 pub cache_lock: Option<i32>,
163}
164
165impl Default for Timing {
166 fn default() -> Self {
167 Self {
168 created_at: Instant::now(),
169 connection_duration: 0,
170 tls_handshake: None,
171 upstream_connect: None,
172 upstream_tcp_connect: None,
173 upstream_tls_handshake: None,
174 upstream_processing: None,
175 upstream_response: None,
176 upstream_connection_duration: None,
177 cache_lookup: None,
178 cache_lock: None,
179 }
180 }
181}
182
183pub trait UpstreamInstance: Send + Sync {
185 fn on_transport_failure(&self, address: &str);
186 fn on_response(&self, address: &str, status: StatusCode);
187 fn completed(&self) -> i32;
188}
189
190pub trait LocationInstance: Send + Sync {
192 fn name(&self) -> &str;
194 fn upstream(&self) -> &str;
196 fn rewrite(
198 &self,
199 header: &mut RequestHeader,
200 variables: Option<AHashMap<String, String>>,
201 ) -> (bool, Option<AHashMap<String, String>>);
202 fn headers(&self) -> Option<&Vec<(HeaderName, HeaderValue, bool)>>;
204 fn client_body_size_limit(&self) -> usize;
206 fn on_request(&self) -> pingora::Result<(u64, i32)>;
212 fn on_response(&self);
214}
215
216#[derive(Default)]
218pub struct UpstreamInfo {
219 pub upstream_instance: Option<Arc<dyn UpstreamInstance>>,
221 pub location_instance: Option<Arc<dyn LocationInstance>>,
223 pub location: Arc<str>,
225 pub name: Arc<str>,
227 pub address: String,
229 pub reused: bool,
231 pub processing_count: Option<i32>,
233 pub connected_count: Option<i32>,
235 pub status: Option<StatusCode>,
237 pub retries: u8,
239 pub max_retries: Option<u8>,
241 pub max_retry_window: Option<Duration>,
249}
250
251#[derive(Default)]
253pub struct RequestState {
254 pub request_id: Option<String>,
256 pub status: Option<StatusCode>,
258 pub payload_size: usize,
260 pub guard: Option<Guard>,
262 pub processing_count: i32,
264 pub accepted_count: u64,
266 pub location_processing_count: i32,
268 pub location_accepted_count: u64,
270}
271
272#[derive(Default)]
274pub struct CacheInfo {
275 pub namespace: Option<String>,
277 pub keys: Option<Vec<String>>,
279 pub check_cache_control: bool,
281 pub max_ttl: Option<Duration>,
283 pub reading_count: Option<u32>,
285 pub writing_count: Option<u32>,
287}
288
289#[derive(Default)]
291pub struct Features {
292 pub variables: Option<AHashMap<String, String>>,
294 pub plugin_processing_times: Option<Vec<(String, u32)>>,
296 pub compression_stat: Option<CompressionStat>,
298 pub modify_body_handlers:
300 Option<AHashMap<String, Box<dyn ModifyResponseBody>>>,
301 #[cfg(feature = "tracing")]
303 pub otel_tracer: Option<OtelTracer>,
304 #[cfg(feature = "tracing")]
306 pub upstream_span: Option<BoxedSpan>,
307}
308
309#[derive(Default)]
310pub struct CompressionStat {
312 pub algorithm: String,
314 pub in_bytes: usize,
316 pub out_bytes: usize,
318 pub duration: Duration,
320}
321
322impl CompressionStat {
323 pub fn ratio(&self) -> f64 {
325 if self.out_bytes == 0 {
326 return 0.0;
327 }
328 (self.in_bytes as f64) / (self.out_bytes as f64)
329 }
330}
331
332#[cfg(feature = "tracing")]
334pub struct OtelTracer {
335 pub tracer: BoxedTracer,
337 pub http_request_span: BoxedSpan,
339}
340
341#[cfg(feature = "tracing")]
342impl OtelTracer {
343 #[inline]
345 pub fn new_upstream_span(&self, name: &str) -> BoxedSpan {
346 self.tracer
347 .span_builder(name.to_string())
348 .with_kind(SpanKind::Client)
349 .start_with_context(
350 &self.tracer,
351 &Context::current().with_remote_span_context(
353 self.http_request_span.span_context().clone(),
354 ),
355 )
356 }
357}
358
359#[derive(Default)]
362pub struct Ctx {
363 pub conn: ConnectionInfo,
365 pub upstream: UpstreamInfo,
367 pub timing: Timing,
369 pub state: RequestState,
371 pub cache: Option<CacheInfo>,
373 pub features: Option<Features>,
375 pub plugins: Option<Vec<(String, Arc<dyn Plugin>)>>,
377}
378
379#[derive(Debug, Default)]
381pub struct DigestDetail {
382 pub connection_reused: bool,
384 pub connection_time: u64,
386 pub tcp_established: u64,
388 pub tls_established: u64,
390 pub tls_version: Option<String>,
392 pub tls_cipher: Option<String>,
394}
395
396#[inline]
397pub(crate) fn timing_to_ms(timing: Option<&Option<TimingDigest>>) -> u64 {
398 match timing {
399 Some(Some(item)) => item
400 .established_ts
401 .duration_since(SystemTime::UNIX_EPOCH)
402 .unwrap_or_default()
403 .as_millis() as u64,
404 _ => 0,
405 }
406}
407
408#[inline]
411pub fn get_digest_detail(digest: &Digest) -> DigestDetail {
412 let tcp_established = timing_to_ms(digest.timing_digest.first());
413 let mut connection_time = 0;
414 let now = real_now_ms();
415 if tcp_established > 0 && tcp_established < now {
416 connection_time = now - tcp_established;
417 }
418 let connection_reused = connection_time > 100;
419
420 let Some(ssl_digest) = &digest.ssl_digest else {
421 return DigestDetail {
422 connection_reused,
423 tcp_established,
424 connection_time,
425 ..Default::default()
426 };
427 };
428
429 DigestDetail {
430 connection_reused,
431 tcp_established,
432 connection_time,
433 tls_established: timing_to_ms(digest.timing_digest.last()),
434 tls_version: Some(ssl_digest.version.to_string()),
435 tls_cipher: Some(ssl_digest.cipher.to_string()),
436 }
437}
438
439impl Ctx {
440 pub fn new() -> Self {
445 Self {
446 ..Default::default()
447 }
448 }
449
450 #[inline]
456 pub fn add_variable(&mut self, key: &str, value: &str) {
457 let features = self.features.get_or_insert_default();
459 let variables = features.variables.get_or_insert_with(AHashMap::new);
460 variables.insert(key.to_string(), value.to_string());
461 }
462
463 #[inline]
468 pub fn extend_variables(&mut self, values: AHashMap<String, String>) {
469 let features = self.features.get_or_insert_default();
470 if let Some(variables) = features.variables.as_mut() {
471 variables.extend(values);
472 } else {
473 features.variables = Some(values);
474 }
475 }
476
477 #[inline]
484 pub fn get_variable(&self, key: &str) -> Option<&str> {
485 self.features
486 .as_ref()?
487 .variables
488 .as_ref()?
489 .get(key)
490 .map(|v| v.as_str())
491 }
492
493 #[inline]
499 pub fn add_modify_body_handler(
500 &mut self,
501 name: &str,
502 handler: Box<dyn ModifyResponseBody>,
503 ) {
504 let features = self.features.get_or_insert_default();
505 let handlers = features
506 .modify_body_handlers
507 .get_or_insert_with(AHashMap::new);
508 handlers.insert(name.to_string(), handler);
509 }
510
511 #[inline]
513 pub fn get_modify_body_handler(
514 &mut self,
515 name: &str,
516 ) -> Option<&mut Box<dyn ModifyResponseBody>> {
517 self.features
518 .as_mut()
519 .and_then(|f| f.modify_body_handlers.as_mut())
520 .and_then(|h| h.get_mut(name))
521 }
522
523 #[inline]
526 fn get_time_field(&self, field: Option<i32>) -> Option<u32> {
527 if let Some(value) = field
528 && value >= 0
529 {
530 return Some(value as u32);
531 }
532 None
533 }
534
535 #[inline]
540 pub fn get_upstream_response_time(&self) -> Option<u32> {
541 self.get_time_field(self.timing.upstream_response)
542 }
543
544 #[inline]
549 pub fn get_upstream_connect_time(&self) -> Option<u32> {
550 self.get_time_field(self.timing.upstream_connect)
551 }
552
553 #[inline]
558 pub fn get_upstream_processing_time(&self) -> Option<u32> {
559 self.get_time_field(self.timing.upstream_processing)
560 }
561
562 #[inline]
568 pub fn add_plugin_processing_time(&mut self, name: &str, time: u32) {
569 let features = self.features.get_or_insert_default();
571 let times = features
572 .plugin_processing_times
573 .get_or_insert_with(|| Vec::with_capacity(5));
574 if let Some(item) = times.iter_mut().find(|item| item.0 == name) {
575 item.1 += time;
576 } else {
577 times.push((name.to_string(), time));
578 }
579 }
580
581 #[inline]
590 pub fn append_log_value(&self, buf: &mut BytesMut, key: &str) {
591 macro_rules! append_time {
593 ($val:expr) => {
595 if let Some(ms) = $val {
596 buf.extend(itoa::Buffer::new().format(ms).as_bytes());
597 }
598 };
599 ($val:expr, human) => {
601 if let Some(ms) = $val {
602 format_duration(buf, ms as u64);
603 }
604 };
605 }
606
607 match key {
608 "connection_id" => {
609 buf.extend(itoa::Buffer::new().format(self.conn.id).as_bytes());
610 },
611 "upstream_reused" => {
612 if self.upstream.reused {
613 buf.extend(b"true");
614 } else {
615 buf.extend(b"false");
616 }
617 },
618 "upstream_status" => {
619 if let Some(status) = &self.upstream.status {
620 buf.extend_from_slice(status.as_str().as_bytes());
621 } else {
622 buf.extend_from_slice(b"-");
623 }
624 },
625 "upstream_addr" => buf.extend(self.upstream.address.as_bytes()),
626 "processing" => buf.extend(
627 itoa::Buffer::new()
628 .format(self.state.processing_count)
629 .as_bytes(),
630 ),
631 "upstream_connected" => {
632 if let Some(value) = self.upstream.connected_count {
633 buf.extend(itoa::Buffer::new().format(value).as_bytes());
634 }
635 },
636
637 "upstream_connect_time" => {
639 append_time!(self.get_upstream_connect_time())
640 },
641 "upstream_connect_time_human" => {
642 append_time!(self.get_upstream_connect_time(), human)
643 },
644
645 "upstream_processing_time" => {
646 append_time!(self.get_upstream_processing_time())
647 },
648 "upstream_processing_time_human" => {
649 append_time!(self.get_upstream_processing_time(), human)
650 },
651 "upstream_response_time" => {
652 append_time!(self.get_upstream_response_time())
653 },
654 "upstream_response_time_human" => {
655 append_time!(self.get_upstream_response_time(), human)
656 },
657 "upstream_tcp_connect_time" => {
658 append_time!(self.timing.upstream_tcp_connect)
659 },
660 "upstream_tcp_connect_time_human" => {
661 append_time!(self.timing.upstream_tcp_connect, human)
662 },
663 "upstream_tls_handshake_time" => {
664 append_time!(self.timing.upstream_tls_handshake)
665 },
666 "upstream_tls_handshake_time_human" => {
667 append_time!(self.timing.upstream_tls_handshake, human)
668 },
669 "upstream_connection_time" => {
670 append_time!(self.timing.upstream_connection_duration)
671 },
672 "upstream_connection_time_human" => {
673 append_time!(self.timing.upstream_connection_duration, human)
674 },
675 "connection_time" => {
676 append_time!(Some(self.timing.connection_duration))
677 },
678 "connection_time_human" => {
679 append_time!(Some(self.timing.connection_duration), human)
680 },
681
682 "location" => {
684 if !self.upstream.location.is_empty() {
685 buf.extend(self.upstream.location.as_bytes())
686 }
687 },
688 "connection_reused" => {
689 if self.conn.reused {
690 buf.extend(b"true");
691 } else {
692 buf.extend(b"false");
693 }
694 },
695 "tls_version" => {
696 if let Some(value) = &self.conn.tls_version {
697 buf.extend(value.as_bytes());
698 }
699 },
700 "tls_cipher" => {
701 if let Some(value) = &self.conn.tls_cipher {
702 buf.extend(value.as_bytes());
703 }
704 },
705 "tls_handshake_time" => append_time!(self.timing.tls_handshake),
706 "tls_handshake_time_human" => {
707 append_time!(self.timing.tls_handshake, human)
708 },
709 "compression_time" => {
710 if let Some(feature) = &self.features
711 && let Some(value) = &feature.compression_stat
712 {
713 append_time!(Some(value.duration.as_millis() as u64))
714 }
715 },
716 "compression_time_human" => {
717 if let Some(feature) = &self.features
718 && let Some(value) = &feature.compression_stat
719 {
720 append_time!(Some(value.duration.as_millis() as u64), human)
721 }
722 },
723 "compression_ratio" => {
724 if let Some(feature) = &self.features
725 && let Some(value) = &feature.compression_stat
726 {
727 buf.extend(format!("{:.1}", value.ratio()).as_bytes());
728 }
729 },
730 "cache_lookup_time" => {
731 append_time!(self.timing.cache_lookup)
732 },
733 "cache_lookup_time_human" => {
734 append_time!(self.timing.cache_lookup, human)
735 },
736 "cache_lock_time" => {
737 append_time!(self.timing.cache_lock)
738 },
739 "cache_lock_time_human" => {
740 append_time!(self.timing.cache_lock, human)
741 },
742 "service_time" => {
743 append_time!(Some(self.timing.created_at.elapsed().as_millis()))
744 },
745 "service_time_human" => {
746 append_time!(
747 Some(self.timing.created_at.elapsed().as_millis()),
748 human
749 )
750 },
751 _ => {},
753 }
754 }
755
756 pub fn generate_server_timing(&self) -> String {
764 let mut timing_str = String::with_capacity(200);
765 let mut first = true;
767
768 macro_rules! add_timing {
770 ($name:expr, $dur:expr) => {
771 if !first {
772 timing_str.push_str(", ");
773 }
774 let _ = write!(&mut timing_str, "{};dur={}", $name, $dur);
776 first = false;
777 };
778 }
779
780 let mut upstream_time = 0;
782 if let Some(time) = self.get_upstream_connect_time() {
783 upstream_time += time;
784 add_timing!("upstream.connect", time);
785 }
786 if let Some(time) = self.get_upstream_processing_time() {
787 upstream_time += time;
788 add_timing!("upstream.processing", time);
789 }
790 if upstream_time > 0 {
791 add_timing!("upstream", upstream_time);
792 }
793
794 let mut cache_time = 0;
796 if let Some(time) = self.timing.cache_lookup {
797 cache_time += time;
798 add_timing!("cache.lookup", time);
799 }
800 if let Some(time) = self.timing.cache_lock {
801 cache_time += time;
802 add_timing!("cache.lock", time);
803 }
804 if cache_time > 0 {
805 add_timing!("cache", cache_time);
806 }
807
808 if let Some(features) = &self.features
810 && let Some(times) = &features.plugin_processing_times
811 {
812 let mut plugin_time: u32 = 0;
813 for (name, time) in times {
814 if *time == 0 {
815 continue;
816 }
817 plugin_time += time;
818 let mut plugin_name = String::with_capacity(7 + name.len());
819 plugin_name.push_str("plugin.");
820 plugin_name.push_str(name);
821 add_timing!(&plugin_name, time);
822 }
823 if plugin_time > 0 {
824 add_timing!("plugin", plugin_time);
825 }
826 }
827
828 let service_time = self.timing.created_at.elapsed().as_millis();
830 if !first {
832 timing_str.push_str(", ");
833 }
834 let _ = write!(&mut timing_str, "total;dur={}", service_time);
836
837 timing_str
838 }
839
840 #[inline]
842 pub fn push_cache_key(&mut self, key: String) {
843 let cache_info = self.cache.get_or_insert_default();
844 cache_info
845 .keys
846 .get_or_insert_with(|| Vec::with_capacity(2))
847 .push(key);
848 }
849
850 #[inline]
852 pub fn extend_cache_keys(&mut self, keys: Vec<String>) {
853 let cache_info = self.cache.get_or_insert_default();
854 cache_info
855 .keys
856 .get_or_insert_with(|| Vec::with_capacity(keys.len() + 2))
857 .extend(keys);
858 }
859 #[inline]
861 pub fn update_upstream_timing_from_digest(
862 &mut self,
863 digest: &Digest,
864 reused: bool,
865 ) {
866 let detail = get_digest_detail(digest);
867 self.timing.upstream_connection_duration = Some(detail.connection_time);
868 if reused {
869 return;
870 }
871
872 let upstream_connect_time =
873 self.timing.upstream_connect.unwrap_or_default();
874 let mut upstream_tcp_connect = upstream_connect_time;
876 if detail.tls_established > detail.tcp_established {
877 let latency =
878 (detail.tls_established - detail.tcp_established) as i32;
879 upstream_tcp_connect -= latency;
880 self.timing.upstream_tls_handshake = Some(latency);
881 }
882 if upstream_tcp_connect > 0 {
883 self.timing.upstream_tcp_connect = Some(upstream_tcp_connect);
884 }
885 }
886}
887
888pub fn get_cache_key(ctx: &Ctx, method: &str, uri: &Uri) -> CacheKey {
898 let Some(cache_info) = &ctx.cache else {
899 return CacheKey::new("", "", "");
901 };
902 let namespace = cache_info.namespace.as_ref().map_or("", |v| v);
903 let key = if let Some(keys) = &cache_info.keys {
904 let mut key_buf = String::with_capacity(
906 keys.iter().map(|s| s.len() + 1).sum::<usize>()
907 + method.len()
908 + 1
909 + uri.to_string().len(),
910 );
911
912 for (i, k) in keys.iter().enumerate() {
914 if i > 0 {
915 key_buf.push(':');
916 }
917 key_buf.push_str(k);
918 }
919 let _ = write!(&mut key_buf, ":{method}:{uri}");
921 key_buf
922 } else {
923 format!("{method}:{uri}")
925 };
926
927 CacheKey::new(namespace, key, "")
928}
929
930#[cfg(test)]
931mod tests {
932 use super::*;
933 use bytes::Bytes;
934 use bytes::BytesMut;
935 use pingora::protocols::tls::SslDigest;
936 use pingora::protocols::tls::SslDigestExtension;
937 use pretty_assertions::assert_eq;
938 use std::{sync::Arc, time::Duration};
939
940 #[test]
941 fn test_ctx_new() {
942 let ctx = Ctx::new();
943 let elapsed_ms = ctx.timing.created_at.elapsed().as_millis();
946 assert!(elapsed_ms < 100, "created_at should be a recent timestamp");
947 assert!(ctx.cache.is_none());
949 assert!(ctx.features.is_none());
950 assert_eq!(ctx.conn.id, 0);
951 }
952
953 #[test]
955 fn test_add_and_get_variable() {
956 let mut ctx = Ctx::new();
957 assert!(
958 ctx.get_variable("key1").is_none(),
959 "Should be None before adding"
960 );
961
962 ctx.add_variable("key1", "value1");
963 ctx.add_variable("key2", "value2");
964
965 assert_eq!(ctx.get_variable("key1"), Some("value1"));
966 assert_eq!(ctx.get_variable("key2"), Some("value2"));
967 assert_eq!(ctx.get_variable("nonexistent"), None);
968 }
969
970 #[test]
972 fn test_get_time_field() {
973 let mut ctx = Ctx::new();
974
975 ctx.timing.upstream_response = Some(100);
977 assert_eq!(ctx.get_upstream_response_time(), Some(100));
978
979 ctx.timing.upstream_response = Some(-1);
981 assert_eq!(
982 ctx.get_upstream_response_time(),
983 None,
984 "Time exceeding one hour should be None"
985 );
986
987 ctx.timing.upstream_response = None;
989 assert_eq!(ctx.get_upstream_response_time(), None);
990 }
991
992 #[test]
994 fn test_append_log_value_coverage() {
995 let mut ctx = Ctx::new();
996 let mut buf = BytesMut::new();
998 ctx.append_log_value(&mut buf, "unknown_key");
999 assert!(buf.is_empty(), "Unknown key should not append anything");
1000
1001 buf = BytesMut::new();
1003 ctx.conn.reused = true;
1004 ctx.append_log_value(&mut buf, "connection_reused");
1005 assert_eq!(&buf[..], b"true");
1006
1007 ctx.conn.tls_version = Some("TLSv1.3".to_string());
1009 buf = BytesMut::new();
1010 ctx.append_log_value(&mut buf, "tls_version");
1011 assert_eq!(&buf[..], b"TLSv1.3");
1012
1013 coarsetime::Clock::update();
1015 std::thread::sleep(Duration::from_millis(11));
1016 buf = BytesMut::new();
1017 ctx.append_log_value(&mut buf, "service_time");
1018 coarsetime::Clock::update();
1019 let service_time: u64 =
1020 std::str::from_utf8(&buf[..]).unwrap().parse().unwrap();
1021 assert!(service_time >= 10, "Service time should be at least 10ms");
1022 }
1023
1024 #[test]
1026 fn test_get_cache_key() {
1027 let method = "GET";
1028 let uri = Uri::from_static("https://example.com/path");
1029
1030 let ctx_no_cache = Ctx::new();
1032 let key1 = get_cache_key(&ctx_no_cache, method, &uri);
1033 assert_eq!(key1.namespace_str(), Some(""));
1034 assert_eq!(key1.primary_key_str(), Some(""));
1035
1036 let mut ctx_with_ns = Ctx::new();
1038 ctx_with_ns.cache = Some(CacheInfo {
1039 namespace: Some("my-ns".to_string()),
1040 ..Default::default()
1041 });
1042 let key2 = get_cache_key(&ctx_with_ns, method, &uri);
1043 assert_eq!(key2.namespace_str(), Some("my-ns"));
1044 assert_eq!(
1045 key2.primary_key_str(),
1046 Some("GET:https://example.com/path")
1047 );
1048
1049 let mut ctx_with_keys = Ctx::new();
1051 ctx_with_keys.cache = Some(CacheInfo {
1052 namespace: Some("my-ns".to_string()),
1053 keys: Some(vec!["user-123".to_string(), "desktop".to_string()]),
1054 ..Default::default()
1055 });
1056 let key3 = get_cache_key(&ctx_with_keys, method, &uri);
1057 assert_eq!(key3.namespace_str(), Some("my-ns"));
1058 assert_eq!(
1059 key3.primary_key_str(),
1060 Some("user-123:desktop:GET:https://example.com/path")
1061 );
1062 }
1063
1064 #[test]
1067 fn test_generate_server_timing() {
1068 let mut ctx = Ctx::new();
1069 ctx.timing.upstream_connect = Some(1);
1070 ctx.timing.upstream_processing = Some(2);
1071 ctx.timing.cache_lookup = Some(6);
1072 ctx.timing.cache_lock = Some(7);
1073 ctx.add_plugin_processing_time("plugin1", 100);
1074
1075 let timing_header = ctx.generate_server_timing();
1076
1077 assert!(timing_header.contains("upstream.connect;dur=1"));
1079 assert!(timing_header.contains("upstream.processing;dur=2"));
1080 assert!(timing_header.contains("upstream;dur=3"));
1081 assert!(timing_header.contains("cache.lookup;dur=6"));
1082 assert!(timing_header.contains("cache.lock;dur=7"));
1083 assert!(timing_header.contains("cache;dur=13"));
1084 assert!(timing_header.contains("plugin.plugin1;dur=100"));
1085 assert!(timing_header.contains("plugin;dur=100"));
1086 assert!(timing_header.contains("total;dur="));
1087 }
1088
1089 #[test]
1090 fn test_format_duration() {
1091 let mut buf = BytesMut::new();
1092 format_duration(&mut buf, (3600 + 3500) * 1000);
1093 assert_eq!(b"1.9h", buf.as_ref());
1094
1095 buf = BytesMut::new();
1096 format_duration(&mut buf, (3600 + 1800) * 1000);
1097 assert_eq!(b"1.5h", buf.as_ref());
1098
1099 buf = BytesMut::new();
1100 format_duration(&mut buf, (3600 + 100) * 1000);
1101 assert_eq!(b"1h", buf.as_ref());
1102
1103 buf = BytesMut::new();
1104 format_duration(&mut buf, (60 + 50) * 1000);
1105 assert_eq!(b"1.8m", buf.as_ref());
1106
1107 buf = BytesMut::new();
1108 format_duration(&mut buf, (60 + 2) * 1000);
1109 assert_eq!(b"1m", buf.as_ref());
1110
1111 buf = BytesMut::new();
1112 format_duration(&mut buf, 1000);
1113 assert_eq!(b"1s", buf.as_ref());
1114
1115 buf = BytesMut::new();
1116 format_duration(&mut buf, 512);
1117 assert_eq!(b"512ms", buf.as_ref());
1118
1119 buf = BytesMut::new();
1120 format_duration(&mut buf, 1112);
1121 assert_eq!(b"1.1s", buf.as_ref());
1122 }
1123
1124 #[test]
1125 fn test_add_variable() {
1126 let mut ctx = Ctx::new();
1127 ctx.add_variable("key1", "value1");
1128 ctx.add_variable("key2", "value2");
1129 ctx.extend_variables(AHashMap::from([
1130 ("key3".to_string(), "value3".to_string()),
1131 ("key4".to_string(), "value4".to_string()),
1132 ]));
1133 let variables =
1134 ctx.features.as_ref().unwrap().variables.as_ref().unwrap();
1135 assert_eq!(variables.get("key1"), Some(&"value1".to_string()));
1138 assert_eq!(variables.get("key2"), Some(&"value2".to_string()));
1139 assert_eq!(variables.get("key3"), Some(&"value3".to_string()));
1140 assert_eq!(variables.get("key4"), Some(&"value4".to_string()));
1141 }
1142
1143 #[test]
1144 fn test_cache_key() {
1145 let mut ctx = Ctx::new();
1146 ctx.push_cache_key("key1".to_string());
1147 ctx.extend_cache_keys(vec!["key2".to_string(), "key3".to_string()]);
1148 assert_eq!(
1149 vec!["key1".to_string(), "key2".to_string(), "key3".to_string()],
1150 ctx.cache.unwrap().keys.unwrap()
1151 );
1152
1153 let mut ctx = Ctx::new();
1154 ctx.cache.get_or_insert_default();
1155 let key = get_cache_key(
1156 &ctx,
1157 "GET",
1158 &Uri::from_static("https://example.com/path"),
1159 );
1160 assert_eq!(key.namespace_str(), Some(""));
1161 assert_eq!(key.primary_key_str(), Some("GET:https://example.com/path"));
1162 }
1163
1164 #[test]
1165 fn test_state() {
1166 let mut ctx = Ctx::new();
1167
1168 let mut buf = BytesMut::new();
1169 ctx.conn.id = 10;
1170 ctx.append_log_value(&mut buf, "connection_id");
1171 assert_eq!(b"10", buf.as_ref());
1172
1173 buf = BytesMut::new();
1174 ctx.append_log_value(&mut buf, "upstream_reused");
1175 assert_eq!(b"false", buf.as_ref());
1176
1177 buf = BytesMut::new();
1178 ctx.upstream.reused = true;
1179 ctx.append_log_value(&mut buf, "upstream_reused");
1180 assert_eq!(b"true", buf.as_ref());
1181
1182 buf = BytesMut::new();
1183 ctx.upstream.address = "192.168.1.1:80".to_string();
1184 ctx.append_log_value(&mut buf, "upstream_addr");
1185 assert_eq!(b"192.168.1.1:80", buf.as_ref());
1186
1187 buf = BytesMut::new();
1188 ctx.upstream.status = Some(StatusCode::CREATED);
1189 ctx.append_log_value(&mut buf, "upstream_status");
1190 assert_eq!(b"201", buf.as_ref());
1191
1192 buf = BytesMut::new();
1193 ctx.state.processing_count = 10;
1194 ctx.append_log_value(&mut buf, "processing");
1195 assert_eq!(b"10", buf.as_ref());
1196
1197 buf = BytesMut::new();
1198 ctx.timing.upstream_connect = Some(1);
1199 ctx.append_log_value(&mut buf, "upstream_connect_time");
1200 assert_eq!(b"1", buf.as_ref());
1201
1202 buf = BytesMut::new();
1203 ctx.append_log_value(&mut buf, "upstream_connect_time_human");
1204 assert_eq!(b"1ms", buf.as_ref());
1205
1206 buf = BytesMut::new();
1207 ctx.upstream.connected_count = Some(30);
1208 ctx.append_log_value(&mut buf, "upstream_connected");
1209 assert_eq!(b"30", buf.as_ref());
1210
1211 buf = BytesMut::new();
1212 ctx.timing.upstream_processing = Some(2);
1213 ctx.append_log_value(&mut buf, "upstream_processing_time");
1214 assert_eq!(b"2", buf.as_ref());
1215
1216 buf = BytesMut::new();
1217 ctx.append_log_value(&mut buf, "upstream_processing_time_human");
1218 assert_eq!(b"2ms", buf.as_ref());
1219
1220 buf = BytesMut::new();
1221 ctx.timing.upstream_response = Some(3);
1222 ctx.append_log_value(&mut buf, "upstream_response_time");
1223 assert_eq!(b"3", buf.as_ref());
1224
1225 buf = BytesMut::new();
1226 ctx.append_log_value(&mut buf, "upstream_response_time_human");
1227 assert_eq!(b"3ms", buf.as_ref());
1228
1229 buf = BytesMut::new();
1230 ctx.timing.upstream_tcp_connect = Some(100);
1231 ctx.append_log_value(&mut buf, "upstream_tcp_connect_time");
1232 assert_eq!(b"100", buf.as_ref());
1233
1234 buf = BytesMut::new();
1235 ctx.append_log_value(&mut buf, "upstream_tcp_connect_time_human");
1236 assert_eq!(b"100ms", buf.as_ref());
1237
1238 buf = BytesMut::new();
1239 ctx.timing.upstream_tls_handshake = Some(110);
1240 ctx.append_log_value(&mut buf, "upstream_tls_handshake_time");
1241 assert_eq!(b"110", buf.as_ref());
1242
1243 buf = BytesMut::new();
1244 ctx.append_log_value(&mut buf, "upstream_tls_handshake_time_human");
1245 assert_eq!(b"110ms", buf.as_ref());
1246
1247 buf = BytesMut::new();
1248 ctx.timing.upstream_connection_duration = Some(120);
1249 ctx.append_log_value(&mut buf, "upstream_connection_time");
1250 assert_eq!(b"120", buf.as_ref());
1251
1252 buf = BytesMut::new();
1253 ctx.append_log_value(&mut buf, "upstream_connection_time_human");
1254 assert_eq!(b"120ms", buf.as_ref());
1255
1256 buf = BytesMut::new();
1257 ctx.upstream.location = "pingap".to_string().into();
1258 ctx.append_log_value(&mut buf, "location");
1259 assert_eq!(b"pingap", buf.as_ref());
1260
1261 buf = BytesMut::new();
1262 ctx.timing.connection_duration = 4;
1263 ctx.append_log_value(&mut buf, "connection_time");
1264 assert_eq!(b"4", buf.as_ref());
1265
1266 buf = BytesMut::new();
1267 ctx.append_log_value(&mut buf, "connection_time_human");
1268 assert_eq!(b"4ms", buf.as_ref());
1269
1270 buf = BytesMut::new();
1271 ctx.conn.reused = false;
1272 ctx.append_log_value(&mut buf, "connection_reused");
1273 assert_eq!(b"false", buf.as_ref());
1274
1275 buf = BytesMut::new();
1276 ctx.conn.reused = true;
1277 ctx.append_log_value(&mut buf, "connection_reused");
1278 assert_eq!(b"true", buf.as_ref());
1279
1280 buf = BytesMut::new();
1281 ctx.conn.tls_version = Some("TLSv1.3".to_string());
1282 ctx.append_log_value(&mut buf, "tls_version");
1283 assert_eq!(b"TLSv1.3", buf.as_ref());
1284
1285 buf = BytesMut::new();
1286 ctx.conn.tls_cipher =
1287 Some("ECDHE_ECDSA_WITH_AES_128_GCM_SHA256".to_string());
1288 ctx.append_log_value(&mut buf, "tls_cipher");
1289 assert_eq!(b"ECDHE_ECDSA_WITH_AES_128_GCM_SHA256", buf.as_ref());
1290
1291 buf = BytesMut::new();
1292 ctx.timing.tls_handshake = Some(101);
1293 ctx.append_log_value(&mut buf, "tls_handshake_time");
1294 assert_eq!(b"101", buf.as_ref());
1295
1296 buf = BytesMut::new();
1297 ctx.append_log_value(&mut buf, "tls_handshake_time_human");
1298 assert_eq!(b"101ms", buf.as_ref());
1299
1300 {
1301 let features = ctx.features.get_or_insert_default();
1302 features.compression_stat = Some(CompressionStat {
1303 in_bytes: 1024,
1304 out_bytes: 500,
1305 duration: Duration::from_millis(5),
1306 ..Default::default()
1307 })
1308 }
1309
1310 buf = BytesMut::new();
1311 ctx.append_log_value(&mut buf, "compression_time");
1312 assert_eq!(b"5", buf.as_ref());
1313
1314 buf = BytesMut::new();
1315 ctx.append_log_value(&mut buf, "compression_time_human");
1316 assert_eq!(b"5ms", buf.as_ref());
1317
1318 buf = BytesMut::new();
1319 ctx.append_log_value(&mut buf, "compression_ratio");
1320 assert_eq!(b"2.0", buf.as_ref());
1321
1322 buf = BytesMut::new();
1323 ctx.timing.cache_lookup = Some(6);
1324 ctx.append_log_value(&mut buf, "cache_lookup_time");
1325 assert_eq!(b"6", buf.as_ref());
1326
1327 buf = BytesMut::new();
1328 ctx.append_log_value(&mut buf, "cache_lookup_time_human");
1329 assert_eq!(b"6ms", buf.as_ref());
1330
1331 buf = BytesMut::new();
1332 ctx.timing.cache_lock = Some(7);
1333 ctx.append_log_value(&mut buf, "cache_lock_time");
1334 assert_eq!(b"7", buf.as_ref());
1335
1336 buf = BytesMut::new();
1337 ctx.append_log_value(&mut buf, "cache_lock_time_human");
1338 assert_eq!(b"7ms", buf.as_ref());
1339 }
1340
1341 #[test]
1342 fn test_add_plugin_processing_time() {
1343 let mut ctx = Ctx::new();
1344 ctx.add_plugin_processing_time("plugin1", 100);
1345 ctx.add_plugin_processing_time("plugin2", 200);
1346 assert_eq!(
1347 ctx.features.unwrap().plugin_processing_times,
1348 Some(vec![
1349 ("plugin1".to_string(), 100),
1350 ("plugin2".to_string(), 200)
1351 ])
1352 );
1353 }
1354
1355 #[test]
1356 fn test_get_digest_detail() {
1357 let mut digest = Digest::default();
1358 let detail = get_digest_detail(&digest);
1359 assert_eq!(detail.connection_reused, false);
1360 assert_eq!(detail.connection_time, 0);
1361 assert_eq!(detail.tcp_established, 0);
1362 assert_eq!(detail.tls_established, 0);
1363 assert_eq!(detail.tls_version, None);
1364 assert_eq!(detail.tls_cipher, None);
1365
1366 digest.timing_digest.push(Some(TimingDigest {
1367 established_ts: SystemTime::UNIX_EPOCH
1368 .checked_add(Duration::from_secs(5))
1369 .unwrap(),
1370 }));
1371 digest.timing_digest.push(Some(TimingDigest {
1372 established_ts: SystemTime::UNIX_EPOCH
1373 .checked_add(Duration::from_secs(3))
1374 .unwrap(),
1375 }));
1376 digest.ssl_digest = Some(Arc::new(SslDigest {
1377 version: "1.3".into(),
1378 cipher: "123".into(),
1379 organization: Some("cloudflare".to_string()),
1380 serial_number: Some(
1381 "0x00000000000000000000000000000abc".to_string(),
1382 ),
1383 cert_digest: vec![],
1384 extension: SslDigestExtension::default(),
1385 }));
1386 let detail = get_digest_detail(&digest);
1387 assert_eq!(detail.connection_reused, true);
1388 assert_eq!(detail.tcp_established, 5000);
1389 assert_eq!(detail.tls_established, 3000);
1390 assert_eq!(detail.tls_version, Some("1.3".to_string()));
1391 assert_eq!(detail.tls_cipher, Some("123".to_string()));
1392 }
1393
1394 #[test]
1395 fn test_modify_body_handler() {
1396 let mut ctx = Ctx::default();
1397
1398 struct TestHandler {}
1399 impl ModifyResponseBody for TestHandler {
1400 fn handle(
1401 &mut self,
1402 _session: &Session,
1403 body: &mut Option<bytes::Bytes>,
1404 _end_of_stream: bool,
1405 ) -> pingora::Result<()> {
1406 *body = Some(Bytes::from("test"));
1407 Ok(())
1408 }
1409 }
1410
1411 ctx.add_modify_body_handler("test", Box::new(TestHandler {}));
1412 assert_eq!(true, ctx.get_modify_body_handler("test").is_some());
1413 }
1414}