pingap_core/
ctx.rs

1// Copyright 2024-2025 Tree xie.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::now_ms;
16use ahash::AHashMap;
17use bytes::{Bytes, BytesMut};
18use http::StatusCode;
19use http::Uri;
20#[cfg(feature = "full")]
21use opentelemetry::{
22    global::{BoxedSpan, BoxedTracer, ObjectSafeSpan},
23    trace::{SpanKind, TraceContextExt, Tracer},
24    Context,
25};
26use pingora::cache::CacheKey;
27use pingora_limits::inflight::Guard;
28use std::time::Duration;
29
30const SECOND: u64 = 1_000;
31const MINUTE: u64 = 60 * SECOND;
32const HOUR: u64 = 60 * MINUTE;
33
34#[inline]
35/// Format the duration in human readable format
36fn format_duration(mut buf: BytesMut, ms: u64) -> BytesMut {
37    if ms >= HOUR {
38        buf.extend(itoa::Buffer::new().format(ms / HOUR).as_bytes());
39        let value = ms % HOUR * 10 / HOUR;
40        if value != 0 {
41            buf.extend(b".");
42            buf.extend(itoa::Buffer::new().format(value).as_bytes());
43        }
44        buf.extend(b"h");
45    } else if ms >= MINUTE {
46        buf.extend(itoa::Buffer::new().format(ms / MINUTE).as_bytes());
47        let value = ms % MINUTE * 10 / MINUTE;
48        if value != 0 {
49            buf.extend(b".");
50            buf.extend(itoa::Buffer::new().format(value).as_bytes());
51        }
52        buf.extend(b"m");
53    } else if ms >= SECOND {
54        buf.extend(itoa::Buffer::new().format(ms / SECOND).as_bytes());
55        let value = (ms % SECOND) / 100;
56        if value != 0 {
57            buf.extend(b".");
58            buf.extend(itoa::Buffer::new().format(value).as_bytes());
59        }
60        buf.extend(b"s");
61    } else {
62        buf.extend(itoa::Buffer::new().format(ms).as_bytes());
63        buf.extend(b"ms");
64    }
65    buf
66}
67
68/// Trait for modifying the response body
69pub trait ModifyResponseBody: Sync + Send {
70    fn handle(&self, data: Bytes) -> Bytes;
71}
72
73/// Statistics about response compression operations
74pub struct CompressionStat {
75    /// Size of the data before compression in bytes
76    pub in_bytes: usize,
77    /// Size of the data after compression in bytes
78    pub out_bytes: usize,
79    /// Time taken to perform the compression operation
80    pub duration: Duration,
81}
82
83impl CompressionStat {
84    pub fn ratio(&self) -> f64 {
85        (self.in_bytes as f64) / (self.out_bytes as f64)
86    }
87}
88
89#[cfg(feature = "full")]
90pub struct OtelTracer {
91    pub tracer: BoxedTracer,
92    pub http_request_span: BoxedSpan,
93}
94
95#[cfg(feature = "full")]
96impl OtelTracer {
97    #[inline]
98    pub fn new_upstream_span(&self, name: &str) -> BoxedSpan {
99        self.tracer
100            .span_builder(name.to_string())
101            .with_kind(SpanKind::Client)
102            .start_with_context(
103                &self.tracer,
104                &Context::current().with_remote_span_context(
105                    self.http_request_span.span_context().clone(),
106                ),
107            )
108    }
109}
110
111/// Represents the state of a request/response cycle, tracking various metrics and properties
112/// including connection details, caching information, and upstream server interactions.
113#[derive(Default)]
114pub struct Ctx {
115    /// Unique identifier for the connection, it should be unique among all existing connections of the same type
116    pub connection_id: usize,
117    /// Number of requests currently processing
118    pub processing: i32,
119    /// Total number of requests accepted
120    pub accepted: u64,
121    /// Number of requests currently processing for the current location
122    pub location_processing: i32,
123    /// Total number of requests accepted for the current location
124    pub location_accepted: u64,
125    /// Timestamp when this context was created (in milliseconds)
126    pub created_at: u64,
127    /// TLS version used by the client connection (e.g., "TLSv1.3")
128    pub tls_version: Option<String>,
129    /// TLS cipher suite used by the client connection
130    pub tls_cipher: Option<String>,
131    /// Time taken for TLS handshake with client (in milliseconds)
132    pub tls_handshake_time: Option<u64>,
133    /// HTTP status code of the response
134    pub status: Option<StatusCode>,
135    /// Total time the connection has been alive (in milliseconds)
136    /// May be large for reused connections
137    pub connection_time: u64,
138    /// Indicates if this connection is reused
139    pub connection_reused: bool,
140    /// The location handling request
141    pub location: String,
142    /// Address of the upstream server
143    pub upstream_address: String,
144    /// Client's IP address
145    pub client_ip: Option<String>,
146    /// Remote connection port
147    pub remote_port: Option<u16>,
148    /// Remote connection address
149    pub remote_addr: Option<String>,
150    /// Server's listening port
151    pub server_port: Option<u16>,
152    /// Server's address
153    pub server_addr: Option<String>,
154    /// Rate limiting guard
155    pub guard: Option<Guard>,
156    /// Unique identifier for the request
157    pub request_id: Option<String>,
158    /// Namespace for cache entries
159    pub cache_namespace: Option<String>,
160    /// Prefix for cache keys
161    pub cache_prefix: Option<String>,
162    /// Whether to check cache control headers
163    pub check_cache_control: bool,
164    /// Time spent looking up cache entries (in milliseconds)
165    pub cache_lookup_time: Option<u64>,
166    /// Time spent acquiring cache locks (in milliseconds)
167    pub cache_lock_time: Option<u64>,
168    /// Maximum time-to-live for cache entries
169    pub cache_max_ttl: Option<Duration>,
170    /// The upstream server
171    pub upstream: String,
172    /// Indicates if the upstream connection is reused
173    pub upstream_reused: bool,
174    /// Number of requests processing by upstream
175    pub upstream_processing: Option<i32>,
176    /// Time taken to establish/reuse upstream connection (in milliseconds)
177    pub upstream_connect_time: Option<u64>,
178    /// Current number of active upstream connections
179    pub upstream_connected: Option<i32>,
180    /// Time taken for TCP connection to upstream (in milliseconds)
181    pub upstream_tcp_connect_time: Option<u64>,
182    /// Time taken for TLS handshake with upstream (in milliseconds)
183    pub upstream_tls_handshake_time: Option<u64>,
184    /// Time taken by upstream server to process request (in milliseconds)
185    pub upstream_processing_time: Option<u64>,
186    /// Total time taken by upstream server (in milliseconds)
187    pub upstream_response_time: Option<u64>,
188    /// Total time the upstream connection has been alive (in milliseconds)
189    /// May be large for reused connections
190    pub upstream_connection_time: Option<u64>,
191    /// Size of the request payload in bytes
192    pub payload_size: usize,
193    /// Statistics about response compression
194    pub compression_stat: Option<CompressionStat>,
195    /// Handler for modifying response body
196    pub modify_response_body: Option<Box<dyn ModifyResponseBody>>,
197    /// Body buffer of modified response
198    pub response_body: Option<BytesMut>,
199    /// Number of cache reading operations
200    pub cache_reading: Option<u32>,
201    /// Number of cache writing operations
202    pub cache_writing: Option<u32>,
203    /// OpenTelemetry tracer (only with "full" feature)
204    #[cfg(feature = "full")]
205    pub otel_tracer: Option<OtelTracer>,
206    /// OpenTelemetry span for upstream requests (only with "full" feature)
207    #[cfg(feature = "full")]
208    pub upstream_span: Option<BoxedSpan>,
209    /// Custom variables map for request processing
210    pub variables: Option<AHashMap<String, String>>,
211    /// Plugin processing times
212    pub plugin_processing_times: Option<Vec<(String, u32)>>,
213}
214
215const ONE_HOUR_MS: u64 = 60 * 60 * 1000;
216
217impl Ctx {
218    /// Creates a new Ctx instance with the current timestamp and default values.
219    ///
220    /// Returns a new Ctx struct initialized with the current timestamp and all other fields
221    /// set to their default values.
222    pub fn new() -> Self {
223        Self {
224            created_at: now_ms(),
225            ..Default::default()
226        }
227    }
228
229    /// Adds a variable to the state's variables map with the given key and value.
230    /// The key will be automatically prefixed with '$' before being stored.
231    ///
232    /// # Arguments
233    /// * `key` - The variable name (will be prefixed with '$')
234    /// * `value` - The value to store for this variable
235    #[inline]
236    pub fn add_variable(&mut self, key: &str, value: &str) {
237        let key = format!("${key}");
238        if let Some(variables) = self.variables.as_mut() {
239            variables.insert(key, value.to_string());
240        } else {
241            let mut variables = AHashMap::new();
242            variables.insert(key, value.to_string());
243            self.variables = Some(variables);
244        }
245    }
246
247    /// Returns the value of a variable by key.
248    ///
249    /// # Arguments
250    /// * `key` - The key of the variable to retrieve
251    ///
252    /// Returns: Option<&str> representing the value of the variable, or None if the variable does not exist
253    #[inline]
254    pub fn get_variable(&self, key: &str) -> Option<&str> {
255        self.variables
256            .as_ref()
257            .and_then(|vars| vars.get(key).map(|v| v.as_str()))
258    }
259
260    /// Returns the upstream response time if it's less than one hour, otherwise None.
261    /// This helps filter out potentially invalid or stale timing data.
262    ///
263    /// Returns: Option<u64> representing milliseconds, or None if time exceeds 1 hour
264    #[inline]
265    pub fn get_upstream_response_time(&self) -> Option<u64> {
266        if let Some(value) = self.upstream_response_time {
267            if value < ONE_HOUR_MS {
268                return Some(value);
269            }
270        }
271        None
272    }
273
274    /// Returns the upstream connect time if it's less than one hour, otherwise None.
275    /// This helps filter out potentially invalid or stale timing data.
276    ///
277    /// Returns: Option<u64> representing milliseconds, or None if time exceeds 1 hour
278    #[inline]
279    pub fn get_upstream_connect_time(&self) -> Option<u64> {
280        if let Some(value) = self.upstream_connect_time {
281            if value < ONE_HOUR_MS {
282                return Some(value);
283            }
284        }
285        None
286    }
287
288    /// Returns the upstream processing time if it's less than one hour, otherwise None.
289    /// This helps filter out potentially invalid or stale timing data.
290    ///
291    /// Returns: Option<u64> representing milliseconds, or None if time exceeds 1 hour
292    #[inline]
293    pub fn get_upstream_processing_time(&self) -> Option<u64> {
294        if let Some(value) = self.upstream_processing_time {
295            if value < ONE_HOUR_MS {
296                return Some(value);
297            }
298        }
299        None
300    }
301
302    /// Adds a plugin processing time to the context
303    ///
304    /// # Arguments
305    /// * `name` - The name of the plugin
306    /// * `time` - The time taken by the plugin in milliseconds
307    #[inline]
308    pub fn add_plugin_processing_time(&mut self, name: &str, time: u32) {
309        if let Some(times) = self.plugin_processing_times.as_mut() {
310            times.push((name.to_string(), time));
311        } else {
312            let mut times = Vec::with_capacity(5);
313            times.push((name.to_string(), time));
314            self.plugin_processing_times = Some(times);
315        }
316    }
317
318    /// Appends a formatted value to the provided buffer based on the given key.
319    /// Handles various metrics including connection info, timing data, and TLS details.
320    ///
321    /// # Arguments
322    /// * `buf` - The BytesMut buffer to append the value to
323    /// * `key` - The key identifying which state value to format and append
324    ///
325    /// Returns: The modified BytesMut buffer
326    #[inline]
327    pub fn append_value(&self, mut buf: BytesMut, key: &str) -> BytesMut {
328        match key {
329            "connection_id" => {
330                buf.extend(
331                    itoa::Buffer::new().format(self.connection_id).as_bytes(),
332                );
333            },
334            "upstream_reused" => {
335                if self.upstream_reused {
336                    buf.extend(b"true");
337                } else {
338                    buf.extend(b"false");
339                }
340            },
341            "upstream_addr" => buf.extend(self.upstream_address.as_bytes()),
342            "processing" => buf
343                .extend(itoa::Buffer::new().format(self.processing).as_bytes()),
344            "upstream_connect_time" => {
345                if let Some(ms) = self.get_upstream_connect_time() {
346                    buf.extend(itoa::Buffer::new().format(ms).as_bytes());
347                }
348            },
349            "upstream_connect_time_human" => {
350                if let Some(ms) = self.get_upstream_connect_time() {
351                    buf = format_duration(buf, ms);
352                }
353            },
354            "upstream_connected" => {
355                if let Some(value) = self.upstream_connected {
356                    buf.extend(itoa::Buffer::new().format(value).as_bytes());
357                }
358            },
359            "upstream_processing_time" => {
360                if let Some(ms) = self.get_upstream_processing_time() {
361                    buf.extend(itoa::Buffer::new().format(ms).as_bytes());
362                }
363            },
364            "upstream_processing_time_human" => {
365                if let Some(ms) = self.get_upstream_processing_time() {
366                    buf = format_duration(buf, ms);
367                }
368            },
369            "upstream_response_time" => {
370                if let Some(ms) = self.get_upstream_response_time() {
371                    buf.extend(itoa::Buffer::new().format(ms).as_bytes());
372                }
373            },
374            "upstream_response_time_human" => {
375                if let Some(ms) = self.get_upstream_response_time() {
376                    buf = format_duration(buf, ms);
377                }
378            },
379            "upstream_tcp_connect_time" => {
380                if let Some(ms) = self.upstream_tcp_connect_time {
381                    buf.extend(itoa::Buffer::new().format(ms).as_bytes());
382                }
383            },
384            "upstream_tcp_connect_time_human" => {
385                if let Some(ms) = self.upstream_tcp_connect_time {
386                    buf = format_duration(buf, ms);
387                }
388            },
389            "upstream_tls_handshake_time" => {
390                if let Some(ms) = self.upstream_tls_handshake_time {
391                    buf.extend(itoa::Buffer::new().format(ms).as_bytes());
392                }
393            },
394            "upstream_tls_handshake_time_human" => {
395                if let Some(ms) = self.upstream_tls_handshake_time {
396                    buf = format_duration(buf, ms);
397                }
398            },
399            "upstream_connection_time" => {
400                if let Some(ms) = self.upstream_connection_time {
401                    buf.extend(itoa::Buffer::new().format(ms).as_bytes());
402                }
403            },
404            "upstream_connection_time_human" => {
405                if let Some(ms) = self.upstream_connection_time {
406                    buf = format_duration(buf, ms);
407                }
408            },
409            "location" => {
410                if !self.location.is_empty() {
411                    buf.extend(self.location.as_bytes())
412                }
413            },
414            "connection_time" => {
415                buf.extend(
416                    itoa::Buffer::new().format(self.connection_time).as_bytes(),
417                );
418            },
419            "connection_time_human" => {
420                buf = format_duration(buf, self.connection_time)
421            },
422            "connection_reused" => {
423                if self.connection_reused {
424                    buf.extend(b"true");
425                } else {
426                    buf.extend(b"false");
427                }
428            },
429            "tls_version" => {
430                if let Some(value) = &self.tls_version {
431                    buf.extend(value.as_bytes());
432                }
433            },
434            "tls_cipher" => {
435                if let Some(value) = &self.tls_cipher {
436                    buf.extend(value.as_bytes());
437                }
438            },
439            "tls_handshake_time" => {
440                if let Some(ms) = self.tls_handshake_time {
441                    buf.extend(itoa::Buffer::new().format(ms).as_bytes());
442                }
443            },
444            "tls_handshake_time_human" => {
445                if let Some(value) = self.tls_handshake_time {
446                    buf = format_duration(buf, value);
447                }
448            },
449            "compression_time" => {
450                if let Some(value) = &self.compression_stat {
451                    buf.extend(
452                        itoa::Buffer::new()
453                            .format(value.duration.as_millis() as u64)
454                            .as_bytes(),
455                    );
456                }
457            },
458            "compression_time_human" => {
459                if let Some(value) = &self.compression_stat {
460                    buf =
461                        format_duration(buf, value.duration.as_millis() as u64);
462                }
463            },
464            "compression_ratio" => {
465                if let Some(value) = &self.compression_stat {
466                    buf.extend(format!("{:.1}", value.ratio()).as_bytes());
467                }
468            },
469            "cache_lookup_time" => {
470                if let Some(ms) = self.cache_lookup_time {
471                    buf.extend(itoa::Buffer::new().format(ms).as_bytes());
472                }
473            },
474            "cache_lookup_time_human" => {
475                if let Some(ms) = self.cache_lookup_time {
476                    buf = format_duration(buf, ms);
477                }
478            },
479            "cache_lock_time" => {
480                if let Some(ms) = self.cache_lock_time {
481                    buf.extend(itoa::Buffer::new().format(ms).as_bytes());
482                }
483            },
484            "cache_lock_time_human" => {
485                if let Some(ms) = self.cache_lock_time {
486                    buf = format_duration(buf, ms);
487                }
488            },
489            "service_time" => {
490                buf.extend(
491                    itoa::Buffer::new()
492                        .format(now_ms() - self.created_at)
493                        .as_bytes(),
494                );
495            },
496            "service_time_human" => {
497                buf = format_duration(buf, now_ms() - self.created_at)
498            },
499            _ => {},
500        }
501        buf
502    }
503
504    /// Generates a Server-Timing header value based on the context's timing metrics.
505    ///
506    /// The Server-Timing header allows servers to communicate performance metrics
507    /// about the request-response cycle to the client. This implementation includes
508    /// various timing metrics like connection time, processing time, and cache operations.
509    ///
510    /// Returns a String containing the formatted Server-Timing header value.
511    pub fn generate_server_timing(&self) -> String {
512        // the response header should be set before get body from upstream,
513        // so upstream response time, compression time, etc. are not included
514
515        let mut timings = Vec::new();
516
517        let mut upstream_time = 0;
518        let mut upstream_time_set = false;
519
520        // Add upstream metrics
521        if let Some(time) = self.get_upstream_connect_time() {
522            upstream_time += time;
523            upstream_time_set = true;
524            timings.push(format!("upstream.connect;dur={}", time));
525        }
526
527        if let Some(time) = self.get_upstream_processing_time() {
528            upstream_time += time;
529            upstream_time_set = true;
530            timings.push(format!("upstream.processing;dur={}", time));
531        }
532
533        if upstream_time_set {
534            timings.push(format!("upstream;dur={}", upstream_time));
535        }
536
537        let mut cache_time = 0;
538        let mut cache_time_set = false;
539        // Add cache metrics
540        if let Some(time) = self.cache_lookup_time {
541            cache_time += time;
542            cache_time_set = true;
543            timings.push(format!("cache.lookup;dur={}", time));
544        }
545
546        if let Some(time) = self.cache_lock_time {
547            cache_time += time;
548            cache_time_set = true;
549            timings.push(format!("cache.lock;dur={}", time));
550        }
551        if cache_time_set {
552            timings.push(format!("cache;dur={}", cache_time));
553        }
554
555        if let Some(times) = self.plugin_processing_times.as_ref() {
556            let mut plugin_time = 0;
557            for (name, time) in times {
558                plugin_time += time;
559                timings.push(format!("plugin.{name};dur={}", time));
560            }
561            timings.push(format!("plugin;dur={}", plugin_time));
562        }
563
564        // Add total service time
565        let service_time = now_ms() - self.created_at;
566        timings.push(format!("total;dur={}", service_time));
567
568        timings.join(", ")
569    }
570}
571
572/// Generates a cache key from the request method, URI and state context.
573/// The key includes an optional namespace and prefix if configured in the state.
574///
575/// # Arguments
576/// * `ctx` - The Ctx context containing cache configuration
577/// * `method` - The HTTP method as a string
578/// * `uri` - The request URI
579///
580/// Returns: A CacheKey combining the namespace, prefix (if any), method and URI
581pub fn get_cache_key(ctx: &Ctx, method: &str, uri: &Uri) -> CacheKey {
582    let namespace = ctx.cache_namespace.as_ref().map_or("", |v| v);
583    let key = if let Some(prefix) = &ctx.cache_prefix {
584        format!("{prefix}{method}:{uri}")
585    } else {
586        format!("{method}:{uri}")
587    };
588
589    CacheKey::new(namespace, key, "")
590}
591
592#[cfg(test)]
593mod tests {
594    use super::*;
595    use bytes::BytesMut;
596    use pretty_assertions::assert_eq;
597    use std::time::Duration;
598
599    #[test]
600    fn test_format_duration() {
601        let mut buf = BytesMut::new();
602        buf = format_duration(buf, (3600 + 3500) * 1000);
603        assert_eq!(b"1.9h", buf.as_ref());
604
605        buf = BytesMut::new();
606        buf = format_duration(buf, (3600 + 1800) * 1000);
607        assert_eq!(b"1.5h", buf.as_ref());
608
609        buf = BytesMut::new();
610        buf = format_duration(buf, (3600 + 100) * 1000);
611        assert_eq!(b"1h", buf.as_ref());
612
613        buf = BytesMut::new();
614        buf = format_duration(buf, (60 + 50) * 1000);
615        assert_eq!(b"1.8m", buf.as_ref());
616
617        buf = BytesMut::new();
618        buf = format_duration(buf, (60 + 2) * 1000);
619        assert_eq!(b"1m", buf.as_ref());
620
621        buf = BytesMut::new();
622        buf = format_duration(buf, 1000);
623        assert_eq!(b"1s", buf.as_ref());
624
625        buf = BytesMut::new();
626        buf = format_duration(buf, 512);
627        assert_eq!(b"512ms", buf.as_ref());
628
629        buf = BytesMut::new();
630        buf = format_duration(buf, 1112);
631        assert_eq!(b"1.1s", buf.as_ref());
632    }
633
634    #[test]
635    fn test_add_variable() {
636        let mut ctx = Ctx::new();
637        ctx.add_variable("key1", "value1");
638        ctx.add_variable("key2", "value2");
639        assert_eq!(
640            ctx.variables.clone().unwrap().get("$key1"),
641            Some(&"value1".to_string())
642        );
643        assert_eq!(
644            ctx.variables.clone().unwrap().get("$key2"),
645            Some(&"value2".to_string())
646        );
647    }
648
649    #[test]
650    fn test_now_ms() {
651        let now = now_ms();
652        assert!(now > 0);
653    }
654
655    #[test]
656    fn test_get_cache_key() {
657        let mut ctx = Ctx::new();
658        ctx.cache_namespace = Some("test".to_string());
659        let method = "GET";
660        let uri = Uri::from_static("http://example.com/");
661        let key = get_cache_key(&ctx, method, &uri);
662        assert_eq!(
663            format!("{key:?}"),
664            r#"CacheKey { namespace: "test", primary: "GET:http://example.com/", primary_bin_override: None, variance: None, user_tag: "", extensions: Extensions }"#
665        );
666
667        ctx.cache_prefix = Some("prefix_".to_string());
668        let key = get_cache_key(&ctx, method, &uri);
669        assert_eq!(
670            format!("{key:?}"),
671            r#"CacheKey { namespace: "test", primary: "prefix_GET:http://example.com/", primary_bin_override: None, variance: None, user_tag: "", extensions: Extensions }"#
672        );
673    }
674
675    #[test]
676    fn test_state() {
677        let mut ctx = Ctx::new();
678
679        ctx.connection_id = 10;
680        assert_eq!(
681            b"10",
682            ctx.append_value(BytesMut::new(), "connection_id").as_ref()
683        );
684
685        assert_eq!(
686            b"false",
687            ctx.append_value(BytesMut::new(), "upstream_reused")
688                .as_ref()
689        );
690
691        ctx.upstream_reused = true;
692        assert_eq!(
693            b"true",
694            ctx.append_value(BytesMut::new(), "upstream_reused")
695                .as_ref()
696        );
697
698        ctx.upstream_address = "192.168.1.1:80".to_string();
699        assert_eq!(
700            b"192.168.1.1:80",
701            ctx.append_value(BytesMut::new(), "upstream_addr").as_ref()
702        );
703
704        ctx.processing = 10;
705        assert_eq!(
706            b"10",
707            ctx.append_value(BytesMut::new(), "processing").as_ref()
708        );
709
710        ctx.upstream_connect_time = Some(1);
711        assert_eq!(
712            b"1",
713            ctx.append_value(BytesMut::new(), "upstream_connect_time")
714                .as_ref()
715        );
716        assert_eq!(
717            b"1ms",
718            ctx.append_value(BytesMut::new(), "upstream_connect_time_human")
719                .as_ref()
720        );
721
722        ctx.upstream_connected = Some(30);
723        assert_eq!(
724            b"30",
725            ctx.append_value(BytesMut::new(), "upstream_connected")
726                .as_ref()
727        );
728
729        ctx.upstream_processing_time = Some(2);
730        assert_eq!(
731            b"2",
732            ctx.append_value(BytesMut::new(), "upstream_processing_time")
733                .as_ref()
734        );
735        assert_eq!(
736            b"2ms",
737            ctx.append_value(BytesMut::new(), "upstream_processing_time_human")
738                .as_ref()
739        );
740
741        ctx.upstream_response_time = Some(3);
742        assert_eq!(
743            b"3",
744            ctx.append_value(BytesMut::new(), "upstream_response_time")
745                .as_ref()
746        );
747        assert_eq!(
748            b"3ms",
749            ctx.append_value(BytesMut::new(), "upstream_response_time_human")
750                .as_ref()
751        );
752
753        ctx.upstream_tcp_connect_time = Some(100);
754        assert_eq!(
755            b"100",
756            ctx.append_value(BytesMut::new(), "upstream_tcp_connect_time")
757                .as_ref()
758        );
759        assert_eq!(
760            b"100ms",
761            ctx.append_value(
762                BytesMut::new(),
763                "upstream_tcp_connect_time_human"
764            )
765            .as_ref()
766        );
767
768        ctx.upstream_tls_handshake_time = Some(110);
769        assert_eq!(
770            b"110",
771            ctx.append_value(BytesMut::new(), "upstream_tls_handshake_time")
772                .as_ref()
773        );
774        assert_eq!(
775            b"110ms",
776            ctx.append_value(
777                BytesMut::new(),
778                "upstream_tls_handshake_time_human"
779            )
780            .as_ref()
781        );
782
783        ctx.upstream_connection_time = Some(120);
784        assert_eq!(
785            b"120",
786            ctx.append_value(BytesMut::new(), "upstream_connection_time")
787                .as_ref()
788        );
789        assert_eq!(
790            b"120ms",
791            ctx.append_value(BytesMut::new(), "upstream_connection_time_human")
792                .as_ref()
793        );
794
795        ctx.location = "pingap".to_string();
796        assert_eq!(
797            b"pingap",
798            ctx.append_value(BytesMut::new(), "location").as_ref()
799        );
800
801        ctx.connection_time = 4;
802        assert_eq!(
803            b"4",
804            ctx.append_value(BytesMut::new(), "connection_time")
805                .as_ref()
806        );
807        assert_eq!(
808            b"4ms",
809            ctx.append_value(BytesMut::new(), "connection_time_human")
810                .as_ref()
811        );
812
813        assert_eq!(
814            b"false",
815            ctx.append_value(BytesMut::new(), "connection_reused")
816                .as_ref()
817        );
818        ctx.connection_reused = true;
819        assert_eq!(
820            b"true",
821            ctx.append_value(BytesMut::new(), "connection_reused")
822                .as_ref()
823        );
824
825        ctx.tls_version = Some("TLSv1.3".to_string());
826        assert_eq!(
827            b"TLSv1.3",
828            ctx.append_value(BytesMut::new(), "tls_version").as_ref()
829        );
830
831        ctx.tls_cipher =
832            Some("ECDHE_ECDSA_WITH_AES_128_GCM_SHA256".to_string());
833        assert_eq!(
834            b"ECDHE_ECDSA_WITH_AES_128_GCM_SHA256",
835            ctx.append_value(BytesMut::new(), "tls_cipher").as_ref()
836        );
837
838        ctx.tls_handshake_time = Some(101);
839        assert_eq!(
840            b"101",
841            ctx.append_value(BytesMut::new(), "tls_handshake_time")
842                .as_ref()
843        );
844        assert_eq!(
845            b"101ms",
846            ctx.append_value(BytesMut::new(), "tls_handshake_time_human")
847                .as_ref()
848        );
849
850        ctx.compression_stat = Some(CompressionStat {
851            in_bytes: 1024,
852            out_bytes: 500,
853            duration: Duration::from_millis(5),
854        });
855        assert_eq!(
856            b"5",
857            ctx.append_value(BytesMut::new(), "compression_time")
858                .as_ref()
859        );
860        assert_eq!(
861            b"5ms",
862            ctx.append_value(BytesMut::new(), "compression_time_human")
863                .as_ref()
864        );
865        assert_eq!(
866            b"2.0",
867            ctx.append_value(BytesMut::new(), "compression_ratio")
868                .as_ref()
869        );
870
871        ctx.cache_lookup_time = Some(6);
872        assert_eq!(
873            b"6",
874            ctx.append_value(BytesMut::new(), "cache_lookup_time")
875                .as_ref()
876        );
877        assert_eq!(
878            b"6ms",
879            ctx.append_value(BytesMut::new(), "cache_lookup_time_human")
880                .as_ref()
881        );
882
883        ctx.cache_lock_time = Some(7);
884        assert_eq!(
885            b"7",
886            ctx.append_value(BytesMut::new(), "cache_lock_time")
887                .as_ref()
888        );
889        assert_eq!(
890            b"7ms",
891            ctx.append_value(BytesMut::new(), "cache_lock_time_human")
892                .as_ref()
893        );
894
895        ctx.created_at = now_ms() - 1;
896        assert_eq!(
897            true,
898            ctx.append_value(BytesMut::new(), "service_time_human")
899                .ends_with(b"ms")
900        );
901    }
902
903    #[test]
904    fn test_add_plugin_processing_time() {
905        let mut ctx = Ctx::new();
906        ctx.add_plugin_processing_time("plugin1", 100);
907        ctx.add_plugin_processing_time("plugin2", 200);
908        assert_eq!(
909            ctx.plugin_processing_times,
910            Some(vec![
911                ("plugin1".to_string(), 100),
912                ("plugin2".to_string(), 200)
913            ])
914        );
915    }
916
917    #[test]
918    fn test_generate_server_timing() {
919        let mut ctx = Ctx::new();
920        ctx.upstream_connect_time = Some(1);
921        ctx.upstream_processing_time = Some(2);
922        ctx.cache_lookup_time = Some(6);
923        ctx.cache_lock_time = Some(7);
924        ctx.created_at = now_ms() - 1;
925        ctx.add_plugin_processing_time("plugin1", 100);
926        ctx.add_plugin_processing_time("plugin2", 200);
927
928        // total duration sometime changes(it may be 1 or 2), so we just check the prefix
929        assert_eq!(true, ctx.generate_server_timing().starts_with("upstream.connect;dur=1, upstream.processing;dur=2, upstream;dur=3, cache.lookup;dur=6, cache.lock;dur=7, cache;dur=13, plugin.plugin1;dur=100, plugin.plugin2;dur=200, plugin;dur=300, total;dur="));
930    }
931}