Skip to main content

pingap_core/
ctx.rs

1// Copyright 2024-2025 Tree xie.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::{Plugin, real_now_ms};
16use ahash::AHashMap;
17use bytes::BytesMut;
18use http::StatusCode;
19use http::Uri;
20use http::{HeaderName, HeaderValue};
21#[cfg(feature = "tracing")]
22use opentelemetry::{
23    Context,
24    global::{BoxedSpan, BoxedTracer, ObjectSafeSpan},
25    trace::{SpanKind, TraceContextExt, Tracer},
26};
27use pingora::cache::CacheKey;
28use pingora::http::RequestHeader;
29use pingora::protocols::Digest;
30use pingora::protocols::TimingDigest;
31use pingora::proxy::Session;
32use pingora_limits::inflight::Guard;
33use std::fmt::Write;
34use std::sync::Arc;
35use std::time::{Duration, Instant, SystemTime};
36
37// Constants for time conversions in milliseconds.
38const SECOND: u64 = 1_000;
39const MINUTE: u64 = 60 * SECOND;
40const HOUR: u64 = 60 * MINUTE;
41
42#[inline]
43/// Format the duration in human readable format, checking smaller units first.
44/// e.g., ms, s, m, h.
45pub fn format_duration(buf: &mut BytesMut, ms: u64) {
46    if ms < SECOND {
47        // Format as milliseconds if less than a second.
48        buf.extend_from_slice(itoa::Buffer::new().format(ms).as_bytes());
49        buf.extend_from_slice(b"ms");
50    } else if ms < MINUTE {
51        // Format as seconds with one decimal place if less than a minute.
52        buf.extend_from_slice(
53            itoa::Buffer::new().format(ms / SECOND).as_bytes(),
54        );
55        let value = (ms % SECOND) / 100;
56        if value != 0 {
57            buf.extend_from_slice(b".");
58            buf.extend_from_slice(itoa::Buffer::new().format(value).as_bytes());
59        }
60        buf.extend_from_slice(b"s");
61    } else if ms < HOUR {
62        // Format as minutes with one decimal place if less than an hour.
63        buf.extend_from_slice(
64            itoa::Buffer::new().format(ms / MINUTE).as_bytes(),
65        );
66        let value = ms % MINUTE * 10 / MINUTE;
67        if value != 0 {
68            buf.extend_from_slice(b".");
69            buf.extend_from_slice(itoa::Buffer::new().format(value).as_bytes());
70        }
71        buf.extend_from_slice(b"m");
72    } else {
73        // Format as hours with one decimal place.
74        buf.extend_from_slice(itoa::Buffer::new().format(ms / HOUR).as_bytes());
75        let value = ms % HOUR * 10 / HOUR;
76        if value != 0 {
77            buf.extend_from_slice(b".");
78            buf.extend_from_slice(itoa::Buffer::new().format(value).as_bytes());
79        }
80        buf.extend_from_slice(b"h");
81    }
82}
83
84#[derive(PartialEq)]
85pub enum ModifiedMode {
86    Upstream,
87    Response,
88}
89
90impl From<&str> for ModifiedMode {
91    fn from(value: &str) -> Self {
92        match value {
93            "upstream" => ModifiedMode::Upstream,
94            _ => ModifiedMode::Response,
95        }
96    }
97}
98
99/// Trait for modifying the response body.
100pub trait ModifyResponseBody: Sync + Send {
101    /// Handles the modification of response body data.
102    fn handle(
103        &mut self,
104        session: &Session,
105        body: &mut Option<bytes::Bytes>,
106        end_of_stream: bool,
107    ) -> pingora::Result<()>;
108    /// Returns the name of the modifier.
109    fn name(&self) -> String {
110        "unknown".to_string()
111    }
112}
113
114/// Information about a single client connection.
115#[derive(Default)]
116pub struct ConnectionInfo {
117    /// A unique identifier for the connection.
118    pub id: usize,
119    /// The IP address of the client.
120    pub client_ip: Option<String>,
121    /// The remote address of the client connection.
122    pub remote_addr: Option<String>,
123    /// The remote port of the client connection.
124    pub remote_port: Option<u16>,
125    /// The server address the client connected to.
126    pub server_addr: Option<String>,
127    /// The server port the client connected to.
128    pub server_port: Option<u16>,
129    /// The TLS version used for the connection, if any.
130    pub tls_version: Option<String>,
131    /// The TLS cipher used for the connection, if any.
132    pub tls_cipher: Option<String>,
133    /// Indicates whether the connection was reused (e.g., HTTP keep-alive).
134    pub reused: bool,
135}
136
137/// All timing-related metrics for the request lifecycle.
138// #[derive(Default)]
139pub struct Timing {
140    /// Timestamp in milliseconds when the request was created.
141    pub created_at: Instant,
142    /// The total duration of the client connection in milliseconds.
143    /// May be large for reused connections.
144    pub connection_duration: u64,
145    /// The duration of the TLS handshake with the client in milliseconds.
146    pub tls_handshake: Option<i32>,
147    /// The total duration to connect to the upstream server in milliseconds.
148    pub upstream_connect: Option<i32>,
149    /// The duration of the TCP connection to the upstream server in milliseconds.
150    pub upstream_tcp_connect: Option<i32>,
151    /// The duration of the TLS handshake with the upstream server in milliseconds.
152    pub upstream_tls_handshake: Option<i32>,
153    /// The duration the upstream server took to process the request in milliseconds.
154    pub upstream_processing: Option<i32>,
155    /// The duration from sending the request to receiving the upstream response in milliseconds.
156    pub upstream_response: Option<i32>,
157    /// The total duration of the upstream connection in milliseconds.
158    pub upstream_connection_duration: Option<u64>,
159    /// The duration of the cache lookup in milliseconds.
160    pub cache_lookup: Option<i32>,
161    /// The duration spent waiting for a cache lock in milliseconds.
162    pub cache_lock: Option<i32>,
163}
164
165impl Default for Timing {
166    fn default() -> Self {
167        Self {
168            created_at: Instant::now(),
169            connection_duration: 0,
170            tls_handshake: None,
171            upstream_connect: None,
172            upstream_tcp_connect: None,
173            upstream_tls_handshake: None,
174            upstream_processing: None,
175            upstream_response: None,
176            upstream_connection_duration: None,
177            cache_lookup: None,
178            cache_lock: None,
179        }
180    }
181}
182
183/// Trait for upstream instance, used to handle the upstream instance lifecycle.
184pub trait UpstreamInstance: Send + Sync {
185    fn on_transport_failure(&self, address: &str);
186    fn on_response(&self, address: &str, status: StatusCode);
187    fn completed(&self) -> i32;
188}
189
190/// Trait for location instance
191pub trait LocationInstance: Send + Sync {
192    /// Get location's name
193    fn name(&self) -> &str;
194    /// Get the upstream of location
195    fn upstream(&self) -> &str;
196    /// Rewrite the request url
197    fn rewrite(
198        &self,
199        header: &mut RequestHeader,
200        variables: Option<AHashMap<String, String>>,
201    ) -> (bool, Option<AHashMap<String, String>>);
202    /// Returns the proxy header to upstream
203    fn headers(&self) -> Option<&Vec<(HeaderName, HeaderValue, bool)>>;
204    /// Returns the client body size limit
205    fn client_body_size_limit(&self) -> usize;
206    /// Called when the request is received from the client
207    /// Returns
208    /// `Result<(u64, i32)>` - A tuple containing:
209    ///   - The new total number of accepted requests (u64)
210    ///   - The new number of currently processing requests (i32)
211    fn on_request(&self) -> pingora::Result<(u64, i32)>;
212    /// Called when the response is received from the upstream
213    fn on_response(&self);
214}
215
216/// Information about the upstream (backend) server.
217#[derive(Default)]
218pub struct UpstreamInfo {
219    /// Upstream instance
220    pub upstream_instance: Option<Arc<dyn UpstreamInstance>>,
221    /// Location instance
222    pub location_instance: Option<Arc<dyn LocationInstance>>,
223    /// The location (route) that directed the request to this upstream.
224    pub location: Arc<str>,
225    /// The name of the upstream server or group.
226    pub name: Arc<str>,
227    /// The address of the upstream server.
228    pub address: String,
229    /// Indicates if the connection to the upstream was reused.
230    pub reused: bool,
231    /// The number of requests currently being processed by the upstream.
232    pub processing_count: Option<i32>,
233    /// The current number of active connections to the upstream.
234    pub connected_count: Option<i32>,
235    /// The HTTP status code of upstream response.
236    pub status: Option<StatusCode>,
237    /// The number of retries for failed connections.
238    pub retries: u8,
239    /// Maximum number of retries for failed connections.
240    pub max_retries: Option<u8>,
241    /// The maximum total time window allowed for an operation and all of its subsequent retries.
242    ///
243    /// The timer starts from the beginning of the **initial attempt**. Once this time window
244    /// is exceeded, no more retries will be initiated, even if the maximum number of
245    /// retries (`max_retries`) has not been reached.
246    ///
247    /// If set to `None`, there is no time limit for the retry process.
248    pub max_retry_window: Option<Duration>,
249}
250
251/// State related to the current request being processed.
252#[derive(Default)]
253pub struct RequestState {
254    /// A unique identifier for the request.
255    pub request_id: Option<String>,
256    /// The HTTP status code of the response.
257    pub status: Option<StatusCode>,
258    /// The size of the request payload in bytes.
259    pub payload_size: usize,
260    /// A guard for rate limiting, if applicable.
261    pub guard: Option<Guard>,
262    /// The total number of requests currently being processed by the service.
263    pub processing_count: i32,
264    /// The total number of requests accepted by the service.
265    pub accepted_count: u64,
266    /// The number of requests currently being processed for this location.
267    pub location_processing_count: i32,
268    /// The total number of requests accepted for this location.
269    pub location_accepted_count: u64,
270}
271
272/// All cache-related configuration and statistics for a request.
273#[derive(Default)]
274pub struct CacheInfo {
275    /// The namespace for cache entries.
276    pub namespace: Option<String>,
277    /// The list of keys used to generate the final cache key.
278    pub keys: Option<Vec<String>>,
279    /// Whether to respect Cache-Control headers.
280    pub check_cache_control: bool,
281    /// The maximum time-to-live for cache entries.
282    pub max_ttl: Option<Duration>,
283    /// The number of cache read operations performed.
284    pub reading_count: Option<u32>,
285    /// The number of cache write operations performed.
286    pub writing_count: Option<u32>,
287}
288
289/// Optional features like tracing, plugins, and response modifications.
290#[derive(Default)]
291pub struct Features {
292    /// A map of custom variables for request processing.
293    pub variables: Option<AHashMap<String, String>>,
294    /// A list of plugin names and their processing times in milliseconds.
295    pub plugin_processing_times: Option<Vec<(String, u32)>>,
296    /// Statistics about response compression.
297    pub compression_stat: Option<CompressionStat>,
298    /// A map of plugin names and their response body handlers.
299    pub modify_body_handlers:
300        Option<AHashMap<String, Box<dyn ModifyResponseBody>>>,
301    /// OpenTelemetry tracer for distributed tracing (available with the "tracing" feature).
302    #[cfg(feature = "tracing")]
303    pub otel_tracer: Option<OtelTracer>,
304    /// OpenTelemetry span for the upstream request (available with the "tracing" feature).
305    #[cfg(feature = "tracing")]
306    pub upstream_span: Option<BoxedSpan>,
307}
308
309#[derive(Default)]
310/// Statistics about response compression operations.
311pub struct CompressionStat {
312    /// The algorithm used for compression (e.g., "gzip", "br").
313    pub algorithm: String,
314    /// The size of the data before compression in bytes.
315    pub in_bytes: usize,
316    /// The size of the data after compression in bytes.
317    pub out_bytes: usize,
318    /// The time taken to perform the compression operation.
319    pub duration: Duration,
320}
321
322impl CompressionStat {
323    /// Calculates the compression ratio.
324    pub fn ratio(&self) -> f64 {
325        if self.out_bytes == 0 {
326            return 0.0;
327        }
328        (self.in_bytes as f64) / (self.out_bytes as f64)
329    }
330}
331
332/// A wrapper for OpenTelemetry tracing components.
333#[cfg(feature = "tracing")]
334pub struct OtelTracer {
335    /// The tracer instance.
336    pub tracer: BoxedTracer,
337    /// The main span for the incoming HTTP request.
338    pub http_request_span: BoxedSpan,
339}
340
341#[cfg(feature = "tracing")]
342impl OtelTracer {
343    /// Creates a new child span for an upstream request.
344    #[inline]
345    pub fn new_upstream_span(&self, name: &str) -> BoxedSpan {
346        self.tracer
347            .span_builder(name.to_string())
348            .with_kind(SpanKind::Client)
349            .start_with_context(
350                &self.tracer,
351                // Set the parent span context to link this upstream span with the main request span.
352                &Context::current().with_remote_span_context(
353                    self.http_request_span.span_context().clone(),
354                ),
355            )
356    }
357}
358
359/// Represents the state of a request/response cycle, tracking various metrics and properties
360/// including connection details, caching information, and upstream server interactions.
361#[derive(Default)]
362pub struct Ctx {
363    /// Information about the client connection.
364    pub conn: ConnectionInfo,
365    /// Information about the upstream server.
366    pub upstream: UpstreamInfo,
367    /// Timing metrics for the request lifecycle.
368    pub timing: Timing,
369    /// State related to the current request.
370    pub state: RequestState,
371    /// Cache-related information. Wrapped in Option to save memory when not in use.
372    pub cache: Option<CacheInfo>,
373    /// Optional features. Wrapped in Option to save memory when not in use.
374    pub features: Option<Features>,
375    /// Plugins for the current location
376    pub plugins: Option<Vec<(String, Arc<dyn Plugin>)>>,
377}
378
379/// Helper struct to store connection timing and TLS details
380#[derive(Debug, Default)]
381pub struct DigestDetail {
382    /// Whether the connection was reused from pool
383    pub connection_reused: bool,
384    /// Total connection time in milliseconds
385    pub connection_time: u64,
386    /// Timestamp when TCP connection was established
387    pub tcp_established: u64,
388    /// Timestamp when TLS handshake completed
389    pub tls_established: u64,
390    /// TLS protocol version if using HTTPS
391    pub tls_version: Option<String>,
392    /// TLS cipher suite in use if using HTTPS
393    pub tls_cipher: Option<String>,
394}
395
396#[inline]
397pub(crate) fn timing_to_ms(timing: Option<&Option<TimingDigest>>) -> u64 {
398    match timing {
399        Some(Some(item)) => item
400            .established_ts
401            .duration_since(SystemTime::UNIX_EPOCH)
402            .unwrap_or_default()
403            .as_millis() as u64,
404        _ => 0,
405    }
406}
407
408/// Extracts timing and TLS information from connection digest.
409/// Used for metrics and logging connection details.
410#[inline]
411pub fn get_digest_detail(digest: &Digest) -> DigestDetail {
412    let tcp_established = timing_to_ms(digest.timing_digest.first());
413    let mut connection_time = 0;
414    let now = real_now_ms();
415    if tcp_established > 0 && tcp_established < now {
416        connection_time = now - tcp_established;
417    }
418    let connection_reused = connection_time > 100;
419
420    let Some(ssl_digest) = &digest.ssl_digest else {
421        return DigestDetail {
422            connection_reused,
423            tcp_established,
424            connection_time,
425            ..Default::default()
426        };
427    };
428
429    DigestDetail {
430        connection_reused,
431        tcp_established,
432        connection_time,
433        tls_established: timing_to_ms(digest.timing_digest.last()),
434        tls_version: Some(ssl_digest.version.to_string()),
435        tls_cipher: Some(ssl_digest.cipher.to_string()),
436    }
437}
438
439impl Ctx {
440    /// Creates a new Ctx instance with the current timestamp and default values.
441    ///
442    /// Returns a new Ctx struct initialized with the current timestamp and all other fields
443    /// set to their default values.
444    pub fn new() -> Self {
445        Self {
446            ..Default::default()
447        }
448    }
449
450    /// Adds a variable to the state's variables map with the given key and value.
451    ///
452    /// # Arguments
453    /// * `key` - The variable name.
454    /// * `value` - The value to store for this variable.
455    #[inline]
456    pub fn add_variable(&mut self, key: &str, value: &str) {
457        // Lazily initialize features and variables map.
458        let features = self.features.get_or_insert_default();
459        let variables = features.variables.get_or_insert_with(AHashMap::new);
460        variables.insert(key.to_string(), value.to_string());
461    }
462
463    /// Extends the variables map with the given key-value pairs.
464    ///
465    /// # Arguments
466    /// * `values` - A HashMap containing the key-value pairs to add.
467    #[inline]
468    pub fn extend_variables(&mut self, values: AHashMap<String, String>) {
469        let features = self.features.get_or_insert_default();
470        if let Some(variables) = features.variables.as_mut() {
471            variables.extend(values);
472        } else {
473            features.variables = Some(values);
474        }
475    }
476
477    /// Returns the value of a variable by key.
478    ///
479    /// # Arguments
480    /// * `key` - The key of the variable to retrieve.
481    ///
482    /// Returns: Option<&str> representing the value of the variable, or None if the variable does not exist.
483    #[inline]
484    pub fn get_variable(&self, key: &str) -> Option<&str> {
485        self.features
486            .as_ref()?
487            .variables
488            .as_ref()?
489            .get(key)
490            .map(|v| v.as_str())
491    }
492
493    /// Adds a modify body handler to the context.
494    ///
495    /// # Arguments
496    /// * `name` - The name of the handler.
497    /// * `handler` - The handler to add.
498    #[inline]
499    pub fn add_modify_body_handler(
500        &mut self,
501        name: &str,
502        handler: Box<dyn ModifyResponseBody>,
503    ) {
504        let features = self.features.get_or_insert_default();
505        let handlers = features
506            .modify_body_handlers
507            .get_or_insert_with(AHashMap::new);
508        handlers.insert(name.to_string(), handler);
509    }
510
511    /// Returns the modify body handler by name.
512    #[inline]
513    pub fn get_modify_body_handler(
514        &mut self,
515        name: &str,
516    ) -> Option<&mut Box<dyn ModifyResponseBody>> {
517        self.features
518            .as_mut()
519            .and_then(|f| f.modify_body_handlers.as_mut())
520            .and_then(|h| h.get_mut(name))
521    }
522
523    // A private helper function to filter out time values that are too large (over an hour),
524    // which might indicate an error or uninitialized state.
525    #[inline]
526    fn get_time_field(&self, field: Option<i32>) -> Option<u32> {
527        if let Some(value) = field {
528            if value >= 0 {
529                return Some(value as u32);
530            }
531        }
532        None
533    }
534
535    /// Returns the upstream response time if it's less than one hour, otherwise None.
536    /// This helps filter out potentially invalid or stale timing data.
537    ///
538    /// Returns: Option<u64> representing milliseconds, or None if time exceeds 1 hour.
539    #[inline]
540    pub fn get_upstream_response_time(&self) -> Option<u32> {
541        self.get_time_field(self.timing.upstream_response)
542    }
543
544    /// Returns the upstream connect time if it's less than one hour, otherwise None.
545    /// This helps filter out potentially invalid or stale timing data.
546    ///
547    /// Returns: Option<u64> representing milliseconds, or None if time exceeds 1 hour.
548    #[inline]
549    pub fn get_upstream_connect_time(&self) -> Option<u32> {
550        self.get_time_field(self.timing.upstream_connect)
551    }
552
553    /// Returns the upstream processing time if it's less than one hour, otherwise None.
554    /// This helps filter out potentially invalid or stale timing data.
555    ///
556    /// Returns: Option<u64> representing milliseconds, or None if time exceeds 1 hour.
557    #[inline]
558    pub fn get_upstream_processing_time(&self) -> Option<u32> {
559        self.get_time_field(self.timing.upstream_processing)
560    }
561
562    /// Adds a plugin processing time to the context.
563    ///
564    /// # Arguments
565    /// * `name` - The name of the plugin.
566    /// * `time` - The time taken by the plugin in milliseconds.
567    #[inline]
568    pub fn add_plugin_processing_time(&mut self, name: &str, time: u32) {
569        // Lazily initialize features and the processing times vector.
570        let features = self.features.get_or_insert_default();
571        let times = features
572            .plugin_processing_times
573            .get_or_insert_with(|| Vec::with_capacity(5));
574        if let Some(item) = times.iter_mut().find(|item| item.0 == name) {
575            item.1 += time;
576        } else {
577            times.push((name.to_string(), time));
578        }
579    }
580
581    /// Appends a formatted value to the provided log buffer based on the given key.
582    /// Handles various metrics including connection info, timing data, and TLS details.
583    ///
584    /// # Arguments
585    /// * `buf` - The BytesMut buffer to append the value to.
586    /// * `key` - The key identifying which state value to format and append.
587    ///
588    /// Returns: The modified BytesMut buffer.
589    #[inline]
590    pub fn append_log_value(&self, buf: &mut BytesMut, key: &str) {
591        // A macro to simplify formatting and appending optional time values.
592        macro_rules! append_time {
593            // Append raw milliseconds.
594            ($val:expr) => {
595                if let Some(ms) = $val {
596                    buf.extend(itoa::Buffer::new().format(ms).as_bytes());
597                }
598            };
599            // Append human-readable formatted time.
600            ($val:expr, human) => {
601                if let Some(ms) = $val {
602                    format_duration(buf, ms as u64);
603                }
604            };
605        }
606
607        match key {
608            "connection_id" => {
609                buf.extend(itoa::Buffer::new().format(self.conn.id).as_bytes());
610            },
611            "upstream_reused" => {
612                if self.upstream.reused {
613                    buf.extend(b"true");
614                } else {
615                    buf.extend(b"false");
616                }
617            },
618            "upstream_status" => {
619                if let Some(status) = &self.upstream.status {
620                    buf.extend_from_slice(status.as_str().as_bytes());
621                } else {
622                    buf.extend_from_slice(b"-");
623                }
624            },
625            "upstream_addr" => buf.extend(self.upstream.address.as_bytes()),
626            "processing" => buf.extend(
627                itoa::Buffer::new()
628                    .format(self.state.processing_count)
629                    .as_bytes(),
630            ),
631            "upstream_connected" => {
632                if let Some(value) = self.upstream.connected_count {
633                    buf.extend(itoa::Buffer::new().format(value).as_bytes());
634                }
635            },
636
637            // Timing fields
638            "upstream_connect_time" => {
639                append_time!(self.get_upstream_connect_time())
640            },
641            "upstream_connect_time_human" => {
642                append_time!(self.get_upstream_connect_time(), human)
643            },
644
645            "upstream_processing_time" => {
646                append_time!(self.get_upstream_processing_time())
647            },
648            "upstream_processing_time_human" => {
649                append_time!(self.get_upstream_processing_time(), human)
650            },
651            "upstream_response_time" => {
652                append_time!(self.get_upstream_response_time())
653            },
654            "upstream_response_time_human" => {
655                append_time!(self.get_upstream_response_time(), human)
656            },
657            "upstream_tcp_connect_time" => {
658                append_time!(self.timing.upstream_tcp_connect)
659            },
660            "upstream_tcp_connect_time_human" => {
661                append_time!(self.timing.upstream_tcp_connect, human)
662            },
663            "upstream_tls_handshake_time" => {
664                append_time!(self.timing.upstream_tls_handshake)
665            },
666            "upstream_tls_handshake_time_human" => {
667                append_time!(self.timing.upstream_tls_handshake, human)
668            },
669            "upstream_connection_time" => {
670                append_time!(self.timing.upstream_connection_duration)
671            },
672            "upstream_connection_time_human" => {
673                append_time!(self.timing.upstream_connection_duration, human)
674            },
675            "connection_time" => {
676                append_time!(Some(self.timing.connection_duration))
677            },
678            "connection_time_human" => {
679                append_time!(Some(self.timing.connection_duration), human)
680            },
681
682            // Other fields
683            "location" => {
684                if !self.upstream.location.is_empty() {
685                    buf.extend(self.upstream.location.as_bytes())
686                }
687            },
688            "connection_reused" => {
689                if self.conn.reused {
690                    buf.extend(b"true");
691                } else {
692                    buf.extend(b"false");
693                }
694            },
695            "tls_version" => {
696                if let Some(value) = &self.conn.tls_version {
697                    buf.extend(value.as_bytes());
698                }
699            },
700            "tls_cipher" => {
701                if let Some(value) = &self.conn.tls_cipher {
702                    buf.extend(value.as_bytes());
703                }
704            },
705            "tls_handshake_time" => append_time!(self.timing.tls_handshake),
706            "tls_handshake_time_human" => {
707                append_time!(self.timing.tls_handshake, human)
708            },
709            "compression_time" => {
710                if let Some(feature) = &self.features {
711                    if let Some(value) = &feature.compression_stat {
712                        append_time!(Some(value.duration.as_millis() as u64))
713                    }
714                }
715            },
716            "compression_time_human" => {
717                if let Some(feature) = &self.features {
718                    if let Some(value) = &feature.compression_stat {
719                        append_time!(
720                            Some(value.duration.as_millis() as u64),
721                            human
722                        )
723                    }
724                }
725            },
726            "compression_ratio" => {
727                if let Some(feature) = &self.features {
728                    if let Some(value) = &feature.compression_stat {
729                        buf.extend(format!("{:.1}", value.ratio()).as_bytes());
730                    }
731                }
732            },
733            "cache_lookup_time" => {
734                append_time!(self.timing.cache_lookup)
735            },
736            "cache_lookup_time_human" => {
737                append_time!(self.timing.cache_lookup, human)
738            },
739            "cache_lock_time" => {
740                append_time!(self.timing.cache_lock)
741            },
742            "cache_lock_time_human" => {
743                append_time!(self.timing.cache_lock, human)
744            },
745            "service_time" => {
746                append_time!(Some(self.timing.created_at.elapsed().as_millis()))
747            },
748            "service_time_human" => {
749                append_time!(
750                    Some(self.timing.created_at.elapsed().as_millis()),
751                    human
752                )
753            },
754            // Ignore unknown keys.
755            _ => {},
756        }
757    }
758
759    /// Generates a Server-Timing header value based on the context's timing metrics.
760    ///
761    /// The Server-Timing header allows servers to communicate performance metrics
762    /// about the request-response cycle to the client. This implementation includes
763    /// various timing metrics like connection time, processing time, and cache operations.
764    ///
765    /// Returns a String containing the formatted Server-Timing header value.
766    pub fn generate_server_timing(&self) -> String {
767        let mut timing_str = String::with_capacity(200);
768        // Flag to track if this is the first timing entry, to handle commas correctly.
769        let mut first = true;
770
771        // Macro to add a timing entry to the string.
772        macro_rules! add_timing {
773            ($name:expr, $dur:expr) => {
774                if !first {
775                    timing_str.push_str(", ");
776                }
777                // Ignore the write! result as it's unlikely to fail with a String.
778                let _ = write!(&mut timing_str, "{};dur={}", $name, $dur);
779                first = false;
780            };
781        }
782
783        // Aggregate and add upstream timings.
784        let mut upstream_time = 0;
785        if let Some(time) = self.get_upstream_connect_time() {
786            upstream_time += time;
787            add_timing!("upstream.connect", time);
788        }
789        if let Some(time) = self.get_upstream_processing_time() {
790            upstream_time += time;
791            add_timing!("upstream.processing", time);
792        }
793        if upstream_time > 0 {
794            add_timing!("upstream", upstream_time);
795        }
796
797        // Aggregate and add cache timings.
798        let mut cache_time = 0;
799        if let Some(time) = self.timing.cache_lookup {
800            cache_time += time;
801            add_timing!("cache.lookup", time);
802        }
803        if let Some(time) = self.timing.cache_lock {
804            cache_time += time;
805            add_timing!("cache.lock", time);
806        }
807        if cache_time > 0 {
808            add_timing!("cache", cache_time);
809        }
810
811        // Aggregate and add plugin timings.
812        if let Some(features) = &self.features {
813            if let Some(times) = &features.plugin_processing_times {
814                let mut plugin_time: u32 = 0;
815                for (name, time) in times {
816                    if *time == 0 {
817                        continue;
818                    }
819                    plugin_time += time;
820                    let mut plugin_name = String::with_capacity(7 + name.len());
821                    plugin_name.push_str("plugin.");
822                    plugin_name.push_str(name);
823                    add_timing!(&plugin_name, time);
824                }
825                if plugin_time > 0 {
826                    add_timing!("plugin", plugin_time);
827                }
828            }
829        }
830
831        // Add the total service time, which is always present.
832        let service_time = self.timing.created_at.elapsed().as_millis();
833        // Add a separator if other timings were already added.
834        if !first {
835            timing_str.push_str(", ");
836        }
837        // Write the final timing directly.
838        let _ = write!(&mut timing_str, "total;dur={}", service_time);
839
840        timing_str
841    }
842
843    /// Pushes a single cache key component to the context.
844    #[inline]
845    pub fn push_cache_key(&mut self, key: String) {
846        let cache_info = self.cache.get_or_insert_default();
847        cache_info
848            .keys
849            .get_or_insert_with(|| Vec::with_capacity(2))
850            .push(key);
851    }
852
853    /// Extends the cache key components with a vector of keys.
854    #[inline]
855    pub fn extend_cache_keys(&mut self, keys: Vec<String>) {
856        let cache_info = self.cache.get_or_insert_default();
857        cache_info
858            .keys
859            .get_or_insert_with(|| Vec::with_capacity(keys.len() + 2))
860            .extend(keys);
861    }
862    /// Updates the upstream timing from the digest.
863    #[inline]
864    pub fn update_upstream_timing_from_digest(
865        &mut self,
866        digest: &Digest,
867        reused: bool,
868    ) {
869        let detail = get_digest_detail(digest);
870        self.timing.upstream_connection_duration = Some(detail.connection_time);
871        if reused {
872            return;
873        }
874
875        let upstream_connect_time =
876            self.timing.upstream_connect.unwrap_or_default();
877        // upstream tcp, tls(if https) connect time
878        let mut upstream_tcp_connect = upstream_connect_time;
879        if detail.tls_established > detail.tcp_established {
880            let latency =
881                (detail.tls_established - detail.tcp_established) as i32;
882            upstream_tcp_connect -= latency;
883            self.timing.upstream_tls_handshake = Some(latency);
884        }
885        if upstream_tcp_connect > 0 {
886            self.timing.upstream_tcp_connect = Some(upstream_tcp_connect);
887        }
888    }
889}
890
891/// Generates a cache key from the request method, URI and state context.
892/// The key includes an optional namespace and other key components if configured in the context.
893///
894/// # Arguments
895/// * `ctx` - The Ctx context containing cache configuration.
896/// * `method` - The HTTP method as a string.
897/// * `uri` - The request URI.
898///
899/// Returns: A CacheKey combining the namespace, custom keys (if any), method and URI.
900pub fn get_cache_key(ctx: &Ctx, method: &str, uri: &Uri) -> CacheKey {
901    let Some(cache_info) = &ctx.cache else {
902        // Return an empty key if cache is not configured for this context.
903        return CacheKey::new("", "", "");
904    };
905    let namespace = cache_info.namespace.as_ref().map_or("", |v| v);
906    let key = if let Some(keys) = &cache_info.keys {
907        // Pre-allocate string capacity to avoid reallocations.
908        let mut key_buf = String::with_capacity(
909            keys.iter().map(|s| s.len() + 1).sum::<usize>()
910                + method.len()
911                + 1
912                + uri.to_string().len(),
913        );
914
915        // Join custom key components with ':'.
916        for (i, k) in keys.iter().enumerate() {
917            if i > 0 {
918                key_buf.push(':');
919            }
920            key_buf.push_str(k);
921        }
922        // Use write! macro to efficiently concatenate the method and URI.
923        let _ = write!(&mut key_buf, ":{method}:{uri}");
924        key_buf
925    } else {
926        // If no custom keys, use "METHOD:URI" as the key.
927        format!("{method}:{uri}")
928    };
929
930    CacheKey::new(namespace, key, "")
931}
932
933#[cfg(test)]
934mod tests {
935    use super::*;
936    use bytes::Bytes;
937    use bytes::BytesMut;
938    use pingora::protocols::tls::SslDigest;
939    use pingora::protocols::tls::SslDigestExtension;
940    use pretty_assertions::assert_eq;
941    use std::{sync::Arc, time::Duration};
942
943    #[test]
944    fn test_ctx_new() {
945        let ctx = Ctx::new();
946        // Check that created_at is a recent timestamp.
947        // It should be within the last 100ms.
948        let elapsed_ms = ctx.timing.created_at.elapsed().as_millis();
949        assert!(elapsed_ms < 100, "created_at should be a recent timestamp");
950        // Check that other fields are correctly defaulted.
951        assert!(ctx.cache.is_none());
952        assert!(ctx.features.is_none());
953        assert_eq!(ctx.conn.id, 0);
954    }
955
956    /// Tests both adding and getting variables.
957    #[test]
958    fn test_add_and_get_variable() {
959        let mut ctx = Ctx::new();
960        assert!(
961            ctx.get_variable("key1").is_none(),
962            "Should be None before adding"
963        );
964
965        ctx.add_variable("key1", "value1");
966        ctx.add_variable("key2", "value2");
967
968        assert_eq!(ctx.get_variable("key1"), Some("value1"));
969        assert_eq!(ctx.get_variable("key2"), Some("value2"));
970        assert_eq!(ctx.get_variable("nonexistent"), None);
971    }
972
973    /// Tests the helper functions for getting filtered time values.
974    #[test]
975    fn test_get_time_field() {
976        let mut ctx = Ctx::new();
977
978        // Test with a valid time
979        ctx.timing.upstream_response = Some(100);
980        assert_eq!(ctx.get_upstream_response_time(), Some(100));
981
982        // Test with a time is negative
983        ctx.timing.upstream_response = Some(-1);
984        assert_eq!(
985            ctx.get_upstream_response_time(),
986            None,
987            "Time exceeding one hour should be None"
988        );
989
990        // Test with None
991        ctx.timing.upstream_response = None;
992        assert_eq!(ctx.get_upstream_response_time(), None);
993    }
994
995    /// Tests the `append_log_value` function with a wider range of keys and edge cases.
996    #[test]
997    fn test_append_log_value_coverage() {
998        let mut ctx = Ctx::new();
999        // Test an unknown key, should do nothing.
1000        let mut buf = BytesMut::new();
1001        ctx.append_log_value(&mut buf, "unknown_key");
1002        assert!(buf.is_empty(), "Unknown key should not append anything");
1003
1004        // Test boolean values
1005        buf = BytesMut::new();
1006        ctx.conn.reused = true;
1007        ctx.append_log_value(&mut buf, "connection_reused");
1008        assert_eq!(&buf[..], b"true");
1009
1010        // Test optional string values
1011        ctx.conn.tls_version = Some("TLSv1.3".to_string());
1012        buf = BytesMut::new();
1013        ctx.append_log_value(&mut buf, "tls_version");
1014        assert_eq!(&buf[..], b"TLSv1.3");
1015
1016        // Test service_time calculation
1017        coarsetime::Clock::update();
1018        std::thread::sleep(Duration::from_millis(11));
1019        buf = BytesMut::new();
1020        ctx.append_log_value(&mut buf, "service_time");
1021        coarsetime::Clock::update();
1022        let service_time: u64 =
1023            std::str::from_utf8(&buf[..]).unwrap().parse().unwrap();
1024        assert!(service_time >= 10, "Service time should be at least 10ms");
1025    }
1026
1027    /// Tests the `get_cache_key` function's logic more thoroughly.
1028    #[test]
1029    fn test_get_cache_key() {
1030        let method = "GET";
1031        let uri = Uri::from_static("https://example.com/path");
1032
1033        // Case 1: No cache info in context.
1034        let ctx_no_cache = Ctx::new();
1035        let key1 = get_cache_key(&ctx_no_cache, method, &uri);
1036        assert_eq!(key1.namespace_str(), Some(""));
1037        assert_eq!(key1.primary_key_str(), Some(""));
1038
1039        // Case 2: Cache info with namespace but no keys.
1040        let mut ctx_with_ns = Ctx::new();
1041        ctx_with_ns.cache = Some(CacheInfo {
1042            namespace: Some("my-ns".to_string()),
1043            ..Default::default()
1044        });
1045        let key2 = get_cache_key(&ctx_with_ns, method, &uri);
1046        assert_eq!(key2.namespace_str(), Some("my-ns"));
1047        assert_eq!(
1048            key2.primary_key_str(),
1049            Some("GET:https://example.com/path")
1050        );
1051
1052        // Case 3: Cache info with namespace and multiple keys.
1053        let mut ctx_with_keys = Ctx::new();
1054        ctx_with_keys.cache = Some(CacheInfo {
1055            namespace: Some("my-ns".to_string()),
1056            keys: Some(vec!["user-123".to_string(), "desktop".to_string()]),
1057            ..Default::default()
1058        });
1059        let key3 = get_cache_key(&ctx_with_keys, method, &uri);
1060        assert_eq!(key3.namespace_str(), Some("my-ns"));
1061        assert_eq!(
1062            key3.primary_key_str(),
1063            Some("user-123:desktop:GET:https://example.com/path")
1064        );
1065    }
1066
1067    /// The original `test_generate_server_timing` is good, but this version
1068    /// is slightly more robust to minor timing variations.
1069    #[test]
1070    fn test_generate_server_timing() {
1071        let mut ctx = Ctx::new();
1072        ctx.timing.upstream_connect = Some(1);
1073        ctx.timing.upstream_processing = Some(2);
1074        ctx.timing.cache_lookup = Some(6);
1075        ctx.timing.cache_lock = Some(7);
1076        ctx.add_plugin_processing_time("plugin1", 100);
1077
1078        let timing_header = ctx.generate_server_timing();
1079
1080        // Check for the presence of each expected component.
1081        assert!(timing_header.contains("upstream.connect;dur=1"));
1082        assert!(timing_header.contains("upstream.processing;dur=2"));
1083        assert!(timing_header.contains("upstream;dur=3"));
1084        assert!(timing_header.contains("cache.lookup;dur=6"));
1085        assert!(timing_header.contains("cache.lock;dur=7"));
1086        assert!(timing_header.contains("cache;dur=13"));
1087        assert!(timing_header.contains("plugin.plugin1;dur=100"));
1088        assert!(timing_header.contains("plugin;dur=100"));
1089        assert!(timing_header.contains("total;dur="));
1090    }
1091
1092    #[test]
1093    fn test_format_duration() {
1094        let mut buf = BytesMut::new();
1095        format_duration(&mut buf, (3600 + 3500) * 1000);
1096        assert_eq!(b"1.9h", buf.as_ref());
1097
1098        buf = BytesMut::new();
1099        format_duration(&mut buf, (3600 + 1800) * 1000);
1100        assert_eq!(b"1.5h", buf.as_ref());
1101
1102        buf = BytesMut::new();
1103        format_duration(&mut buf, (3600 + 100) * 1000);
1104        assert_eq!(b"1h", buf.as_ref());
1105
1106        buf = BytesMut::new();
1107        format_duration(&mut buf, (60 + 50) * 1000);
1108        assert_eq!(b"1.8m", buf.as_ref());
1109
1110        buf = BytesMut::new();
1111        format_duration(&mut buf, (60 + 2) * 1000);
1112        assert_eq!(b"1m", buf.as_ref());
1113
1114        buf = BytesMut::new();
1115        format_duration(&mut buf, 1000);
1116        assert_eq!(b"1s", buf.as_ref());
1117
1118        buf = BytesMut::new();
1119        format_duration(&mut buf, 512);
1120        assert_eq!(b"512ms", buf.as_ref());
1121
1122        buf = BytesMut::new();
1123        format_duration(&mut buf, 1112);
1124        assert_eq!(b"1.1s", buf.as_ref());
1125    }
1126
1127    #[test]
1128    fn test_add_variable() {
1129        let mut ctx = Ctx::new();
1130        ctx.add_variable("key1", "value1");
1131        ctx.add_variable("key2", "value2");
1132        ctx.extend_variables(AHashMap::from([
1133            ("key3".to_string(), "value3".to_string()),
1134            ("key4".to_string(), "value4".to_string()),
1135        ]));
1136        let variables =
1137            ctx.features.as_ref().unwrap().variables.as_ref().unwrap();
1138        // NOTE: The current implementation in the main code doesn't add the '$' prefix automatically.
1139        // The test should reflect the actual implementation.
1140        assert_eq!(variables.get("key1"), Some(&"value1".to_string()));
1141        assert_eq!(variables.get("key2"), Some(&"value2".to_string()));
1142        assert_eq!(variables.get("key3"), Some(&"value3".to_string()));
1143        assert_eq!(variables.get("key4"), Some(&"value4".to_string()));
1144    }
1145
1146    #[test]
1147    fn test_cache_key() {
1148        let mut ctx = Ctx::new();
1149        ctx.push_cache_key("key1".to_string());
1150        ctx.extend_cache_keys(vec!["key2".to_string(), "key3".to_string()]);
1151        assert_eq!(
1152            vec!["key1".to_string(), "key2".to_string(), "key3".to_string()],
1153            ctx.cache.unwrap().keys.unwrap()
1154        );
1155
1156        let mut ctx = Ctx::new();
1157        ctx.cache.get_or_insert_default();
1158        let key = get_cache_key(
1159            &ctx,
1160            "GET",
1161            &Uri::from_static("https://example.com/path"),
1162        );
1163        assert_eq!(key.namespace_str(), Some(""));
1164        assert_eq!(key.primary_key_str(), Some("GET:https://example.com/path"));
1165    }
1166
1167    #[test]
1168    fn test_state() {
1169        let mut ctx = Ctx::new();
1170
1171        let mut buf = BytesMut::new();
1172        ctx.conn.id = 10;
1173        ctx.append_log_value(&mut buf, "connection_id");
1174        assert_eq!(b"10", buf.as_ref());
1175
1176        buf = BytesMut::new();
1177        ctx.append_log_value(&mut buf, "upstream_reused");
1178        assert_eq!(b"false", buf.as_ref());
1179
1180        buf = BytesMut::new();
1181        ctx.upstream.reused = true;
1182        ctx.append_log_value(&mut buf, "upstream_reused");
1183        assert_eq!(b"true", buf.as_ref());
1184
1185        buf = BytesMut::new();
1186        ctx.upstream.address = "192.168.1.1:80".to_string();
1187        ctx.append_log_value(&mut buf, "upstream_addr");
1188        assert_eq!(b"192.168.1.1:80", buf.as_ref());
1189
1190        buf = BytesMut::new();
1191        ctx.upstream.status = Some(StatusCode::CREATED);
1192        ctx.append_log_value(&mut buf, "upstream_status");
1193        assert_eq!(b"201", buf.as_ref());
1194
1195        buf = BytesMut::new();
1196        ctx.state.processing_count = 10;
1197        ctx.append_log_value(&mut buf, "processing");
1198        assert_eq!(b"10", buf.as_ref());
1199
1200        buf = BytesMut::new();
1201        ctx.timing.upstream_connect = Some(1);
1202        ctx.append_log_value(&mut buf, "upstream_connect_time");
1203        assert_eq!(b"1", buf.as_ref());
1204
1205        buf = BytesMut::new();
1206        ctx.append_log_value(&mut buf, "upstream_connect_time_human");
1207        assert_eq!(b"1ms", buf.as_ref());
1208
1209        buf = BytesMut::new();
1210        ctx.upstream.connected_count = Some(30);
1211        ctx.append_log_value(&mut buf, "upstream_connected");
1212        assert_eq!(b"30", buf.as_ref());
1213
1214        buf = BytesMut::new();
1215        ctx.timing.upstream_processing = Some(2);
1216        ctx.append_log_value(&mut buf, "upstream_processing_time");
1217        assert_eq!(b"2", buf.as_ref());
1218
1219        buf = BytesMut::new();
1220        ctx.append_log_value(&mut buf, "upstream_processing_time_human");
1221        assert_eq!(b"2ms", buf.as_ref());
1222
1223        buf = BytesMut::new();
1224        ctx.timing.upstream_response = Some(3);
1225        ctx.append_log_value(&mut buf, "upstream_response_time");
1226        assert_eq!(b"3", buf.as_ref());
1227
1228        buf = BytesMut::new();
1229        ctx.append_log_value(&mut buf, "upstream_response_time_human");
1230        assert_eq!(b"3ms", buf.as_ref());
1231
1232        buf = BytesMut::new();
1233        ctx.timing.upstream_tcp_connect = Some(100);
1234        ctx.append_log_value(&mut buf, "upstream_tcp_connect_time");
1235        assert_eq!(b"100", buf.as_ref());
1236
1237        buf = BytesMut::new();
1238        ctx.append_log_value(&mut buf, "upstream_tcp_connect_time_human");
1239        assert_eq!(b"100ms", buf.as_ref());
1240
1241        buf = BytesMut::new();
1242        ctx.timing.upstream_tls_handshake = Some(110);
1243        ctx.append_log_value(&mut buf, "upstream_tls_handshake_time");
1244        assert_eq!(b"110", buf.as_ref());
1245
1246        buf = BytesMut::new();
1247        ctx.append_log_value(&mut buf, "upstream_tls_handshake_time_human");
1248        assert_eq!(b"110ms", buf.as_ref());
1249
1250        buf = BytesMut::new();
1251        ctx.timing.upstream_connection_duration = Some(120);
1252        ctx.append_log_value(&mut buf, "upstream_connection_time");
1253        assert_eq!(b"120", buf.as_ref());
1254
1255        buf = BytesMut::new();
1256        ctx.append_log_value(&mut buf, "upstream_connection_time_human");
1257        assert_eq!(b"120ms", buf.as_ref());
1258
1259        buf = BytesMut::new();
1260        ctx.upstream.location = "pingap".to_string().into();
1261        ctx.append_log_value(&mut buf, "location");
1262        assert_eq!(b"pingap", buf.as_ref());
1263
1264        buf = BytesMut::new();
1265        ctx.timing.connection_duration = 4;
1266        ctx.append_log_value(&mut buf, "connection_time");
1267        assert_eq!(b"4", buf.as_ref());
1268
1269        buf = BytesMut::new();
1270        ctx.append_log_value(&mut buf, "connection_time_human");
1271        assert_eq!(b"4ms", buf.as_ref());
1272
1273        buf = BytesMut::new();
1274        ctx.conn.reused = false;
1275        ctx.append_log_value(&mut buf, "connection_reused");
1276        assert_eq!(b"false", buf.as_ref());
1277
1278        buf = BytesMut::new();
1279        ctx.conn.reused = true;
1280        ctx.append_log_value(&mut buf, "connection_reused");
1281        assert_eq!(b"true", buf.as_ref());
1282
1283        buf = BytesMut::new();
1284        ctx.conn.tls_version = Some("TLSv1.3".to_string());
1285        ctx.append_log_value(&mut buf, "tls_version");
1286        assert_eq!(b"TLSv1.3", buf.as_ref());
1287
1288        buf = BytesMut::new();
1289        ctx.conn.tls_cipher =
1290            Some("ECDHE_ECDSA_WITH_AES_128_GCM_SHA256".to_string());
1291        ctx.append_log_value(&mut buf, "tls_cipher");
1292        assert_eq!(b"ECDHE_ECDSA_WITH_AES_128_GCM_SHA256", buf.as_ref());
1293
1294        buf = BytesMut::new();
1295        ctx.timing.tls_handshake = Some(101);
1296        ctx.append_log_value(&mut buf, "tls_handshake_time");
1297        assert_eq!(b"101", buf.as_ref());
1298
1299        buf = BytesMut::new();
1300        ctx.append_log_value(&mut buf, "tls_handshake_time_human");
1301        assert_eq!(b"101ms", buf.as_ref());
1302
1303        {
1304            let features = ctx.features.get_or_insert_default();
1305            features.compression_stat = Some(CompressionStat {
1306                in_bytes: 1024,
1307                out_bytes: 500,
1308                duration: Duration::from_millis(5),
1309                ..Default::default()
1310            })
1311        }
1312
1313        buf = BytesMut::new();
1314        ctx.append_log_value(&mut buf, "compression_time");
1315        assert_eq!(b"5", buf.as_ref());
1316
1317        buf = BytesMut::new();
1318        ctx.append_log_value(&mut buf, "compression_time_human");
1319        assert_eq!(b"5ms", buf.as_ref());
1320
1321        buf = BytesMut::new();
1322        ctx.append_log_value(&mut buf, "compression_ratio");
1323        assert_eq!(b"2.0", buf.as_ref());
1324
1325        buf = BytesMut::new();
1326        ctx.timing.cache_lookup = Some(6);
1327        ctx.append_log_value(&mut buf, "cache_lookup_time");
1328        assert_eq!(b"6", buf.as_ref());
1329
1330        buf = BytesMut::new();
1331        ctx.append_log_value(&mut buf, "cache_lookup_time_human");
1332        assert_eq!(b"6ms", buf.as_ref());
1333
1334        buf = BytesMut::new();
1335        ctx.timing.cache_lock = Some(7);
1336        ctx.append_log_value(&mut buf, "cache_lock_time");
1337        assert_eq!(b"7", buf.as_ref());
1338
1339        buf = BytesMut::new();
1340        ctx.append_log_value(&mut buf, "cache_lock_time_human");
1341        assert_eq!(b"7ms", buf.as_ref());
1342    }
1343
1344    #[test]
1345    fn test_add_plugin_processing_time() {
1346        let mut ctx = Ctx::new();
1347        ctx.add_plugin_processing_time("plugin1", 100);
1348        ctx.add_plugin_processing_time("plugin2", 200);
1349        assert_eq!(
1350            ctx.features.unwrap().plugin_processing_times,
1351            Some(vec![
1352                ("plugin1".to_string(), 100),
1353                ("plugin2".to_string(), 200)
1354            ])
1355        );
1356    }
1357
1358    #[test]
1359    fn test_get_digest_detail() {
1360        let mut digest = Digest::default();
1361        let detail = get_digest_detail(&digest);
1362        assert_eq!(detail.connection_reused, false);
1363        assert_eq!(detail.connection_time, 0);
1364        assert_eq!(detail.tcp_established, 0);
1365        assert_eq!(detail.tls_established, 0);
1366        assert_eq!(detail.tls_version, None);
1367        assert_eq!(detail.tls_cipher, None);
1368
1369        digest.timing_digest.push(Some(TimingDigest {
1370            established_ts: SystemTime::UNIX_EPOCH
1371                .checked_add(Duration::from_secs(5))
1372                .unwrap(),
1373        }));
1374        digest.timing_digest.push(Some(TimingDigest {
1375            established_ts: SystemTime::UNIX_EPOCH
1376                .checked_add(Duration::from_secs(3))
1377                .unwrap(),
1378        }));
1379        digest.ssl_digest = Some(Arc::new(SslDigest {
1380            version: "1.3".into(),
1381            cipher: "123".into(),
1382            organization: Some("cloudflare".to_string()),
1383            serial_number: Some(
1384                "0x00000000000000000000000000000abc".to_string(),
1385            ),
1386            cert_digest: vec![],
1387            extension: SslDigestExtension::default(),
1388        }));
1389        let detail = get_digest_detail(&digest);
1390        assert_eq!(detail.connection_reused, true);
1391        assert_eq!(detail.tcp_established, 5000);
1392        assert_eq!(detail.tls_established, 3000);
1393        assert_eq!(detail.tls_version, Some("1.3".to_string()));
1394        assert_eq!(detail.tls_cipher, Some("123".to_string()));
1395    }
1396
1397    #[test]
1398    fn test_modify_body_handler() {
1399        let mut ctx = Ctx::default();
1400
1401        struct TestHandler {}
1402        impl ModifyResponseBody for TestHandler {
1403            fn handle(
1404                &mut self,
1405                _session: &Session,
1406                body: &mut Option<bytes::Bytes>,
1407                _end_of_stream: bool,
1408            ) -> pingora::Result<()> {
1409                *body = Some(Bytes::from("test"));
1410                Ok(())
1411            }
1412        }
1413
1414        ctx.add_modify_body_handler("test", Box::new(TestHandler {}));
1415        assert_eq!(true, ctx.get_modify_body_handler("test").is_some());
1416    }
1417}