Skip to main content

metrics_lib/
counter.rs

1//! # Ultra-Fast Atomic Counter
2//!
3//! The fastest counter implementation possible - sub-3ns increments.
4//!
5//! ## Features
6//!
7//! - **Sub-3ns increments** - Single atomic instruction
8//! - **Zero allocations** - Pure stack operations
9//! - **Lock-free** - Never blocks, never waits
10//! - **Cache optimized** - Aligned to prevent false sharing
11//! - **Overflow safe** - Handles u64::MAX gracefully
12
13use crate::{MetricsError, Result};
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::time::{Duration, Instant};
16
17/// Ultra-fast atomic counter
18///
19/// Optimized for maximum throughput with minimal memory overhead.
20/// Cache-line aligned to prevent false sharing.
21#[repr(align(64))]
22pub struct Counter {
23    /// Main counter value
24    value: AtomicU64,
25    /// Creation timestamp for rate calculations
26    created_at: Instant,
27}
28
29/// Counter statistics
30#[derive(Debug, Clone)]
31#[cfg_attr(feature = "serde", derive(serde::Serialize))]
32pub struct CounterStats {
33    /// Current counter value
34    pub value: u64,
35    /// Time since counter creation
36    pub age: Duration,
37    /// Average increments per second since creation
38    pub rate_per_second: f64,
39    /// Total increments (same as value for basic counter)
40    pub total: u64,
41}
42
43impl Counter {
44    /// Create new counter starting at zero
45    #[inline]
46    pub fn new() -> Self {
47        Self {
48            value: AtomicU64::new(0),
49            created_at: Instant::now(),
50        }
51    }
52
53    /// Create counter with initial value
54    #[inline]
55    pub fn with_value(initial: u64) -> Self {
56        Self {
57            value: AtomicU64::new(initial),
58            created_at: Instant::now(),
59        }
60    }
61
62    /// Increment by 1 - THE FASTEST PATH
63    ///
64    /// This is optimized to be as fast as physically possible:
65    /// - Single atomic fetch_add instruction
66    /// - Relaxed memory ordering for maximum speed
67    /// - Inlined for zero function call overhead
68    #[inline(always)]
69    pub fn inc(&self) {
70        self.value.fetch_add(1, Ordering::Relaxed);
71    }
72
73    /// Try to increment by 1 with overflow check
74    ///
75    /// Returns `Ok(())` on success, or `Err(MetricsError::Overflow)` if the
76    /// increment would overflow `u64::MAX`.
77    ///
78    /// Example
79    /// ```
80    /// use metrics_lib::{Counter, MetricsError};
81    /// let c = Counter::with_value(u64::MAX - 1);
82    /// c.try_inc().unwrap();
83    /// assert_eq!(c.get(), u64::MAX);
84    /// assert!(matches!(c.try_inc(), Err(MetricsError::Overflow)));
85    /// ```
86    #[inline(always)]
87    pub fn try_inc(&self) -> Result<()> {
88        // CAS loop: the overflow check and the increment are together atomic,
89        // so this is safe under concurrent access (no TOCTOU race).
90        loop {
91            let current = self.value.load(Ordering::Relaxed);
92            if current == u64::MAX {
93                return Err(MetricsError::Overflow);
94            }
95            match self.value.compare_exchange_weak(
96                current,
97                current + 1,
98                Ordering::Relaxed,
99                Ordering::Relaxed,
100            ) {
101                Ok(_) => return Ok(()),
102                Err(_) => continue,
103            }
104        }
105    }
106
107    /// Add arbitrary amount - also blazingly fast
108    ///
109    /// Zero branch optimization - if amount is 0, still does the atomic
110    /// operation to maintain consistent performance characteristics
111    #[inline(always)]
112    pub fn add(&self, amount: u64) {
113        self.value.fetch_add(amount, Ordering::Relaxed);
114    }
115
116    /// Try to add an arbitrary amount with overflow check
117    ///
118    /// Returns `Ok(())` on success, or `Err(MetricsError::Overflow)` if the
119    /// addition would overflow `u64::MAX`.
120    ///
121    /// Example
122    /// ```
123    /// use metrics_lib::{Counter, MetricsError};
124    /// let c = Counter::with_value(u64::MAX - 5);
125    /// assert!(c.try_add(4).is_ok());
126    /// assert!(matches!(c.try_add(2), Err(MetricsError::Overflow)));
127    /// ```
128    #[inline(always)]
129    pub fn try_add(&self, amount: u64) -> Result<()> {
130        if amount == 0 {
131            return Ok(());
132        }
133        // CAS loop: overflow check and addition are together atomic.
134        loop {
135            let current = self.value.load(Ordering::Relaxed);
136            let new_val = current.checked_add(amount).ok_or(MetricsError::Overflow)?;
137            match self.value.compare_exchange_weak(
138                current,
139                new_val,
140                Ordering::Relaxed,
141                Ordering::Relaxed,
142            ) {
143                Ok(_) => return Ok(()),
144                Err(_) => continue,
145            }
146        }
147    }
148
149    /// Get current value - single atomic load
150    #[must_use]
151    #[inline(always)]
152    pub fn get(&self) -> u64 {
153        self.value.load(Ordering::Relaxed)
154    }
155
156    /// Reset to zero - use sparingly
157    ///
158    /// Note: This uses SeqCst ordering to ensure all threads see the reset
159    #[inline]
160    pub fn reset(&self) {
161        self.value.store(0, Ordering::SeqCst);
162    }
163
164    /// Set to specific value - use sparingly
165    ///
166    /// Note: This uses SeqCst ordering for consistency
167    #[inline]
168    pub fn set(&self, value: u64) {
169        self.value.store(value, Ordering::SeqCst);
170    }
171
172    /// Try to set to a specific value (always succeeds for `u64`)
173    ///
174    /// This method never fails and always returns `Ok(())`.
175    #[inline]
176    pub fn try_set(&self, value: u64) -> Result<()> {
177        self.set(value);
178        Ok(())
179    }
180
181    /// Atomic compare-and-swap
182    ///
183    /// Returns Ok(previous_value) if successful, Err(current_value) if failed
184    #[inline]
185    pub fn compare_and_swap(&self, expected: u64, new: u64) -> core::result::Result<u64, u64> {
186        match self
187            .value
188            .compare_exchange(expected, new, Ordering::SeqCst, Ordering::SeqCst)
189        {
190            Ok(prev) => Ok(prev),
191            Err(current) => Err(current),
192        }
193    }
194
195    /// Add amount and return previous value
196    #[must_use]
197    #[inline]
198    pub fn fetch_add(&self, amount: u64) -> u64 {
199        self.value.fetch_add(amount, Ordering::Relaxed)
200    }
201
202    /// Checked fetch_add that returns the previous value or error on overflow
203    ///
204    /// Returns the previous value on success, or `Err(MetricsError::Overflow)`
205    /// if adding `amount` would overflow `u64::MAX`.
206    ///
207    /// Example
208    /// ```
209    /// use metrics_lib::{Counter, MetricsError};
210    /// let c = Counter::with_value(u64::MAX - 1);
211    /// assert_eq!(c.try_fetch_add(1).unwrap(), u64::MAX - 1);
212    /// assert!(matches!(c.try_fetch_add(1), Err(MetricsError::Overflow)));
213    /// ```
214    #[inline]
215    pub fn try_fetch_add(&self, amount: u64) -> Result<u64> {
216        if amount == 0 {
217            return Ok(self.get());
218        }
219        // CAS loop: overflow check and fetch_add are together atomic.
220        loop {
221            let current = self.value.load(Ordering::Relaxed);
222            let new_val = current.checked_add(amount).ok_or(MetricsError::Overflow)?;
223            match self.value.compare_exchange_weak(
224                current,
225                new_val,
226                Ordering::Relaxed,
227                Ordering::Relaxed,
228            ) {
229                Ok(prev) => return Ok(prev),
230                Err(_) => continue,
231            }
232        }
233    }
234
235    /// Add amount and return new value
236    ///
237    /// On overflow the returned value wraps modulo `u64::MAX + 1`, matching
238    /// the underlying [`AtomicU64::fetch_add`] semantics (no panic in debug or
239    /// release builds). Use [`Counter::try_fetch_add`] for an explicit
240    /// `MetricsError::Overflow` instead.
241    #[must_use]
242    #[inline]
243    pub fn add_and_get(&self, amount: u64) -> u64 {
244        self.value
245            .fetch_add(amount, Ordering::Relaxed)
246            .wrapping_add(amount)
247    }
248
249    /// Increment and return new value
250    ///
251    /// On overflow the returned value wraps to `0`, matching the underlying
252    /// [`AtomicU64::fetch_add`] semantics (no panic in debug or release
253    /// builds). Use [`Counter::try_inc_and_get`] for an explicit
254    /// `MetricsError::Overflow` instead.
255    #[must_use]
256    #[inline]
257    pub fn inc_and_get(&self) -> u64 {
258        self.value.fetch_add(1, Ordering::Relaxed).wrapping_add(1)
259    }
260
261    /// Checked increment that returns new value or error on overflow
262    ///
263    /// Returns the new value, or `Err(MetricsError::Overflow)` if the
264    /// increment would overflow `u64::MAX`.
265    ///
266    /// Example
267    /// ```
268    /// use metrics_lib::{Counter, MetricsError};
269    /// let c = Counter::with_value(u64::MAX - 1);
270    /// assert_eq!(c.try_inc_and_get().unwrap(), u64::MAX);
271    /// assert!(matches!(c.try_inc_and_get(), Err(MetricsError::Overflow)));
272    /// ```
273    #[inline]
274    pub fn try_inc_and_get(&self) -> Result<u64> {
275        // CAS loop: the returned value is the exact new value after the atomic
276        // increment — correct under concurrent access, unlike a load+store pattern.
277        loop {
278            let current = self.value.load(Ordering::Relaxed);
279            let new_val = current.checked_add(1).ok_or(MetricsError::Overflow)?;
280            match self.value.compare_exchange_weak(
281                current,
282                new_val,
283                Ordering::Relaxed,
284                Ordering::Relaxed,
285            ) {
286                Ok(_) => return Ok(new_val),
287                Err(_) => continue,
288            }
289        }
290    }
291
292    /// Get comprehensive statistics
293    #[must_use]
294    pub fn stats(&self) -> CounterStats {
295        let value = self.get();
296        let age = self.created_at.elapsed();
297        let age_seconds = age.as_secs_f64();
298
299        let rate_per_second = if age_seconds > 0.0 {
300            value as f64 / age_seconds
301        } else {
302            0.0
303        };
304
305        CounterStats {
306            value,
307            age,
308            rate_per_second,
309            total: value,
310        }
311    }
312
313    /// Get age since creation
314    #[must_use]
315    #[inline]
316    pub fn age(&self) -> Duration {
317        self.created_at.elapsed()
318    }
319
320    /// Check if counter is zero
321    #[must_use]
322    #[inline]
323    pub fn is_zero(&self) -> bool {
324        self.get() == 0
325    }
326
327    /// Get rate per second since creation
328    #[must_use]
329    #[inline]
330    pub fn rate_per_second(&self) -> f64 {
331        let age_seconds = self.age().as_secs_f64();
332        if age_seconds > 0.0 {
333            self.get() as f64 / age_seconds
334        } else {
335            0.0
336        }
337    }
338
339    /// Saturating add - won't overflow past u64::MAX
340    ///
341    /// Uses a `Relaxed` CAS loop on the hot path; no `SeqCst` cost. The loop
342    /// terminates either when the value is already at `u64::MAX` or when a CAS
343    /// succeeds.
344    #[inline]
345    pub fn saturating_add(&self, amount: u64) {
346        loop {
347            let current = self.value.load(Ordering::Relaxed);
348            let new_value = current.saturating_add(amount);
349
350            // If no change needed (already at max), break
351            if new_value == current {
352                break;
353            }
354
355            match self.value.compare_exchange_weak(
356                current,
357                new_value,
358                Ordering::Relaxed,
359                Ordering::Relaxed,
360            ) {
361                Ok(_) => break,
362                Err(_) => continue,
363            }
364        }
365    }
366}
367
368impl Default for Counter {
369    #[inline]
370    fn default() -> Self {
371        Self::new()
372    }
373}
374
375impl std::fmt::Display for Counter {
376    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
377        write!(f, "Counter({})", self.get())
378    }
379}
380
381impl std::fmt::Debug for Counter {
382    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
383        f.debug_struct("Counter")
384            .field("value", &self.get())
385            .field("age", &self.age())
386            .field("rate_per_second", &self.rate_per_second())
387            .finish()
388    }
389}
390
391// Counter is composed solely of `AtomicU64` (Send + Sync) and `Instant`
392// (Send + Sync). The compiler derives Send + Sync automatically; no
393// explicit unsafe impl is needed.
394
395/// Batch counter operations for even better performance
396impl Counter {
397    /// Batch increment - for very high throughput scenarios
398    ///
399    /// When you have many increments to do, batch them for better performance
400    #[inline]
401    pub fn batch_inc(&self, count: usize) {
402        if count > 0 {
403            self.add(count as u64);
404        }
405    }
406
407    /// Conditional increment - only increment if condition is true
408    #[inline]
409    pub fn inc_if(&self, condition: bool) {
410        if condition {
411            self.inc();
412        }
413    }
414
415    /// Increment with maximum value
416    ///
417    /// Uses a `Relaxed` CAS loop on the hot path; no `SeqCst` cost. Returns
418    /// `true` if the increment was applied (counter was below `max_value`),
419    /// `false` if the counter was already at or above the limit.
420    #[inline]
421    pub fn inc_max(&self, max_value: u64) -> bool {
422        loop {
423            let current = self.value.load(Ordering::Relaxed);
424            if current >= max_value {
425                return false;
426            }
427            // Safe: current < max_value <= u64::MAX implies current < u64::MAX.
428            let new_value = current + 1;
429
430            match self.value.compare_exchange_weak(
431                current,
432                new_value,
433                Ordering::Relaxed,
434                Ordering::Relaxed,
435            ) {
436                Ok(_) => return true,
437                Err(_) => continue,
438            }
439        }
440    }
441}
442
443#[cfg(test)]
444mod tests {
445    use super::*;
446    use std::sync::Arc;
447    use std::thread;
448
449    #[test]
450    fn test_basic_operations() {
451        let counter = Counter::new();
452
453        assert_eq!(counter.get(), 0);
454        assert!(counter.is_zero());
455
456        counter.inc();
457        assert_eq!(counter.get(), 1);
458        assert!(!counter.is_zero());
459
460        counter.add(5);
461        assert_eq!(counter.get(), 6);
462
463        counter.reset();
464        assert_eq!(counter.get(), 0);
465
466        counter.set(42);
467        assert_eq!(counter.get(), 42);
468    }
469
470    #[test]
471    fn test_fetch_operations() {
472        let counter = Counter::new();
473
474        assert_eq!(counter.fetch_add(10), 0);
475        assert_eq!(counter.get(), 10);
476
477        assert_eq!(counter.inc_and_get(), 11);
478        assert_eq!(counter.add_and_get(5), 16);
479    }
480
481    #[test]
482    fn test_compare_and_swap() {
483        let counter = Counter::new();
484        counter.set(10);
485
486        // Successful swap
487        assert_eq!(counter.compare_and_swap(10, 20), Ok(10));
488        assert_eq!(counter.get(), 20);
489
490        // Failed swap
491        assert_eq!(counter.compare_and_swap(10, 30), Err(20));
492        assert_eq!(counter.get(), 20);
493    }
494
495    #[test]
496    fn test_saturating_add() {
497        let counter = Counter::new();
498        counter.set(u64::MAX - 5);
499
500        counter.saturating_add(10);
501        assert_eq!(counter.get(), u64::MAX);
502
503        // Should not overflow
504        counter.saturating_add(100);
505        assert_eq!(counter.get(), u64::MAX);
506    }
507
508    #[test]
509    fn test_conditional_operations() {
510        let counter = Counter::new();
511
512        counter.inc_if(true);
513        assert_eq!(counter.get(), 1);
514
515        counter.inc_if(false);
516        assert_eq!(counter.get(), 1);
517
518        // Test inc_max
519        assert!(counter.inc_max(5));
520        assert_eq!(counter.get(), 2);
521
522        counter.set(5);
523        assert!(!counter.inc_max(5));
524        assert_eq!(counter.get(), 5);
525    }
526
527    #[test]
528    fn test_statistics() {
529        let counter = Counter::new();
530        counter.add(100);
531
532        let stats = counter.stats();
533        assert_eq!(stats.value, 100);
534        assert_eq!(stats.total, 100);
535        assert!(stats.age > Duration::from_nanos(0));
536        // Rate might be 0 if test runs too fast
537        assert!(stats.rate_per_second >= 0.0);
538    }
539
540    #[test]
541    fn test_high_concurrency() {
542        let counter = Arc::new(Counter::new());
543        let num_threads = 100;
544        let increments_per_thread = 1000;
545
546        let handles: Vec<_> = (0..num_threads)
547            .map(|_| {
548                let counter = Arc::clone(&counter);
549                thread::spawn(move || {
550                    for _ in 0..increments_per_thread {
551                        counter.inc();
552                    }
553                })
554            })
555            .collect();
556
557        for handle in handles {
558            handle.join().unwrap();
559        }
560
561        assert_eq!(counter.get(), num_threads * increments_per_thread);
562
563        let stats = counter.stats();
564        assert!(stats.rate_per_second > 0.0);
565    }
566
567    #[test]
568    fn test_batch_operations() {
569        let counter = Counter::new();
570
571        counter.batch_inc(1000);
572        assert_eq!(counter.get(), 1000);
573
574        counter.batch_inc(0); // Should be no-op
575        assert_eq!(counter.get(), 1000);
576    }
577
578    #[test]
579    fn test_display_and_debug() {
580        let counter = Counter::new();
581        counter.set(42);
582
583        let display_str = format!("{counter}");
584        assert!(display_str.contains("42"));
585
586        let debug_str = format!("{counter:?}");
587        assert!(debug_str.contains("Counter"));
588        assert!(debug_str.contains("42"));
589    }
590
591    #[test]
592    fn test_add_and_get_wraps_on_overflow_without_panic() {
593        // 0.9.2 regression: add_and_get and inc_and_get used `+` which would
594        // panic in debug builds when crossing u64::MAX. They now wrap, matching
595        // the underlying fetch_add semantics.
596        let counter = Counter::with_value(u64::MAX);
597        let next = counter.inc_and_get();
598        assert_eq!(next, 0, "inc_and_get should wrap to 0 past u64::MAX");
599        // Subsequent inc lands on 1
600        assert_eq!(counter.inc_and_get(), 1);
601
602        let counter = Counter::with_value(u64::MAX - 2);
603        let next = counter.add_and_get(5);
604        assert_eq!(
605            next, 2,
606            "add_and_get should wrap: (u64::MAX-2) + 5 == 2 mod 2^64"
607        );
608    }
609
610    #[test]
611    fn test_saturating_add_terminates_at_max() {
612        // 0.9.2 regression: saturating_add previously used SeqCst CAS in a
613        // tight loop. Now uses Relaxed compare_exchange_weak. Behavior must
614        // remain identical.
615        let counter = Counter::with_value(u64::MAX - 3);
616        counter.saturating_add(100);
617        assert_eq!(counter.get(), u64::MAX);
618
619        // Already-at-max no-op short-circuit
620        counter.saturating_add(1);
621        assert_eq!(counter.get(), u64::MAX);
622    }
623
624    #[test]
625    fn test_inc_max_relaxed_cas_still_correct() {
626        // 0.9.2 regression: inc_max switched from SeqCst CAS to Relaxed.
627        // Single-threaded behavior must remain identical.
628        let counter = Counter::with_value(5);
629        assert!(counter.inc_max(7));
630        assert_eq!(counter.get(), 6);
631        assert!(counter.inc_max(7));
632        assert_eq!(counter.get(), 7);
633        assert!(!counter.inc_max(7));
634        assert_eq!(counter.get(), 7);
635    }
636
637    #[test]
638    fn test_checked_operations_and_overflow_paths() {
639        let counter = Counter::new();
640
641        counter.try_set(3).unwrap();
642        assert_eq!(counter.get(), 3);
643
644        counter.try_inc().unwrap();
645        assert_eq!(counter.get(), 4);
646
647        counter.try_add(0).unwrap();
648        assert_eq!(counter.get(), 4);
649
650        assert_eq!(counter.try_fetch_add(2).unwrap(), 4);
651        assert_eq!(counter.get(), 6);
652
653        assert_eq!(counter.try_fetch_add(0).unwrap(), 6);
654        assert_eq!(counter.try_inc_and_get().unwrap(), 7);
655
656        let overflow = Counter::with_value(u64::MAX);
657        assert!(matches!(overflow.try_inc(), Err(MetricsError::Overflow)));
658        assert!(matches!(overflow.try_add(1), Err(MetricsError::Overflow)));
659        assert!(matches!(
660            overflow.try_fetch_add(1),
661            Err(MetricsError::Overflow)
662        ));
663        assert!(matches!(
664            overflow.try_inc_and_get(),
665            Err(MetricsError::Overflow)
666        ));
667    }
668}
669
670#[cfg(all(test, feature = "bench-tests", not(tarpaulin), not(coverage)))]
671#[allow(unused_imports)]
672mod benchmarks {
673    use super::*;
674    use std::time::Instant;
675
676    #[cfg_attr(not(feature = "bench-tests"), ignore)]
677    #[test]
678    fn bench_counter_increment() {
679        let counter = Counter::new();
680        let iterations = 10_000_000;
681
682        let start = Instant::now();
683        for _ in 0..iterations {
684            counter.inc();
685        }
686        let elapsed = start.elapsed();
687
688        println!(
689            "Counter increment: {:.2} ns/op",
690            elapsed.as_nanos() as f64 / iterations as f64
691        );
692
693        // Throughput-only smoke check. Criterion's `metrics_bench` is the
694        // authoritative regression detector — see CONTRIBUTING.md.
695        assert_eq!(counter.get(), iterations as u64);
696    }
697
698    #[cfg_attr(not(feature = "bench-tests"), ignore)]
699    #[test]
700    fn bench_counter_add() {
701        let counter = Counter::new();
702        let iterations = 1_000_000;
703
704        let start = Instant::now();
705        for i in 0..iterations {
706            counter.add(i + 1);
707        }
708        let elapsed = start.elapsed();
709
710        println!(
711            "Counter add: {:.2} ns/op",
712            elapsed.as_nanos() as f64 / iterations as f64
713        );
714
715        // Throughput-only smoke check. Criterion catches regressions.
716    }
717
718    #[cfg_attr(not(feature = "bench-tests"), ignore)]
719    #[test]
720    fn bench_counter_get() {
721        let counter = Counter::new();
722        counter.set(42);
723        let iterations = 100_000_000;
724
725        let start = Instant::now();
726        let mut sum = 0;
727        for _ in 0..iterations {
728            sum += counter.get();
729        }
730        let elapsed = start.elapsed();
731
732        println!(
733            "Counter get: {:.2} ns/op",
734            elapsed.as_nanos() as f64 / iterations as f64
735        );
736
737        // Prevent optimization elision; Criterion is the regression detector.
738        assert_eq!(sum, 42 * iterations);
739    }
740}