Skip to main content

omega_cache/
metrics.rs

1use crossbeam::utils::CachePadded;
2use hdrhistogram::Histogram;
3use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
4use std::sync::atomic::{AtomicU16, AtomicU64, AtomicUsize};
5
6/// Default number of shards if not specified.
7pub const DEFAULT_SHARDS: usize = 4;
8
9/// Default capacity for the circular latency buffer.
10pub const DEFAULT_LATENCY_SAMPLES: usize = 256;
11
12/// A bitmask used to isolate the data portion of a packed 64-bit sample.
13///
14/// This mask covers the lower 48 bits (bits 0–47), providing a valid range
15/// for values from 0 up to 281,474,976,710,655. Any bits above index 47
16/// are zeroed out by this mask.
17pub const SAMPLE_DATA_MASK: u64 = (1 << 48) - 1;
18
19/// The number of bits to shift a 16-bit sequence ID to place it in the
20/// most significant bits of a 64-bit word.
21///
22/// This shift positions the `sequence_id` (generation tag) in bits 48–63,
23/// effectively separating the metadata from the sample value for atomic
24/// updates and ghost data filtering.
25pub const SAMPLE_SEQUENCE_ID_SHIFT: usize = 48;
26
27static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
28
29thread_local! {
30    /// Each thread is assigned a unique ID on its first metrics call to determine
31    /// its shard affinity.
32    static THREAD_ID: usize = NEXT_ID.fetch_add(1, Relaxed);
33}
34
35/// Computes the shard index for the current thread.
36///
37/// Uses a bitwise AND mask, requiring `shards` to be a power of two.
38#[inline]
39fn get_shard_index(shards: usize) -> usize {
40    let thread_id = THREAD_ID.with(|id| *id);
41    let mask = shards - 1;
42    thread_id & mask
43}
44
45/// Configuration parameters for the [`Metrics`] collector.
46#[derive(Debug)]
47pub struct MetricsConfig {
48    /// Total shards for metrics distribution. Must be a power of two.
49    shards: usize,
50    /// Capacity of the circular buffer per shard. Must be a power of two.
51    latency_samples: usize,
52}
53
54impl MetricsConfig {
55    /// Creates a new configuration, enforcing power-of-two constraints.
56    ///
57    /// # Parameters
58    /// - `shards`: Target shard count (rounded up to nearest $2^n$).
59    /// - `latency_samples`: Target buffer size (rounded up to nearest $2^n$).
60    #[inline]
61    pub fn new(shards: usize, latency_samples: usize) -> Self {
62        Self {
63            shards: shards.next_power_of_two(),
64            latency_samples: latency_samples.next_power_of_two(),
65        }
66    }
67}
68
69impl Default for MetricsConfig {
70    fn default() -> Self {
71        Self {
72            shards: DEFAULT_SHARDS,
73            latency_samples: DEFAULT_LATENCY_SAMPLES,
74        }
75    }
76}
77
78/// The telemetry engine for the cache, providing high-concurrency event tracking.
79///
80/// `Metrics` is designed to be owned by the cache and serves as a thread-safe
81/// sink for all operational telemetry. It supports high-frequency invocation
82/// across multiple threads by utilizing a sharded, lock-free architecture.
83///
84/// # Concurrency & Multi-Threaded Ownership
85/// Although owned by a single cache instance, `Metrics` is designed to be
86/// accessed through shared references (`&self`) across all threads interacting
87/// with the cache. It leverages internal mutability via atomics to allow
88/// concurrent updates without requiring a `Mutex` or `RwLock` at the cache level.
89///
90/// # Throughput Scalability
91/// To prevent metrics from becoming a point of contention in multi-core
92/// environments, writes are distributed across independent [`MetricsStorage`]
93/// shards. A thread-local affinity mechanism maps worker threads to specific
94/// shards, localizing atomic increments and minimizing cross-core
95/// synchronization overhead.
96///
97///
98///
99/// # Cache-Line Isolation
100/// Each shard is explicitly wrapped in [`CachePadded`] to ensure it occupies
101/// unique cache lines. This physical isolation prevents "false sharing,"
102/// where unrelated updates to different shards would otherwise trigger
103/// expensive CPU cache-coherency protocols and degrade cache performance.
104///
105/// # Operational Profile
106/// - **Wait-Free Writes:** All recording operations are wait-free, ensuring
107///   telemetry collection never blocks cache lookups or insertions.
108/// - **Eventually Consistent Snapshots:** The [`snapshot()`] method provides
109///   a point-in-time aggregation of all shards. While snapshots are linear
110///   in complexity relative to shard count, recording remains $O(1)$.
111#[derive(Debug)]
112pub struct Metrics {
113    shards: Vec<CachePadded<MetricsStorage>>,
114    config: MetricsConfig,
115}
116
117impl Metrics {
118    /// Initializes the metrics engine with sharded storage.
119    #[inline]
120    pub fn new(config: MetricsConfig) -> Self {
121        let shards = (0..config.shards)
122            .map(|_| CachePadded::new(MetricsStorage::new(&config)))
123            .collect::<Vec<_>>();
124
125        Self { shards, config }
126    }
127
128    /// Increments the cache hit counter for the current thread's assigned shard.
129    ///
130    /// # Concurrency
131    /// This is a **wait-free** $O(1)$ operation. It uses `Relaxed` atomic ordering,
132    /// providing high throughput at the cost of strict sequential consistency.
133    #[inline]
134    pub fn record_hit(&self) {
135        let shard_index = get_shard_index(self.config.shards);
136        self.shards[shard_index].record_hit();
137    }
138
139    /// Increments the cache miss counter for the current thread's assigned shard.
140    ///
141    /// # Concurrency
142    /// This is a **wait-free** $O(1)$ operation. It uses `Relaxed` atomic ordering,
143    /// ensuring that telemetry collection does not stall cache lookups.
144    #[inline]
145    pub fn record_miss(&self) {
146        let shard_index = get_shard_index(self.config.shards);
147        self.shards[shard_index].record_miss();
148    }
149
150    /// Records a cache entry eviction for the current thread's assigned shard.
151    ///
152    /// This should be invoked whenever an item is removed from the cache to
153    /// satisfy capacity constraints.
154    #[inline]
155    pub fn record_eviction(&self) {
156        let shard_index = get_shard_index(self.config.shards);
157        self.shards[shard_index].record_eviction();
158    }
159
160    /// Records a latency measurement into the sampler assigned to the current thread's shard.
161    ///
162    /// This method captures execution timing (e.g., lookup duration or insertion time)
163    /// without blocking the calling thread or incurring the overhead of a global lock.
164    ///
165    /// # Concurrency & Progress
166    /// This is a **wait-free** operation. It utilizes a thread-local shard lookup followed
167    /// by an atomic fetch-and-add on the shard's write cursor. This ensures that even
168    /// under extreme write contention, every thread makes independent progress.
169    ///
170    /// # Sampling Behavior
171    /// Latency is recorded into a lossy, circular buffer ([`Sampler`]). If the buffer
172    /// for the current shard is full, the oldest sample is overwritten. This "lossy"
173    /// property is a deliberate design choice to bound memory usage and prioritize
174    /// write throughput over absolute data retention.
175    ///
176    ///
177    ///
178    /// # Arguments
179    /// * `latency` - The raw timing value to record. Units (ns, us, ms) should remain
180    ///   consistent across all calls. A value of `0` is ignored during the [`snapshot()`]
181    ///   aggregation to prevent uninitialized data from skewing percentiles.
182    #[inline]
183    pub fn record_latency(&self, latency: u64) {
184        let shard_index = get_shard_index(self.config.shards);
185        self.shards[shard_index].record_latency(latency);
186    }
187
188    /// Performs a non-destructive aggregation of all sharded metrics into a
189    /// consistent [`MetricsSnapshot`].
190    ///
191    /// # Performance
192    /// This method is $O(S + N)$, where $S$ is the number of shards and $N$ is
193    /// the total capacity of the latency samplers. Because it iterates over
194    /// all shards and populates an [`HdrHistogram`], it is significantly more
195    /// expensive than recording methods and should typically be called from
196    /// a background reporting thread.
197    ///
198    /// # Consistency
199    /// The snapshot provides a **point-in-time** view. However, because shards
200    /// are read sequentially without a global lock, the snapshot may not represent
201    /// a single atomic instant across the entire cache. This is a standard
202    /// trade-off in high-performance telemetry systems.
203    #[inline]
204    pub fn snapshot(&self) -> MetricsSnapshot {
205        let mut hit_count: u64 = 0;
206        let mut miss_count: u64 = 0;
207        let mut eviction_count: u64 = 0;
208        let mut latency_histogram = create_latency_histogram();
209
210        for metrics_storage in &self.shards {
211            hit_count = hit_count.saturating_add(metrics_storage.hit_count());
212            miss_count = miss_count.saturating_add(metrics_storage.miss_count());
213            eviction_count = eviction_count.saturating_add(metrics_storage.eviction_count());
214
215            metrics_storage.write_latency_samples(&mut latency_histogram);
216        }
217
218        MetricsSnapshot {
219            hit_count,
220            miss_count,
221            eviction_count,
222            latency_histogram,
223        }
224    }
225}
226
227impl Default for Metrics {
228    fn default() -> Self {
229        let config = MetricsConfig {
230            shards: DEFAULT_SHARDS,
231            latency_samples: DEFAULT_LATENCY_SAMPLES,
232        };
233
234        Metrics::new(config)
235    }
236}
237
238/// Common latency percentiles used for performance analysis.
239#[derive(Debug, Clone, Copy, PartialEq)]
240pub enum LatencyPercentile {
241    /// The 50th percentile, or the median value.
242    P50,
243    /// The 90th percentile.
244    P90,
245    /// The 99th percentile (standard tail latency metric).
246    P99,
247    /// The 99.9th percentile (extreme tail latency).
248    P999,
249}
250
251impl LatencyPercentile {
252    /// Returns the float value (0.0 - 1.0) required by HdrHistogram.
253    fn as_quantile(&self) -> f64 {
254        match self {
255            Self::P50 => 0.50,
256            Self::P90 => 0.90,
257            Self::P99 => 0.99,
258            Self::P999 => 0.999,
259        }
260    }
261}
262
263/// A read-only, point-in-time representation of [`Metrics`].
264///
265/// Created via [`Metrics::snapshot`] or [`Metrics::into`].
266#[derive(Debug)]
267pub struct MetricsSnapshot {
268    hit_count: u64,
269    miss_count: u64,
270    eviction_count: u64,
271    latency_histogram: Histogram<u64>,
272}
273
274impl MetricsSnapshot {
275    /// Returns the total number of hits recorded at the time of the snapshot.
276    #[inline]
277    pub fn hit_count(&self) -> u64 {
278        self.hit_count
279    }
280
281    /// Returns the total number of misses recorded at the time of the snapshot.
282    #[inline]
283    pub fn miss_count(&self) -> u64 {
284        self.miss_count
285    }
286
287    /// Returns the total number of evictions recorded at the time of the snapshot.
288    #[inline]
289    pub fn eviction_count(&self) -> u64 {
290        self.eviction_count
291    }
292
293    /// Calculates the miss rate ($misses / (hits + misses)$).
294    ///
295    /// Returns `0.0` if no activity was recorded to prevent division-by-zero errors.
296    pub fn miss_rate(&self) -> f64 {
297        let total = self.hit_count + self.miss_count;
298        if total == 0 {
299            0.0
300        } else {
301            self.miss_count as f64 / total as f64
302        }
303    }
304
305    /// Calculates the hit rate ($hits / (hits + misses)$).
306    ///
307    /// Returns `0.0` if no activity was recorded.
308    pub fn hit_rate(&self) -> f64 {
309        let total = self.hit_count + self.miss_count;
310        if total == 0 {
311            0.0
312        } else {
313            self.hit_count as f64 / total as f64
314        }
315    }
316
317    /// Returns the latency value at a specific percentile using a type-safe enum.
318    ///
319    /// This is the preferred method for monitoring cache performance, as it
320    /// explicitly categorizes measurements into median, tail, and extreme tail latencies.
321    ///
322    /// # Performance
323    /// The lookup is $O(1)$ relative to the number of samples in the histogram.
324    #[inline]
325    pub fn latency(&self, percentile: LatencyPercentile) -> u64 {
326        self.latency_histogram
327            .value_at_quantile(percentile.as_quantile())
328    }
329}
330
331impl From<&Metrics> for MetricsSnapshot {
332    fn from(metrics: &Metrics) -> Self {
333        metrics.snapshot()
334    }
335}
336
337/// Internal thread-safe container for a single shard's metric data.
338///
339/// `MetricsStorage` manages the primary counters (hits, misses, evictions) and
340/// the latency sampler for a subset of the cache's traffic. It is designed to
341/// be held within a [`CachePadded`] wrapper to ensure physical isolation
342/// from other shards.
343///
344/// # Field-Level Isolation
345/// To prevent internal contention between different types of events (e.g.,
346/// a hit and an eviction happening on the same shard), each `AtomicU64`
347/// counter is individually wrapped in [`CachePadded`]. This ensures that
348/// `hit_count`, `miss_count`, and `eviction_count` reside on distinct
349/// cache lines.
350///
351///
352///
353/// # Memory Ordering
354/// All operations utilize [`Relaxed`] memory ordering. This is appropriate
355/// for metrics where the absolute global order of increments across shards
356/// is less important than the total cumulative throughput.
357#[derive(Debug)]
358struct MetricsStorage {
359    /// Atomic counter for cache hits. Isolated to its own cache line.
360    hit_count: CachePadded<AtomicU64>,
361    /// Atomic counter for cache misses. Isolated to its own cache line.
362    miss_count: CachePadded<AtomicU64>,
363    /// Atomic counter for entry evictions. Isolated to its own cache line.
364    eviction_count: CachePadded<AtomicU64>,
365    /// Circular buffer for wait-free latency sampling.
366    latency_sampler: Sampler,
367}
368
369impl MetricsStorage {
370    /// Initializes a new shard of metric counters and a latency sampler.
371    ///
372    /// Each atomic counter is initialized to zero and isolated within
373    /// its own [`CachePadded`] block to ensure that concurrent updates to
374    /// hits, misses, and evictions do not interfere with each other.
375    fn new(config: &MetricsConfig) -> Self {
376        Self {
377            hit_count: CachePadded::new(AtomicU64::default()),
378            miss_count: CachePadded::new(AtomicU64::default()),
379            eviction_count: CachePadded::new(AtomicU64::default()),
380            latency_sampler: Sampler::new(config.latency_samples),
381        }
382    }
383
384    /// Loads the current hit count from this shard.
385    ///
386    /// # Memory Ordering
387    /// Uses [`Relaxed`] ordering, providing an eventually consistent
388    /// view of the counter without memory fence overhead.
389    #[inline]
390    fn hit_count(&self) -> u64 {
391        self.hit_count.load(Relaxed)
392    }
393
394    /// Atomically increments the hit counter.
395    ///
396    ///
397    #[inline]
398    fn record_hit(&self) {
399        self.hit_count.fetch_add(1, Relaxed);
400    }
401
402    /// Loads the current miss count from this shard.
403    #[inline]
404    fn miss_count(&self) -> u64 {
405        self.miss_count.load(Relaxed)
406    }
407
408    /// Atomically increments the miss counter.
409    #[inline]
410    fn record_miss(&self) {
411        self.miss_count.fetch_add(1, Relaxed);
412    }
413
414    /// Loads the current eviction count from this shard.
415    #[inline]
416    fn eviction_count(&self) -> u64 {
417        self.eviction_count.load(Relaxed)
418    }
419
420    /// Atomically increments the eviction counter.
421    #[inline]
422    fn record_eviction(&self) {
423        self.eviction_count.fetch_add(1, Relaxed);
424    }
425
426    /// Forwards a latency measurement to the shard's internal [`Sampler`].
427    ///
428    /// This is a wait-free operation that records timing data into
429    /// a circular buffer.
430    #[inline]
431    fn record_latency(&self, latency: u64) {
432        self.latency_sampler.record(latency);
433    }
434
435    /// Aggregates all non-zero latency samples from this shard into the
436    /// provided [`Histogram`].
437    ///
438    /// This is a non-destructive read; the sampler's state is preserved
439    /// for potential subsequent snapshots.
440    #[inline]
441    fn write_latency_samples(&self, histogram: &mut Histogram<u64>) {
442        self.latency_sampler.write_samples(histogram);
443    }
444}
445
446/// A wait-free, lossy circular buffer for capturing telemetry samples.
447///
448/// `Sampler` provides a high-throughput mechanism for recording numeric data
449/// in environments with massive write contention. It prioritizes progress
450/// for producers (threads recording data) over absolute data retention.
451///
452/// # Concurrency Design
453/// The buffer is **wait-free**. It uses a single atomic [`head`] pointer
454/// to reserve slots in the `samples` vector. Multiple producers can
455/// concurrently claim indices and store values without blocking or
456/// needing to coordinate beyond a single `fetch_add`.
457///
458///
459///
460/// # Lossy Buffer Mechanics
461/// Once the buffer reaches its capacity ($2^n$), the `head` index wraps
462/// around, and new samples overwrite the oldest data. This ensures
463/// memory usage remains constant regardless of the volume of events.
464///
465/// # Significant Implementation Details
466/// - **Masking:** Indexing uses bitwise `& mask` instead of the remainder
467///   operator `%`. This requires the internal capacity to be a power of two,
468///   a constraint enforced during initialization.
469/// - **Zero-Value Sentinels:** During aggregation ([`write_samples`]),
470///   values of `0` are ignored. This identifies uninitialized slots or
471///   intentionally skipped samples.
472/// - **Read Consistency:** Snapshotting iterates over the buffer with
473///   [`Relaxed`] loads. While a snapshot is being taken, producers may
474///   be overwriting values, making the snapshot an "eventually consistent"
475///   view of the buffer's state.
476#[derive(Debug)]
477pub struct Sampler {
478    /// The underlying storage for samples.
479    /// Note: The values themselves are atomic to allow concurrent reads/writes.
480    samples: Vec<AtomicU64>,
481    /// The monotonic write cursor. Padded to prevent the "Hot Head"
482    /// contention from affecting adjacent memory.
483    head: CachePadded<AtomicUsize>,
484    sequence_id: CachePadded<AtomicU16>,
485    /// Bitmask for fast index calculation ($capacity - 1$).
486    mask: usize,
487}
488
489impl Sampler {
490    /// Creates a new `Sampler` with a capacity rounded to the nearest power of two.
491    ///
492    /// The actual capacity is $2^n$, ensuring that bitwise masking can be used
493    /// for fast index calculation.
494    pub fn new(capacity: usize) -> Self {
495        let len = capacity.next_power_of_two();
496        let mut values = Vec::with_capacity(len);
497        for _ in 0..len {
498            values.push(AtomicU64::new(0));
499        }
500
501        Self {
502            samples: values,
503            head: CachePadded::new(AtomicUsize::new(0)),
504            sequence_id: CachePadded::new(AtomicU16::new(1)),
505            mask: len - 1,
506        }
507    }
508
509    /// Records a value into the circular buffer using a bit-packed generation tag.
510    ///
511    /// This operation is **wait-free**, ensuring that high-frequency writers are never
512    /// blocked by concurrent readers or other writers. It uniquely tags each sample
513    /// with a `sequence_id` (lap count) to prevent "ghost reads"—where stale data
514    /// from a previous rotation of the buffer is incorrectly included in a new snapshot.
515    ///
516    /// # Bit-Packing Layout
517    /// The 64-bit atomic slot is partitioned to allow single-word updates:
518    /// * **Bits 63–48 (16 bits):** `sequence_id`. Acts as a filter to validate data "freshness."
519    /// * **Bits 47–0 (48 bits):** `data`. Supports measurements up to $2^{48} - 1$ (e.g., $\approx 281$ TB or 281 trillion nanoseconds).
520    ///
521    ///
522    ///
523    /// # Performance & Safety
524    /// * **Efficiency:** Uses a bitwise mask (`head & mask`) for indexing, avoiding
525    ///   expensive integer division. This requires the buffer size to be a power of two.
526    /// * **Memory Ordering:** Uses `Relaxed` for the index increment to minimize
527    ///   cache-line contention, while `Acquire` on the `sequence_id` ensures the writer
528    ///   is synchronized with the current global lap.
529    ///
530    /// # Examples
531    /// Since the value is masked by `SAMPLER_VALUE_MASK`, any bits higher than 47
532    /// provided in the `value` argument will be truncated to ensure the `sequence_id`
533    /// remains uncorrupted.
534    #[inline]
535    pub fn record(&self, data: u64) {
536        let head = self.head.fetch_add(1, Relaxed);
537        let sequence_id = self.sequence_id.load(Acquire);
538
539        let index = head & self.mask;
540        let packed = sample_pack(sequence_id, data);
541        self.samples[index].store(packed, Relaxed);
542    }
543
544    /// Returns the power-of-two capacity of the sampler.
545    #[inline]
546    pub fn capacity(&self) -> usize {
547        self.samples.len()
548    }
549
550    /// Aggregates samples from the buffer that match the generation active at the start of the call.
551    ///
552    /// # Ghost Data Prevention
553    /// This method implements a **generation-flip snapshot**. By atomically incrementing
554    /// the `sequence_id` at the entry point, it captures the ID of the just-completed
555    /// generation. During iteration, it filters the buffer to prevent:
556    /// * **Stale Data (Ghosts):** Samples with a tag smaller than `sequency_id` are ignored.
557    /// * **Future Data:** Samples with a tag larger than `sequency_id` (from writers that
558    ///   started after this read began) trigger an early `break`.
559    ///
560    /// # Concurrency & Performance
561    /// * **Wait-Free:** Writers are never blocked. They simply begin tagging new
562    ///   samples with the next generation ID while this reader processes the previous one.
563    /// * **Early Exit:** The `break` condition optimizes for cases where writers
564    ///   rapidly overtake the reader, preventing unnecessary iteration over "future" slots.
565    /// * **Memory Ordering:** Uses `Release` on the increment to ensure subsequent
566    ///   writes in the next generation are ordered after this snapshot's boundary.
567    ///
568    /// # Panics
569    /// Does not panic, though it assumes the buffer is not so large that the reader
570    /// cannot complete a pass before the 16-bit `sequence_id` wraps around.
571    #[inline]
572    pub fn write_samples(&self, histogram: &mut Histogram<u64>) {
573        let global_sequence_id = self.sequence_id.fetch_add(1, Release);
574
575        for sample in &self.samples {
576            let (sample_sequence_id, data) = sample_unpack(sample.load(Relaxed));
577
578            if sample_sequence_id > global_sequence_id {
579                break;
580            }
581
582            if data > 0 && sample_sequence_id == global_sequence_id {
583                let _ = histogram.record(data);
584            }
585        }
586    }
587}
588
589/// Packs a 16-bit sequence ID and a 48-bit data value into a single 64-bit word.
590///
591/// # Invariants
592/// If the provided `data` exceeds the 48-bit range ($> 2^{48}-1$), the high bits
593/// are truncated via `SAMPLE_DATA_MASK` to prevent corruption of the `sequence_id`.
594fn sample_pack(sequence_id: u16, data: u64) -> u64 {
595    (data & SAMPLE_DATA_MASK) | ((sequence_id as u64) << SAMPLE_SEQUENCE_ID_SHIFT)
596}
597
598/// Unpacks a 64-bit word into its constituent 16-bit sequence ID and 48-bit data value.
599///
600/// This is the inverse of [`sample_pack`]. It extracts the generation tag used
601/// for ghost data prevention and the actual metric value.
602fn sample_unpack(packed: u64) -> (u16, u64) {
603    let sequence_id = (packed >> SAMPLE_SEQUENCE_ID_SHIFT) as u16;
604    let data = packed & SAMPLE_DATA_MASK;
605    (sequence_id, data)
606}
607
608/// Creates a new latency histogram with a standardized range and precision.
609///
610/// # Configuration
611/// - **Range:** 1ms to 10,000ms (10 seconds).
612/// - **Precision:** 2 significant figures (guarantees $\le 1\%$ error).
613///
614/// # Returns
615/// A configured [`Histogram<u64>`].
616///
617/// # Panics
618/// Panics if the internal bounds are invalid (though 1, 10000, 2 are verified constants).
619#[inline]
620pub fn create_latency_histogram() -> Histogram<u64> {
621    const MIN_LATENCY: u64 = 1;
622    const MAX_LATENCY: u64 = 10_000;
623    const PRECISION: u8 = 2;
624
625    Histogram::new_with_bounds(MIN_LATENCY, MAX_LATENCY, PRECISION)
626        .expect("Failed to initialize latency histogram with standard bounds")
627}
628
629#[cfg(test)]
630mod tests {
631    use super::*;
632    use rand::RngExt;
633    use std::sync::Barrier;
634    use std::sync::{Arc, Mutex};
635    use std::thread;
636
637    /// Verifies that configuration values for shards and samples are automatically
638    /// rounded up to the nearest power of two to support bitmask indexing.
639    #[test]
640    fn test_config_rounding() {
641        let config = MetricsConfig::new(7, 1000);
642
643        assert_eq!(config.shards, 8);
644        assert_eq!(config.latency_samples, 1024);
645    }
646
647    /// Ensures that the hit counter is thread-safe and accurately aggregates
648    /// increments from multiple concurrent producers.
649    #[test]
650    fn test_multithreaded_hit_counter() {
651        let metrics = Metrics::default();
652
653        thread::scope(|s| {
654            for _ in 0..10 {
655                s.spawn(|| {
656                    for _ in 0..1000 {
657                        metrics.record_hit();
658                    }
659                });
660            }
661        });
662
663        let snapshot = metrics.snapshot();
664        assert_eq!(snapshot.hit_count(), 10000);
665    }
666
667    /// Ensures that the miss counter is thread-safe across concurrent updates.
668    #[test]
669    fn test_multithreaded_miss_counter() {
670        let metrics = Metrics::default();
671
672        thread::scope(|s| {
673            for _ in 0..10 {
674                s.spawn(|| {
675                    for _ in 0..1000 {
676                        metrics.record_miss();
677                    }
678                });
679            }
680        });
681
682        let snapshot = metrics.snapshot();
683        assert_eq!(snapshot.miss_count(), 10000);
684    }
685
686    /// Ensures that the eviction counter is thread-safe across concurrent updates.
687    #[test]
688    fn test_multithreaded_eviction_counter() {
689        let metrics = Metrics::default();
690
691        thread::scope(|s| {
692            for _ in 0..10 {
693                s.spawn(|| {
694                    for _ in 0..1000 {
695                        metrics.record_eviction();
696                    }
697                });
698            }
699        });
700
701        let snapshot = metrics.snapshot();
702        assert_eq!(snapshot.eviction_count(), 10000);
703    }
704
705    /// Tests the sharding mechanism under heavy contention.
706    /// Uses a Barrier to ensure all threads begin writing simultaneously.
707    #[test]
708    fn test_high_contention_counters() {
709        let metrics = Metrics::new(MetricsConfig::new(4, 1024));
710        let num_threads = 8;
711        let barrier = Barrier::new(num_threads);
712
713        thread::scope(|s| {
714            for _ in 0..num_threads {
715                s.spawn(|| {
716                    barrier.wait();
717                    for _ in 0..1000 {
718                        metrics.record_hit();
719                    }
720                });
721            }
722        });
723
724        assert_eq!(metrics.snapshot().hit_count(), 8000);
725    }
726
727    /// Verifies that the snapshot correctly maps raw latency samples to
728    /// statistical percentiles using the HdrHistogram backend.
729    #[test]
730    fn test_percentile_logic() {
731        let metrics = Metrics::default();
732        for i in 1..=100 {
733            metrics.record_latency(i);
734        }
735        let snap = metrics.snapshot();
736
737        // Allow for small rounding errors inherent in histogram bucketing
738        assert!(snap.latency(LatencyPercentile::P50) >= 49);
739        assert!(snap.latency(LatencyPercentile::P99) >= 98);
740    }
741
742    /// Validates the floating-point calculations for hit and miss rates.
743    #[test]
744    fn test_hit_and_miss_rates() {
745        let metrics = Metrics::new(MetricsConfig::new(4, 1024));
746
747        for _ in 0..80 {
748            metrics.record_hit();
749        }
750        for _ in 0..20 {
751            metrics.record_miss();
752        }
753
754        let snap = metrics.snapshot();
755
756        assert_eq!(snap.hit_count(), 80);
757        assert_eq!(snap.miss_count(), 20);
758        assert!((snap.hit_rate() - 0.8).abs() < f64::EPSILON);
759        assert!((snap.miss_rate() - 0.2).abs() < f64::EPSILON);
760    }
761
762    /// Simple validation for the eviction counter recording.
763    #[test]
764    fn test_eviction_tracking() {
765        let metrics = Metrics::default();
766
767        for _ in 0..15 {
768            metrics.record_eviction();
769        }
770
771        let snap = metrics.snapshot();
772        assert_eq!(snap.eviction_count(), 15);
773    }
774
775    /// Verifies that the latency distribution accurately reflects the recorded samples.
776    #[test]
777    fn test_latency_distribution() {
778        let metrics = Metrics::new(MetricsConfig::new(1, 1024));
779
780        for i in 1..=10 {
781            metrics.record_latency(i * 10);
782        }
783
784        let snap = metrics.snapshot();
785
786        let p50 = snap.latency(LatencyPercentile::P50);
787        let p99 = snap.latency(LatencyPercentile::P99);
788
789        assert!((49..=51).contains(&p50));
790        assert!((99..=101).contains(&p99));
791    }
792
793    /// Ensures that taking a snapshot when no data has been recorded
794    /// does not result in panics or invalid math (NaN).
795    #[test]
796    fn test_empty_snapshot_safety() {
797        let metrics = Metrics::default();
798        let snap = metrics.snapshot();
799
800        assert_eq!(snap.hit_count(), 0);
801        assert_eq!(snap.hit_rate(), 0.0);
802        assert_eq!(snap.miss_rate(), 0.0);
803        assert_eq!(snap.latency(LatencyPercentile::P50), 0);
804    }
805
806    /// Confirms that snapshots are independent "point-in-time" views
807    /// and do not clear or mutate the underlying metric state.
808    #[test]
809    fn test_snapshot_independence() {
810        let metrics = Metrics::default();
811
812        metrics.record_hit();
813        let snap1 = metrics.snapshot();
814
815        metrics.record_hit();
816        let snap2 = metrics.snapshot();
817
818        assert_eq!(snap1.hit_count(), 1);
819        assert_eq!(snap2.hit_count(), 2);
820    }
821
822    /// Integration test to verify concurrent latency accumulation, snapshot isolation,
823    /// and the efficacy of the ghost data prevention algorithm.
824    ///
825    /// This test confirms that:
826    /// 1. **Snapshot Independence:** Snapshots are point-in-time views that do not
827    ///    clear or mutate the underlying metric state, yet remain isolated by generation.
828    /// 2. **Ghost Prevention:** The 16-bit generation tag correctly identifies and
829    ///    excludes stale data from previous buffer rotations.
830    /// 3. **Statistical Integrity:** Percentiles (P50, P90, P99, P999) calculated from
831    ///    sharded, lock-free storage match a Mutex-protected ground truth.
832    /// 4. **Wait-Free Progress:** High-contention writes via a `Barrier` do not result
833    ///    in data corruption or lost updates due to bit-packing race conditions.
834    #[test]
835    fn test_metrics_collect_latency_metrics() {
836        let config = MetricsConfig::new(4, 1024);
837        let metrics = Arc::new(Metrics::new(config));
838
839        let num_threads = 10;
840        let samples_per_thread = 200;
841        let barrier = Arc::new(Barrier::new(num_threads));
842
843        for _ in 0..10 {
844            let histogram: Mutex<Histogram<u64>> = Mutex::new(create_latency_histogram());
845
846            thread::scope(|s| {
847                for _ in 0..num_threads {
848                    s.spawn({
849                        let metrics = metrics.clone();
850                        let barrier = barrier.clone();
851                        let histogram = &histogram;
852
853                        move || {
854                            let mut rng = rand::rng();
855                            let mut written_values = Vec::with_capacity(samples_per_thread);
856                            barrier.wait();
857
858                            for _ in 0..samples_per_thread {
859                                // Simulate latency between 100us and 1000us
860                                let latency = rng.random_range(100..1000);
861                                metrics.record_latency(latency);
862                                written_values.push(latency);
863                            }
864
865                            let mut guard = histogram.lock().expect("cannot acquire lock");
866
867                            for value in written_values {
868                                guard.record(value).expect("cannot write value");
869                            }
870
871                            drop(guard)
872                        }
873                    });
874                }
875            });
876
877            let snapshot = metrics.snapshot();
878
879            let percentiles = [
880                LatencyPercentile::P50,
881                LatencyPercentile::P90,
882                LatencyPercentile::P99,
883                LatencyPercentile::P999,
884            ];
885
886            let guard = histogram.lock().expect("cannot acquire the lock");
887
888            for p in percentiles {
889                let actual = snapshot.latency(p);
890                let expected = guard.value_at_quantile(p.as_quantile());
891
892                // Assert that the lock-free sampler matches the controlled histogram
893                assert!(
894                    (actual as i64 - expected as i64).abs() <= 1,
895                    "Percentile {:?} mismatch: actual {}, expected {}",
896                    p,
897                    actual,
898                    expected
899                );
900            }
901        }
902    }
903}