Skip to main content

pingap_core/
ctx.rs

1// Copyright 2024-2025 Tree xie.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::{Plugin, real_now_ms};
16use ahash::AHashMap;
17use bytes::BytesMut;
18use http::StatusCode;
19use http::Uri;
20use http::{HeaderName, HeaderValue};
21#[cfg(feature = "tracing")]
22use opentelemetry::{
23    Context,
24    global::{BoxedSpan, BoxedTracer, ObjectSafeSpan},
25    trace::{SpanKind, TraceContextExt, Tracer},
26};
27use pingora::cache::CacheKey;
28use pingora::http::RequestHeader;
29use pingora::protocols::Digest;
30use pingora::protocols::TimingDigest;
31use pingora::proxy::Session;
32use pingora_limits::inflight::Guard;
33use std::fmt::Write;
34use std::sync::Arc;
35use std::time::{Duration, Instant, SystemTime};
36
37// Constants for time conversions in milliseconds.
38const SECOND: u64 = 1_000;
39const MINUTE: u64 = 60 * SECOND;
40const HOUR: u64 = 60 * MINUTE;
41
42#[inline]
43/// Format the duration in human readable format, checking smaller units first.
44/// e.g., ms, s, m, h.
45pub fn format_duration(buf: &mut BytesMut, ms: u64) {
46    if ms < SECOND {
47        // Format as milliseconds if less than a second.
48        buf.extend_from_slice(itoa::Buffer::new().format(ms).as_bytes());
49        buf.extend_from_slice(b"ms");
50    } else if ms < MINUTE {
51        // Format as seconds with one decimal place if less than a minute.
52        buf.extend_from_slice(
53            itoa::Buffer::new().format(ms / SECOND).as_bytes(),
54        );
55        let value = (ms % SECOND) / 100;
56        if value != 0 {
57            buf.extend_from_slice(b".");
58            buf.extend_from_slice(itoa::Buffer::new().format(value).as_bytes());
59        }
60        buf.extend_from_slice(b"s");
61    } else if ms < HOUR {
62        // Format as minutes with one decimal place if less than an hour.
63        buf.extend_from_slice(
64            itoa::Buffer::new().format(ms / MINUTE).as_bytes(),
65        );
66        let value = ms % MINUTE * 10 / MINUTE;
67        if value != 0 {
68            buf.extend_from_slice(b".");
69            buf.extend_from_slice(itoa::Buffer::new().format(value).as_bytes());
70        }
71        buf.extend_from_slice(b"m");
72    } else {
73        // Format as hours with one decimal place.
74        buf.extend_from_slice(itoa::Buffer::new().format(ms / HOUR).as_bytes());
75        let value = ms % HOUR * 10 / HOUR;
76        if value != 0 {
77            buf.extend_from_slice(b".");
78            buf.extend_from_slice(itoa::Buffer::new().format(value).as_bytes());
79        }
80        buf.extend_from_slice(b"h");
81    }
82}
83
84#[derive(PartialEq)]
85pub enum ModifiedMode {
86    Upstream,
87    Response,
88}
89
90impl From<&str> for ModifiedMode {
91    fn from(value: &str) -> Self {
92        match value {
93            "upstream" => ModifiedMode::Upstream,
94            _ => ModifiedMode::Response,
95        }
96    }
97}
98
99/// Trait for modifying the response body.
100pub trait ModifyResponseBody: Sync + Send {
101    /// Handles the modification of response body data.
102    fn handle(
103        &mut self,
104        session: &Session,
105        body: &mut Option<bytes::Bytes>,
106        end_of_stream: bool,
107    ) -> pingora::Result<()>;
108    /// Returns the name of the modifier.
109    fn name(&self) -> String {
110        "unknown".to_string()
111    }
112}
113
114/// Information about a single client connection.
115#[derive(Default)]
116pub struct ConnectionInfo {
117    /// A unique identifier for the connection.
118    pub id: usize,
119    /// The IP address of the client.
120    pub client_ip: Option<String>,
121    /// The remote address of the client connection.
122    pub remote_addr: Option<String>,
123    /// The remote port of the client connection.
124    pub remote_port: Option<u16>,
125    /// The server address the client connected to.
126    pub server_addr: Option<String>,
127    /// The server port the client connected to.
128    pub server_port: Option<u16>,
129    /// The TLS version used for the connection, if any.
130    pub tls_version: Option<String>,
131    /// The TLS cipher used for the connection, if any.
132    pub tls_cipher: Option<String>,
133    /// Indicates whether the connection was reused (e.g., HTTP keep-alive).
134    pub reused: bool,
135}
136
137/// All timing-related metrics for the request lifecycle.
138// #[derive(Default)]
139pub struct Timing {
140    /// Timestamp in milliseconds when the request was created.
141    pub created_at: Instant,
142    /// The total duration of the client connection in milliseconds.
143    /// May be large for reused connections.
144    pub connection_duration: u64,
145    /// The duration of the TLS handshake with the client in milliseconds.
146    pub tls_handshake: Option<i32>,
147    /// The total duration to connect to the upstream server in milliseconds.
148    pub upstream_connect: Option<i32>,
149    /// The duration of the TCP connection to the upstream server in milliseconds.
150    pub upstream_tcp_connect: Option<i32>,
151    /// The duration of the TLS handshake with the upstream server in milliseconds.
152    pub upstream_tls_handshake: Option<i32>,
153    /// The duration the upstream server took to process the request in milliseconds.
154    pub upstream_processing: Option<i32>,
155    /// The duration from sending the request to receiving the upstream response in milliseconds.
156    pub upstream_response: Option<i32>,
157    /// The total duration of the upstream connection in milliseconds.
158    pub upstream_connection_duration: Option<u64>,
159    /// The duration of the cache lookup in milliseconds.
160    pub cache_lookup: Option<i32>,
161    /// The duration spent waiting for a cache lock in milliseconds.
162    pub cache_lock: Option<i32>,
163}
164
165impl Default for Timing {
166    fn default() -> Self {
167        Self {
168            created_at: Instant::now(),
169            connection_duration: 0,
170            tls_handshake: None,
171            upstream_connect: None,
172            upstream_tcp_connect: None,
173            upstream_tls_handshake: None,
174            upstream_processing: None,
175            upstream_response: None,
176            upstream_connection_duration: None,
177            cache_lookup: None,
178            cache_lock: None,
179        }
180    }
181}
182
183/// Trait for upstream instance, used to handle the upstream instance lifecycle.
184pub trait UpstreamInstance: Send + Sync {
185    fn on_transport_failure(&self, address: &str);
186    fn on_response(&self, address: &str, status: StatusCode);
187    fn completed(&self) -> i32;
188}
189
190/// Trait for location instance
191pub trait LocationInstance: Send + Sync {
192    /// Get location's name
193    fn name(&self) -> &str;
194    /// Get the upstream of location
195    fn upstream(&self) -> &str;
196    /// Rewrite the request url
197    fn rewrite(
198        &self,
199        header: &mut RequestHeader,
200        variables: Option<AHashMap<String, String>>,
201    ) -> (bool, Option<AHashMap<String, String>>);
202    /// Returns the proxy header to upstream
203    fn headers(&self) -> Option<&Vec<(HeaderName, HeaderValue, bool)>>;
204    /// Returns the client body size limit
205    fn client_body_size_limit(&self) -> usize;
206    /// Called when the request is received from the client
207    /// Returns
208    /// `Result<(u64, i32)>` - A tuple containing:
209    ///   - The new total number of accepted requests (u64)
210    ///   - The new number of currently processing requests (i32)
211    fn on_request(&self) -> pingora::Result<(u64, i32)>;
212    /// Called when the response is received from the upstream
213    fn on_response(&self);
214}
215
216/// Information about the upstream (backend) server.
217#[derive(Default)]
218pub struct UpstreamInfo {
219    /// Upstream instance
220    pub upstream_instance: Option<Arc<dyn UpstreamInstance>>,
221    /// Location instance
222    pub location_instance: Option<Arc<dyn LocationInstance>>,
223    /// The location (route) that directed the request to this upstream.
224    pub location: Arc<str>,
225    /// The name of the upstream server or group.
226    pub name: Arc<str>,
227    /// The address of the upstream server.
228    pub address: String,
229    /// Indicates if the connection to the upstream was reused.
230    pub reused: bool,
231    /// The number of requests currently being processed by the upstream.
232    pub processing_count: Option<i32>,
233    /// The current number of active connections to the upstream.
234    pub connected_count: Option<i32>,
235    /// The HTTP status code of upstream response.
236    pub status: Option<StatusCode>,
237    /// The number of retries for failed connections.
238    pub retries: u8,
239    /// Maximum number of retries for failed connections.
240    pub max_retries: Option<u8>,
241    /// The maximum total time window allowed for an operation and all of its subsequent retries.
242    ///
243    /// The timer starts from the beginning of the **initial attempt**. Once this time window
244    /// is exceeded, no more retries will be initiated, even if the maximum number of
245    /// retries (`max_retries`) has not been reached.
246    ///
247    /// If set to `None`, there is no time limit for the retry process.
248    pub max_retry_window: Option<Duration>,
249}
250
251/// State related to the current request being processed.
252#[derive(Default)]
253pub struct RequestState {
254    /// A unique identifier for the request.
255    pub request_id: Option<String>,
256    /// The HTTP status code of the response.
257    pub status: Option<StatusCode>,
258    /// The size of the request payload in bytes.
259    pub payload_size: usize,
260    /// A guard for rate limiting, if applicable.
261    pub guard: Option<Guard>,
262    /// The total number of requests currently being processed by the service.
263    pub processing_count: i32,
264    /// The total number of requests accepted by the service.
265    pub accepted_count: u64,
266    /// The number of requests currently being processed for this location.
267    pub location_processing_count: i32,
268    /// The total number of requests accepted for this location.
269    pub location_accepted_count: u64,
270}
271
272/// All cache-related configuration and statistics for a request.
273#[derive(Default)]
274pub struct CacheInfo {
275    /// The namespace for cache entries.
276    pub namespace: Option<String>,
277    /// The list of keys used to generate the final cache key.
278    pub keys: Option<Vec<String>>,
279    /// Whether to respect Cache-Control headers.
280    pub check_cache_control: bool,
281    /// The maximum time-to-live for cache entries.
282    pub max_ttl: Option<Duration>,
283    /// The number of cache read operations performed.
284    pub reading_count: Option<u32>,
285    /// The number of cache write operations performed.
286    pub writing_count: Option<u32>,
287}
288
289/// Optional features like tracing, plugins, and response modifications.
290#[derive(Default)]
291pub struct Features {
292    /// A map of custom variables for request processing.
293    pub variables: Option<AHashMap<String, String>>,
294    /// A list of plugin names and their processing times in milliseconds.
295    pub plugin_processing_times: Option<Vec<(String, u32)>>,
296    /// Statistics about response compression.
297    pub compression_stat: Option<CompressionStat>,
298    /// A map of plugin names and their response body handlers.
299    pub modify_body_handlers:
300        Option<AHashMap<String, Box<dyn ModifyResponseBody>>>,
301    /// OpenTelemetry tracer for distributed tracing (available with the "tracing" feature).
302    #[cfg(feature = "tracing")]
303    pub otel_tracer: Option<OtelTracer>,
304    /// OpenTelemetry span for the upstream request (available with the "tracing" feature).
305    #[cfg(feature = "tracing")]
306    pub upstream_span: Option<BoxedSpan>,
307}
308
309#[derive(Default)]
310/// Statistics about response compression operations.
311pub struct CompressionStat {
312    /// The algorithm used for compression (e.g., "gzip", "br").
313    pub algorithm: String,
314    /// The size of the data before compression in bytes.
315    pub in_bytes: usize,
316    /// The size of the data after compression in bytes.
317    pub out_bytes: usize,
318    /// The time taken to perform the compression operation.
319    pub duration: Duration,
320}
321
322impl CompressionStat {
323    /// Calculates the compression ratio.
324    pub fn ratio(&self) -> f64 {
325        if self.out_bytes == 0 {
326            return 0.0;
327        }
328        (self.in_bytes as f64) / (self.out_bytes as f64)
329    }
330}
331
332/// A wrapper for OpenTelemetry tracing components.
333#[cfg(feature = "tracing")]
334pub struct OtelTracer {
335    /// The tracer instance.
336    pub tracer: BoxedTracer,
337    /// The main span for the incoming HTTP request.
338    pub http_request_span: BoxedSpan,
339}
340
341#[cfg(feature = "tracing")]
342impl OtelTracer {
343    /// Creates a new child span for an upstream request.
344    #[inline]
345    pub fn new_upstream_span(&self, name: &str) -> BoxedSpan {
346        self.tracer
347            .span_builder(name.to_string())
348            .with_kind(SpanKind::Client)
349            .start_with_context(
350                &self.tracer,
351                // Set the parent span context to link this upstream span with the main request span.
352                &Context::current().with_remote_span_context(
353                    self.http_request_span.span_context().clone(),
354                ),
355            )
356    }
357}
358
359/// Represents the state of a request/response cycle, tracking various metrics and properties
360/// including connection details, caching information, and upstream server interactions.
361#[derive(Default)]
362pub struct Ctx {
363    /// Information about the client connection.
364    pub conn: ConnectionInfo,
365    /// Information about the upstream server.
366    pub upstream: UpstreamInfo,
367    /// Timing metrics for the request lifecycle.
368    pub timing: Timing,
369    /// State related to the current request.
370    pub state: RequestState,
371    /// Cache-related information. Wrapped in Option to save memory when not in use.
372    pub cache: Option<CacheInfo>,
373    /// Optional features. Wrapped in Option to save memory when not in use.
374    pub features: Option<Features>,
375    /// Plugins for the current location
376    pub plugins: Option<Vec<(String, Arc<dyn Plugin>)>>,
377}
378
379/// Helper struct to store connection timing and TLS details
380#[derive(Debug, Default)]
381pub struct DigestDetail {
382    /// Whether the connection was reused from pool
383    pub connection_reused: bool,
384    /// Total connection time in milliseconds
385    pub connection_time: u64,
386    /// Timestamp when TCP connection was established
387    pub tcp_established: u64,
388    /// Timestamp when TLS handshake completed
389    pub tls_established: u64,
390    /// TLS protocol version if using HTTPS
391    pub tls_version: Option<String>,
392    /// TLS cipher suite in use if using HTTPS
393    pub tls_cipher: Option<String>,
394}
395
396#[inline]
397pub(crate) fn timing_to_ms(timing: Option<&Option<TimingDigest>>) -> u64 {
398    match timing {
399        Some(Some(item)) => item
400            .established_ts
401            .duration_since(SystemTime::UNIX_EPOCH)
402            .unwrap_or_default()
403            .as_millis() as u64,
404        _ => 0,
405    }
406}
407
408/// Extracts timing and TLS information from connection digest.
409/// Used for metrics and logging connection details.
410#[inline]
411pub fn get_digest_detail(digest: &Digest) -> DigestDetail {
412    let tcp_established = timing_to_ms(digest.timing_digest.first());
413    let mut connection_time = 0;
414    let now = real_now_ms();
415    if tcp_established > 0 && tcp_established < now {
416        connection_time = now - tcp_established;
417    }
418    let connection_reused = connection_time > 100;
419
420    let Some(ssl_digest) = &digest.ssl_digest else {
421        return DigestDetail {
422            connection_reused,
423            tcp_established,
424            connection_time,
425            ..Default::default()
426        };
427    };
428
429    DigestDetail {
430        connection_reused,
431        tcp_established,
432        connection_time,
433        tls_established: timing_to_ms(digest.timing_digest.last()),
434        tls_version: Some(ssl_digest.version.to_string()),
435        tls_cipher: Some(ssl_digest.cipher.to_string()),
436    }
437}
438
439impl Ctx {
440    /// Creates a new Ctx instance with the current timestamp and default values.
441    ///
442    /// Returns a new Ctx struct initialized with the current timestamp and all other fields
443    /// set to their default values.
444    pub fn new() -> Self {
445        Self {
446            ..Default::default()
447        }
448    }
449
450    /// Adds a variable to the state's variables map with the given key and value.
451    ///
452    /// # Arguments
453    /// * `key` - The variable name.
454    /// * `value` - The value to store for this variable.
455    #[inline]
456    pub fn add_variable(&mut self, key: &str, value: &str) {
457        // Lazily initialize features and variables map.
458        let features = self.features.get_or_insert_default();
459        let variables = features.variables.get_or_insert_with(AHashMap::new);
460        variables.insert(key.to_string(), value.to_string());
461    }
462
463    /// Extends the variables map with the given key-value pairs.
464    ///
465    /// # Arguments
466    /// * `values` - A HashMap containing the key-value pairs to add.
467    #[inline]
468    pub fn extend_variables(&mut self, values: AHashMap<String, String>) {
469        let features = self.features.get_or_insert_default();
470        if let Some(variables) = features.variables.as_mut() {
471            variables.extend(values);
472        } else {
473            features.variables = Some(values);
474        }
475    }
476
477    /// Returns the value of a variable by key.
478    ///
479    /// # Arguments
480    /// * `key` - The key of the variable to retrieve.
481    ///
482    /// Returns: Option<&str> representing the value of the variable, or None if the variable does not exist.
483    #[inline]
484    pub fn get_variable(&self, key: &str) -> Option<&str> {
485        self.features
486            .as_ref()?
487            .variables
488            .as_ref()?
489            .get(key)
490            .map(|v| v.as_str())
491    }
492
493    /// Adds a modify body handler to the context.
494    ///
495    /// # Arguments
496    /// * `name` - The name of the handler.
497    /// * `handler` - The handler to add.
498    #[inline]
499    pub fn add_modify_body_handler(
500        &mut self,
501        name: &str,
502        handler: Box<dyn ModifyResponseBody>,
503    ) {
504        let features = self.features.get_or_insert_default();
505        let handlers = features
506            .modify_body_handlers
507            .get_or_insert_with(AHashMap::new);
508        handlers.insert(name.to_string(), handler);
509    }
510
511    /// Returns the modify body handler by name.
512    #[inline]
513    pub fn get_modify_body_handler(
514        &mut self,
515        name: &str,
516    ) -> Option<&mut Box<dyn ModifyResponseBody>> {
517        self.features
518            .as_mut()
519            .and_then(|f| f.modify_body_handlers.as_mut())
520            .and_then(|h| h.get_mut(name))
521    }
522
523    // A private helper function to filter out time values that are too large (over an hour),
524    // which might indicate an error or uninitialized state.
525    #[inline]
526    fn get_time_field(&self, field: Option<i32>) -> Option<u32> {
527        if let Some(value) = field
528            && value >= 0
529        {
530            return Some(value as u32);
531        }
532        None
533    }
534
535    /// Returns the upstream response time if it's less than one hour, otherwise None.
536    /// This helps filter out potentially invalid or stale timing data.
537    ///
538    /// Returns: Option<u64> representing milliseconds, or None if time exceeds 1 hour.
539    #[inline]
540    pub fn get_upstream_response_time(&self) -> Option<u32> {
541        self.get_time_field(self.timing.upstream_response)
542    }
543
544    /// Returns the upstream connect time if it's less than one hour, otherwise None.
545    /// This helps filter out potentially invalid or stale timing data.
546    ///
547    /// Returns: Option<u64> representing milliseconds, or None if time exceeds 1 hour.
548    #[inline]
549    pub fn get_upstream_connect_time(&self) -> Option<u32> {
550        self.get_time_field(self.timing.upstream_connect)
551    }
552
553    /// Returns the upstream processing time if it's less than one hour, otherwise None.
554    /// This helps filter out potentially invalid or stale timing data.
555    ///
556    /// Returns: Option<u64> representing milliseconds, or None if time exceeds 1 hour.
557    #[inline]
558    pub fn get_upstream_processing_time(&self) -> Option<u32> {
559        self.get_time_field(self.timing.upstream_processing)
560    }
561
562    /// Adds a plugin processing time to the context.
563    ///
564    /// # Arguments
565    /// * `name` - The name of the plugin.
566    /// * `time` - The time taken by the plugin in milliseconds.
567    #[inline]
568    pub fn add_plugin_processing_time(&mut self, name: &str, time: u32) {
569        // Lazily initialize features and the processing times vector.
570        let features = self.features.get_or_insert_default();
571        let times = features
572            .plugin_processing_times
573            .get_or_insert_with(|| Vec::with_capacity(5));
574        if let Some(item) = times.iter_mut().find(|item| item.0 == name) {
575            item.1 += time;
576        } else {
577            times.push((name.to_string(), time));
578        }
579    }
580
581    /// Appends a formatted value to the provided log buffer based on the given key.
582    /// Handles various metrics including connection info, timing data, and TLS details.
583    ///
584    /// # Arguments
585    /// * `buf` - The BytesMut buffer to append the value to.
586    /// * `key` - The key identifying which state value to format and append.
587    ///
588    /// Returns: The modified BytesMut buffer.
589    #[inline]
590    pub fn append_log_value(&self, buf: &mut BytesMut, key: &str) {
591        // A macro to simplify formatting and appending optional time values.
592        macro_rules! append_time {
593            // Append raw milliseconds.
594            ($val:expr) => {
595                if let Some(ms) = $val {
596                    buf.extend(itoa::Buffer::new().format(ms).as_bytes());
597                }
598            };
599            // Append human-readable formatted time.
600            ($val:expr, human) => {
601                if let Some(ms) = $val {
602                    format_duration(buf, ms as u64);
603                }
604            };
605        }
606
607        match key {
608            "connection_id" => {
609                buf.extend(itoa::Buffer::new().format(self.conn.id).as_bytes());
610            },
611            "upstream_reused" => {
612                if self.upstream.reused {
613                    buf.extend(b"true");
614                } else {
615                    buf.extend(b"false");
616                }
617            },
618            "upstream_status" => {
619                if let Some(status) = &self.upstream.status {
620                    buf.extend_from_slice(status.as_str().as_bytes());
621                } else {
622                    buf.extend_from_slice(b"-");
623                }
624            },
625            "upstream_addr" => buf.extend(self.upstream.address.as_bytes()),
626            "processing" => buf.extend(
627                itoa::Buffer::new()
628                    .format(self.state.processing_count)
629                    .as_bytes(),
630            ),
631            "upstream_connected" => {
632                if let Some(value) = self.upstream.connected_count {
633                    buf.extend(itoa::Buffer::new().format(value).as_bytes());
634                }
635            },
636
637            // Timing fields
638            "upstream_connect_time" => {
639                append_time!(self.get_upstream_connect_time())
640            },
641            "upstream_connect_time_human" => {
642                append_time!(self.get_upstream_connect_time(), human)
643            },
644
645            "upstream_processing_time" => {
646                append_time!(self.get_upstream_processing_time())
647            },
648            "upstream_processing_time_human" => {
649                append_time!(self.get_upstream_processing_time(), human)
650            },
651            "upstream_response_time" => {
652                append_time!(self.get_upstream_response_time())
653            },
654            "upstream_response_time_human" => {
655                append_time!(self.get_upstream_response_time(), human)
656            },
657            "upstream_tcp_connect_time" => {
658                append_time!(self.timing.upstream_tcp_connect)
659            },
660            "upstream_tcp_connect_time_human" => {
661                append_time!(self.timing.upstream_tcp_connect, human)
662            },
663            "upstream_tls_handshake_time" => {
664                append_time!(self.timing.upstream_tls_handshake)
665            },
666            "upstream_tls_handshake_time_human" => {
667                append_time!(self.timing.upstream_tls_handshake, human)
668            },
669            "upstream_connection_time" => {
670                append_time!(self.timing.upstream_connection_duration)
671            },
672            "upstream_connection_time_human" => {
673                append_time!(self.timing.upstream_connection_duration, human)
674            },
675            "connection_time" => {
676                append_time!(Some(self.timing.connection_duration))
677            },
678            "connection_time_human" => {
679                append_time!(Some(self.timing.connection_duration), human)
680            },
681
682            // Other fields
683            "location" => {
684                if !self.upstream.location.is_empty() {
685                    buf.extend(self.upstream.location.as_bytes())
686                }
687            },
688            "connection_reused" => {
689                if self.conn.reused {
690                    buf.extend(b"true");
691                } else {
692                    buf.extend(b"false");
693                }
694            },
695            "tls_version" => {
696                if let Some(value) = &self.conn.tls_version {
697                    buf.extend(value.as_bytes());
698                }
699            },
700            "tls_cipher" => {
701                if let Some(value) = &self.conn.tls_cipher {
702                    buf.extend(value.as_bytes());
703                }
704            },
705            "tls_handshake_time" => append_time!(self.timing.tls_handshake),
706            "tls_handshake_time_human" => {
707                append_time!(self.timing.tls_handshake, human)
708            },
709            "compression_time" => {
710                if let Some(feature) = &self.features
711                    && let Some(value) = &feature.compression_stat
712                {
713                    append_time!(Some(value.duration.as_millis() as u64))
714                }
715            },
716            "compression_time_human" => {
717                if let Some(feature) = &self.features
718                    && let Some(value) = &feature.compression_stat
719                {
720                    append_time!(Some(value.duration.as_millis() as u64), human)
721                }
722            },
723            "compression_ratio" => {
724                if let Some(feature) = &self.features
725                    && let Some(value) = &feature.compression_stat
726                {
727                    buf.extend(format!("{:.1}", value.ratio()).as_bytes());
728                }
729            },
730            "cache_lookup_time" => {
731                append_time!(self.timing.cache_lookup)
732            },
733            "cache_lookup_time_human" => {
734                append_time!(self.timing.cache_lookup, human)
735            },
736            "cache_lock_time" => {
737                append_time!(self.timing.cache_lock)
738            },
739            "cache_lock_time_human" => {
740                append_time!(self.timing.cache_lock, human)
741            },
742            "service_time" => {
743                append_time!(Some(self.timing.created_at.elapsed().as_millis()))
744            },
745            "service_time_human" => {
746                append_time!(
747                    Some(self.timing.created_at.elapsed().as_millis()),
748                    human
749                )
750            },
751            // Ignore unknown keys.
752            _ => {},
753        }
754    }
755
756    /// Generates a Server-Timing header value based on the context's timing metrics.
757    ///
758    /// The Server-Timing header allows servers to communicate performance metrics
759    /// about the request-response cycle to the client. This implementation includes
760    /// various timing metrics like connection time, processing time, and cache operations.
761    ///
762    /// Returns a String containing the formatted Server-Timing header value.
763    pub fn generate_server_timing(&self) -> String {
764        let mut timing_str = String::with_capacity(200);
765        // Flag to track if this is the first timing entry, to handle commas correctly.
766        let mut first = true;
767
768        // Macro to add a timing entry to the string.
769        macro_rules! add_timing {
770            ($name:expr, $dur:expr) => {
771                if !first {
772                    timing_str.push_str(", ");
773                }
774                // Ignore the write! result as it's unlikely to fail with a String.
775                let _ = write!(&mut timing_str, "{};dur={}", $name, $dur);
776                first = false;
777            };
778        }
779
780        // Aggregate and add upstream timings.
781        let mut upstream_time = 0;
782        if let Some(time) = self.get_upstream_connect_time() {
783            upstream_time += time;
784            add_timing!("upstream.connect", time);
785        }
786        if let Some(time) = self.get_upstream_processing_time() {
787            upstream_time += time;
788            add_timing!("upstream.processing", time);
789        }
790        if upstream_time > 0 {
791            add_timing!("upstream", upstream_time);
792        }
793
794        // Aggregate and add cache timings.
795        let mut cache_time = 0;
796        if let Some(time) = self.timing.cache_lookup {
797            cache_time += time;
798            add_timing!("cache.lookup", time);
799        }
800        if let Some(time) = self.timing.cache_lock {
801            cache_time += time;
802            add_timing!("cache.lock", time);
803        }
804        if cache_time > 0 {
805            add_timing!("cache", cache_time);
806        }
807
808        // Aggregate and add plugin timings.
809        if let Some(features) = &self.features
810            && let Some(times) = &features.plugin_processing_times
811        {
812            let mut plugin_time: u32 = 0;
813            for (name, time) in times {
814                if *time == 0 {
815                    continue;
816                }
817                plugin_time += time;
818                let mut plugin_name = String::with_capacity(7 + name.len());
819                plugin_name.push_str("plugin.");
820                plugin_name.push_str(name);
821                add_timing!(&plugin_name, time);
822            }
823            if plugin_time > 0 {
824                add_timing!("plugin", plugin_time);
825            }
826        }
827
828        // Add the total service time, which is always present.
829        let service_time = self.timing.created_at.elapsed().as_millis();
830        // Add a separator if other timings were already added.
831        if !first {
832            timing_str.push_str(", ");
833        }
834        // Write the final timing directly.
835        let _ = write!(&mut timing_str, "total;dur={}", service_time);
836
837        timing_str
838    }
839
840    /// Pushes a single cache key component to the context.
841    #[inline]
842    pub fn push_cache_key(&mut self, key: String) {
843        let cache_info = self.cache.get_or_insert_default();
844        cache_info
845            .keys
846            .get_or_insert_with(|| Vec::with_capacity(2))
847            .push(key);
848    }
849
850    /// Extends the cache key components with a vector of keys.
851    #[inline]
852    pub fn extend_cache_keys(&mut self, keys: Vec<String>) {
853        let cache_info = self.cache.get_or_insert_default();
854        cache_info
855            .keys
856            .get_or_insert_with(|| Vec::with_capacity(keys.len() + 2))
857            .extend(keys);
858    }
859    /// Updates the upstream timing from the digest.
860    #[inline]
861    pub fn update_upstream_timing_from_digest(
862        &mut self,
863        digest: &Digest,
864        reused: bool,
865    ) {
866        let detail = get_digest_detail(digest);
867        self.timing.upstream_connection_duration = Some(detail.connection_time);
868        if reused {
869            return;
870        }
871
872        let upstream_connect_time =
873            self.timing.upstream_connect.unwrap_or_default();
874        // upstream tcp, tls(if https) connect time
875        let mut upstream_tcp_connect = upstream_connect_time;
876        if detail.tls_established > detail.tcp_established {
877            let latency =
878                (detail.tls_established - detail.tcp_established) as i32;
879            upstream_tcp_connect -= latency;
880            self.timing.upstream_tls_handshake = Some(latency);
881        }
882        if upstream_tcp_connect > 0 {
883            self.timing.upstream_tcp_connect = Some(upstream_tcp_connect);
884        }
885    }
886}
887
888/// Generates a cache key from the request method, URI and state context.
889/// The key includes an optional namespace and other key components if configured in the context.
890///
891/// # Arguments
892/// * `ctx` - The Ctx context containing cache configuration.
893/// * `method` - The HTTP method as a string.
894/// * `uri` - The request URI.
895///
896/// Returns: A CacheKey combining the namespace, custom keys (if any), method and URI.
897pub fn get_cache_key(ctx: &Ctx, method: &str, uri: &Uri) -> CacheKey {
898    let Some(cache_info) = &ctx.cache else {
899        // Return an empty key if cache is not configured for this context.
900        return CacheKey::new("", "", "");
901    };
902    let namespace = cache_info.namespace.as_ref().map_or("", |v| v);
903    let key = if let Some(keys) = &cache_info.keys {
904        // Pre-allocate string capacity to avoid reallocations.
905        let mut key_buf = String::with_capacity(
906            keys.iter().map(|s| s.len() + 1).sum::<usize>()
907                + method.len()
908                + 1
909                + uri.to_string().len(),
910        );
911
912        // Join custom key components with ':'.
913        for (i, k) in keys.iter().enumerate() {
914            if i > 0 {
915                key_buf.push(':');
916            }
917            key_buf.push_str(k);
918        }
919        // Use write! macro to efficiently concatenate the method and URI.
920        let _ = write!(&mut key_buf, ":{method}:{uri}");
921        key_buf
922    } else {
923        // If no custom keys, use "METHOD:URI" as the key.
924        format!("{method}:{uri}")
925    };
926
927    CacheKey::new(namespace, key, "")
928}
929
930#[cfg(test)]
931mod tests {
932    use super::*;
933    use bytes::Bytes;
934    use bytes::BytesMut;
935    use pingora::protocols::tls::SslDigest;
936    use pingora::protocols::tls::SslDigestExtension;
937    use pretty_assertions::assert_eq;
938    use std::{sync::Arc, time::Duration};
939
940    #[test]
941    fn test_ctx_new() {
942        let ctx = Ctx::new();
943        // Check that created_at is a recent timestamp.
944        // It should be within the last 100ms.
945        let elapsed_ms = ctx.timing.created_at.elapsed().as_millis();
946        assert!(elapsed_ms < 100, "created_at should be a recent timestamp");
947        // Check that other fields are correctly defaulted.
948        assert!(ctx.cache.is_none());
949        assert!(ctx.features.is_none());
950        assert_eq!(ctx.conn.id, 0);
951    }
952
953    /// Tests both adding and getting variables.
954    #[test]
955    fn test_add_and_get_variable() {
956        let mut ctx = Ctx::new();
957        assert!(
958            ctx.get_variable("key1").is_none(),
959            "Should be None before adding"
960        );
961
962        ctx.add_variable("key1", "value1");
963        ctx.add_variable("key2", "value2");
964
965        assert_eq!(ctx.get_variable("key1"), Some("value1"));
966        assert_eq!(ctx.get_variable("key2"), Some("value2"));
967        assert_eq!(ctx.get_variable("nonexistent"), None);
968    }
969
970    /// Tests the helper functions for getting filtered time values.
971    #[test]
972    fn test_get_time_field() {
973        let mut ctx = Ctx::new();
974
975        // Test with a valid time
976        ctx.timing.upstream_response = Some(100);
977        assert_eq!(ctx.get_upstream_response_time(), Some(100));
978
979        // Test with a time is negative
980        ctx.timing.upstream_response = Some(-1);
981        assert_eq!(
982            ctx.get_upstream_response_time(),
983            None,
984            "Time exceeding one hour should be None"
985        );
986
987        // Test with None
988        ctx.timing.upstream_response = None;
989        assert_eq!(ctx.get_upstream_response_time(), None);
990    }
991
992    /// Tests the `append_log_value` function with a wider range of keys and edge cases.
993    #[test]
994    fn test_append_log_value_coverage() {
995        let mut ctx = Ctx::new();
996        // Test an unknown key, should do nothing.
997        let mut buf = BytesMut::new();
998        ctx.append_log_value(&mut buf, "unknown_key");
999        assert!(buf.is_empty(), "Unknown key should not append anything");
1000
1001        // Test boolean values
1002        buf = BytesMut::new();
1003        ctx.conn.reused = true;
1004        ctx.append_log_value(&mut buf, "connection_reused");
1005        assert_eq!(&buf[..], b"true");
1006
1007        // Test optional string values
1008        ctx.conn.tls_version = Some("TLSv1.3".to_string());
1009        buf = BytesMut::new();
1010        ctx.append_log_value(&mut buf, "tls_version");
1011        assert_eq!(&buf[..], b"TLSv1.3");
1012
1013        // Test service_time calculation
1014        coarsetime::Clock::update();
1015        std::thread::sleep(Duration::from_millis(11));
1016        buf = BytesMut::new();
1017        ctx.append_log_value(&mut buf, "service_time");
1018        coarsetime::Clock::update();
1019        let service_time: u64 =
1020            std::str::from_utf8(&buf[..]).unwrap().parse().unwrap();
1021        assert!(service_time >= 10, "Service time should be at least 10ms");
1022    }
1023
1024    /// Tests the `get_cache_key` function's logic more thoroughly.
1025    #[test]
1026    fn test_get_cache_key() {
1027        let method = "GET";
1028        let uri = Uri::from_static("https://example.com/path");
1029
1030        // Case 1: No cache info in context.
1031        let ctx_no_cache = Ctx::new();
1032        let key1 = get_cache_key(&ctx_no_cache, method, &uri);
1033        assert_eq!(key1.namespace_str(), Some(""));
1034        assert_eq!(key1.primary_key_str(), Some(""));
1035
1036        // Case 2: Cache info with namespace but no keys.
1037        let mut ctx_with_ns = Ctx::new();
1038        ctx_with_ns.cache = Some(CacheInfo {
1039            namespace: Some("my-ns".to_string()),
1040            ..Default::default()
1041        });
1042        let key2 = get_cache_key(&ctx_with_ns, method, &uri);
1043        assert_eq!(key2.namespace_str(), Some("my-ns"));
1044        assert_eq!(
1045            key2.primary_key_str(),
1046            Some("GET:https://example.com/path")
1047        );
1048
1049        // Case 3: Cache info with namespace and multiple keys.
1050        let mut ctx_with_keys = Ctx::new();
1051        ctx_with_keys.cache = Some(CacheInfo {
1052            namespace: Some("my-ns".to_string()),
1053            keys: Some(vec!["user-123".to_string(), "desktop".to_string()]),
1054            ..Default::default()
1055        });
1056        let key3 = get_cache_key(&ctx_with_keys, method, &uri);
1057        assert_eq!(key3.namespace_str(), Some("my-ns"));
1058        assert_eq!(
1059            key3.primary_key_str(),
1060            Some("user-123:desktop:GET:https://example.com/path")
1061        );
1062    }
1063
1064    /// The original `test_generate_server_timing` is good, but this version
1065    /// is slightly more robust to minor timing variations.
1066    #[test]
1067    fn test_generate_server_timing() {
1068        let mut ctx = Ctx::new();
1069        ctx.timing.upstream_connect = Some(1);
1070        ctx.timing.upstream_processing = Some(2);
1071        ctx.timing.cache_lookup = Some(6);
1072        ctx.timing.cache_lock = Some(7);
1073        ctx.add_plugin_processing_time("plugin1", 100);
1074
1075        let timing_header = ctx.generate_server_timing();
1076
1077        // Check for the presence of each expected component.
1078        assert!(timing_header.contains("upstream.connect;dur=1"));
1079        assert!(timing_header.contains("upstream.processing;dur=2"));
1080        assert!(timing_header.contains("upstream;dur=3"));
1081        assert!(timing_header.contains("cache.lookup;dur=6"));
1082        assert!(timing_header.contains("cache.lock;dur=7"));
1083        assert!(timing_header.contains("cache;dur=13"));
1084        assert!(timing_header.contains("plugin.plugin1;dur=100"));
1085        assert!(timing_header.contains("plugin;dur=100"));
1086        assert!(timing_header.contains("total;dur="));
1087    }
1088
1089    #[test]
1090    fn test_format_duration() {
1091        let mut buf = BytesMut::new();
1092        format_duration(&mut buf, (3600 + 3500) * 1000);
1093        assert_eq!(b"1.9h", buf.as_ref());
1094
1095        buf = BytesMut::new();
1096        format_duration(&mut buf, (3600 + 1800) * 1000);
1097        assert_eq!(b"1.5h", buf.as_ref());
1098
1099        buf = BytesMut::new();
1100        format_duration(&mut buf, (3600 + 100) * 1000);
1101        assert_eq!(b"1h", buf.as_ref());
1102
1103        buf = BytesMut::new();
1104        format_duration(&mut buf, (60 + 50) * 1000);
1105        assert_eq!(b"1.8m", buf.as_ref());
1106
1107        buf = BytesMut::new();
1108        format_duration(&mut buf, (60 + 2) * 1000);
1109        assert_eq!(b"1m", buf.as_ref());
1110
1111        buf = BytesMut::new();
1112        format_duration(&mut buf, 1000);
1113        assert_eq!(b"1s", buf.as_ref());
1114
1115        buf = BytesMut::new();
1116        format_duration(&mut buf, 512);
1117        assert_eq!(b"512ms", buf.as_ref());
1118
1119        buf = BytesMut::new();
1120        format_duration(&mut buf, 1112);
1121        assert_eq!(b"1.1s", buf.as_ref());
1122    }
1123
1124    #[test]
1125    fn test_add_variable() {
1126        let mut ctx = Ctx::new();
1127        ctx.add_variable("key1", "value1");
1128        ctx.add_variable("key2", "value2");
1129        ctx.extend_variables(AHashMap::from([
1130            ("key3".to_string(), "value3".to_string()),
1131            ("key4".to_string(), "value4".to_string()),
1132        ]));
1133        let variables =
1134            ctx.features.as_ref().unwrap().variables.as_ref().unwrap();
1135        // NOTE: The current implementation in the main code doesn't add the '$' prefix automatically.
1136        // The test should reflect the actual implementation.
1137        assert_eq!(variables.get("key1"), Some(&"value1".to_string()));
1138        assert_eq!(variables.get("key2"), Some(&"value2".to_string()));
1139        assert_eq!(variables.get("key3"), Some(&"value3".to_string()));
1140        assert_eq!(variables.get("key4"), Some(&"value4".to_string()));
1141    }
1142
1143    #[test]
1144    fn test_cache_key() {
1145        let mut ctx = Ctx::new();
1146        ctx.push_cache_key("key1".to_string());
1147        ctx.extend_cache_keys(vec!["key2".to_string(), "key3".to_string()]);
1148        assert_eq!(
1149            vec!["key1".to_string(), "key2".to_string(), "key3".to_string()],
1150            ctx.cache.unwrap().keys.unwrap()
1151        );
1152
1153        let mut ctx = Ctx::new();
1154        ctx.cache.get_or_insert_default();
1155        let key = get_cache_key(
1156            &ctx,
1157            "GET",
1158            &Uri::from_static("https://example.com/path"),
1159        );
1160        assert_eq!(key.namespace_str(), Some(""));
1161        assert_eq!(key.primary_key_str(), Some("GET:https://example.com/path"));
1162    }
1163
1164    #[test]
1165    fn test_state() {
1166        let mut ctx = Ctx::new();
1167
1168        let mut buf = BytesMut::new();
1169        ctx.conn.id = 10;
1170        ctx.append_log_value(&mut buf, "connection_id");
1171        assert_eq!(b"10", buf.as_ref());
1172
1173        buf = BytesMut::new();
1174        ctx.append_log_value(&mut buf, "upstream_reused");
1175        assert_eq!(b"false", buf.as_ref());
1176
1177        buf = BytesMut::new();
1178        ctx.upstream.reused = true;
1179        ctx.append_log_value(&mut buf, "upstream_reused");
1180        assert_eq!(b"true", buf.as_ref());
1181
1182        buf = BytesMut::new();
1183        ctx.upstream.address = "192.168.1.1:80".to_string();
1184        ctx.append_log_value(&mut buf, "upstream_addr");
1185        assert_eq!(b"192.168.1.1:80", buf.as_ref());
1186
1187        buf = BytesMut::new();
1188        ctx.upstream.status = Some(StatusCode::CREATED);
1189        ctx.append_log_value(&mut buf, "upstream_status");
1190        assert_eq!(b"201", buf.as_ref());
1191
1192        buf = BytesMut::new();
1193        ctx.state.processing_count = 10;
1194        ctx.append_log_value(&mut buf, "processing");
1195        assert_eq!(b"10", buf.as_ref());
1196
1197        buf = BytesMut::new();
1198        ctx.timing.upstream_connect = Some(1);
1199        ctx.append_log_value(&mut buf, "upstream_connect_time");
1200        assert_eq!(b"1", buf.as_ref());
1201
1202        buf = BytesMut::new();
1203        ctx.append_log_value(&mut buf, "upstream_connect_time_human");
1204        assert_eq!(b"1ms", buf.as_ref());
1205
1206        buf = BytesMut::new();
1207        ctx.upstream.connected_count = Some(30);
1208        ctx.append_log_value(&mut buf, "upstream_connected");
1209        assert_eq!(b"30", buf.as_ref());
1210
1211        buf = BytesMut::new();
1212        ctx.timing.upstream_processing = Some(2);
1213        ctx.append_log_value(&mut buf, "upstream_processing_time");
1214        assert_eq!(b"2", buf.as_ref());
1215
1216        buf = BytesMut::new();
1217        ctx.append_log_value(&mut buf, "upstream_processing_time_human");
1218        assert_eq!(b"2ms", buf.as_ref());
1219
1220        buf = BytesMut::new();
1221        ctx.timing.upstream_response = Some(3);
1222        ctx.append_log_value(&mut buf, "upstream_response_time");
1223        assert_eq!(b"3", buf.as_ref());
1224
1225        buf = BytesMut::new();
1226        ctx.append_log_value(&mut buf, "upstream_response_time_human");
1227        assert_eq!(b"3ms", buf.as_ref());
1228
1229        buf = BytesMut::new();
1230        ctx.timing.upstream_tcp_connect = Some(100);
1231        ctx.append_log_value(&mut buf, "upstream_tcp_connect_time");
1232        assert_eq!(b"100", buf.as_ref());
1233
1234        buf = BytesMut::new();
1235        ctx.append_log_value(&mut buf, "upstream_tcp_connect_time_human");
1236        assert_eq!(b"100ms", buf.as_ref());
1237
1238        buf = BytesMut::new();
1239        ctx.timing.upstream_tls_handshake = Some(110);
1240        ctx.append_log_value(&mut buf, "upstream_tls_handshake_time");
1241        assert_eq!(b"110", buf.as_ref());
1242
1243        buf = BytesMut::new();
1244        ctx.append_log_value(&mut buf, "upstream_tls_handshake_time_human");
1245        assert_eq!(b"110ms", buf.as_ref());
1246
1247        buf = BytesMut::new();
1248        ctx.timing.upstream_connection_duration = Some(120);
1249        ctx.append_log_value(&mut buf, "upstream_connection_time");
1250        assert_eq!(b"120", buf.as_ref());
1251
1252        buf = BytesMut::new();
1253        ctx.append_log_value(&mut buf, "upstream_connection_time_human");
1254        assert_eq!(b"120ms", buf.as_ref());
1255
1256        buf = BytesMut::new();
1257        ctx.upstream.location = "pingap".to_string().into();
1258        ctx.append_log_value(&mut buf, "location");
1259        assert_eq!(b"pingap", buf.as_ref());
1260
1261        buf = BytesMut::new();
1262        ctx.timing.connection_duration = 4;
1263        ctx.append_log_value(&mut buf, "connection_time");
1264        assert_eq!(b"4", buf.as_ref());
1265
1266        buf = BytesMut::new();
1267        ctx.append_log_value(&mut buf, "connection_time_human");
1268        assert_eq!(b"4ms", buf.as_ref());
1269
1270        buf = BytesMut::new();
1271        ctx.conn.reused = false;
1272        ctx.append_log_value(&mut buf, "connection_reused");
1273        assert_eq!(b"false", buf.as_ref());
1274
1275        buf = BytesMut::new();
1276        ctx.conn.reused = true;
1277        ctx.append_log_value(&mut buf, "connection_reused");
1278        assert_eq!(b"true", buf.as_ref());
1279
1280        buf = BytesMut::new();
1281        ctx.conn.tls_version = Some("TLSv1.3".to_string());
1282        ctx.append_log_value(&mut buf, "tls_version");
1283        assert_eq!(b"TLSv1.3", buf.as_ref());
1284
1285        buf = BytesMut::new();
1286        ctx.conn.tls_cipher =
1287            Some("ECDHE_ECDSA_WITH_AES_128_GCM_SHA256".to_string());
1288        ctx.append_log_value(&mut buf, "tls_cipher");
1289        assert_eq!(b"ECDHE_ECDSA_WITH_AES_128_GCM_SHA256", buf.as_ref());
1290
1291        buf = BytesMut::new();
1292        ctx.timing.tls_handshake = Some(101);
1293        ctx.append_log_value(&mut buf, "tls_handshake_time");
1294        assert_eq!(b"101", buf.as_ref());
1295
1296        buf = BytesMut::new();
1297        ctx.append_log_value(&mut buf, "tls_handshake_time_human");
1298        assert_eq!(b"101ms", buf.as_ref());
1299
1300        {
1301            let features = ctx.features.get_or_insert_default();
1302            features.compression_stat = Some(CompressionStat {
1303                in_bytes: 1024,
1304                out_bytes: 500,
1305                duration: Duration::from_millis(5),
1306                ..Default::default()
1307            })
1308        }
1309
1310        buf = BytesMut::new();
1311        ctx.append_log_value(&mut buf, "compression_time");
1312        assert_eq!(b"5", buf.as_ref());
1313
1314        buf = BytesMut::new();
1315        ctx.append_log_value(&mut buf, "compression_time_human");
1316        assert_eq!(b"5ms", buf.as_ref());
1317
1318        buf = BytesMut::new();
1319        ctx.append_log_value(&mut buf, "compression_ratio");
1320        assert_eq!(b"2.0", buf.as_ref());
1321
1322        buf = BytesMut::new();
1323        ctx.timing.cache_lookup = Some(6);
1324        ctx.append_log_value(&mut buf, "cache_lookup_time");
1325        assert_eq!(b"6", buf.as_ref());
1326
1327        buf = BytesMut::new();
1328        ctx.append_log_value(&mut buf, "cache_lookup_time_human");
1329        assert_eq!(b"6ms", buf.as_ref());
1330
1331        buf = BytesMut::new();
1332        ctx.timing.cache_lock = Some(7);
1333        ctx.append_log_value(&mut buf, "cache_lock_time");
1334        assert_eq!(b"7", buf.as_ref());
1335
1336        buf = BytesMut::new();
1337        ctx.append_log_value(&mut buf, "cache_lock_time_human");
1338        assert_eq!(b"7ms", buf.as_ref());
1339    }
1340
1341    #[test]
1342    fn test_add_plugin_processing_time() {
1343        let mut ctx = Ctx::new();
1344        ctx.add_plugin_processing_time("plugin1", 100);
1345        ctx.add_plugin_processing_time("plugin2", 200);
1346        assert_eq!(
1347            ctx.features.unwrap().plugin_processing_times,
1348            Some(vec![
1349                ("plugin1".to_string(), 100),
1350                ("plugin2".to_string(), 200)
1351            ])
1352        );
1353    }
1354
1355    #[test]
1356    fn test_get_digest_detail() {
1357        let mut digest = Digest::default();
1358        let detail = get_digest_detail(&digest);
1359        assert_eq!(detail.connection_reused, false);
1360        assert_eq!(detail.connection_time, 0);
1361        assert_eq!(detail.tcp_established, 0);
1362        assert_eq!(detail.tls_established, 0);
1363        assert_eq!(detail.tls_version, None);
1364        assert_eq!(detail.tls_cipher, None);
1365
1366        digest.timing_digest.push(Some(TimingDigest {
1367            established_ts: SystemTime::UNIX_EPOCH
1368                .checked_add(Duration::from_secs(5))
1369                .unwrap(),
1370        }));
1371        digest.timing_digest.push(Some(TimingDigest {
1372            established_ts: SystemTime::UNIX_EPOCH
1373                .checked_add(Duration::from_secs(3))
1374                .unwrap(),
1375        }));
1376        digest.ssl_digest = Some(Arc::new(SslDigest {
1377            version: "1.3".into(),
1378            cipher: "123".into(),
1379            organization: Some("cloudflare".to_string()),
1380            serial_number: Some(
1381                "0x00000000000000000000000000000abc".to_string(),
1382            ),
1383            cert_digest: vec![],
1384            extension: SslDigestExtension::default(),
1385        }));
1386        let detail = get_digest_detail(&digest);
1387        assert_eq!(detail.connection_reused, true);
1388        assert_eq!(detail.tcp_established, 5000);
1389        assert_eq!(detail.tls_established, 3000);
1390        assert_eq!(detail.tls_version, Some("1.3".to_string()));
1391        assert_eq!(detail.tls_cipher, Some("123".to_string()));
1392    }
1393
1394    #[test]
1395    fn test_modify_body_handler() {
1396        let mut ctx = Ctx::default();
1397
1398        struct TestHandler {}
1399        impl ModifyResponseBody for TestHandler {
1400            fn handle(
1401                &mut self,
1402                _session: &Session,
1403                body: &mut Option<bytes::Bytes>,
1404                _end_of_stream: bool,
1405            ) -> pingora::Result<()> {
1406                *body = Some(Bytes::from("test"));
1407                Ok(())
1408            }
1409        }
1410
1411        ctx.add_modify_body_handler("test", Box::new(TestHandler {}));
1412        assert_eq!(true, ctx.get_modify_body_handler("test").is_some());
1413    }
1414}