omega_cache/metrics.rs
1use crossbeam::utils::CachePadded;
2use hdrhistogram::Histogram;
3use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
4use std::sync::atomic::{AtomicU16, AtomicU64, AtomicUsize};
5
6/// Default number of shards if not specified.
7pub const DEFAULT_SHARDS: usize = 4;
8
9/// Default capacity for the circular latency buffer.
10pub const DEFAULT_LATENCY_SAMPLES: usize = 256;
11
12/// A bitmask used to isolate the data portion of a packed 64-bit sample.
13///
14/// This mask covers the lower 48 bits (bits 0–47), providing a valid range
15/// for values from 0 up to 281,474,976,710,655. Any bits above index 47
16/// are zeroed out by this mask.
17pub const SAMPLE_DATA_MASK: u64 = (1 << 48) - 1;
18
19/// The number of bits to shift a 16-bit sequence ID to place it in the
20/// most significant bits of a 64-bit word.
21///
22/// This shift positions the `sequence_id` (generation tag) in bits 48–63,
23/// effectively separating the metadata from the sample value for atomic
24/// updates and ghost data filtering.
25pub const SAMPLE_SEQUENCE_ID_SHIFT: usize = 48;
26
27static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
28
29thread_local! {
30 /// Each thread is assigned a unique ID on its first metrics call to determine
31 /// its shard affinity.
32 static THREAD_ID: usize = NEXT_ID.fetch_add(1, Relaxed);
33}
34
35/// Computes the shard index for the current thread.
36///
37/// Uses a bitwise AND mask, requiring `shards` to be a power of two.
38#[inline]
39fn get_shard_index(shards: usize) -> usize {
40 let thread_id = THREAD_ID.with(|id| *id);
41 let mask = shards - 1;
42 thread_id & mask
43}
44
45/// Configuration parameters for the [`Metrics`] collector.
46#[derive(Debug)]
47pub struct MetricsConfig {
48 /// Total shards for metrics distribution. Must be a power of two.
49 shards: usize,
50 /// Capacity of the circular buffer per shard. Must be a power of two.
51 latency_samples: usize,
52}
53
54impl MetricsConfig {
55 /// Creates a new configuration, enforcing power-of-two constraints.
56 ///
57 /// # Parameters
58 /// - `shards`: Target shard count (rounded up to nearest $2^n$).
59 /// - `latency_samples`: Target buffer size (rounded up to nearest $2^n$).
60 #[inline]
61 pub fn new(shards: usize, latency_samples: usize) -> Self {
62 Self {
63 shards: shards.next_power_of_two(),
64 latency_samples: latency_samples.next_power_of_two(),
65 }
66 }
67}
68
69impl Default for MetricsConfig {
70 fn default() -> Self {
71 Self {
72 shards: DEFAULT_SHARDS,
73 latency_samples: DEFAULT_LATENCY_SAMPLES,
74 }
75 }
76}
77
78/// The telemetry engine for the cache, providing high-concurrency event tracking.
79///
80/// `Metrics` is designed to be owned by the cache and serves as a thread-safe
81/// sink for all operational telemetry. It supports high-frequency invocation
82/// across multiple threads by utilizing a sharded, lock-free architecture.
83///
84/// # Concurrency & Multi-Threaded Ownership
85/// Although owned by a single cache instance, `Metrics` is designed to be
86/// accessed through shared references (`&self`) across all threads interacting
87/// with the cache. It leverages internal mutability via atomics to allow
88/// concurrent updates without requiring a `Mutex` or `RwLock` at the cache level.
89///
90/// # Throughput Scalability
91/// To prevent metrics from becoming a point of contention in multi-core
92/// environments, writes are distributed across independent [`MetricsStorage`]
93/// shards. A thread-local affinity mechanism maps worker threads to specific
94/// shards, localizing atomic increments and minimizing cross-core
95/// synchronization overhead.
96///
97///
98///
99/// # Cache-Line Isolation
100/// Each shard is explicitly wrapped in [`CachePadded`] to ensure it occupies
101/// unique cache lines. This physical isolation prevents "false sharing,"
102/// where unrelated updates to different shards would otherwise trigger
103/// expensive CPU cache-coherency protocols and degrade cache performance.
104///
105/// # Operational Profile
106/// - **Wait-Free Writes:** All recording operations are wait-free, ensuring
107/// telemetry collection never blocks cache lookups or insertions.
108/// - **Eventually Consistent Snapshots:** The [`snapshot()`] method provides
109/// a point-in-time aggregation of all shards. While snapshots are linear
110/// in complexity relative to shard count, recording remains $O(1)$.
111#[derive(Debug)]
112pub struct Metrics {
113 shards: Vec<CachePadded<MetricsStorage>>,
114 config: MetricsConfig,
115}
116
117impl Metrics {
118 /// Initializes the metrics engine with sharded storage.
119 #[inline]
120 pub fn new(config: MetricsConfig) -> Self {
121 let shards = (0..config.shards)
122 .map(|_| CachePadded::new(MetricsStorage::new(&config)))
123 .collect::<Vec<_>>();
124
125 Self { shards, config }
126 }
127
128 /// Increments the cache hit counter for the current thread's assigned shard.
129 ///
130 /// # Concurrency
131 /// This is a **wait-free** $O(1)$ operation. It uses `Relaxed` atomic ordering,
132 /// providing high throughput at the cost of strict sequential consistency.
133 #[inline]
134 pub fn record_hit(&self) {
135 let shard_index = get_shard_index(self.config.shards);
136 self.shards[shard_index].record_hit();
137 }
138
139 /// Increments the cache miss counter for the current thread's assigned shard.
140 ///
141 /// # Concurrency
142 /// This is a **wait-free** $O(1)$ operation. It uses `Relaxed` atomic ordering,
143 /// ensuring that telemetry collection does not stall cache lookups.
144 #[inline]
145 pub fn record_miss(&self) {
146 let shard_index = get_shard_index(self.config.shards);
147 self.shards[shard_index].record_miss();
148 }
149
150 /// Records a cache entry eviction for the current thread's assigned shard.
151 ///
152 /// This should be invoked whenever an item is removed from the cache to
153 /// satisfy capacity constraints.
154 #[inline]
155 pub fn record_eviction(&self) {
156 let shard_index = get_shard_index(self.config.shards);
157 self.shards[shard_index].record_eviction();
158 }
159
160 /// Records a latency measurement into the sampler assigned to the current thread's shard.
161 ///
162 /// This method captures execution timing (e.g., lookup duration or insertion time)
163 /// without blocking the calling thread or incurring the overhead of a global lock.
164 ///
165 /// # Concurrency & Progress
166 /// This is a **wait-free** operation. It utilizes a thread-local shard lookup followed
167 /// by an atomic fetch-and-add on the shard's write cursor. This ensures that even
168 /// under extreme write contention, every thread makes independent progress.
169 ///
170 /// # Sampling Behavior
171 /// Latency is recorded into a lossy, circular buffer ([`Sampler`]). If the buffer
172 /// for the current shard is full, the oldest sample is overwritten. This "lossy"
173 /// property is a deliberate design choice to bound memory usage and prioritize
174 /// write throughput over absolute data retention.
175 ///
176 ///
177 ///
178 /// # Arguments
179 /// * `latency` - The raw timing value to record. Units (ns, us, ms) should remain
180 /// consistent across all calls. A value of `0` is ignored during the [`snapshot()`]
181 /// aggregation to prevent uninitialized data from skewing percentiles.
182 #[inline]
183 pub fn record_latency(&self, latency: u64) {
184 let shard_index = get_shard_index(self.config.shards);
185 self.shards[shard_index].record_latency(latency);
186 }
187
188 /// Performs a non-destructive aggregation of all sharded metrics into a
189 /// consistent [`MetricsSnapshot`].
190 ///
191 /// # Performance
192 /// This method is $O(S + N)$, where $S$ is the number of shards and $N$ is
193 /// the total capacity of the latency samplers. Because it iterates over
194 /// all shards and populates an [`HdrHistogram`], it is significantly more
195 /// expensive than recording methods and should typically be called from
196 /// a background reporting thread.
197 ///
198 /// # Consistency
199 /// The snapshot provides a **point-in-time** view. However, because shards
200 /// are read sequentially without a global lock, the snapshot may not represent
201 /// a single atomic instant across the entire cache. This is a standard
202 /// trade-off in high-performance telemetry systems.
203 #[inline]
204 pub fn snapshot(&self) -> MetricsSnapshot {
205 let mut hit_count: u64 = 0;
206 let mut miss_count: u64 = 0;
207 let mut eviction_count: u64 = 0;
208 let mut latency_histogram = create_latency_histogram();
209
210 for metrics_storage in &self.shards {
211 hit_count = hit_count.saturating_add(metrics_storage.hit_count());
212 miss_count = miss_count.saturating_add(metrics_storage.miss_count());
213 eviction_count = eviction_count.saturating_add(metrics_storage.eviction_count());
214
215 metrics_storage.write_latency_samples(&mut latency_histogram);
216 }
217
218 MetricsSnapshot {
219 hit_count,
220 miss_count,
221 eviction_count,
222 latency_histogram,
223 }
224 }
225}
226
227impl Default for Metrics {
228 fn default() -> Self {
229 let config = MetricsConfig {
230 shards: DEFAULT_SHARDS,
231 latency_samples: DEFAULT_LATENCY_SAMPLES,
232 };
233
234 Metrics::new(config)
235 }
236}
237
238/// Common latency percentiles used for performance analysis.
239#[derive(Debug, Clone, Copy, PartialEq)]
240pub enum LatencyPercentile {
241 /// The 50th percentile, or the median value.
242 P50,
243 /// The 90th percentile.
244 P90,
245 /// The 99th percentile (standard tail latency metric).
246 P99,
247 /// The 99.9th percentile (extreme tail latency).
248 P999,
249}
250
251impl LatencyPercentile {
252 /// Returns the float value (0.0 - 1.0) required by HdrHistogram.
253 fn as_quantile(&self) -> f64 {
254 match self {
255 Self::P50 => 0.50,
256 Self::P90 => 0.90,
257 Self::P99 => 0.99,
258 Self::P999 => 0.999,
259 }
260 }
261}
262
263/// A read-only, point-in-time representation of [`Metrics`].
264///
265/// Created via [`Metrics::snapshot`] or [`Metrics::into`].
266#[derive(Debug)]
267pub struct MetricsSnapshot {
268 hit_count: u64,
269 miss_count: u64,
270 eviction_count: u64,
271 latency_histogram: Histogram<u64>,
272}
273
274impl MetricsSnapshot {
275 /// Returns the total number of hits recorded at the time of the snapshot.
276 #[inline]
277 pub fn hit_count(&self) -> u64 {
278 self.hit_count
279 }
280
281 /// Returns the total number of misses recorded at the time of the snapshot.
282 #[inline]
283 pub fn miss_count(&self) -> u64 {
284 self.miss_count
285 }
286
287 /// Returns the total number of evictions recorded at the time of the snapshot.
288 #[inline]
289 pub fn eviction_count(&self) -> u64 {
290 self.eviction_count
291 }
292
293 /// Calculates the miss rate ($misses / (hits + misses)$).
294 ///
295 /// Returns `0.0` if no activity was recorded to prevent division-by-zero errors.
296 pub fn miss_rate(&self) -> f64 {
297 let total = self.hit_count + self.miss_count;
298 if total == 0 {
299 0.0
300 } else {
301 self.miss_count as f64 / total as f64
302 }
303 }
304
305 /// Calculates the hit rate ($hits / (hits + misses)$).
306 ///
307 /// Returns `0.0` if no activity was recorded.
308 pub fn hit_rate(&self) -> f64 {
309 let total = self.hit_count + self.miss_count;
310 if total == 0 {
311 0.0
312 } else {
313 self.hit_count as f64 / total as f64
314 }
315 }
316
317 /// Returns the latency value at a specific percentile using a type-safe enum.
318 ///
319 /// This is the preferred method for monitoring cache performance, as it
320 /// explicitly categorizes measurements into median, tail, and extreme tail latencies.
321 ///
322 /// # Performance
323 /// The lookup is $O(1)$ relative to the number of samples in the histogram.
324 #[inline]
325 pub fn latency(&self, percentile: LatencyPercentile) -> u64 {
326 self.latency_histogram
327 .value_at_quantile(percentile.as_quantile())
328 }
329}
330
331impl From<&Metrics> for MetricsSnapshot {
332 fn from(metrics: &Metrics) -> Self {
333 metrics.snapshot()
334 }
335}
336
337/// Internal thread-safe container for a single shard's metric data.
338///
339/// `MetricsStorage` manages the primary counters (hits, misses, evictions) and
340/// the latency sampler for a subset of the cache's traffic. It is designed to
341/// be held within a [`CachePadded`] wrapper to ensure physical isolation
342/// from other shards.
343///
344/// # Field-Level Isolation
345/// To prevent internal contention between different types of events (e.g.,
346/// a hit and an eviction happening on the same shard), each `AtomicU64`
347/// counter is individually wrapped in [`CachePadded`]. This ensures that
348/// `hit_count`, `miss_count`, and `eviction_count` reside on distinct
349/// cache lines.
350///
351///
352///
353/// # Memory Ordering
354/// All operations utilize [`Relaxed`] memory ordering. This is appropriate
355/// for metrics where the absolute global order of increments across shards
356/// is less important than the total cumulative throughput.
357#[derive(Debug)]
358struct MetricsStorage {
359 /// Atomic counter for cache hits. Isolated to its own cache line.
360 hit_count: CachePadded<AtomicU64>,
361 /// Atomic counter for cache misses. Isolated to its own cache line.
362 miss_count: CachePadded<AtomicU64>,
363 /// Atomic counter for entry evictions. Isolated to its own cache line.
364 eviction_count: CachePadded<AtomicU64>,
365 /// Circular buffer for wait-free latency sampling.
366 latency_sampler: Sampler,
367}
368
369impl MetricsStorage {
370 /// Initializes a new shard of metric counters and a latency sampler.
371 ///
372 /// Each atomic counter is initialized to zero and isolated within
373 /// its own [`CachePadded`] block to ensure that concurrent updates to
374 /// hits, misses, and evictions do not interfere with each other.
375 fn new(config: &MetricsConfig) -> Self {
376 Self {
377 hit_count: CachePadded::new(AtomicU64::default()),
378 miss_count: CachePadded::new(AtomicU64::default()),
379 eviction_count: CachePadded::new(AtomicU64::default()),
380 latency_sampler: Sampler::new(config.latency_samples),
381 }
382 }
383
384 /// Loads the current hit count from this shard.
385 ///
386 /// # Memory Ordering
387 /// Uses [`Relaxed`] ordering, providing an eventually consistent
388 /// view of the counter without memory fence overhead.
389 #[inline]
390 fn hit_count(&self) -> u64 {
391 self.hit_count.load(Relaxed)
392 }
393
394 /// Atomically increments the hit counter.
395 ///
396 ///
397 #[inline]
398 fn record_hit(&self) {
399 self.hit_count.fetch_add(1, Relaxed);
400 }
401
402 /// Loads the current miss count from this shard.
403 #[inline]
404 fn miss_count(&self) -> u64 {
405 self.miss_count.load(Relaxed)
406 }
407
408 /// Atomically increments the miss counter.
409 #[inline]
410 fn record_miss(&self) {
411 self.miss_count.fetch_add(1, Relaxed);
412 }
413
414 /// Loads the current eviction count from this shard.
415 #[inline]
416 fn eviction_count(&self) -> u64 {
417 self.eviction_count.load(Relaxed)
418 }
419
420 /// Atomically increments the eviction counter.
421 #[inline]
422 fn record_eviction(&self) {
423 self.eviction_count.fetch_add(1, Relaxed);
424 }
425
426 /// Forwards a latency measurement to the shard's internal [`Sampler`].
427 ///
428 /// This is a wait-free operation that records timing data into
429 /// a circular buffer.
430 #[inline]
431 fn record_latency(&self, latency: u64) {
432 self.latency_sampler.record(latency);
433 }
434
435 /// Aggregates all non-zero latency samples from this shard into the
436 /// provided [`Histogram`].
437 ///
438 /// This is a non-destructive read; the sampler's state is preserved
439 /// for potential subsequent snapshots.
440 #[inline]
441 fn write_latency_samples(&self, histogram: &mut Histogram<u64>) {
442 self.latency_sampler.write_samples(histogram);
443 }
444}
445
446/// A wait-free, lossy circular buffer for capturing telemetry samples.
447///
448/// `Sampler` provides a high-throughput mechanism for recording numeric data
449/// in environments with massive write contention. It prioritizes progress
450/// for producers (threads recording data) over absolute data retention.
451///
452/// # Concurrency Design
453/// The buffer is **wait-free**. It uses a single atomic [`head`] pointer
454/// to reserve slots in the `samples` vector. Multiple producers can
455/// concurrently claim indices and store values without blocking or
456/// needing to coordinate beyond a single `fetch_add`.
457///
458///
459///
460/// # Lossy Buffer Mechanics
461/// Once the buffer reaches its capacity ($2^n$), the `head` index wraps
462/// around, and new samples overwrite the oldest data. This ensures
463/// memory usage remains constant regardless of the volume of events.
464///
465/// # Significant Implementation Details
466/// - **Masking:** Indexing uses bitwise `& mask` instead of the remainder
467/// operator `%`. This requires the internal capacity to be a power of two,
468/// a constraint enforced during initialization.
469/// - **Zero-Value Sentinels:** During aggregation ([`write_samples`]),
470/// values of `0` are ignored. This identifies uninitialized slots or
471/// intentionally skipped samples.
472/// - **Read Consistency:** Snapshotting iterates over the buffer with
473/// [`Relaxed`] loads. While a snapshot is being taken, producers may
474/// be overwriting values, making the snapshot an "eventually consistent"
475/// view of the buffer's state.
476#[derive(Debug)]
477pub struct Sampler {
478 /// The underlying storage for samples.
479 /// Note: The values themselves are atomic to allow concurrent reads/writes.
480 samples: Vec<AtomicU64>,
481 /// The monotonic write cursor. Padded to prevent the "Hot Head"
482 /// contention from affecting adjacent memory.
483 head: CachePadded<AtomicUsize>,
484 sequence_id: CachePadded<AtomicU16>,
485 /// Bitmask for fast index calculation ($capacity - 1$).
486 mask: usize,
487}
488
489impl Sampler {
490 /// Creates a new `Sampler` with a capacity rounded to the nearest power of two.
491 ///
492 /// The actual capacity is $2^n$, ensuring that bitwise masking can be used
493 /// for fast index calculation.
494 pub fn new(capacity: usize) -> Self {
495 let len = capacity.next_power_of_two();
496 let mut values = Vec::with_capacity(len);
497 for _ in 0..len {
498 values.push(AtomicU64::new(0));
499 }
500
501 Self {
502 samples: values,
503 head: CachePadded::new(AtomicUsize::new(0)),
504 sequence_id: CachePadded::new(AtomicU16::new(1)),
505 mask: len - 1,
506 }
507 }
508
509 /// Records a value into the circular buffer using a bit-packed generation tag.
510 ///
511 /// This operation is **wait-free**, ensuring that high-frequency writers are never
512 /// blocked by concurrent readers or other writers. It uniquely tags each sample
513 /// with a `sequence_id` (lap count) to prevent "ghost reads"—where stale data
514 /// from a previous rotation of the buffer is incorrectly included in a new snapshot.
515 ///
516 /// # Bit-Packing Layout
517 /// The 64-bit atomic slot is partitioned to allow single-word updates:
518 /// * **Bits 63–48 (16 bits):** `sequence_id`. Acts as a filter to validate data "freshness."
519 /// * **Bits 47–0 (48 bits):** `data`. Supports measurements up to $2^{48} - 1$ (e.g., $\approx 281$ TB or 281 trillion nanoseconds).
520 ///
521 ///
522 ///
523 /// # Performance & Safety
524 /// * **Efficiency:** Uses a bitwise mask (`head & mask`) for indexing, avoiding
525 /// expensive integer division. This requires the buffer size to be a power of two.
526 /// * **Memory Ordering:** Uses `Relaxed` for the index increment to minimize
527 /// cache-line contention, while `Acquire` on the `sequence_id` ensures the writer
528 /// is synchronized with the current global lap.
529 ///
530 /// # Examples
531 /// Since the value is masked by `SAMPLER_VALUE_MASK`, any bits higher than 47
532 /// provided in the `value` argument will be truncated to ensure the `sequence_id`
533 /// remains uncorrupted.
534 #[inline]
535 pub fn record(&self, data: u64) {
536 let head = self.head.fetch_add(1, Relaxed);
537 let sequence_id = self.sequence_id.load(Acquire);
538
539 let index = head & self.mask;
540 let packed = sample_pack(sequence_id, data);
541 self.samples[index].store(packed, Relaxed);
542 }
543
544 /// Returns the power-of-two capacity of the sampler.
545 #[inline]
546 pub fn capacity(&self) -> usize {
547 self.samples.len()
548 }
549
550 /// Aggregates samples from the buffer that match the generation active at the start of the call.
551 ///
552 /// # Ghost Data Prevention
553 /// This method implements a **generation-flip snapshot**. By atomically incrementing
554 /// the `sequence_id` at the entry point, it captures the ID of the just-completed
555 /// generation. During iteration, it filters the buffer to prevent:
556 /// * **Stale Data (Ghosts):** Samples with a tag smaller than `sequency_id` are ignored.
557 /// * **Future Data:** Samples with a tag larger than `sequency_id` (from writers that
558 /// started after this read began) trigger an early `break`.
559 ///
560 /// # Concurrency & Performance
561 /// * **Wait-Free:** Writers are never blocked. They simply begin tagging new
562 /// samples with the next generation ID while this reader processes the previous one.
563 /// * **Early Exit:** The `break` condition optimizes for cases where writers
564 /// rapidly overtake the reader, preventing unnecessary iteration over "future" slots.
565 /// * **Memory Ordering:** Uses `Release` on the increment to ensure subsequent
566 /// writes in the next generation are ordered after this snapshot's boundary.
567 ///
568 /// # Panics
569 /// Does not panic, though it assumes the buffer is not so large that the reader
570 /// cannot complete a pass before the 16-bit `sequence_id` wraps around.
571 #[inline]
572 pub fn write_samples(&self, histogram: &mut Histogram<u64>) {
573 let global_sequence_id = self.sequence_id.fetch_add(1, Release);
574
575 for sample in &self.samples {
576 let (sample_sequence_id, data) = sample_unpack(sample.load(Relaxed));
577
578 if sample_sequence_id > global_sequence_id {
579 break;
580 }
581
582 if data > 0 && sample_sequence_id == global_sequence_id {
583 let _ = histogram.record(data);
584 }
585 }
586 }
587}
588
589/// Packs a 16-bit sequence ID and a 48-bit data value into a single 64-bit word.
590///
591/// # Invariants
592/// If the provided `data` exceeds the 48-bit range ($> 2^{48}-1$), the high bits
593/// are truncated via `SAMPLE_DATA_MASK` to prevent corruption of the `sequence_id`.
594fn sample_pack(sequence_id: u16, data: u64) -> u64 {
595 (data & SAMPLE_DATA_MASK) | ((sequence_id as u64) << SAMPLE_SEQUENCE_ID_SHIFT)
596}
597
598/// Unpacks a 64-bit word into its constituent 16-bit sequence ID and 48-bit data value.
599///
600/// This is the inverse of [`sample_pack`]. It extracts the generation tag used
601/// for ghost data prevention and the actual metric value.
602fn sample_unpack(packed: u64) -> (u16, u64) {
603 let sequence_id = (packed >> SAMPLE_SEQUENCE_ID_SHIFT) as u16;
604 let data = packed & SAMPLE_DATA_MASK;
605 (sequence_id, data)
606}
607
608/// Creates a new latency histogram with a standardized range and precision.
609///
610/// # Configuration
611/// - **Range:** 1ms to 10,000ms (10 seconds).
612/// - **Precision:** 2 significant figures (guarantees $\le 1\%$ error).
613///
614/// # Returns
615/// A configured [`Histogram<u64>`].
616///
617/// # Panics
618/// Panics if the internal bounds are invalid (though 1, 10000, 2 are verified constants).
619#[inline]
620pub fn create_latency_histogram() -> Histogram<u64> {
621 const MIN_LATENCY: u64 = 1;
622 const MAX_LATENCY: u64 = 10_000;
623 const PRECISION: u8 = 2;
624
625 Histogram::new_with_bounds(MIN_LATENCY, MAX_LATENCY, PRECISION)
626 .expect("Failed to initialize latency histogram with standard bounds")
627}
628
629#[cfg(test)]
630mod tests {
631 use super::*;
632 use rand::RngExt;
633 use std::sync::Barrier;
634 use std::sync::{Arc, Mutex};
635 use std::thread;
636
637 /// Verifies that configuration values for shards and samples are automatically
638 /// rounded up to the nearest power of two to support bitmask indexing.
639 #[test]
640 fn test_config_rounding() {
641 let config = MetricsConfig::new(7, 1000);
642
643 assert_eq!(config.shards, 8);
644 assert_eq!(config.latency_samples, 1024);
645 }
646
647 /// Ensures that the hit counter is thread-safe and accurately aggregates
648 /// increments from multiple concurrent producers.
649 #[test]
650 fn test_multithreaded_hit_counter() {
651 let metrics = Metrics::default();
652
653 thread::scope(|s| {
654 for _ in 0..10 {
655 s.spawn(|| {
656 for _ in 0..1000 {
657 metrics.record_hit();
658 }
659 });
660 }
661 });
662
663 let snapshot = metrics.snapshot();
664 assert_eq!(snapshot.hit_count(), 10000);
665 }
666
667 /// Ensures that the miss counter is thread-safe across concurrent updates.
668 #[test]
669 fn test_multithreaded_miss_counter() {
670 let metrics = Metrics::default();
671
672 thread::scope(|s| {
673 for _ in 0..10 {
674 s.spawn(|| {
675 for _ in 0..1000 {
676 metrics.record_miss();
677 }
678 });
679 }
680 });
681
682 let snapshot = metrics.snapshot();
683 assert_eq!(snapshot.miss_count(), 10000);
684 }
685
686 /// Ensures that the eviction counter is thread-safe across concurrent updates.
687 #[test]
688 fn test_multithreaded_eviction_counter() {
689 let metrics = Metrics::default();
690
691 thread::scope(|s| {
692 for _ in 0..10 {
693 s.spawn(|| {
694 for _ in 0..1000 {
695 metrics.record_eviction();
696 }
697 });
698 }
699 });
700
701 let snapshot = metrics.snapshot();
702 assert_eq!(snapshot.eviction_count(), 10000);
703 }
704
705 /// Tests the sharding mechanism under heavy contention.
706 /// Uses a Barrier to ensure all threads begin writing simultaneously.
707 #[test]
708 fn test_high_contention_counters() {
709 let metrics = Metrics::new(MetricsConfig::new(4, 1024));
710 let num_threads = 8;
711 let barrier = Barrier::new(num_threads);
712
713 thread::scope(|s| {
714 for _ in 0..num_threads {
715 s.spawn(|| {
716 barrier.wait();
717 for _ in 0..1000 {
718 metrics.record_hit();
719 }
720 });
721 }
722 });
723
724 assert_eq!(metrics.snapshot().hit_count(), 8000);
725 }
726
727 /// Verifies that the snapshot correctly maps raw latency samples to
728 /// statistical percentiles using the HdrHistogram backend.
729 #[test]
730 fn test_percentile_logic() {
731 let metrics = Metrics::default();
732 for i in 1..=100 {
733 metrics.record_latency(i);
734 }
735 let snap = metrics.snapshot();
736
737 // Allow for small rounding errors inherent in histogram bucketing
738 assert!(snap.latency(LatencyPercentile::P50) >= 49);
739 assert!(snap.latency(LatencyPercentile::P99) >= 98);
740 }
741
742 /// Validates the floating-point calculations for hit and miss rates.
743 #[test]
744 fn test_hit_and_miss_rates() {
745 let metrics = Metrics::new(MetricsConfig::new(4, 1024));
746
747 for _ in 0..80 {
748 metrics.record_hit();
749 }
750 for _ in 0..20 {
751 metrics.record_miss();
752 }
753
754 let snap = metrics.snapshot();
755
756 assert_eq!(snap.hit_count(), 80);
757 assert_eq!(snap.miss_count(), 20);
758 assert!((snap.hit_rate() - 0.8).abs() < f64::EPSILON);
759 assert!((snap.miss_rate() - 0.2).abs() < f64::EPSILON);
760 }
761
762 /// Simple validation for the eviction counter recording.
763 #[test]
764 fn test_eviction_tracking() {
765 let metrics = Metrics::default();
766
767 for _ in 0..15 {
768 metrics.record_eviction();
769 }
770
771 let snap = metrics.snapshot();
772 assert_eq!(snap.eviction_count(), 15);
773 }
774
775 /// Verifies that the latency distribution accurately reflects the recorded samples.
776 #[test]
777 fn test_latency_distribution() {
778 let metrics = Metrics::new(MetricsConfig::new(1, 1024));
779
780 for i in 1..=10 {
781 metrics.record_latency(i * 10);
782 }
783
784 let snap = metrics.snapshot();
785
786 let p50 = snap.latency(LatencyPercentile::P50);
787 let p99 = snap.latency(LatencyPercentile::P99);
788
789 assert!((49..=51).contains(&p50));
790 assert!((99..=101).contains(&p99));
791 }
792
793 /// Ensures that taking a snapshot when no data has been recorded
794 /// does not result in panics or invalid math (NaN).
795 #[test]
796 fn test_empty_snapshot_safety() {
797 let metrics = Metrics::default();
798 let snap = metrics.snapshot();
799
800 assert_eq!(snap.hit_count(), 0);
801 assert_eq!(snap.hit_rate(), 0.0);
802 assert_eq!(snap.miss_rate(), 0.0);
803 assert_eq!(snap.latency(LatencyPercentile::P50), 0);
804 }
805
806 /// Confirms that snapshots are independent "point-in-time" views
807 /// and do not clear or mutate the underlying metric state.
808 #[test]
809 fn test_snapshot_independence() {
810 let metrics = Metrics::default();
811
812 metrics.record_hit();
813 let snap1 = metrics.snapshot();
814
815 metrics.record_hit();
816 let snap2 = metrics.snapshot();
817
818 assert_eq!(snap1.hit_count(), 1);
819 assert_eq!(snap2.hit_count(), 2);
820 }
821
822 /// Integration test to verify concurrent latency accumulation, snapshot isolation,
823 /// and the efficacy of the ghost data prevention algorithm.
824 ///
825 /// This test confirms that:
826 /// 1. **Snapshot Independence:** Snapshots are point-in-time views that do not
827 /// clear or mutate the underlying metric state, yet remain isolated by generation.
828 /// 2. **Ghost Prevention:** The 16-bit generation tag correctly identifies and
829 /// excludes stale data from previous buffer rotations.
830 /// 3. **Statistical Integrity:** Percentiles (P50, P90, P99, P999) calculated from
831 /// sharded, lock-free storage match a Mutex-protected ground truth.
832 /// 4. **Wait-Free Progress:** High-contention writes via a `Barrier` do not result
833 /// in data corruption or lost updates due to bit-packing race conditions.
834 #[test]
835 fn test_metrics_collect_latency_metrics() {
836 let config = MetricsConfig::new(4, 1024);
837 let metrics = Arc::new(Metrics::new(config));
838
839 let num_threads = 10;
840 let samples_per_thread = 200;
841 let barrier = Arc::new(Barrier::new(num_threads));
842
843 for _ in 0..10 {
844 let histogram: Mutex<Histogram<u64>> = Mutex::new(create_latency_histogram());
845
846 thread::scope(|s| {
847 for _ in 0..num_threads {
848 s.spawn({
849 let metrics = metrics.clone();
850 let barrier = barrier.clone();
851 let histogram = &histogram;
852
853 move || {
854 let mut rng = rand::rng();
855 let mut written_values = Vec::with_capacity(samples_per_thread);
856 barrier.wait();
857
858 for _ in 0..samples_per_thread {
859 // Simulate latency between 100us and 1000us
860 let latency = rng.random_range(100..1000);
861 metrics.record_latency(latency);
862 written_values.push(latency);
863 }
864
865 let mut guard = histogram.lock().expect("cannot acquire lock");
866
867 for value in written_values {
868 guard.record(value).expect("cannot write value");
869 }
870
871 drop(guard)
872 }
873 });
874 }
875 });
876
877 let snapshot = metrics.snapshot();
878
879 let percentiles = [
880 LatencyPercentile::P50,
881 LatencyPercentile::P90,
882 LatencyPercentile::P99,
883 LatencyPercentile::P999,
884 ];
885
886 let guard = histogram.lock().expect("cannot acquire the lock");
887
888 for p in percentiles {
889 let actual = snapshot.latency(p);
890 let expected = guard.value_at_quantile(p.as_quantile());
891
892 // Assert that the lock-free sampler matches the controlled histogram
893 assert!(
894 (actual as i64 - expected as i64).abs() <= 1,
895 "Percentile {:?} mismatch: actual {}, expected {}",
896 p,
897 actual,
898 expected
899 );
900 }
901 }
902 }
903}