1use crate::{Plugin, real_now_ms};
16use ahash::AHashMap;
17use bytes::BytesMut;
18use http::StatusCode;
19use http::Uri;
20use http::{HeaderName, HeaderValue};
21#[cfg(feature = "tracing")]
22use opentelemetry::{
23 Context,
24 global::{BoxedSpan, BoxedTracer, ObjectSafeSpan},
25 trace::{SpanKind, TraceContextExt, Tracer},
26};
27use pingora::cache::CacheKey;
28use pingora::http::RequestHeader;
29use pingora::protocols::Digest;
30use pingora::protocols::TimingDigest;
31use pingora::proxy::Session;
32use pingora_limits::inflight::Guard;
33use std::fmt::Write;
34use std::sync::Arc;
35use std::time::{Duration, Instant, SystemTime};
36
37const SECOND: u64 = 1_000;
39const MINUTE: u64 = 60 * SECOND;
40const HOUR: u64 = 60 * MINUTE;
41
42#[inline]
43pub fn format_duration(buf: &mut BytesMut, ms: u64) {
46 if ms < SECOND {
47 buf.extend_from_slice(itoa::Buffer::new().format(ms).as_bytes());
49 buf.extend_from_slice(b"ms");
50 } else if ms < MINUTE {
51 buf.extend_from_slice(
53 itoa::Buffer::new().format(ms / SECOND).as_bytes(),
54 );
55 let value = (ms % SECOND) / 100;
56 if value != 0 {
57 buf.extend_from_slice(b".");
58 buf.extend_from_slice(itoa::Buffer::new().format(value).as_bytes());
59 }
60 buf.extend_from_slice(b"s");
61 } else if ms < HOUR {
62 buf.extend_from_slice(
64 itoa::Buffer::new().format(ms / MINUTE).as_bytes(),
65 );
66 let value = ms % MINUTE * 10 / MINUTE;
67 if value != 0 {
68 buf.extend_from_slice(b".");
69 buf.extend_from_slice(itoa::Buffer::new().format(value).as_bytes());
70 }
71 buf.extend_from_slice(b"m");
72 } else {
73 buf.extend_from_slice(itoa::Buffer::new().format(ms / HOUR).as_bytes());
75 let value = ms % HOUR * 10 / HOUR;
76 if value != 0 {
77 buf.extend_from_slice(b".");
78 buf.extend_from_slice(itoa::Buffer::new().format(value).as_bytes());
79 }
80 buf.extend_from_slice(b"h");
81 }
82}
83
84#[derive(PartialEq)]
85pub enum ModifiedMode {
86 Upstream,
87 Response,
88}
89
90impl From<&str> for ModifiedMode {
91 fn from(value: &str) -> Self {
92 match value {
93 "upstream" => ModifiedMode::Upstream,
94 _ => ModifiedMode::Response,
95 }
96 }
97}
98
99pub trait ModifyResponseBody: Sync + Send {
101 fn handle(
103 &mut self,
104 session: &Session,
105 body: &mut Option<bytes::Bytes>,
106 end_of_stream: bool,
107 ) -> pingora::Result<()>;
108 fn name(&self) -> String {
110 "unknown".to_string()
111 }
112}
113
114#[derive(Default)]
116pub struct ConnectionInfo {
117 pub id: usize,
119 pub client_ip: Option<String>,
121 pub remote_addr: Option<String>,
123 pub remote_port: Option<u16>,
125 pub server_addr: Option<String>,
127 pub server_port: Option<u16>,
129 pub tls_version: Option<String>,
131 pub tls_cipher: Option<String>,
133 pub reused: bool,
135}
136
137pub struct Timing {
140 pub created_at: Instant,
142 pub connection_duration: u64,
145 pub tls_handshake: Option<i32>,
147 pub upstream_connect: Option<i32>,
149 pub upstream_tcp_connect: Option<i32>,
151 pub upstream_tls_handshake: Option<i32>,
153 pub upstream_processing: Option<i32>,
155 pub upstream_response: Option<i32>,
157 pub upstream_connection_duration: Option<u64>,
159 pub cache_lookup: Option<i32>,
161 pub cache_lock: Option<i32>,
163}
164
165impl Default for Timing {
166 fn default() -> Self {
167 Self {
168 created_at: Instant::now(),
169 connection_duration: 0,
170 tls_handshake: None,
171 upstream_connect: None,
172 upstream_tcp_connect: None,
173 upstream_tls_handshake: None,
174 upstream_processing: None,
175 upstream_response: None,
176 upstream_connection_duration: None,
177 cache_lookup: None,
178 cache_lock: None,
179 }
180 }
181}
182
183pub trait UpstreamInstance: Send + Sync {
185 fn on_transport_failure(&self, address: &str);
186 fn on_response(&self, address: &str, status: StatusCode);
187 fn completed(&self) -> i32;
188}
189
190pub trait LocationInstance: Send + Sync {
192 fn name(&self) -> &str;
194 fn upstream(&self) -> &str;
196 fn rewrite(
198 &self,
199 header: &mut RequestHeader,
200 variables: Option<AHashMap<String, String>>,
201 ) -> (bool, Option<AHashMap<String, String>>);
202 fn headers(&self) -> Option<&Vec<(HeaderName, HeaderValue, bool)>>;
204 fn client_body_size_limit(&self) -> usize;
206 fn on_request(&self) -> pingora::Result<(u64, i32)>;
212 fn on_response(&self);
214}
215
216#[derive(Default)]
218pub struct UpstreamInfo {
219 pub upstream_instance: Option<Arc<dyn UpstreamInstance>>,
221 pub location_instance: Option<Arc<dyn LocationInstance>>,
223 pub location: Arc<str>,
225 pub name: Arc<str>,
227 pub address: String,
229 pub reused: bool,
231 pub processing_count: Option<i32>,
233 pub connected_count: Option<i32>,
235 pub status: Option<StatusCode>,
237 pub retries: u8,
239 pub max_retries: Option<u8>,
241 pub max_retry_window: Option<Duration>,
249}
250
251#[derive(Default)]
253pub struct RequestState {
254 pub request_id: Option<String>,
256 pub status: Option<StatusCode>,
258 pub payload_size: usize,
260 pub guard: Option<Guard>,
262 pub processing_count: i32,
264 pub accepted_count: u64,
266 pub location_processing_count: i32,
268 pub location_accepted_count: u64,
270}
271
272#[derive(Default)]
274pub struct CacheInfo {
275 pub namespace: Option<String>,
277 pub keys: Option<Vec<String>>,
279 pub check_cache_control: bool,
281 pub max_ttl: Option<Duration>,
283 pub reading_count: Option<u32>,
285 pub writing_count: Option<u32>,
287}
288
289#[derive(Default)]
291pub struct Features {
292 pub variables: Option<AHashMap<String, String>>,
294 pub plugin_processing_times: Option<Vec<(String, u32)>>,
296 pub compression_stat: Option<CompressionStat>,
298 pub modify_body_handlers:
300 Option<AHashMap<String, Box<dyn ModifyResponseBody>>>,
301 #[cfg(feature = "tracing")]
303 pub otel_tracer: Option<OtelTracer>,
304 #[cfg(feature = "tracing")]
306 pub upstream_span: Option<BoxedSpan>,
307}
308
309#[derive(Default)]
310pub struct CompressionStat {
312 pub algorithm: String,
314 pub in_bytes: usize,
316 pub out_bytes: usize,
318 pub duration: Duration,
320}
321
322impl CompressionStat {
323 pub fn ratio(&self) -> f64 {
325 if self.out_bytes == 0 {
326 return 0.0;
327 }
328 (self.in_bytes as f64) / (self.out_bytes as f64)
329 }
330}
331
332#[cfg(feature = "tracing")]
334pub struct OtelTracer {
335 pub tracer: BoxedTracer,
337 pub http_request_span: BoxedSpan,
339}
340
341#[cfg(feature = "tracing")]
342impl OtelTracer {
343 #[inline]
345 pub fn new_upstream_span(&self, name: &str) -> BoxedSpan {
346 self.tracer
347 .span_builder(name.to_string())
348 .with_kind(SpanKind::Client)
349 .start_with_context(
350 &self.tracer,
351 &Context::current().with_remote_span_context(
353 self.http_request_span.span_context().clone(),
354 ),
355 )
356 }
357}
358
359#[derive(Default)]
362pub struct Ctx {
363 pub conn: ConnectionInfo,
365 pub upstream: UpstreamInfo,
367 pub timing: Timing,
369 pub state: RequestState,
371 pub cache: Option<CacheInfo>,
373 pub features: Option<Features>,
375 pub plugins: Option<Vec<(String, Arc<dyn Plugin>)>>,
377}
378
379#[derive(Debug, Default)]
381pub struct DigestDetail {
382 pub connection_reused: bool,
384 pub connection_time: u64,
386 pub tcp_established: u64,
388 pub tls_established: u64,
390 pub tls_version: Option<String>,
392 pub tls_cipher: Option<String>,
394}
395
396#[inline]
397pub(crate) fn timing_to_ms(timing: Option<&Option<TimingDigest>>) -> u64 {
398 match timing {
399 Some(Some(item)) => item
400 .established_ts
401 .duration_since(SystemTime::UNIX_EPOCH)
402 .unwrap_or_default()
403 .as_millis() as u64,
404 _ => 0,
405 }
406}
407
408#[inline]
411pub fn get_digest_detail(digest: &Digest) -> DigestDetail {
412 let tcp_established = timing_to_ms(digest.timing_digest.first());
413 let mut connection_time = 0;
414 let now = real_now_ms();
415 if tcp_established > 0 && tcp_established < now {
416 connection_time = now - tcp_established;
417 }
418 let connection_reused = connection_time > 100;
419
420 let Some(ssl_digest) = &digest.ssl_digest else {
421 return DigestDetail {
422 connection_reused,
423 tcp_established,
424 connection_time,
425 ..Default::default()
426 };
427 };
428
429 DigestDetail {
430 connection_reused,
431 tcp_established,
432 connection_time,
433 tls_established: timing_to_ms(digest.timing_digest.last()),
434 tls_version: Some(ssl_digest.version.to_string()),
435 tls_cipher: Some(ssl_digest.cipher.to_string()),
436 }
437}
438
439impl Ctx {
440 pub fn new() -> Self {
445 Self {
446 ..Default::default()
447 }
448 }
449
450 #[inline]
456 pub fn add_variable(&mut self, key: &str, value: &str) {
457 let features = self.features.get_or_insert_default();
459 let variables = features.variables.get_or_insert_with(AHashMap::new);
460 variables.insert(key.to_string(), value.to_string());
461 }
462
463 #[inline]
468 pub fn extend_variables(&mut self, values: AHashMap<String, String>) {
469 let features = self.features.get_or_insert_default();
470 if let Some(variables) = features.variables.as_mut() {
471 variables.extend(values);
472 } else {
473 features.variables = Some(values);
474 }
475 }
476
477 #[inline]
484 pub fn get_variable(&self, key: &str) -> Option<&str> {
485 self.features
486 .as_ref()?
487 .variables
488 .as_ref()?
489 .get(key)
490 .map(|v| v.as_str())
491 }
492
493 #[inline]
499 pub fn add_modify_body_handler(
500 &mut self,
501 name: &str,
502 handler: Box<dyn ModifyResponseBody>,
503 ) {
504 let features = self.features.get_or_insert_default();
505 let handlers = features
506 .modify_body_handlers
507 .get_or_insert_with(AHashMap::new);
508 handlers.insert(name.to_string(), handler);
509 }
510
511 #[inline]
513 pub fn get_modify_body_handler(
514 &mut self,
515 name: &str,
516 ) -> Option<&mut Box<dyn ModifyResponseBody>> {
517 self.features
518 .as_mut()
519 .and_then(|f| f.modify_body_handlers.as_mut())
520 .and_then(|h| h.get_mut(name))
521 }
522
523 #[inline]
526 fn get_time_field(&self, field: Option<i32>) -> Option<u32> {
527 if let Some(value) = field {
528 if value >= 0 {
529 return Some(value as u32);
530 }
531 }
532 None
533 }
534
535 #[inline]
540 pub fn get_upstream_response_time(&self) -> Option<u32> {
541 self.get_time_field(self.timing.upstream_response)
542 }
543
544 #[inline]
549 pub fn get_upstream_connect_time(&self) -> Option<u32> {
550 self.get_time_field(self.timing.upstream_connect)
551 }
552
553 #[inline]
558 pub fn get_upstream_processing_time(&self) -> Option<u32> {
559 self.get_time_field(self.timing.upstream_processing)
560 }
561
562 #[inline]
568 pub fn add_plugin_processing_time(&mut self, name: &str, time: u32) {
569 let features = self.features.get_or_insert_default();
571 let times = features
572 .plugin_processing_times
573 .get_or_insert_with(|| Vec::with_capacity(5));
574 if let Some(item) = times.iter_mut().find(|item| item.0 == name) {
575 item.1 += time;
576 } else {
577 times.push((name.to_string(), time));
578 }
579 }
580
581 #[inline]
590 pub fn append_log_value(&self, buf: &mut BytesMut, key: &str) {
591 macro_rules! append_time {
593 ($val:expr) => {
595 if let Some(ms) = $val {
596 buf.extend(itoa::Buffer::new().format(ms).as_bytes());
597 }
598 };
599 ($val:expr, human) => {
601 if let Some(ms) = $val {
602 format_duration(buf, ms as u64);
603 }
604 };
605 }
606
607 match key {
608 "connection_id" => {
609 buf.extend(itoa::Buffer::new().format(self.conn.id).as_bytes());
610 },
611 "upstream_reused" => {
612 if self.upstream.reused {
613 buf.extend(b"true");
614 } else {
615 buf.extend(b"false");
616 }
617 },
618 "upstream_status" => {
619 if let Some(status) = &self.upstream.status {
620 buf.extend_from_slice(status.as_str().as_bytes());
621 } else {
622 buf.extend_from_slice(b"-");
623 }
624 },
625 "upstream_addr" => buf.extend(self.upstream.address.as_bytes()),
626 "processing" => buf.extend(
627 itoa::Buffer::new()
628 .format(self.state.processing_count)
629 .as_bytes(),
630 ),
631 "upstream_connected" => {
632 if let Some(value) = self.upstream.connected_count {
633 buf.extend(itoa::Buffer::new().format(value).as_bytes());
634 }
635 },
636
637 "upstream_connect_time" => {
639 append_time!(self.get_upstream_connect_time())
640 },
641 "upstream_connect_time_human" => {
642 append_time!(self.get_upstream_connect_time(), human)
643 },
644
645 "upstream_processing_time" => {
646 append_time!(self.get_upstream_processing_time())
647 },
648 "upstream_processing_time_human" => {
649 append_time!(self.get_upstream_processing_time(), human)
650 },
651 "upstream_response_time" => {
652 append_time!(self.get_upstream_response_time())
653 },
654 "upstream_response_time_human" => {
655 append_time!(self.get_upstream_response_time(), human)
656 },
657 "upstream_tcp_connect_time" => {
658 append_time!(self.timing.upstream_tcp_connect)
659 },
660 "upstream_tcp_connect_time_human" => {
661 append_time!(self.timing.upstream_tcp_connect, human)
662 },
663 "upstream_tls_handshake_time" => {
664 append_time!(self.timing.upstream_tls_handshake)
665 },
666 "upstream_tls_handshake_time_human" => {
667 append_time!(self.timing.upstream_tls_handshake, human)
668 },
669 "upstream_connection_time" => {
670 append_time!(self.timing.upstream_connection_duration)
671 },
672 "upstream_connection_time_human" => {
673 append_time!(self.timing.upstream_connection_duration, human)
674 },
675 "connection_time" => {
676 append_time!(Some(self.timing.connection_duration))
677 },
678 "connection_time_human" => {
679 append_time!(Some(self.timing.connection_duration), human)
680 },
681
682 "location" => {
684 if !self.upstream.location.is_empty() {
685 buf.extend(self.upstream.location.as_bytes())
686 }
687 },
688 "connection_reused" => {
689 if self.conn.reused {
690 buf.extend(b"true");
691 } else {
692 buf.extend(b"false");
693 }
694 },
695 "tls_version" => {
696 if let Some(value) = &self.conn.tls_version {
697 buf.extend(value.as_bytes());
698 }
699 },
700 "tls_cipher" => {
701 if let Some(value) = &self.conn.tls_cipher {
702 buf.extend(value.as_bytes());
703 }
704 },
705 "tls_handshake_time" => append_time!(self.timing.tls_handshake),
706 "tls_handshake_time_human" => {
707 append_time!(self.timing.tls_handshake, human)
708 },
709 "compression_time" => {
710 if let Some(feature) = &self.features {
711 if let Some(value) = &feature.compression_stat {
712 append_time!(Some(value.duration.as_millis() as u64))
713 }
714 }
715 },
716 "compression_time_human" => {
717 if let Some(feature) = &self.features {
718 if let Some(value) = &feature.compression_stat {
719 append_time!(
720 Some(value.duration.as_millis() as u64),
721 human
722 )
723 }
724 }
725 },
726 "compression_ratio" => {
727 if let Some(feature) = &self.features {
728 if let Some(value) = &feature.compression_stat {
729 buf.extend(format!("{:.1}", value.ratio()).as_bytes());
730 }
731 }
732 },
733 "cache_lookup_time" => {
734 append_time!(self.timing.cache_lookup)
735 },
736 "cache_lookup_time_human" => {
737 append_time!(self.timing.cache_lookup, human)
738 },
739 "cache_lock_time" => {
740 append_time!(self.timing.cache_lock)
741 },
742 "cache_lock_time_human" => {
743 append_time!(self.timing.cache_lock, human)
744 },
745 "service_time" => {
746 append_time!(Some(self.timing.created_at.elapsed().as_millis()))
747 },
748 "service_time_human" => {
749 append_time!(
750 Some(self.timing.created_at.elapsed().as_millis()),
751 human
752 )
753 },
754 _ => {},
756 }
757 }
758
759 pub fn generate_server_timing(&self) -> String {
767 let mut timing_str = String::with_capacity(200);
768 let mut first = true;
770
771 macro_rules! add_timing {
773 ($name:expr, $dur:expr) => {
774 if !first {
775 timing_str.push_str(", ");
776 }
777 let _ = write!(&mut timing_str, "{};dur={}", $name, $dur);
779 first = false;
780 };
781 }
782
783 let mut upstream_time = 0;
785 if let Some(time) = self.get_upstream_connect_time() {
786 upstream_time += time;
787 add_timing!("upstream.connect", time);
788 }
789 if let Some(time) = self.get_upstream_processing_time() {
790 upstream_time += time;
791 add_timing!("upstream.processing", time);
792 }
793 if upstream_time > 0 {
794 add_timing!("upstream", upstream_time);
795 }
796
797 let mut cache_time = 0;
799 if let Some(time) = self.timing.cache_lookup {
800 cache_time += time;
801 add_timing!("cache.lookup", time);
802 }
803 if let Some(time) = self.timing.cache_lock {
804 cache_time += time;
805 add_timing!("cache.lock", time);
806 }
807 if cache_time > 0 {
808 add_timing!("cache", cache_time);
809 }
810
811 if let Some(features) = &self.features {
813 if let Some(times) = &features.plugin_processing_times {
814 let mut plugin_time: u32 = 0;
815 for (name, time) in times {
816 if *time == 0 {
817 continue;
818 }
819 plugin_time += time;
820 let mut plugin_name = String::with_capacity(7 + name.len());
821 plugin_name.push_str("plugin.");
822 plugin_name.push_str(name);
823 add_timing!(&plugin_name, time);
824 }
825 if plugin_time > 0 {
826 add_timing!("plugin", plugin_time);
827 }
828 }
829 }
830
831 let service_time = self.timing.created_at.elapsed().as_millis();
833 if !first {
835 timing_str.push_str(", ");
836 }
837 let _ = write!(&mut timing_str, "total;dur={}", service_time);
839
840 timing_str
841 }
842
843 #[inline]
845 pub fn push_cache_key(&mut self, key: String) {
846 let cache_info = self.cache.get_or_insert_default();
847 cache_info
848 .keys
849 .get_or_insert_with(|| Vec::with_capacity(2))
850 .push(key);
851 }
852
853 #[inline]
855 pub fn extend_cache_keys(&mut self, keys: Vec<String>) {
856 let cache_info = self.cache.get_or_insert_default();
857 cache_info
858 .keys
859 .get_or_insert_with(|| Vec::with_capacity(keys.len() + 2))
860 .extend(keys);
861 }
862 #[inline]
864 pub fn update_upstream_timing_from_digest(
865 &mut self,
866 digest: &Digest,
867 reused: bool,
868 ) {
869 let detail = get_digest_detail(digest);
870 self.timing.upstream_connection_duration = Some(detail.connection_time);
871 if reused {
872 return;
873 }
874
875 let upstream_connect_time =
876 self.timing.upstream_connect.unwrap_or_default();
877 let mut upstream_tcp_connect = upstream_connect_time;
879 if detail.tls_established > detail.tcp_established {
880 let latency =
881 (detail.tls_established - detail.tcp_established) as i32;
882 upstream_tcp_connect -= latency;
883 self.timing.upstream_tls_handshake = Some(latency);
884 }
885 if upstream_tcp_connect > 0 {
886 self.timing.upstream_tcp_connect = Some(upstream_tcp_connect);
887 }
888 }
889}
890
891pub fn get_cache_key(ctx: &Ctx, method: &str, uri: &Uri) -> CacheKey {
901 let Some(cache_info) = &ctx.cache else {
902 return CacheKey::new("", "", "");
904 };
905 let namespace = cache_info.namespace.as_ref().map_or("", |v| v);
906 let key = if let Some(keys) = &cache_info.keys {
907 let mut key_buf = String::with_capacity(
909 keys.iter().map(|s| s.len() + 1).sum::<usize>()
910 + method.len()
911 + 1
912 + uri.to_string().len(),
913 );
914
915 for (i, k) in keys.iter().enumerate() {
917 if i > 0 {
918 key_buf.push(':');
919 }
920 key_buf.push_str(k);
921 }
922 let _ = write!(&mut key_buf, ":{method}:{uri}");
924 key_buf
925 } else {
926 format!("{method}:{uri}")
928 };
929
930 CacheKey::new(namespace, key, "")
931}
932
933#[cfg(test)]
934mod tests {
935 use super::*;
936 use bytes::Bytes;
937 use bytes::BytesMut;
938 use pingora::protocols::tls::SslDigest;
939 use pingora::protocols::tls::SslDigestExtension;
940 use pretty_assertions::assert_eq;
941 use std::{sync::Arc, time::Duration};
942
943 #[test]
944 fn test_ctx_new() {
945 let ctx = Ctx::new();
946 let elapsed_ms = ctx.timing.created_at.elapsed().as_millis();
949 assert!(elapsed_ms < 100, "created_at should be a recent timestamp");
950 assert!(ctx.cache.is_none());
952 assert!(ctx.features.is_none());
953 assert_eq!(ctx.conn.id, 0);
954 }
955
956 #[test]
958 fn test_add_and_get_variable() {
959 let mut ctx = Ctx::new();
960 assert!(
961 ctx.get_variable("key1").is_none(),
962 "Should be None before adding"
963 );
964
965 ctx.add_variable("key1", "value1");
966 ctx.add_variable("key2", "value2");
967
968 assert_eq!(ctx.get_variable("key1"), Some("value1"));
969 assert_eq!(ctx.get_variable("key2"), Some("value2"));
970 assert_eq!(ctx.get_variable("nonexistent"), None);
971 }
972
973 #[test]
975 fn test_get_time_field() {
976 let mut ctx = Ctx::new();
977
978 ctx.timing.upstream_response = Some(100);
980 assert_eq!(ctx.get_upstream_response_time(), Some(100));
981
982 ctx.timing.upstream_response = Some(-1);
984 assert_eq!(
985 ctx.get_upstream_response_time(),
986 None,
987 "Time exceeding one hour should be None"
988 );
989
990 ctx.timing.upstream_response = None;
992 assert_eq!(ctx.get_upstream_response_time(), None);
993 }
994
995 #[test]
997 fn test_append_log_value_coverage() {
998 let mut ctx = Ctx::new();
999 let mut buf = BytesMut::new();
1001 ctx.append_log_value(&mut buf, "unknown_key");
1002 assert!(buf.is_empty(), "Unknown key should not append anything");
1003
1004 buf = BytesMut::new();
1006 ctx.conn.reused = true;
1007 ctx.append_log_value(&mut buf, "connection_reused");
1008 assert_eq!(&buf[..], b"true");
1009
1010 ctx.conn.tls_version = Some("TLSv1.3".to_string());
1012 buf = BytesMut::new();
1013 ctx.append_log_value(&mut buf, "tls_version");
1014 assert_eq!(&buf[..], b"TLSv1.3");
1015
1016 coarsetime::Clock::update();
1018 std::thread::sleep(Duration::from_millis(11));
1019 buf = BytesMut::new();
1020 ctx.append_log_value(&mut buf, "service_time");
1021 coarsetime::Clock::update();
1022 let service_time: u64 =
1023 std::str::from_utf8(&buf[..]).unwrap().parse().unwrap();
1024 assert!(service_time >= 10, "Service time should be at least 10ms");
1025 }
1026
1027 #[test]
1029 fn test_get_cache_key() {
1030 let method = "GET";
1031 let uri = Uri::from_static("https://example.com/path");
1032
1033 let ctx_no_cache = Ctx::new();
1035 let key1 = get_cache_key(&ctx_no_cache, method, &uri);
1036 assert_eq!(key1.namespace_str(), Some(""));
1037 assert_eq!(key1.primary_key_str(), Some(""));
1038
1039 let mut ctx_with_ns = Ctx::new();
1041 ctx_with_ns.cache = Some(CacheInfo {
1042 namespace: Some("my-ns".to_string()),
1043 ..Default::default()
1044 });
1045 let key2 = get_cache_key(&ctx_with_ns, method, &uri);
1046 assert_eq!(key2.namespace_str(), Some("my-ns"));
1047 assert_eq!(
1048 key2.primary_key_str(),
1049 Some("GET:https://example.com/path")
1050 );
1051
1052 let mut ctx_with_keys = Ctx::new();
1054 ctx_with_keys.cache = Some(CacheInfo {
1055 namespace: Some("my-ns".to_string()),
1056 keys: Some(vec!["user-123".to_string(), "desktop".to_string()]),
1057 ..Default::default()
1058 });
1059 let key3 = get_cache_key(&ctx_with_keys, method, &uri);
1060 assert_eq!(key3.namespace_str(), Some("my-ns"));
1061 assert_eq!(
1062 key3.primary_key_str(),
1063 Some("user-123:desktop:GET:https://example.com/path")
1064 );
1065 }
1066
1067 #[test]
1070 fn test_generate_server_timing() {
1071 let mut ctx = Ctx::new();
1072 ctx.timing.upstream_connect = Some(1);
1073 ctx.timing.upstream_processing = Some(2);
1074 ctx.timing.cache_lookup = Some(6);
1075 ctx.timing.cache_lock = Some(7);
1076 ctx.add_plugin_processing_time("plugin1", 100);
1077
1078 let timing_header = ctx.generate_server_timing();
1079
1080 assert!(timing_header.contains("upstream.connect;dur=1"));
1082 assert!(timing_header.contains("upstream.processing;dur=2"));
1083 assert!(timing_header.contains("upstream;dur=3"));
1084 assert!(timing_header.contains("cache.lookup;dur=6"));
1085 assert!(timing_header.contains("cache.lock;dur=7"));
1086 assert!(timing_header.contains("cache;dur=13"));
1087 assert!(timing_header.contains("plugin.plugin1;dur=100"));
1088 assert!(timing_header.contains("plugin;dur=100"));
1089 assert!(timing_header.contains("total;dur="));
1090 }
1091
1092 #[test]
1093 fn test_format_duration() {
1094 let mut buf = BytesMut::new();
1095 format_duration(&mut buf, (3600 + 3500) * 1000);
1096 assert_eq!(b"1.9h", buf.as_ref());
1097
1098 buf = BytesMut::new();
1099 format_duration(&mut buf, (3600 + 1800) * 1000);
1100 assert_eq!(b"1.5h", buf.as_ref());
1101
1102 buf = BytesMut::new();
1103 format_duration(&mut buf, (3600 + 100) * 1000);
1104 assert_eq!(b"1h", buf.as_ref());
1105
1106 buf = BytesMut::new();
1107 format_duration(&mut buf, (60 + 50) * 1000);
1108 assert_eq!(b"1.8m", buf.as_ref());
1109
1110 buf = BytesMut::new();
1111 format_duration(&mut buf, (60 + 2) * 1000);
1112 assert_eq!(b"1m", buf.as_ref());
1113
1114 buf = BytesMut::new();
1115 format_duration(&mut buf, 1000);
1116 assert_eq!(b"1s", buf.as_ref());
1117
1118 buf = BytesMut::new();
1119 format_duration(&mut buf, 512);
1120 assert_eq!(b"512ms", buf.as_ref());
1121
1122 buf = BytesMut::new();
1123 format_duration(&mut buf, 1112);
1124 assert_eq!(b"1.1s", buf.as_ref());
1125 }
1126
1127 #[test]
1128 fn test_add_variable() {
1129 let mut ctx = Ctx::new();
1130 ctx.add_variable("key1", "value1");
1131 ctx.add_variable("key2", "value2");
1132 ctx.extend_variables(AHashMap::from([
1133 ("key3".to_string(), "value3".to_string()),
1134 ("key4".to_string(), "value4".to_string()),
1135 ]));
1136 let variables =
1137 ctx.features.as_ref().unwrap().variables.as_ref().unwrap();
1138 assert_eq!(variables.get("key1"), Some(&"value1".to_string()));
1141 assert_eq!(variables.get("key2"), Some(&"value2".to_string()));
1142 assert_eq!(variables.get("key3"), Some(&"value3".to_string()));
1143 assert_eq!(variables.get("key4"), Some(&"value4".to_string()));
1144 }
1145
1146 #[test]
1147 fn test_cache_key() {
1148 let mut ctx = Ctx::new();
1149 ctx.push_cache_key("key1".to_string());
1150 ctx.extend_cache_keys(vec!["key2".to_string(), "key3".to_string()]);
1151 assert_eq!(
1152 vec!["key1".to_string(), "key2".to_string(), "key3".to_string()],
1153 ctx.cache.unwrap().keys.unwrap()
1154 );
1155
1156 let mut ctx = Ctx::new();
1157 ctx.cache.get_or_insert_default();
1158 let key = get_cache_key(
1159 &ctx,
1160 "GET",
1161 &Uri::from_static("https://example.com/path"),
1162 );
1163 assert_eq!(key.namespace_str(), Some(""));
1164 assert_eq!(key.primary_key_str(), Some("GET:https://example.com/path"));
1165 }
1166
1167 #[test]
1168 fn test_state() {
1169 let mut ctx = Ctx::new();
1170
1171 let mut buf = BytesMut::new();
1172 ctx.conn.id = 10;
1173 ctx.append_log_value(&mut buf, "connection_id");
1174 assert_eq!(b"10", buf.as_ref());
1175
1176 buf = BytesMut::new();
1177 ctx.append_log_value(&mut buf, "upstream_reused");
1178 assert_eq!(b"false", buf.as_ref());
1179
1180 buf = BytesMut::new();
1181 ctx.upstream.reused = true;
1182 ctx.append_log_value(&mut buf, "upstream_reused");
1183 assert_eq!(b"true", buf.as_ref());
1184
1185 buf = BytesMut::new();
1186 ctx.upstream.address = "192.168.1.1:80".to_string();
1187 ctx.append_log_value(&mut buf, "upstream_addr");
1188 assert_eq!(b"192.168.1.1:80", buf.as_ref());
1189
1190 buf = BytesMut::new();
1191 ctx.upstream.status = Some(StatusCode::CREATED);
1192 ctx.append_log_value(&mut buf, "upstream_status");
1193 assert_eq!(b"201", buf.as_ref());
1194
1195 buf = BytesMut::new();
1196 ctx.state.processing_count = 10;
1197 ctx.append_log_value(&mut buf, "processing");
1198 assert_eq!(b"10", buf.as_ref());
1199
1200 buf = BytesMut::new();
1201 ctx.timing.upstream_connect = Some(1);
1202 ctx.append_log_value(&mut buf, "upstream_connect_time");
1203 assert_eq!(b"1", buf.as_ref());
1204
1205 buf = BytesMut::new();
1206 ctx.append_log_value(&mut buf, "upstream_connect_time_human");
1207 assert_eq!(b"1ms", buf.as_ref());
1208
1209 buf = BytesMut::new();
1210 ctx.upstream.connected_count = Some(30);
1211 ctx.append_log_value(&mut buf, "upstream_connected");
1212 assert_eq!(b"30", buf.as_ref());
1213
1214 buf = BytesMut::new();
1215 ctx.timing.upstream_processing = Some(2);
1216 ctx.append_log_value(&mut buf, "upstream_processing_time");
1217 assert_eq!(b"2", buf.as_ref());
1218
1219 buf = BytesMut::new();
1220 ctx.append_log_value(&mut buf, "upstream_processing_time_human");
1221 assert_eq!(b"2ms", buf.as_ref());
1222
1223 buf = BytesMut::new();
1224 ctx.timing.upstream_response = Some(3);
1225 ctx.append_log_value(&mut buf, "upstream_response_time");
1226 assert_eq!(b"3", buf.as_ref());
1227
1228 buf = BytesMut::new();
1229 ctx.append_log_value(&mut buf, "upstream_response_time_human");
1230 assert_eq!(b"3ms", buf.as_ref());
1231
1232 buf = BytesMut::new();
1233 ctx.timing.upstream_tcp_connect = Some(100);
1234 ctx.append_log_value(&mut buf, "upstream_tcp_connect_time");
1235 assert_eq!(b"100", buf.as_ref());
1236
1237 buf = BytesMut::new();
1238 ctx.append_log_value(&mut buf, "upstream_tcp_connect_time_human");
1239 assert_eq!(b"100ms", buf.as_ref());
1240
1241 buf = BytesMut::new();
1242 ctx.timing.upstream_tls_handshake = Some(110);
1243 ctx.append_log_value(&mut buf, "upstream_tls_handshake_time");
1244 assert_eq!(b"110", buf.as_ref());
1245
1246 buf = BytesMut::new();
1247 ctx.append_log_value(&mut buf, "upstream_tls_handshake_time_human");
1248 assert_eq!(b"110ms", buf.as_ref());
1249
1250 buf = BytesMut::new();
1251 ctx.timing.upstream_connection_duration = Some(120);
1252 ctx.append_log_value(&mut buf, "upstream_connection_time");
1253 assert_eq!(b"120", buf.as_ref());
1254
1255 buf = BytesMut::new();
1256 ctx.append_log_value(&mut buf, "upstream_connection_time_human");
1257 assert_eq!(b"120ms", buf.as_ref());
1258
1259 buf = BytesMut::new();
1260 ctx.upstream.location = "pingap".to_string().into();
1261 ctx.append_log_value(&mut buf, "location");
1262 assert_eq!(b"pingap", buf.as_ref());
1263
1264 buf = BytesMut::new();
1265 ctx.timing.connection_duration = 4;
1266 ctx.append_log_value(&mut buf, "connection_time");
1267 assert_eq!(b"4", buf.as_ref());
1268
1269 buf = BytesMut::new();
1270 ctx.append_log_value(&mut buf, "connection_time_human");
1271 assert_eq!(b"4ms", buf.as_ref());
1272
1273 buf = BytesMut::new();
1274 ctx.conn.reused = false;
1275 ctx.append_log_value(&mut buf, "connection_reused");
1276 assert_eq!(b"false", buf.as_ref());
1277
1278 buf = BytesMut::new();
1279 ctx.conn.reused = true;
1280 ctx.append_log_value(&mut buf, "connection_reused");
1281 assert_eq!(b"true", buf.as_ref());
1282
1283 buf = BytesMut::new();
1284 ctx.conn.tls_version = Some("TLSv1.3".to_string());
1285 ctx.append_log_value(&mut buf, "tls_version");
1286 assert_eq!(b"TLSv1.3", buf.as_ref());
1287
1288 buf = BytesMut::new();
1289 ctx.conn.tls_cipher =
1290 Some("ECDHE_ECDSA_WITH_AES_128_GCM_SHA256".to_string());
1291 ctx.append_log_value(&mut buf, "tls_cipher");
1292 assert_eq!(b"ECDHE_ECDSA_WITH_AES_128_GCM_SHA256", buf.as_ref());
1293
1294 buf = BytesMut::new();
1295 ctx.timing.tls_handshake = Some(101);
1296 ctx.append_log_value(&mut buf, "tls_handshake_time");
1297 assert_eq!(b"101", buf.as_ref());
1298
1299 buf = BytesMut::new();
1300 ctx.append_log_value(&mut buf, "tls_handshake_time_human");
1301 assert_eq!(b"101ms", buf.as_ref());
1302
1303 {
1304 let features = ctx.features.get_or_insert_default();
1305 features.compression_stat = Some(CompressionStat {
1306 in_bytes: 1024,
1307 out_bytes: 500,
1308 duration: Duration::from_millis(5),
1309 ..Default::default()
1310 })
1311 }
1312
1313 buf = BytesMut::new();
1314 ctx.append_log_value(&mut buf, "compression_time");
1315 assert_eq!(b"5", buf.as_ref());
1316
1317 buf = BytesMut::new();
1318 ctx.append_log_value(&mut buf, "compression_time_human");
1319 assert_eq!(b"5ms", buf.as_ref());
1320
1321 buf = BytesMut::new();
1322 ctx.append_log_value(&mut buf, "compression_ratio");
1323 assert_eq!(b"2.0", buf.as_ref());
1324
1325 buf = BytesMut::new();
1326 ctx.timing.cache_lookup = Some(6);
1327 ctx.append_log_value(&mut buf, "cache_lookup_time");
1328 assert_eq!(b"6", buf.as_ref());
1329
1330 buf = BytesMut::new();
1331 ctx.append_log_value(&mut buf, "cache_lookup_time_human");
1332 assert_eq!(b"6ms", buf.as_ref());
1333
1334 buf = BytesMut::new();
1335 ctx.timing.cache_lock = Some(7);
1336 ctx.append_log_value(&mut buf, "cache_lock_time");
1337 assert_eq!(b"7", buf.as_ref());
1338
1339 buf = BytesMut::new();
1340 ctx.append_log_value(&mut buf, "cache_lock_time_human");
1341 assert_eq!(b"7ms", buf.as_ref());
1342 }
1343
1344 #[test]
1345 fn test_add_plugin_processing_time() {
1346 let mut ctx = Ctx::new();
1347 ctx.add_plugin_processing_time("plugin1", 100);
1348 ctx.add_plugin_processing_time("plugin2", 200);
1349 assert_eq!(
1350 ctx.features.unwrap().plugin_processing_times,
1351 Some(vec![
1352 ("plugin1".to_string(), 100),
1353 ("plugin2".to_string(), 200)
1354 ])
1355 );
1356 }
1357
1358 #[test]
1359 fn test_get_digest_detail() {
1360 let mut digest = Digest::default();
1361 let detail = get_digest_detail(&digest);
1362 assert_eq!(detail.connection_reused, false);
1363 assert_eq!(detail.connection_time, 0);
1364 assert_eq!(detail.tcp_established, 0);
1365 assert_eq!(detail.tls_established, 0);
1366 assert_eq!(detail.tls_version, None);
1367 assert_eq!(detail.tls_cipher, None);
1368
1369 digest.timing_digest.push(Some(TimingDigest {
1370 established_ts: SystemTime::UNIX_EPOCH
1371 .checked_add(Duration::from_secs(5))
1372 .unwrap(),
1373 }));
1374 digest.timing_digest.push(Some(TimingDigest {
1375 established_ts: SystemTime::UNIX_EPOCH
1376 .checked_add(Duration::from_secs(3))
1377 .unwrap(),
1378 }));
1379 digest.ssl_digest = Some(Arc::new(SslDigest {
1380 version: "1.3".into(),
1381 cipher: "123".into(),
1382 organization: Some("cloudflare".to_string()),
1383 serial_number: Some(
1384 "0x00000000000000000000000000000abc".to_string(),
1385 ),
1386 cert_digest: vec![],
1387 extension: SslDigestExtension::default(),
1388 }));
1389 let detail = get_digest_detail(&digest);
1390 assert_eq!(detail.connection_reused, true);
1391 assert_eq!(detail.tcp_established, 5000);
1392 assert_eq!(detail.tls_established, 3000);
1393 assert_eq!(detail.tls_version, Some("1.3".to_string()));
1394 assert_eq!(detail.tls_cipher, Some("123".to_string()));
1395 }
1396
1397 #[test]
1398 fn test_modify_body_handler() {
1399 let mut ctx = Ctx::default();
1400
1401 struct TestHandler {}
1402 impl ModifyResponseBody for TestHandler {
1403 fn handle(
1404 &mut self,
1405 _session: &Session,
1406 body: &mut Option<bytes::Bytes>,
1407 _end_of_stream: bool,
1408 ) -> pingora::Result<()> {
1409 *body = Some(Bytes::from("test"));
1410 Ok(())
1411 }
1412 }
1413
1414 ctx.add_modify_body_handler("test", Box::new(TestHandler {}));
1415 assert_eq!(true, ctx.get_modify_body_handler("test").is_some());
1416 }
1417}