pingap_core/
ctx.rs

1// Copyright 2024-2025 Tree xie.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::now_ms;
16use ahash::AHashMap;
17use bytes::{Bytes, BytesMut};
18use http::StatusCode;
19use http::Uri;
20#[cfg(feature = "tracing")]
21use opentelemetry::{
22    global::{BoxedSpan, BoxedTracer, ObjectSafeSpan},
23    trace::{SpanKind, TraceContextExt, Tracer},
24    Context,
25};
26use pingora::cache::CacheKey;
27use pingora_limits::inflight::Guard;
28use std::time::Duration;
29
30const SECOND: u64 = 1_000;
31const MINUTE: u64 = 60 * SECOND;
32const HOUR: u64 = 60 * MINUTE;
33
34#[inline]
35/// Format the duration in human readable format
36fn format_duration(mut buf: BytesMut, ms: u64) -> BytesMut {
37    if ms >= HOUR {
38        buf.extend(itoa::Buffer::new().format(ms / HOUR).as_bytes());
39        let value = ms % HOUR * 10 / HOUR;
40        if value != 0 {
41            buf.extend(b".");
42            buf.extend(itoa::Buffer::new().format(value).as_bytes());
43        }
44        buf.extend(b"h");
45    } else if ms >= MINUTE {
46        buf.extend(itoa::Buffer::new().format(ms / MINUTE).as_bytes());
47        let value = ms % MINUTE * 10 / MINUTE;
48        if value != 0 {
49            buf.extend(b".");
50            buf.extend(itoa::Buffer::new().format(value).as_bytes());
51        }
52        buf.extend(b"m");
53    } else if ms >= SECOND {
54        buf.extend(itoa::Buffer::new().format(ms / SECOND).as_bytes());
55        let value = (ms % SECOND) / 100;
56        if value != 0 {
57            buf.extend(b".");
58            buf.extend(itoa::Buffer::new().format(value).as_bytes());
59        }
60        buf.extend(b"s");
61    } else {
62        buf.extend(itoa::Buffer::new().format(ms).as_bytes());
63        buf.extend(b"ms");
64    }
65    buf
66}
67
68/// Trait for modifying the response body
69pub trait ModifyResponseBody: Sync + Send {
70    fn handle(&self, data: Bytes) -> pingora::Result<Bytes>;
71    fn name(&self) -> String {
72        "unknown".to_string()
73    }
74}
75
76#[derive(Default)]
77/// Statistics about response compression operations
78pub struct CompressionStat {
79    /// Algorithm used for compression
80    pub algorithm: String,
81    /// Size of the data before compression in bytes
82    pub in_bytes: usize,
83    /// Size of the data after compression in bytes
84    pub out_bytes: usize,
85    /// Time taken to perform the compression operation
86    pub duration: Duration,
87}
88
89impl CompressionStat {
90    pub fn ratio(&self) -> f64 {
91        (self.in_bytes as f64) / (self.out_bytes as f64)
92    }
93}
94
95#[cfg(feature = "tracing")]
96pub struct OtelTracer {
97    pub tracer: BoxedTracer,
98    pub http_request_span: BoxedSpan,
99}
100
101#[cfg(feature = "tracing")]
102impl OtelTracer {
103    #[inline]
104    pub fn new_upstream_span(&self, name: &str) -> BoxedSpan {
105        self.tracer
106            .span_builder(name.to_string())
107            .with_kind(SpanKind::Client)
108            .start_with_context(
109                &self.tracer,
110                &Context::current().with_remote_span_context(
111                    self.http_request_span.span_context().clone(),
112                ),
113            )
114    }
115}
116
117/// Represents the state of a request/response cycle, tracking various metrics and properties
118/// including connection details, caching information, and upstream server interactions.
119#[derive(Default)]
120pub struct Ctx {
121    /// Unique identifier for the connection, it should be unique among all existing connections of the same type
122    pub connection_id: usize,
123    /// Number of requests currently processing
124    pub processing: i32,
125    /// Total number of requests accepted
126    pub accepted: u64,
127    /// Number of requests currently processing for the current location
128    pub location_processing: i32,
129    /// Total number of requests accepted for the current location
130    pub location_accepted: u64,
131    /// Timestamp when this context was created (in milliseconds)
132    pub created_at: u64,
133    /// TLS version used by the client connection (e.g., "TLSv1.3")
134    pub tls_version: Option<String>,
135    /// TLS cipher suite used by the client connection
136    pub tls_cipher: Option<String>,
137    /// Time taken for TLS handshake with client (in milliseconds)
138    pub tls_handshake_time: Option<u64>,
139    /// HTTP status code of the response
140    pub status: Option<StatusCode>,
141    /// Total time the connection has been alive (in milliseconds)
142    /// May be large for reused connections
143    pub connection_time: u64,
144    /// Indicates if this connection is reused
145    pub connection_reused: bool,
146    /// The location handling request
147    pub location: String,
148    /// Address of the upstream server
149    pub upstream_address: String,
150    /// Client's IP address
151    pub client_ip: Option<String>,
152    /// Remote connection port
153    pub remote_port: Option<u16>,
154    /// Remote connection address
155    pub remote_addr: Option<String>,
156    /// Server's listening port
157    pub server_port: Option<u16>,
158    /// Server's address
159    pub server_addr: Option<String>,
160    /// Rate limiting guard
161    pub guard: Option<Guard>,
162    /// Unique identifier for the request
163    pub request_id: Option<String>,
164    /// Namespace for cache entries
165    pub cache_namespace: Option<String>,
166    /// Cache keys
167    pub cache_keys: Option<Vec<String>>,
168    /// Whether to check cache control headers
169    pub check_cache_control: bool,
170    /// Time spent looking up cache entries (in milliseconds)
171    pub cache_lookup_time: Option<u64>,
172    /// Time spent acquiring cache locks (in milliseconds)
173    pub cache_lock_time: Option<u64>,
174    /// Maximum time-to-live for cache entries
175    pub cache_max_ttl: Option<Duration>,
176    /// The upstream server
177    pub upstream: String,
178    /// Indicates if the upstream connection is reused
179    pub upstream_reused: bool,
180    /// Number of requests processing by upstream
181    pub upstream_processing: Option<i32>,
182    /// Time taken to establish/reuse upstream connection (in milliseconds)
183    pub upstream_connect_time: Option<u64>,
184    /// Current number of active upstream connections
185    pub upstream_connected: Option<i32>,
186    /// Time taken for TCP connection to upstream (in milliseconds)
187    pub upstream_tcp_connect_time: Option<u64>,
188    /// Time taken for TLS handshake with upstream (in milliseconds)
189    pub upstream_tls_handshake_time: Option<u64>,
190    /// Time taken by upstream server to process request (in milliseconds)
191    pub upstream_processing_time: Option<u64>,
192    /// Total time taken by upstream server (in milliseconds)
193    pub upstream_response_time: Option<u64>,
194    /// Total time the upstream connection has been alive (in milliseconds)
195    /// May be large for reused connections
196    pub upstream_connection_time: Option<u64>,
197    /// Size of the request payload in bytes
198    pub payload_size: usize,
199    /// Statistics about response compression
200    pub compression_stat: Option<CompressionStat>,
201    /// Handler for modifying response body
202    pub modify_response_body: Option<Box<dyn ModifyResponseBody>>,
203    pub modify_upstream_response_body: Option<Box<dyn ModifyResponseBody>>,
204    /// Body buffer of modified response
205    pub response_body: Option<BytesMut>,
206    /// Number of cache reading operations
207    pub cache_reading: Option<u32>,
208    /// Number of cache writing operations
209    pub cache_writing: Option<u32>,
210    /// OpenTelemetry tracer (only with "full" feature)
211    #[cfg(feature = "tracing")]
212    pub otel_tracer: Option<OtelTracer>,
213    /// OpenTelemetry span for upstream requests (only with "full" feature)
214    #[cfg(feature = "tracing")]
215    pub upstream_span: Option<BoxedSpan>,
216    /// Custom variables map for request processing
217    pub variables: Option<AHashMap<String, String>>,
218    /// Plugin processing times
219    pub plugin_processing_times: Option<Vec<(String, u32)>>,
220}
221
222const ONE_HOUR_MS: u64 = 60 * 60 * 1000;
223
224impl Ctx {
225    /// Creates a new Ctx instance with the current timestamp and default values.
226    ///
227    /// Returns a new Ctx struct initialized with the current timestamp and all other fields
228    /// set to their default values.
229    pub fn new() -> Self {
230        Self {
231            created_at: now_ms(),
232            ..Default::default()
233        }
234    }
235
236    /// Adds a variable to the state's variables map with the given key and value.
237    /// The key will be automatically prefixed with '$' before being stored.
238    ///
239    /// # Arguments
240    /// * `key` - The variable name (will be prefixed with '$')
241    /// * `value` - The value to store for this variable
242    #[inline]
243    pub fn add_variable(&mut self, key: &str, value: &str) {
244        let key = format!("${key}");
245        if let Some(variables) = self.variables.as_mut() {
246            variables.insert(key, value.to_string());
247        } else {
248            let mut variables = AHashMap::new();
249            variables.insert(key, value.to_string());
250            self.variables = Some(variables);
251        }
252    }
253
254    /// Returns the value of a variable by key.
255    ///
256    /// # Arguments
257    /// * `key` - The key of the variable to retrieve
258    ///
259    /// Returns: Option<&str> representing the value of the variable, or None if the variable does not exist
260    #[inline]
261    pub fn get_variable(&self, key: &str) -> Option<&str> {
262        self.variables
263            .as_ref()
264            .and_then(|vars| vars.get(key).map(|v| v.as_str()))
265    }
266
267    /// Returns the upstream response time if it's less than one hour, otherwise None.
268    /// This helps filter out potentially invalid or stale timing data.
269    ///
270    /// Returns: Option<u64> representing milliseconds, or None if time exceeds 1 hour
271    #[inline]
272    pub fn get_upstream_response_time(&self) -> Option<u64> {
273        if let Some(value) = self.upstream_response_time {
274            if value < ONE_HOUR_MS {
275                return Some(value);
276            }
277        }
278        None
279    }
280
281    /// Returns the upstream connect time if it's less than one hour, otherwise None.
282    /// This helps filter out potentially invalid or stale timing data.
283    ///
284    /// Returns: Option<u64> representing milliseconds, or None if time exceeds 1 hour
285    #[inline]
286    pub fn get_upstream_connect_time(&self) -> Option<u64> {
287        if let Some(value) = self.upstream_connect_time {
288            if value < ONE_HOUR_MS {
289                return Some(value);
290            }
291        }
292        None
293    }
294
295    /// Returns the upstream processing time if it's less than one hour, otherwise None.
296    /// This helps filter out potentially invalid or stale timing data.
297    ///
298    /// Returns: Option<u64> representing milliseconds, or None if time exceeds 1 hour
299    #[inline]
300    pub fn get_upstream_processing_time(&self) -> Option<u64> {
301        if let Some(value) = self.upstream_processing_time {
302            if value < ONE_HOUR_MS {
303                return Some(value);
304            }
305        }
306        None
307    }
308
309    /// Adds a plugin processing time to the context
310    ///
311    /// # Arguments
312    /// * `name` - The name of the plugin
313    /// * `time` - The time taken by the plugin in milliseconds
314    #[inline]
315    pub fn add_plugin_processing_time(&mut self, name: &str, time: u32) {
316        if let Some(times) = self.plugin_processing_times.as_mut() {
317            times.push((name.to_string(), time));
318        } else {
319            let mut times = Vec::with_capacity(5);
320            times.push((name.to_string(), time));
321            self.plugin_processing_times = Some(times);
322        }
323    }
324
325    /// Appends a formatted value to the provided buffer based on the given key.
326    /// Handles various metrics including connection info, timing data, and TLS details.
327    ///
328    /// # Arguments
329    /// * `buf` - The BytesMut buffer to append the value to
330    /// * `key` - The key identifying which state value to format and append
331    ///
332    /// Returns: The modified BytesMut buffer
333    #[inline]
334    pub fn append_value(&self, mut buf: BytesMut, key: &str) -> BytesMut {
335        match key {
336            "connection_id" => {
337                buf.extend(
338                    itoa::Buffer::new().format(self.connection_id).as_bytes(),
339                );
340            },
341            "upstream_reused" => {
342                if self.upstream_reused {
343                    buf.extend(b"true");
344                } else {
345                    buf.extend(b"false");
346                }
347            },
348            "upstream_addr" => buf.extend(self.upstream_address.as_bytes()),
349            "processing" => buf
350                .extend(itoa::Buffer::new().format(self.processing).as_bytes()),
351            "upstream_connect_time" => {
352                if let Some(ms) = self.get_upstream_connect_time() {
353                    buf.extend(itoa::Buffer::new().format(ms).as_bytes());
354                }
355            },
356            "upstream_connect_time_human" => {
357                if let Some(ms) = self.get_upstream_connect_time() {
358                    buf = format_duration(buf, ms);
359                }
360            },
361            "upstream_connected" => {
362                if let Some(value) = self.upstream_connected {
363                    buf.extend(itoa::Buffer::new().format(value).as_bytes());
364                }
365            },
366            "upstream_processing_time" => {
367                if let Some(ms) = self.get_upstream_processing_time() {
368                    buf.extend(itoa::Buffer::new().format(ms).as_bytes());
369                }
370            },
371            "upstream_processing_time_human" => {
372                if let Some(ms) = self.get_upstream_processing_time() {
373                    buf = format_duration(buf, ms);
374                }
375            },
376            "upstream_response_time" => {
377                if let Some(ms) = self.get_upstream_response_time() {
378                    buf.extend(itoa::Buffer::new().format(ms).as_bytes());
379                }
380            },
381            "upstream_response_time_human" => {
382                if let Some(ms) = self.get_upstream_response_time() {
383                    buf = format_duration(buf, ms);
384                }
385            },
386            "upstream_tcp_connect_time" => {
387                if let Some(ms) = self.upstream_tcp_connect_time {
388                    buf.extend(itoa::Buffer::new().format(ms).as_bytes());
389                }
390            },
391            "upstream_tcp_connect_time_human" => {
392                if let Some(ms) = self.upstream_tcp_connect_time {
393                    buf = format_duration(buf, ms);
394                }
395            },
396            "upstream_tls_handshake_time" => {
397                if let Some(ms) = self.upstream_tls_handshake_time {
398                    buf.extend(itoa::Buffer::new().format(ms).as_bytes());
399                }
400            },
401            "upstream_tls_handshake_time_human" => {
402                if let Some(ms) = self.upstream_tls_handshake_time {
403                    buf = format_duration(buf, ms);
404                }
405            },
406            "upstream_connection_time" => {
407                if let Some(ms) = self.upstream_connection_time {
408                    buf.extend(itoa::Buffer::new().format(ms).as_bytes());
409                }
410            },
411            "upstream_connection_time_human" => {
412                if let Some(ms) = self.upstream_connection_time {
413                    buf = format_duration(buf, ms);
414                }
415            },
416            "location" => {
417                if !self.location.is_empty() {
418                    buf.extend(self.location.as_bytes())
419                }
420            },
421            "connection_time" => {
422                buf.extend(
423                    itoa::Buffer::new().format(self.connection_time).as_bytes(),
424                );
425            },
426            "connection_time_human" => {
427                buf = format_duration(buf, self.connection_time)
428            },
429            "connection_reused" => {
430                if self.connection_reused {
431                    buf.extend(b"true");
432                } else {
433                    buf.extend(b"false");
434                }
435            },
436            "tls_version" => {
437                if let Some(value) = &self.tls_version {
438                    buf.extend(value.as_bytes());
439                }
440            },
441            "tls_cipher" => {
442                if let Some(value) = &self.tls_cipher {
443                    buf.extend(value.as_bytes());
444                }
445            },
446            "tls_handshake_time" => {
447                if let Some(ms) = self.tls_handshake_time {
448                    buf.extend(itoa::Buffer::new().format(ms).as_bytes());
449                }
450            },
451            "tls_handshake_time_human" => {
452                if let Some(value) = self.tls_handshake_time {
453                    buf = format_duration(buf, value);
454                }
455            },
456            "compression_time" => {
457                if let Some(value) = &self.compression_stat {
458                    buf.extend(
459                        itoa::Buffer::new()
460                            .format(value.duration.as_millis() as u64)
461                            .as_bytes(),
462                    );
463                }
464            },
465            "compression_time_human" => {
466                if let Some(value) = &self.compression_stat {
467                    buf =
468                        format_duration(buf, value.duration.as_millis() as u64);
469                }
470            },
471            "compression_ratio" => {
472                if let Some(value) = &self.compression_stat {
473                    buf.extend(format!("{:.1}", value.ratio()).as_bytes());
474                }
475            },
476            "cache_lookup_time" => {
477                if let Some(ms) = self.cache_lookup_time {
478                    buf.extend(itoa::Buffer::new().format(ms).as_bytes());
479                }
480            },
481            "cache_lookup_time_human" => {
482                if let Some(ms) = self.cache_lookup_time {
483                    buf = format_duration(buf, ms);
484                }
485            },
486            "cache_lock_time" => {
487                if let Some(ms) = self.cache_lock_time {
488                    buf.extend(itoa::Buffer::new().format(ms).as_bytes());
489                }
490            },
491            "cache_lock_time_human" => {
492                if let Some(ms) = self.cache_lock_time {
493                    buf = format_duration(buf, ms);
494                }
495            },
496            "service_time" => {
497                buf.extend(
498                    itoa::Buffer::new()
499                        .format(now_ms() - self.created_at)
500                        .as_bytes(),
501                );
502            },
503            "service_time_human" => {
504                buf = format_duration(buf, now_ms() - self.created_at)
505            },
506            _ => {},
507        }
508        buf
509    }
510
511    /// Generates a Server-Timing header value based on the context's timing metrics.
512    ///
513    /// The Server-Timing header allows servers to communicate performance metrics
514    /// about the request-response cycle to the client. This implementation includes
515    /// various timing metrics like connection time, processing time, and cache operations.
516    ///
517    /// Returns a String containing the formatted Server-Timing header value.
518    pub fn generate_server_timing(&self) -> String {
519        // the response header should be set before get body from upstream,
520        // so upstream response time, compression time, etc. are not included
521
522        let mut timings = Vec::new();
523
524        let mut upstream_time = 0;
525        let mut upstream_time_set = false;
526
527        // Add upstream metrics
528        if let Some(time) = self.get_upstream_connect_time() {
529            upstream_time += time;
530            upstream_time_set = true;
531            timings.push(format!("upstream.connect;dur={time}"));
532        }
533
534        if let Some(time) = self.get_upstream_processing_time() {
535            upstream_time += time;
536            upstream_time_set = true;
537            timings.push(format!("upstream.processing;dur={time}"));
538        }
539
540        if upstream_time_set {
541            timings.push(format!("upstream;dur={upstream_time}"));
542        }
543
544        let mut cache_time = 0;
545        let mut cache_time_set = false;
546        // Add cache metrics
547        if let Some(time) = self.cache_lookup_time {
548            cache_time += time;
549            cache_time_set = true;
550            timings.push(format!("cache.lookup;dur={time}"));
551        }
552
553        if let Some(time) = self.cache_lock_time {
554            cache_time += time;
555            cache_time_set = true;
556            timings.push(format!("cache.lock;dur={time}"));
557        }
558        if cache_time_set {
559            timings.push(format!("cache;dur={cache_time}"));
560        }
561
562        if let Some(times) = self.plugin_processing_times.as_ref() {
563            let mut plugin_time = 0;
564            for (name, time) in times {
565                plugin_time += time;
566                timings.push(format!("plugin.{name};dur={time}"));
567            }
568            timings.push(format!("plugin;dur={plugin_time}"));
569        }
570
571        // Add total service time
572        let service_time = now_ms() - self.created_at;
573        timings.push(format!("total;dur={service_time}"));
574
575        timings.join(", ")
576    }
577    #[inline]
578    pub fn push_cache_key(&mut self, key: String) {
579        if let Some(cache_keys) = &mut self.cache_keys {
580            cache_keys.push(key.to_string());
581        } else {
582            self.cache_keys = Some(vec![key.to_string()]);
583        }
584    }
585    #[inline]
586    pub fn extend_cache_keys(&mut self, keys: Vec<String>) {
587        if let Some(cache_keys) = &mut self.cache_keys {
588            cache_keys.extend(keys);
589        } else {
590            self.cache_keys = Some(keys);
591        }
592    }
593}
594
595/// Generates a cache key from the request method, URI and state context.
596/// The key includes an optional namespace and prefix if configured in the state.
597///
598/// # Arguments
599/// * `ctx` - The Ctx context containing cache configuration
600/// * `method` - The HTTP method as a string
601/// * `uri` - The request URI
602///
603/// Returns: A CacheKey combining the namespace, prefix (if any), method and URI
604pub fn get_cache_key(ctx: &Ctx, method: &str, uri: &Uri) -> CacheKey {
605    let namespace = ctx.cache_namespace.as_ref().map_or("", |v| v);
606    let key = if let Some(keys) = &ctx.cache_keys {
607        format!("{}:{method}:{uri}", keys.join(":"))
608    } else {
609        format!("{method}:{uri}")
610    };
611
612    CacheKey::new(namespace, key, "")
613}
614
615#[cfg(test)]
616mod tests {
617    use super::*;
618    use bytes::BytesMut;
619    use pretty_assertions::assert_eq;
620    use std::time::Duration;
621
622    #[test]
623    fn test_format_duration() {
624        let mut buf = BytesMut::new();
625        buf = format_duration(buf, (3600 + 3500) * 1000);
626        assert_eq!(b"1.9h", buf.as_ref());
627
628        buf = BytesMut::new();
629        buf = format_duration(buf, (3600 + 1800) * 1000);
630        assert_eq!(b"1.5h", buf.as_ref());
631
632        buf = BytesMut::new();
633        buf = format_duration(buf, (3600 + 100) * 1000);
634        assert_eq!(b"1h", buf.as_ref());
635
636        buf = BytesMut::new();
637        buf = format_duration(buf, (60 + 50) * 1000);
638        assert_eq!(b"1.8m", buf.as_ref());
639
640        buf = BytesMut::new();
641        buf = format_duration(buf, (60 + 2) * 1000);
642        assert_eq!(b"1m", buf.as_ref());
643
644        buf = BytesMut::new();
645        buf = format_duration(buf, 1000);
646        assert_eq!(b"1s", buf.as_ref());
647
648        buf = BytesMut::new();
649        buf = format_duration(buf, 512);
650        assert_eq!(b"512ms", buf.as_ref());
651
652        buf = BytesMut::new();
653        buf = format_duration(buf, 1112);
654        assert_eq!(b"1.1s", buf.as_ref());
655    }
656
657    #[test]
658    fn test_add_variable() {
659        let mut ctx = Ctx::new();
660        ctx.add_variable("key1", "value1");
661        ctx.add_variable("key2", "value2");
662        assert_eq!(
663            ctx.variables.clone().unwrap().get("$key1"),
664            Some(&"value1".to_string())
665        );
666        assert_eq!(
667            ctx.variables.clone().unwrap().get("$key2"),
668            Some(&"value2".to_string())
669        );
670    }
671
672    #[test]
673    fn test_now_ms() {
674        let now = now_ms();
675        assert!(now > 0);
676    }
677
678    #[test]
679    fn test_get_cache_key() {
680        let mut ctx = Ctx::new();
681        ctx.cache_namespace = Some("test".to_string());
682        let method = "GET";
683        let uri = Uri::from_static("http://example.com/");
684        let key = get_cache_key(&ctx, method, &uri);
685        assert_eq!(key.primary_key_str(), Some("GET:http://example.com/"));
686        assert_eq!(key.namespace_str(), Some("test"));
687
688        ctx.cache_keys = Some(vec!["prefix".to_string()]);
689        let key = get_cache_key(&ctx, method, &uri);
690        assert_eq!(
691            key.primary_key_str(),
692            Some("prefix:GET:http://example.com/")
693        );
694        assert_eq!(key.namespace_str(), Some("test"));
695    }
696
697    #[test]
698    fn test_state() {
699        let mut ctx = Ctx::new();
700
701        ctx.connection_id = 10;
702        assert_eq!(
703            b"10",
704            ctx.append_value(BytesMut::new(), "connection_id").as_ref()
705        );
706
707        assert_eq!(
708            b"false",
709            ctx.append_value(BytesMut::new(), "upstream_reused")
710                .as_ref()
711        );
712
713        ctx.upstream_reused = true;
714        assert_eq!(
715            b"true",
716            ctx.append_value(BytesMut::new(), "upstream_reused")
717                .as_ref()
718        );
719
720        ctx.upstream_address = "192.168.1.1:80".to_string();
721        assert_eq!(
722            b"192.168.1.1:80",
723            ctx.append_value(BytesMut::new(), "upstream_addr").as_ref()
724        );
725
726        ctx.processing = 10;
727        assert_eq!(
728            b"10",
729            ctx.append_value(BytesMut::new(), "processing").as_ref()
730        );
731
732        ctx.upstream_connect_time = Some(1);
733        assert_eq!(
734            b"1",
735            ctx.append_value(BytesMut::new(), "upstream_connect_time")
736                .as_ref()
737        );
738        assert_eq!(
739            b"1ms",
740            ctx.append_value(BytesMut::new(), "upstream_connect_time_human")
741                .as_ref()
742        );
743
744        ctx.upstream_connected = Some(30);
745        assert_eq!(
746            b"30",
747            ctx.append_value(BytesMut::new(), "upstream_connected")
748                .as_ref()
749        );
750
751        ctx.upstream_processing_time = Some(2);
752        assert_eq!(
753            b"2",
754            ctx.append_value(BytesMut::new(), "upstream_processing_time")
755                .as_ref()
756        );
757        assert_eq!(
758            b"2ms",
759            ctx.append_value(BytesMut::new(), "upstream_processing_time_human")
760                .as_ref()
761        );
762
763        ctx.upstream_response_time = Some(3);
764        assert_eq!(
765            b"3",
766            ctx.append_value(BytesMut::new(), "upstream_response_time")
767                .as_ref()
768        );
769        assert_eq!(
770            b"3ms",
771            ctx.append_value(BytesMut::new(), "upstream_response_time_human")
772                .as_ref()
773        );
774
775        ctx.upstream_tcp_connect_time = Some(100);
776        assert_eq!(
777            b"100",
778            ctx.append_value(BytesMut::new(), "upstream_tcp_connect_time")
779                .as_ref()
780        );
781        assert_eq!(
782            b"100ms",
783            ctx.append_value(
784                BytesMut::new(),
785                "upstream_tcp_connect_time_human"
786            )
787            .as_ref()
788        );
789
790        ctx.upstream_tls_handshake_time = Some(110);
791        assert_eq!(
792            b"110",
793            ctx.append_value(BytesMut::new(), "upstream_tls_handshake_time")
794                .as_ref()
795        );
796        assert_eq!(
797            b"110ms",
798            ctx.append_value(
799                BytesMut::new(),
800                "upstream_tls_handshake_time_human"
801            )
802            .as_ref()
803        );
804
805        ctx.upstream_connection_time = Some(120);
806        assert_eq!(
807            b"120",
808            ctx.append_value(BytesMut::new(), "upstream_connection_time")
809                .as_ref()
810        );
811        assert_eq!(
812            b"120ms",
813            ctx.append_value(BytesMut::new(), "upstream_connection_time_human")
814                .as_ref()
815        );
816
817        ctx.location = "pingap".to_string();
818        assert_eq!(
819            b"pingap",
820            ctx.append_value(BytesMut::new(), "location").as_ref()
821        );
822
823        ctx.connection_time = 4;
824        assert_eq!(
825            b"4",
826            ctx.append_value(BytesMut::new(), "connection_time")
827                .as_ref()
828        );
829        assert_eq!(
830            b"4ms",
831            ctx.append_value(BytesMut::new(), "connection_time_human")
832                .as_ref()
833        );
834
835        assert_eq!(
836            b"false",
837            ctx.append_value(BytesMut::new(), "connection_reused")
838                .as_ref()
839        );
840        ctx.connection_reused = true;
841        assert_eq!(
842            b"true",
843            ctx.append_value(BytesMut::new(), "connection_reused")
844                .as_ref()
845        );
846
847        ctx.tls_version = Some("TLSv1.3".to_string());
848        assert_eq!(
849            b"TLSv1.3",
850            ctx.append_value(BytesMut::new(), "tls_version").as_ref()
851        );
852
853        ctx.tls_cipher =
854            Some("ECDHE_ECDSA_WITH_AES_128_GCM_SHA256".to_string());
855        assert_eq!(
856            b"ECDHE_ECDSA_WITH_AES_128_GCM_SHA256",
857            ctx.append_value(BytesMut::new(), "tls_cipher").as_ref()
858        );
859
860        ctx.tls_handshake_time = Some(101);
861        assert_eq!(
862            b"101",
863            ctx.append_value(BytesMut::new(), "tls_handshake_time")
864                .as_ref()
865        );
866        assert_eq!(
867            b"101ms",
868            ctx.append_value(BytesMut::new(), "tls_handshake_time_human")
869                .as_ref()
870        );
871
872        ctx.compression_stat = Some(CompressionStat {
873            in_bytes: 1024,
874            out_bytes: 500,
875            duration: Duration::from_millis(5),
876            ..Default::default()
877        });
878        assert_eq!(
879            b"5",
880            ctx.append_value(BytesMut::new(), "compression_time")
881                .as_ref()
882        );
883        assert_eq!(
884            b"5ms",
885            ctx.append_value(BytesMut::new(), "compression_time_human")
886                .as_ref()
887        );
888        assert_eq!(
889            b"2.0",
890            ctx.append_value(BytesMut::new(), "compression_ratio")
891                .as_ref()
892        );
893
894        ctx.cache_lookup_time = Some(6);
895        assert_eq!(
896            b"6",
897            ctx.append_value(BytesMut::new(), "cache_lookup_time")
898                .as_ref()
899        );
900        assert_eq!(
901            b"6ms",
902            ctx.append_value(BytesMut::new(), "cache_lookup_time_human")
903                .as_ref()
904        );
905
906        ctx.cache_lock_time = Some(7);
907        assert_eq!(
908            b"7",
909            ctx.append_value(BytesMut::new(), "cache_lock_time")
910                .as_ref()
911        );
912        assert_eq!(
913            b"7ms",
914            ctx.append_value(BytesMut::new(), "cache_lock_time_human")
915                .as_ref()
916        );
917
918        ctx.created_at = now_ms() - 1;
919        assert_eq!(
920            true,
921            ctx.append_value(BytesMut::new(), "service_time_human")
922                .ends_with(b"ms")
923        );
924    }
925
926    #[test]
927    fn test_add_plugin_processing_time() {
928        let mut ctx = Ctx::new();
929        ctx.add_plugin_processing_time("plugin1", 100);
930        ctx.add_plugin_processing_time("plugin2", 200);
931        assert_eq!(
932            ctx.plugin_processing_times,
933            Some(vec![
934                ("plugin1".to_string(), 100),
935                ("plugin2".to_string(), 200)
936            ])
937        );
938    }
939
940    #[test]
941    fn test_generate_server_timing() {
942        let mut ctx = Ctx::new();
943        ctx.upstream_connect_time = Some(1);
944        ctx.upstream_processing_time = Some(2);
945        ctx.cache_lookup_time = Some(6);
946        ctx.cache_lock_time = Some(7);
947        ctx.created_at = now_ms() - 1;
948        ctx.add_plugin_processing_time("plugin1", 100);
949        ctx.add_plugin_processing_time("plugin2", 200);
950
951        // total duration sometime changes(it may be 1 or 2), so we just check the prefix
952        assert_eq!(true, ctx.generate_server_timing().starts_with("upstream.connect;dur=1, upstream.processing;dur=2, upstream;dur=3, cache.lookup;dur=6, cache.lock;dur=7, cache;dur=13, plugin.plugin1;dur=100, plugin.plugin2;dur=200, plugin;dur=300, total;dur="));
953    }
954}