use crate::{Plugin, real_now_ms};
use ahash::AHashMap;
use bytes::BytesMut;
use http::StatusCode;
use http::Uri;
use http::{HeaderName, HeaderValue};
#[cfg(feature = "tracing")]
use opentelemetry::{
Context,
global::{BoxedSpan, BoxedTracer, ObjectSafeSpan},
trace::{SpanKind, TraceContextExt, Tracer},
};
use pingora::cache::CacheKey;
use pingora::http::RequestHeader;
use pingora::protocols::Digest;
use pingora::protocols::TimingDigest;
use pingora::proxy::Session;
use pingora_limits::inflight::Guard;
use std::fmt::Write;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
const SECOND: u64 = 1_000;
const MINUTE: u64 = 60 * SECOND;
const HOUR: u64 = 60 * MINUTE;
#[inline]
pub fn format_duration(buf: &mut BytesMut, ms: u64) {
if ms < SECOND {
buf.extend_from_slice(itoa::Buffer::new().format(ms).as_bytes());
buf.extend_from_slice(b"ms");
} else if ms < MINUTE {
buf.extend_from_slice(
itoa::Buffer::new().format(ms / SECOND).as_bytes(),
);
let value = (ms % SECOND) / 100;
if value != 0 {
buf.extend_from_slice(b".");
buf.extend_from_slice(itoa::Buffer::new().format(value).as_bytes());
}
buf.extend_from_slice(b"s");
} else if ms < HOUR {
buf.extend_from_slice(
itoa::Buffer::new().format(ms / MINUTE).as_bytes(),
);
let value = ms % MINUTE * 10 / MINUTE;
if value != 0 {
buf.extend_from_slice(b".");
buf.extend_from_slice(itoa::Buffer::new().format(value).as_bytes());
}
buf.extend_from_slice(b"m");
} else {
buf.extend_from_slice(itoa::Buffer::new().format(ms / HOUR).as_bytes());
let value = ms % HOUR * 10 / HOUR;
if value != 0 {
buf.extend_from_slice(b".");
buf.extend_from_slice(itoa::Buffer::new().format(value).as_bytes());
}
buf.extend_from_slice(b"h");
}
}
#[derive(PartialEq)]
pub enum ModifiedMode {
Upstream,
Response,
}
impl From<&str> for ModifiedMode {
fn from(value: &str) -> Self {
match value {
"upstream" => ModifiedMode::Upstream,
_ => ModifiedMode::Response,
}
}
}
pub trait ModifyResponseBody: Sync + Send {
fn handle(
&mut self,
session: &Session,
body: &mut Option<bytes::Bytes>,
end_of_stream: bool,
) -> pingora::Result<()>;
fn name(&self) -> String {
"unknown".to_string()
}
}
#[derive(Default)]
pub struct ConnectionInfo {
pub id: usize,
pub client_ip: Option<String>,
pub remote_addr: Option<String>,
pub remote_port: Option<u16>,
pub server_addr: Option<String>,
pub server_port: Option<u16>,
pub tls_version: Option<String>,
pub tls_cipher: Option<String>,
pub reused: bool,
}
pub struct Timing {
pub created_at: Instant,
pub connection_duration: u64,
pub tls_handshake: Option<i32>,
pub upstream_connect: Option<i32>,
pub upstream_tcp_connect: Option<i32>,
pub upstream_tls_handshake: Option<i32>,
pub upstream_processing: Option<i32>,
pub upstream_response: Option<i32>,
pub upstream_connection_duration: Option<u64>,
pub cache_lookup: Option<i32>,
pub cache_lock: Option<i32>,
}
impl Default for Timing {
fn default() -> Self {
Self {
created_at: Instant::now(),
connection_duration: 0,
tls_handshake: None,
upstream_connect: None,
upstream_tcp_connect: None,
upstream_tls_handshake: None,
upstream_processing: None,
upstream_response: None,
upstream_connection_duration: None,
cache_lookup: None,
cache_lock: None,
}
}
}
pub trait UpstreamInstance: Send + Sync {
fn on_transport_failure(&self, address: &str);
fn on_response(&self, address: &str, status: StatusCode);
fn completed(&self) -> i32;
}
pub trait LocationInstance: Send + Sync {
fn name(&self) -> &str;
fn upstream(&self) -> &str;
fn rewrite(
&self,
header: &mut RequestHeader,
variables: Option<AHashMap<String, String>>,
) -> (bool, Option<AHashMap<String, String>>);
fn headers(&self) -> Option<&Vec<(HeaderName, HeaderValue, bool)>>;
fn client_body_size_limit(&self) -> usize;
fn on_request(&self) -> pingora::Result<(u64, i32)>;
fn on_response(&self);
}
#[derive(Default)]
pub struct UpstreamInfo {
pub upstream_instance: Option<Arc<dyn UpstreamInstance>>,
pub location_instance: Option<Arc<dyn LocationInstance>>,
pub location: Arc<str>,
pub name: Arc<str>,
pub address: String,
pub reused: bool,
pub processing_count: Option<i32>,
pub connected_count: Option<i32>,
pub status: Option<StatusCode>,
pub retries: u8,
pub max_retries: Option<u8>,
pub max_retry_window: Option<Duration>,
}
#[derive(Default)]
pub struct RequestState {
pub request_id: Option<String>,
pub status: Option<StatusCode>,
pub payload_size: usize,
pub guard: Option<Guard>,
pub processing_count: i32,
pub accepted_count: u64,
pub location_processing_count: i32,
pub location_accepted_count: u64,
}
#[derive(Default)]
pub struct CacheInfo {
pub namespace: Option<String>,
pub keys: Option<Vec<String>>,
pub check_cache_control: bool,
pub max_ttl: Option<Duration>,
pub reading_count: Option<u32>,
pub writing_count: Option<u32>,
}
#[derive(Default)]
pub struct Features {
pub variables: Option<AHashMap<String, String>>,
pub plugin_processing_times: Option<Vec<(String, u32)>>,
pub compression_stat: Option<CompressionStat>,
pub modify_body_handlers:
Option<AHashMap<String, Box<dyn ModifyResponseBody>>>,
#[cfg(feature = "tracing")]
pub otel_tracer: Option<OtelTracer>,
#[cfg(feature = "tracing")]
pub upstream_span: Option<BoxedSpan>,
}
#[derive(Default)]
pub struct CompressionStat {
pub algorithm: String,
pub in_bytes: usize,
pub out_bytes: usize,
pub duration: Duration,
}
impl CompressionStat {
pub fn ratio(&self) -> f64 {
if self.out_bytes == 0 {
return 0.0;
}
(self.in_bytes as f64) / (self.out_bytes as f64)
}
}
#[cfg(feature = "tracing")]
pub struct OtelTracer {
pub tracer: BoxedTracer,
pub http_request_span: BoxedSpan,
}
#[cfg(feature = "tracing")]
impl OtelTracer {
#[inline]
pub fn new_upstream_span(&self, name: &str) -> BoxedSpan {
self.tracer
.span_builder(name.to_string())
.with_kind(SpanKind::Client)
.start_with_context(
&self.tracer,
&Context::current().with_remote_span_context(
self.http_request_span.span_context().clone(),
),
)
}
}
#[derive(Default)]
pub struct Ctx {
pub conn: ConnectionInfo,
pub upstream: UpstreamInfo,
pub timing: Timing,
pub state: RequestState,
pub cache: Option<CacheInfo>,
pub features: Option<Features>,
pub plugins: Option<Vec<(String, Arc<dyn Plugin>)>>,
}
#[derive(Debug, Default)]
pub struct DigestDetail {
pub connection_reused: bool,
pub connection_time: u64,
pub tcp_established: u64,
pub tls_established: u64,
pub tls_version: Option<String>,
pub tls_cipher: Option<String>,
}
#[inline]
pub(crate) fn timing_to_ms(timing: Option<&Option<TimingDigest>>) -> u64 {
match timing {
Some(Some(item)) => item
.established_ts
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64,
_ => 0,
}
}
#[inline]
pub fn get_digest_detail(digest: &Digest) -> DigestDetail {
let tcp_established = timing_to_ms(digest.timing_digest.first());
let mut connection_time = 0;
let now = real_now_ms();
if tcp_established > 0 && tcp_established < now {
connection_time = now - tcp_established;
}
let connection_reused = connection_time > 100;
let Some(ssl_digest) = &digest.ssl_digest else {
return DigestDetail {
connection_reused,
tcp_established,
connection_time,
..Default::default()
};
};
DigestDetail {
connection_reused,
tcp_established,
connection_time,
tls_established: timing_to_ms(digest.timing_digest.last()),
tls_version: Some(ssl_digest.version.to_string()),
tls_cipher: Some(ssl_digest.cipher.to_string()),
}
}
impl Ctx {
pub fn new() -> Self {
Self {
..Default::default()
}
}
#[inline]
pub fn add_variable(&mut self, key: &str, value: &str) {
let features = self.features.get_or_insert_default();
let variables = features.variables.get_or_insert_with(AHashMap::new);
variables.insert(key.to_string(), value.to_string());
}
#[inline]
pub fn extend_variables(&mut self, values: AHashMap<String, String>) {
let features = self.features.get_or_insert_default();
if let Some(variables) = features.variables.as_mut() {
variables.extend(values);
} else {
features.variables = Some(values);
}
}
#[inline]
pub fn get_variable(&self, key: &str) -> Option<&str> {
self.features
.as_ref()?
.variables
.as_ref()?
.get(key)
.map(|v| v.as_str())
}
#[inline]
pub fn add_modify_body_handler(
&mut self,
name: &str,
handler: Box<dyn ModifyResponseBody>,
) {
let features = self.features.get_or_insert_default();
let handlers = features
.modify_body_handlers
.get_or_insert_with(AHashMap::new);
handlers.insert(name.to_string(), handler);
}
#[inline]
pub fn get_modify_body_handler(
&mut self,
name: &str,
) -> Option<&mut Box<dyn ModifyResponseBody>> {
self.features
.as_mut()
.and_then(|f| f.modify_body_handlers.as_mut())
.and_then(|h| h.get_mut(name))
}
#[inline]
fn get_time_field(&self, field: Option<i32>) -> Option<u32> {
if let Some(value) = field
&& value >= 0
{
return Some(value as u32);
}
None
}
#[inline]
pub fn get_upstream_response_time(&self) -> Option<u32> {
self.get_time_field(self.timing.upstream_response)
}
#[inline]
pub fn get_upstream_connect_time(&self) -> Option<u32> {
self.get_time_field(self.timing.upstream_connect)
}
#[inline]
pub fn get_upstream_processing_time(&self) -> Option<u32> {
self.get_time_field(self.timing.upstream_processing)
}
#[inline]
pub fn add_plugin_processing_time(&mut self, name: &str, time: u32) {
let features = self.features.get_or_insert_default();
let times = features
.plugin_processing_times
.get_or_insert_with(|| Vec::with_capacity(5));
if let Some(item) = times.iter_mut().find(|item| item.0 == name) {
item.1 += time;
} else {
times.push((name.to_string(), time));
}
}
#[inline]
pub fn append_log_value(&self, buf: &mut BytesMut, key: &str) {
macro_rules! append_time {
($val:expr) => {
if let Some(ms) = $val {
buf.extend(itoa::Buffer::new().format(ms).as_bytes());
}
};
($val:expr, human) => {
if let Some(ms) = $val {
format_duration(buf, ms as u64);
}
};
}
match key {
"connection_id" => {
buf.extend(itoa::Buffer::new().format(self.conn.id).as_bytes());
},
"upstream_reused" => {
if self.upstream.reused {
buf.extend(b"true");
} else {
buf.extend(b"false");
}
},
"upstream_status" => {
if let Some(status) = &self.upstream.status {
buf.extend_from_slice(status.as_str().as_bytes());
} else {
buf.extend_from_slice(b"-");
}
},
"upstream_addr" => buf.extend(self.upstream.address.as_bytes()),
"processing" => buf.extend(
itoa::Buffer::new()
.format(self.state.processing_count)
.as_bytes(),
),
"upstream_connected" => {
if let Some(value) = self.upstream.connected_count {
buf.extend(itoa::Buffer::new().format(value).as_bytes());
}
},
"upstream_connect_time" => {
append_time!(self.get_upstream_connect_time())
},
"upstream_connect_time_human" => {
append_time!(self.get_upstream_connect_time(), human)
},
"upstream_processing_time" => {
append_time!(self.get_upstream_processing_time())
},
"upstream_processing_time_human" => {
append_time!(self.get_upstream_processing_time(), human)
},
"upstream_response_time" => {
append_time!(self.get_upstream_response_time())
},
"upstream_response_time_human" => {
append_time!(self.get_upstream_response_time(), human)
},
"upstream_tcp_connect_time" => {
append_time!(self.timing.upstream_tcp_connect)
},
"upstream_tcp_connect_time_human" => {
append_time!(self.timing.upstream_tcp_connect, human)
},
"upstream_tls_handshake_time" => {
append_time!(self.timing.upstream_tls_handshake)
},
"upstream_tls_handshake_time_human" => {
append_time!(self.timing.upstream_tls_handshake, human)
},
"upstream_connection_time" => {
append_time!(self.timing.upstream_connection_duration)
},
"upstream_connection_time_human" => {
append_time!(self.timing.upstream_connection_duration, human)
},
"connection_time" => {
append_time!(Some(self.timing.connection_duration))
},
"connection_time_human" => {
append_time!(Some(self.timing.connection_duration), human)
},
"location" => {
if !self.upstream.location.is_empty() {
buf.extend(self.upstream.location.as_bytes())
}
},
"connection_reused" => {
if self.conn.reused {
buf.extend(b"true");
} else {
buf.extend(b"false");
}
},
"tls_version" => {
if let Some(value) = &self.conn.tls_version {
buf.extend(value.as_bytes());
}
},
"tls_cipher" => {
if let Some(value) = &self.conn.tls_cipher {
buf.extend(value.as_bytes());
}
},
"tls_handshake_time" => append_time!(self.timing.tls_handshake),
"tls_handshake_time_human" => {
append_time!(self.timing.tls_handshake, human)
},
"compression_time" => {
if let Some(feature) = &self.features
&& let Some(value) = &feature.compression_stat
{
append_time!(Some(value.duration.as_millis() as u64))
}
},
"compression_time_human" => {
if let Some(feature) = &self.features
&& let Some(value) = &feature.compression_stat
{
append_time!(Some(value.duration.as_millis() as u64), human)
}
},
"compression_ratio" => {
if let Some(feature) = &self.features
&& let Some(value) = &feature.compression_stat
{
buf.extend(format!("{:.1}", value.ratio()).as_bytes());
}
},
"cache_lookup_time" => {
append_time!(self.timing.cache_lookup)
},
"cache_lookup_time_human" => {
append_time!(self.timing.cache_lookup, human)
},
"cache_lock_time" => {
append_time!(self.timing.cache_lock)
},
"cache_lock_time_human" => {
append_time!(self.timing.cache_lock, human)
},
"service_time" => {
append_time!(Some(self.timing.created_at.elapsed().as_millis()))
},
"service_time_human" => {
append_time!(
Some(self.timing.created_at.elapsed().as_millis()),
human
)
},
_ => {},
}
}
pub fn generate_server_timing(&self) -> String {
let mut timing_str = String::with_capacity(200);
let mut first = true;
macro_rules! add_timing {
($name:expr, $dur:expr) => {
if !first {
timing_str.push_str(", ");
}
let _ = write!(&mut timing_str, "{};dur={}", $name, $dur);
first = false;
};
}
let mut upstream_time = 0;
if let Some(time) = self.get_upstream_connect_time() {
upstream_time += time;
add_timing!("upstream.connect", time);
}
if let Some(time) = self.get_upstream_processing_time() {
upstream_time += time;
add_timing!("upstream.processing", time);
}
if upstream_time > 0 {
add_timing!("upstream", upstream_time);
}
let mut cache_time = 0;
if let Some(time) = self.timing.cache_lookup {
cache_time += time;
add_timing!("cache.lookup", time);
}
if let Some(time) = self.timing.cache_lock {
cache_time += time;
add_timing!("cache.lock", time);
}
if cache_time > 0 {
add_timing!("cache", cache_time);
}
if let Some(features) = &self.features
&& let Some(times) = &features.plugin_processing_times
{
let mut plugin_time: u32 = 0;
for (name, time) in times {
if *time == 0 {
continue;
}
plugin_time += time;
let mut plugin_name = String::with_capacity(7 + name.len());
plugin_name.push_str("plugin.");
plugin_name.push_str(name);
add_timing!(&plugin_name, time);
}
if plugin_time > 0 {
add_timing!("plugin", plugin_time);
}
}
let service_time = self.timing.created_at.elapsed().as_millis();
if !first {
timing_str.push_str(", ");
}
let _ = write!(&mut timing_str, "total;dur={}", service_time);
timing_str
}
#[inline]
pub fn push_cache_key(&mut self, key: String) {
let cache_info = self.cache.get_or_insert_default();
cache_info
.keys
.get_or_insert_with(|| Vec::with_capacity(2))
.push(key);
}
#[inline]
pub fn extend_cache_keys(&mut self, keys: Vec<String>) {
let cache_info = self.cache.get_or_insert_default();
cache_info
.keys
.get_or_insert_with(|| Vec::with_capacity(keys.len() + 2))
.extend(keys);
}
#[inline]
pub fn update_upstream_timing_from_digest(
&mut self,
digest: &Digest,
reused: bool,
) {
let detail = get_digest_detail(digest);
self.timing.upstream_connection_duration = Some(detail.connection_time);
if reused {
return;
}
let upstream_connect_time =
self.timing.upstream_connect.unwrap_or_default();
let mut upstream_tcp_connect = upstream_connect_time;
if detail.tls_established > detail.tcp_established {
let latency =
(detail.tls_established - detail.tcp_established) as i32;
upstream_tcp_connect -= latency;
self.timing.upstream_tls_handshake = Some(latency);
}
if upstream_tcp_connect > 0 {
self.timing.upstream_tcp_connect = Some(upstream_tcp_connect);
}
}
}
pub fn get_cache_key(ctx: &Ctx, method: &str, uri: &Uri) -> CacheKey {
let Some(cache_info) = &ctx.cache else {
return CacheKey::new("", "", "");
};
let namespace = cache_info.namespace.as_ref().map_or("", |v| v);
let key = if let Some(keys) = &cache_info.keys {
let mut key_buf = String::with_capacity(
keys.iter().map(|s| s.len() + 1).sum::<usize>()
+ method.len()
+ 1
+ uri.to_string().len(),
);
for (i, k) in keys.iter().enumerate() {
if i > 0 {
key_buf.push(':');
}
key_buf.push_str(k);
}
let _ = write!(&mut key_buf, ":{method}:{uri}");
key_buf
} else {
format!("{method}:{uri}")
};
CacheKey::new(namespace, key, "")
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use bytes::BytesMut;
use pingora::protocols::tls::SslDigest;
use pingora::protocols::tls::SslDigestExtension;
use pretty_assertions::assert_eq;
use std::{sync::Arc, time::Duration};
#[test]
fn test_ctx_new() {
let ctx = Ctx::new();
let elapsed_ms = ctx.timing.created_at.elapsed().as_millis();
assert!(elapsed_ms < 100, "created_at should be a recent timestamp");
assert!(ctx.cache.is_none());
assert!(ctx.features.is_none());
assert_eq!(ctx.conn.id, 0);
}
#[test]
fn test_add_and_get_variable() {
let mut ctx = Ctx::new();
assert!(
ctx.get_variable("key1").is_none(),
"Should be None before adding"
);
ctx.add_variable("key1", "value1");
ctx.add_variable("key2", "value2");
assert_eq!(ctx.get_variable("key1"), Some("value1"));
assert_eq!(ctx.get_variable("key2"), Some("value2"));
assert_eq!(ctx.get_variable("nonexistent"), None);
}
#[test]
fn test_get_time_field() {
let mut ctx = Ctx::new();
ctx.timing.upstream_response = Some(100);
assert_eq!(ctx.get_upstream_response_time(), Some(100));
ctx.timing.upstream_response = Some(-1);
assert_eq!(
ctx.get_upstream_response_time(),
None,
"Time exceeding one hour should be None"
);
ctx.timing.upstream_response = None;
assert_eq!(ctx.get_upstream_response_time(), None);
}
#[test]
fn test_append_log_value_coverage() {
let mut ctx = Ctx::new();
let mut buf = BytesMut::new();
ctx.append_log_value(&mut buf, "unknown_key");
assert!(buf.is_empty(), "Unknown key should not append anything");
buf = BytesMut::new();
ctx.conn.reused = true;
ctx.append_log_value(&mut buf, "connection_reused");
assert_eq!(&buf[..], b"true");
ctx.conn.tls_version = Some("TLSv1.3".to_string());
buf = BytesMut::new();
ctx.append_log_value(&mut buf, "tls_version");
assert_eq!(&buf[..], b"TLSv1.3");
coarsetime::Clock::update();
std::thread::sleep(Duration::from_millis(11));
buf = BytesMut::new();
ctx.append_log_value(&mut buf, "service_time");
coarsetime::Clock::update();
let service_time: u64 =
std::str::from_utf8(&buf[..]).unwrap().parse().unwrap();
assert!(service_time >= 10, "Service time should be at least 10ms");
}
#[test]
fn test_get_cache_key() {
let method = "GET";
let uri = Uri::from_static("https://example.com/path");
let ctx_no_cache = Ctx::new();
let key1 = get_cache_key(&ctx_no_cache, method, &uri);
assert_eq!(key1.namespace_str(), Some(""));
assert_eq!(key1.primary_key_str(), Some(""));
let mut ctx_with_ns = Ctx::new();
ctx_with_ns.cache = Some(CacheInfo {
namespace: Some("my-ns".to_string()),
..Default::default()
});
let key2 = get_cache_key(&ctx_with_ns, method, &uri);
assert_eq!(key2.namespace_str(), Some("my-ns"));
assert_eq!(
key2.primary_key_str(),
Some("GET:https://example.com/path")
);
let mut ctx_with_keys = Ctx::new();
ctx_with_keys.cache = Some(CacheInfo {
namespace: Some("my-ns".to_string()),
keys: Some(vec!["user-123".to_string(), "desktop".to_string()]),
..Default::default()
});
let key3 = get_cache_key(&ctx_with_keys, method, &uri);
assert_eq!(key3.namespace_str(), Some("my-ns"));
assert_eq!(
key3.primary_key_str(),
Some("user-123:desktop:GET:https://example.com/path")
);
}
#[test]
fn test_generate_server_timing() {
let mut ctx = Ctx::new();
ctx.timing.upstream_connect = Some(1);
ctx.timing.upstream_processing = Some(2);
ctx.timing.cache_lookup = Some(6);
ctx.timing.cache_lock = Some(7);
ctx.add_plugin_processing_time("plugin1", 100);
let timing_header = ctx.generate_server_timing();
assert!(timing_header.contains("upstream.connect;dur=1"));
assert!(timing_header.contains("upstream.processing;dur=2"));
assert!(timing_header.contains("upstream;dur=3"));
assert!(timing_header.contains("cache.lookup;dur=6"));
assert!(timing_header.contains("cache.lock;dur=7"));
assert!(timing_header.contains("cache;dur=13"));
assert!(timing_header.contains("plugin.plugin1;dur=100"));
assert!(timing_header.contains("plugin;dur=100"));
assert!(timing_header.contains("total;dur="));
}
#[test]
fn test_format_duration() {
let mut buf = BytesMut::new();
format_duration(&mut buf, (3600 + 3500) * 1000);
assert_eq!(b"1.9h", buf.as_ref());
buf = BytesMut::new();
format_duration(&mut buf, (3600 + 1800) * 1000);
assert_eq!(b"1.5h", buf.as_ref());
buf = BytesMut::new();
format_duration(&mut buf, (3600 + 100) * 1000);
assert_eq!(b"1h", buf.as_ref());
buf = BytesMut::new();
format_duration(&mut buf, (60 + 50) * 1000);
assert_eq!(b"1.8m", buf.as_ref());
buf = BytesMut::new();
format_duration(&mut buf, (60 + 2) * 1000);
assert_eq!(b"1m", buf.as_ref());
buf = BytesMut::new();
format_duration(&mut buf, 1000);
assert_eq!(b"1s", buf.as_ref());
buf = BytesMut::new();
format_duration(&mut buf, 512);
assert_eq!(b"512ms", buf.as_ref());
buf = BytesMut::new();
format_duration(&mut buf, 1112);
assert_eq!(b"1.1s", buf.as_ref());
}
#[test]
fn test_add_variable() {
let mut ctx = Ctx::new();
ctx.add_variable("key1", "value1");
ctx.add_variable("key2", "value2");
ctx.extend_variables(AHashMap::from([
("key3".to_string(), "value3".to_string()),
("key4".to_string(), "value4".to_string()),
]));
let variables =
ctx.features.as_ref().unwrap().variables.as_ref().unwrap();
assert_eq!(variables.get("key1"), Some(&"value1".to_string()));
assert_eq!(variables.get("key2"), Some(&"value2".to_string()));
assert_eq!(variables.get("key3"), Some(&"value3".to_string()));
assert_eq!(variables.get("key4"), Some(&"value4".to_string()));
}
#[test]
fn test_cache_key() {
let mut ctx = Ctx::new();
ctx.push_cache_key("key1".to_string());
ctx.extend_cache_keys(vec!["key2".to_string(), "key3".to_string()]);
assert_eq!(
vec!["key1".to_string(), "key2".to_string(), "key3".to_string()],
ctx.cache.unwrap().keys.unwrap()
);
let mut ctx = Ctx::new();
ctx.cache.get_or_insert_default();
let key = get_cache_key(
&ctx,
"GET",
&Uri::from_static("https://example.com/path"),
);
assert_eq!(key.namespace_str(), Some(""));
assert_eq!(key.primary_key_str(), Some("GET:https://example.com/path"));
}
#[test]
fn test_state() {
let mut ctx = Ctx::new();
let mut buf = BytesMut::new();
ctx.conn.id = 10;
ctx.append_log_value(&mut buf, "connection_id");
assert_eq!(b"10", buf.as_ref());
buf = BytesMut::new();
ctx.append_log_value(&mut buf, "upstream_reused");
assert_eq!(b"false", buf.as_ref());
buf = BytesMut::new();
ctx.upstream.reused = true;
ctx.append_log_value(&mut buf, "upstream_reused");
assert_eq!(b"true", buf.as_ref());
buf = BytesMut::new();
ctx.upstream.address = "192.168.1.1:80".to_string();
ctx.append_log_value(&mut buf, "upstream_addr");
assert_eq!(b"192.168.1.1:80", buf.as_ref());
buf = BytesMut::new();
ctx.upstream.status = Some(StatusCode::CREATED);
ctx.append_log_value(&mut buf, "upstream_status");
assert_eq!(b"201", buf.as_ref());
buf = BytesMut::new();
ctx.state.processing_count = 10;
ctx.append_log_value(&mut buf, "processing");
assert_eq!(b"10", buf.as_ref());
buf = BytesMut::new();
ctx.timing.upstream_connect = Some(1);
ctx.append_log_value(&mut buf, "upstream_connect_time");
assert_eq!(b"1", buf.as_ref());
buf = BytesMut::new();
ctx.append_log_value(&mut buf, "upstream_connect_time_human");
assert_eq!(b"1ms", buf.as_ref());
buf = BytesMut::new();
ctx.upstream.connected_count = Some(30);
ctx.append_log_value(&mut buf, "upstream_connected");
assert_eq!(b"30", buf.as_ref());
buf = BytesMut::new();
ctx.timing.upstream_processing = Some(2);
ctx.append_log_value(&mut buf, "upstream_processing_time");
assert_eq!(b"2", buf.as_ref());
buf = BytesMut::new();
ctx.append_log_value(&mut buf, "upstream_processing_time_human");
assert_eq!(b"2ms", buf.as_ref());
buf = BytesMut::new();
ctx.timing.upstream_response = Some(3);
ctx.append_log_value(&mut buf, "upstream_response_time");
assert_eq!(b"3", buf.as_ref());
buf = BytesMut::new();
ctx.append_log_value(&mut buf, "upstream_response_time_human");
assert_eq!(b"3ms", buf.as_ref());
buf = BytesMut::new();
ctx.timing.upstream_tcp_connect = Some(100);
ctx.append_log_value(&mut buf, "upstream_tcp_connect_time");
assert_eq!(b"100", buf.as_ref());
buf = BytesMut::new();
ctx.append_log_value(&mut buf, "upstream_tcp_connect_time_human");
assert_eq!(b"100ms", buf.as_ref());
buf = BytesMut::new();
ctx.timing.upstream_tls_handshake = Some(110);
ctx.append_log_value(&mut buf, "upstream_tls_handshake_time");
assert_eq!(b"110", buf.as_ref());
buf = BytesMut::new();
ctx.append_log_value(&mut buf, "upstream_tls_handshake_time_human");
assert_eq!(b"110ms", buf.as_ref());
buf = BytesMut::new();
ctx.timing.upstream_connection_duration = Some(120);
ctx.append_log_value(&mut buf, "upstream_connection_time");
assert_eq!(b"120", buf.as_ref());
buf = BytesMut::new();
ctx.append_log_value(&mut buf, "upstream_connection_time_human");
assert_eq!(b"120ms", buf.as_ref());
buf = BytesMut::new();
ctx.upstream.location = "pingap".to_string().into();
ctx.append_log_value(&mut buf, "location");
assert_eq!(b"pingap", buf.as_ref());
buf = BytesMut::new();
ctx.timing.connection_duration = 4;
ctx.append_log_value(&mut buf, "connection_time");
assert_eq!(b"4", buf.as_ref());
buf = BytesMut::new();
ctx.append_log_value(&mut buf, "connection_time_human");
assert_eq!(b"4ms", buf.as_ref());
buf = BytesMut::new();
ctx.conn.reused = false;
ctx.append_log_value(&mut buf, "connection_reused");
assert_eq!(b"false", buf.as_ref());
buf = BytesMut::new();
ctx.conn.reused = true;
ctx.append_log_value(&mut buf, "connection_reused");
assert_eq!(b"true", buf.as_ref());
buf = BytesMut::new();
ctx.conn.tls_version = Some("TLSv1.3".to_string());
ctx.append_log_value(&mut buf, "tls_version");
assert_eq!(b"TLSv1.3", buf.as_ref());
buf = BytesMut::new();
ctx.conn.tls_cipher =
Some("ECDHE_ECDSA_WITH_AES_128_GCM_SHA256".to_string());
ctx.append_log_value(&mut buf, "tls_cipher");
assert_eq!(b"ECDHE_ECDSA_WITH_AES_128_GCM_SHA256", buf.as_ref());
buf = BytesMut::new();
ctx.timing.tls_handshake = Some(101);
ctx.append_log_value(&mut buf, "tls_handshake_time");
assert_eq!(b"101", buf.as_ref());
buf = BytesMut::new();
ctx.append_log_value(&mut buf, "tls_handshake_time_human");
assert_eq!(b"101ms", buf.as_ref());
{
let features = ctx.features.get_or_insert_default();
features.compression_stat = Some(CompressionStat {
in_bytes: 1024,
out_bytes: 500,
duration: Duration::from_millis(5),
..Default::default()
})
}
buf = BytesMut::new();
ctx.append_log_value(&mut buf, "compression_time");
assert_eq!(b"5", buf.as_ref());
buf = BytesMut::new();
ctx.append_log_value(&mut buf, "compression_time_human");
assert_eq!(b"5ms", buf.as_ref());
buf = BytesMut::new();
ctx.append_log_value(&mut buf, "compression_ratio");
assert_eq!(b"2.0", buf.as_ref());
buf = BytesMut::new();
ctx.timing.cache_lookup = Some(6);
ctx.append_log_value(&mut buf, "cache_lookup_time");
assert_eq!(b"6", buf.as_ref());
buf = BytesMut::new();
ctx.append_log_value(&mut buf, "cache_lookup_time_human");
assert_eq!(b"6ms", buf.as_ref());
buf = BytesMut::new();
ctx.timing.cache_lock = Some(7);
ctx.append_log_value(&mut buf, "cache_lock_time");
assert_eq!(b"7", buf.as_ref());
buf = BytesMut::new();
ctx.append_log_value(&mut buf, "cache_lock_time_human");
assert_eq!(b"7ms", buf.as_ref());
}
#[test]
fn test_add_plugin_processing_time() {
let mut ctx = Ctx::new();
ctx.add_plugin_processing_time("plugin1", 100);
ctx.add_plugin_processing_time("plugin2", 200);
assert_eq!(
ctx.features.unwrap().plugin_processing_times,
Some(vec![
("plugin1".to_string(), 100),
("plugin2".to_string(), 200)
])
);
}
#[test]
fn test_get_digest_detail() {
let mut digest = Digest::default();
let detail = get_digest_detail(&digest);
assert_eq!(detail.connection_reused, false);
assert_eq!(detail.connection_time, 0);
assert_eq!(detail.tcp_established, 0);
assert_eq!(detail.tls_established, 0);
assert_eq!(detail.tls_version, None);
assert_eq!(detail.tls_cipher, None);
digest.timing_digest.push(Some(TimingDigest {
established_ts: SystemTime::UNIX_EPOCH
.checked_add(Duration::from_secs(5))
.unwrap(),
}));
digest.timing_digest.push(Some(TimingDigest {
established_ts: SystemTime::UNIX_EPOCH
.checked_add(Duration::from_secs(3))
.unwrap(),
}));
digest.ssl_digest = Some(Arc::new(SslDigest {
version: "1.3".into(),
cipher: "123".into(),
organization: Some("cloudflare".to_string()),
serial_number: Some(
"0x00000000000000000000000000000abc".to_string(),
),
cert_digest: vec![],
extension: SslDigestExtension::default(),
}));
let detail = get_digest_detail(&digest);
assert_eq!(detail.connection_reused, true);
assert_eq!(detail.tcp_established, 5000);
assert_eq!(detail.tls_established, 3000);
assert_eq!(detail.tls_version, Some("1.3".to_string()));
assert_eq!(detail.tls_cipher, Some("123".to_string()));
}
#[test]
fn test_modify_body_handler() {
let mut ctx = Ctx::default();
struct TestHandler {}
impl ModifyResponseBody for TestHandler {
fn handle(
&mut self,
_session: &Session,
body: &mut Option<bytes::Bytes>,
_end_of_stream: bool,
) -> pingora::Result<()> {
*body = Some(Bytes::from("test"));
Ok(())
}
}
ctx.add_modify_body_handler("test", Box::new(TestHandler {}));
assert_eq!(true, ctx.get_modify_body_handler("test").is_some());
}
}