Skip to main content

metrics_lib/
counter.rs

1//! # Ultra-Fast Atomic Counter
2//!
3//! The fastest counter implementation possible - sub-3ns increments.
4//!
5//! ## Features
6//!
7//! - **Sub-3ns increments** - Single atomic instruction
8//! - **Zero allocations** - Pure stack operations
9//! - **Lock-free** - Never blocks, never waits
10//! - **Cache optimized** - Aligned to prevent false sharing
11//! - **Overflow safe** - Handles u64::MAX gracefully
12
13use crate::{MetricsError, Result};
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::time::{Duration, Instant};
16
17/// Ultra-fast atomic counter
18///
19/// Optimized for maximum throughput with minimal memory overhead.
20/// Cache-line aligned to prevent false sharing.
21#[repr(align(64))]
22pub struct Counter {
23    /// Main counter value
24    value: AtomicU64,
25    /// Creation timestamp for rate calculations
26    created_at: Instant,
27}
28
29/// Counter statistics
30#[derive(Debug, Clone)]
31pub struct CounterStats {
32    /// Current counter value
33    pub value: u64,
34    /// Time since counter creation
35    pub age: Duration,
36    /// Average increments per second since creation
37    pub rate_per_second: f64,
38    /// Total increments (same as value for basic counter)
39    pub total: u64,
40}
41
42impl Counter {
43    /// Create new counter starting at zero
44    #[inline]
45    pub fn new() -> Self {
46        Self {
47            value: AtomicU64::new(0),
48            created_at: Instant::now(),
49        }
50    }
51
52    /// Create counter with initial value
53    #[inline]
54    pub fn with_value(initial: u64) -> Self {
55        Self {
56            value: AtomicU64::new(initial),
57            created_at: Instant::now(),
58        }
59    }
60
61    /// Increment by 1 - THE FASTEST PATH
62    ///
63    /// This is optimized to be as fast as physically possible:
64    /// - Single atomic fetch_add instruction
65    /// - Relaxed memory ordering for maximum speed
66    /// - Inlined for zero function call overhead
67    #[inline(always)]
68    pub fn inc(&self) {
69        self.value.fetch_add(1, Ordering::Relaxed);
70    }
71
72    /// Try to increment by 1 with overflow check
73    ///
74    /// Returns `Ok(())` on success, or `Err(MetricsError::Overflow)` if the
75    /// increment would overflow `u64::MAX`.
76    ///
77    /// Example
78    /// ```
79    /// use metrics_lib::{Counter, MetricsError};
80    /// let c = Counter::with_value(u64::MAX - 1);
81    /// c.try_inc().unwrap();
82    /// assert_eq!(c.get(), u64::MAX);
83    /// assert!(matches!(c.try_inc(), Err(MetricsError::Overflow)));
84    /// ```
85    #[inline(always)]
86    pub fn try_inc(&self) -> Result<()> {
87        // CAS loop: the overflow check and the increment are together atomic,
88        // so this is safe under concurrent access (no TOCTOU race).
89        loop {
90            let current = self.value.load(Ordering::Relaxed);
91            if current == u64::MAX {
92                return Err(MetricsError::Overflow);
93            }
94            match self.value.compare_exchange_weak(
95                current,
96                current + 1,
97                Ordering::Relaxed,
98                Ordering::Relaxed,
99            ) {
100                Ok(_) => return Ok(()),
101                Err(_) => continue,
102            }
103        }
104    }
105
106    /// Add arbitrary amount - also blazingly fast
107    ///
108    /// Zero branch optimization - if amount is 0, still does the atomic
109    /// operation to maintain consistent performance characteristics
110    #[inline(always)]
111    pub fn add(&self, amount: u64) {
112        self.value.fetch_add(amount, Ordering::Relaxed);
113    }
114
115    /// Try to add an arbitrary amount with overflow check
116    ///
117    /// Returns `Ok(())` on success, or `Err(MetricsError::Overflow)` if the
118    /// addition would overflow `u64::MAX`.
119    ///
120    /// Example
121    /// ```
122    /// use metrics_lib::{Counter, MetricsError};
123    /// let c = Counter::with_value(u64::MAX - 5);
124    /// assert!(c.try_add(4).is_ok());
125    /// assert!(matches!(c.try_add(2), Err(MetricsError::Overflow)));
126    /// ```
127    #[inline(always)]
128    pub fn try_add(&self, amount: u64) -> Result<()> {
129        if amount == 0 {
130            return Ok(());
131        }
132        // CAS loop: overflow check and addition are together atomic.
133        loop {
134            let current = self.value.load(Ordering::Relaxed);
135            let new_val = current.checked_add(amount).ok_or(MetricsError::Overflow)?;
136            match self.value.compare_exchange_weak(
137                current,
138                new_val,
139                Ordering::Relaxed,
140                Ordering::Relaxed,
141            ) {
142                Ok(_) => return Ok(()),
143                Err(_) => continue,
144            }
145        }
146    }
147
148    /// Get current value - single atomic load
149    #[must_use]
150    #[inline(always)]
151    pub fn get(&self) -> u64 {
152        self.value.load(Ordering::Relaxed)
153    }
154
155    /// Reset to zero - use sparingly
156    ///
157    /// Note: This uses SeqCst ordering to ensure all threads see the reset
158    #[inline]
159    pub fn reset(&self) {
160        self.value.store(0, Ordering::SeqCst);
161    }
162
163    /// Set to specific value - use sparingly
164    ///
165    /// Note: This uses SeqCst ordering for consistency
166    #[inline]
167    pub fn set(&self, value: u64) {
168        self.value.store(value, Ordering::SeqCst);
169    }
170
171    /// Try to set to a specific value (always succeeds for `u64`)
172    ///
173    /// This method never fails and always returns `Ok(())`.
174    #[inline]
175    pub fn try_set(&self, value: u64) -> Result<()> {
176        self.set(value);
177        Ok(())
178    }
179
180    /// Atomic compare-and-swap
181    ///
182    /// Returns Ok(previous_value) if successful, Err(current_value) if failed
183    #[inline]
184    pub fn compare_and_swap(&self, expected: u64, new: u64) -> core::result::Result<u64, u64> {
185        match self
186            .value
187            .compare_exchange(expected, new, Ordering::SeqCst, Ordering::SeqCst)
188        {
189            Ok(prev) => Ok(prev),
190            Err(current) => Err(current),
191        }
192    }
193
194    /// Add amount and return previous value
195    #[must_use]
196    #[inline]
197    pub fn fetch_add(&self, amount: u64) -> u64 {
198        self.value.fetch_add(amount, Ordering::Relaxed)
199    }
200
201    /// Checked fetch_add that returns the previous value or error on overflow
202    ///
203    /// Returns the previous value on success, or `Err(MetricsError::Overflow)`
204    /// if adding `amount` would overflow `u64::MAX`.
205    ///
206    /// Example
207    /// ```
208    /// use metrics_lib::{Counter, MetricsError};
209    /// let c = Counter::with_value(u64::MAX - 1);
210    /// assert_eq!(c.try_fetch_add(1).unwrap(), u64::MAX - 1);
211    /// assert!(matches!(c.try_fetch_add(1), Err(MetricsError::Overflow)));
212    /// ```
213    #[inline]
214    pub fn try_fetch_add(&self, amount: u64) -> Result<u64> {
215        if amount == 0 {
216            return Ok(self.get());
217        }
218        // CAS loop: overflow check and fetch_add are together atomic.
219        loop {
220            let current = self.value.load(Ordering::Relaxed);
221            let new_val = current.checked_add(amount).ok_or(MetricsError::Overflow)?;
222            match self.value.compare_exchange_weak(
223                current,
224                new_val,
225                Ordering::Relaxed,
226                Ordering::Relaxed,
227            ) {
228                Ok(prev) => return Ok(prev),
229                Err(_) => continue,
230            }
231        }
232    }
233
234    /// Add amount and return new value
235    ///
236    /// On overflow the returned value wraps modulo `u64::MAX + 1`, matching
237    /// the underlying [`AtomicU64::fetch_add`] semantics (no panic in debug or
238    /// release builds). Use [`Counter::try_fetch_add`] for an explicit
239    /// `MetricsError::Overflow` instead.
240    #[must_use]
241    #[inline]
242    pub fn add_and_get(&self, amount: u64) -> u64 {
243        self.value
244            .fetch_add(amount, Ordering::Relaxed)
245            .wrapping_add(amount)
246    }
247
248    /// Increment and return new value
249    ///
250    /// On overflow the returned value wraps to `0`, matching the underlying
251    /// [`AtomicU64::fetch_add`] semantics (no panic in debug or release
252    /// builds). Use [`Counter::try_inc_and_get`] for an explicit
253    /// `MetricsError::Overflow` instead.
254    #[must_use]
255    #[inline]
256    pub fn inc_and_get(&self) -> u64 {
257        self.value.fetch_add(1, Ordering::Relaxed).wrapping_add(1)
258    }
259
260    /// Checked increment that returns new value or error on overflow
261    ///
262    /// Returns the new value, or `Err(MetricsError::Overflow)` if the
263    /// increment would overflow `u64::MAX`.
264    ///
265    /// Example
266    /// ```
267    /// use metrics_lib::{Counter, MetricsError};
268    /// let c = Counter::with_value(u64::MAX - 1);
269    /// assert_eq!(c.try_inc_and_get().unwrap(), u64::MAX);
270    /// assert!(matches!(c.try_inc_and_get(), Err(MetricsError::Overflow)));
271    /// ```
272    #[inline]
273    pub fn try_inc_and_get(&self) -> Result<u64> {
274        // CAS loop: the returned value is the exact new value after the atomic
275        // increment — correct under concurrent access, unlike a load+store pattern.
276        loop {
277            let current = self.value.load(Ordering::Relaxed);
278            let new_val = current.checked_add(1).ok_or(MetricsError::Overflow)?;
279            match self.value.compare_exchange_weak(
280                current,
281                new_val,
282                Ordering::Relaxed,
283                Ordering::Relaxed,
284            ) {
285                Ok(_) => return Ok(new_val),
286                Err(_) => continue,
287            }
288        }
289    }
290
291    /// Get comprehensive statistics
292    #[must_use]
293    pub fn stats(&self) -> CounterStats {
294        let value = self.get();
295        let age = self.created_at.elapsed();
296        let age_seconds = age.as_secs_f64();
297
298        let rate_per_second = if age_seconds > 0.0 {
299            value as f64 / age_seconds
300        } else {
301            0.0
302        };
303
304        CounterStats {
305            value,
306            age,
307            rate_per_second,
308            total: value,
309        }
310    }
311
312    /// Get age since creation
313    #[must_use]
314    #[inline]
315    pub fn age(&self) -> Duration {
316        self.created_at.elapsed()
317    }
318
319    /// Check if counter is zero
320    #[must_use]
321    #[inline]
322    pub fn is_zero(&self) -> bool {
323        self.get() == 0
324    }
325
326    /// Get rate per second since creation
327    #[must_use]
328    #[inline]
329    pub fn rate_per_second(&self) -> f64 {
330        let age_seconds = self.age().as_secs_f64();
331        if age_seconds > 0.0 {
332            self.get() as f64 / age_seconds
333        } else {
334            0.0
335        }
336    }
337
338    /// Saturating add - won't overflow past u64::MAX
339    ///
340    /// Uses a `Relaxed` CAS loop on the hot path; no `SeqCst` cost. The loop
341    /// terminates either when the value is already at `u64::MAX` or when a CAS
342    /// succeeds.
343    #[inline]
344    pub fn saturating_add(&self, amount: u64) {
345        loop {
346            let current = self.value.load(Ordering::Relaxed);
347            let new_value = current.saturating_add(amount);
348
349            // If no change needed (already at max), break
350            if new_value == current {
351                break;
352            }
353
354            match self.value.compare_exchange_weak(
355                current,
356                new_value,
357                Ordering::Relaxed,
358                Ordering::Relaxed,
359            ) {
360                Ok(_) => break,
361                Err(_) => continue,
362            }
363        }
364    }
365}
366
367impl Default for Counter {
368    #[inline]
369    fn default() -> Self {
370        Self::new()
371    }
372}
373
374impl std::fmt::Display for Counter {
375    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
376        write!(f, "Counter({})", self.get())
377    }
378}
379
380impl std::fmt::Debug for Counter {
381    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
382        f.debug_struct("Counter")
383            .field("value", &self.get())
384            .field("age", &self.age())
385            .field("rate_per_second", &self.rate_per_second())
386            .finish()
387    }
388}
389
390// Counter is composed solely of `AtomicU64` (Send + Sync) and `Instant`
391// (Send + Sync). The compiler derives Send + Sync automatically; no
392// explicit unsafe impl is needed.
393
394/// Batch counter operations for even better performance
395impl Counter {
396    /// Batch increment - for very high throughput scenarios
397    ///
398    /// When you have many increments to do, batch them for better performance
399    #[inline]
400    pub fn batch_inc(&self, count: usize) {
401        if count > 0 {
402            self.add(count as u64);
403        }
404    }
405
406    /// Conditional increment - only increment if condition is true
407    #[inline]
408    pub fn inc_if(&self, condition: bool) {
409        if condition {
410            self.inc();
411        }
412    }
413
414    /// Increment with maximum value
415    ///
416    /// Uses a `Relaxed` CAS loop on the hot path; no `SeqCst` cost. Returns
417    /// `true` if the increment was applied (counter was below `max_value`),
418    /// `false` if the counter was already at or above the limit.
419    #[inline]
420    pub fn inc_max(&self, max_value: u64) -> bool {
421        loop {
422            let current = self.value.load(Ordering::Relaxed);
423            if current >= max_value {
424                return false;
425            }
426            // Safe: current < max_value <= u64::MAX implies current < u64::MAX.
427            let new_value = current + 1;
428
429            match self.value.compare_exchange_weak(
430                current,
431                new_value,
432                Ordering::Relaxed,
433                Ordering::Relaxed,
434            ) {
435                Ok(_) => return true,
436                Err(_) => continue,
437            }
438        }
439    }
440}
441
442#[cfg(test)]
443mod tests {
444    use super::*;
445    use std::sync::Arc;
446    use std::thread;
447
448    #[test]
449    fn test_basic_operations() {
450        let counter = Counter::new();
451
452        assert_eq!(counter.get(), 0);
453        assert!(counter.is_zero());
454
455        counter.inc();
456        assert_eq!(counter.get(), 1);
457        assert!(!counter.is_zero());
458
459        counter.add(5);
460        assert_eq!(counter.get(), 6);
461
462        counter.reset();
463        assert_eq!(counter.get(), 0);
464
465        counter.set(42);
466        assert_eq!(counter.get(), 42);
467    }
468
469    #[test]
470    fn test_fetch_operations() {
471        let counter = Counter::new();
472
473        assert_eq!(counter.fetch_add(10), 0);
474        assert_eq!(counter.get(), 10);
475
476        assert_eq!(counter.inc_and_get(), 11);
477        assert_eq!(counter.add_and_get(5), 16);
478    }
479
480    #[test]
481    fn test_compare_and_swap() {
482        let counter = Counter::new();
483        counter.set(10);
484
485        // Successful swap
486        assert_eq!(counter.compare_and_swap(10, 20), Ok(10));
487        assert_eq!(counter.get(), 20);
488
489        // Failed swap
490        assert_eq!(counter.compare_and_swap(10, 30), Err(20));
491        assert_eq!(counter.get(), 20);
492    }
493
494    #[test]
495    fn test_saturating_add() {
496        let counter = Counter::new();
497        counter.set(u64::MAX - 5);
498
499        counter.saturating_add(10);
500        assert_eq!(counter.get(), u64::MAX);
501
502        // Should not overflow
503        counter.saturating_add(100);
504        assert_eq!(counter.get(), u64::MAX);
505    }
506
507    #[test]
508    fn test_conditional_operations() {
509        let counter = Counter::new();
510
511        counter.inc_if(true);
512        assert_eq!(counter.get(), 1);
513
514        counter.inc_if(false);
515        assert_eq!(counter.get(), 1);
516
517        // Test inc_max
518        assert!(counter.inc_max(5));
519        assert_eq!(counter.get(), 2);
520
521        counter.set(5);
522        assert!(!counter.inc_max(5));
523        assert_eq!(counter.get(), 5);
524    }
525
526    #[test]
527    fn test_statistics() {
528        let counter = Counter::new();
529        counter.add(100);
530
531        let stats = counter.stats();
532        assert_eq!(stats.value, 100);
533        assert_eq!(stats.total, 100);
534        assert!(stats.age > Duration::from_nanos(0));
535        // Rate might be 0 if test runs too fast
536        assert!(stats.rate_per_second >= 0.0);
537    }
538
539    #[test]
540    fn test_high_concurrency() {
541        let counter = Arc::new(Counter::new());
542        let num_threads = 100;
543        let increments_per_thread = 1000;
544
545        let handles: Vec<_> = (0..num_threads)
546            .map(|_| {
547                let counter = Arc::clone(&counter);
548                thread::spawn(move || {
549                    for _ in 0..increments_per_thread {
550                        counter.inc();
551                    }
552                })
553            })
554            .collect();
555
556        for handle in handles {
557            handle.join().unwrap();
558        }
559
560        assert_eq!(counter.get(), num_threads * increments_per_thread);
561
562        let stats = counter.stats();
563        assert!(stats.rate_per_second > 0.0);
564    }
565
566    #[test]
567    fn test_batch_operations() {
568        let counter = Counter::new();
569
570        counter.batch_inc(1000);
571        assert_eq!(counter.get(), 1000);
572
573        counter.batch_inc(0); // Should be no-op
574        assert_eq!(counter.get(), 1000);
575    }
576
577    #[test]
578    fn test_display_and_debug() {
579        let counter = Counter::new();
580        counter.set(42);
581
582        let display_str = format!("{counter}");
583        assert!(display_str.contains("42"));
584
585        let debug_str = format!("{counter:?}");
586        assert!(debug_str.contains("Counter"));
587        assert!(debug_str.contains("42"));
588    }
589
590    #[test]
591    fn test_add_and_get_wraps_on_overflow_without_panic() {
592        // 0.9.2 regression: add_and_get and inc_and_get used `+` which would
593        // panic in debug builds when crossing u64::MAX. They now wrap, matching
594        // the underlying fetch_add semantics.
595        let counter = Counter::with_value(u64::MAX);
596        let next = counter.inc_and_get();
597        assert_eq!(next, 0, "inc_and_get should wrap to 0 past u64::MAX");
598        // Subsequent inc lands on 1
599        assert_eq!(counter.inc_and_get(), 1);
600
601        let counter = Counter::with_value(u64::MAX - 2);
602        let next = counter.add_and_get(5);
603        assert_eq!(
604            next, 2,
605            "add_and_get should wrap: (u64::MAX-2) + 5 == 2 mod 2^64"
606        );
607    }
608
609    #[test]
610    fn test_saturating_add_terminates_at_max() {
611        // 0.9.2 regression: saturating_add previously used SeqCst CAS in a
612        // tight loop. Now uses Relaxed compare_exchange_weak. Behavior must
613        // remain identical.
614        let counter = Counter::with_value(u64::MAX - 3);
615        counter.saturating_add(100);
616        assert_eq!(counter.get(), u64::MAX);
617
618        // Already-at-max no-op short-circuit
619        counter.saturating_add(1);
620        assert_eq!(counter.get(), u64::MAX);
621    }
622
623    #[test]
624    fn test_inc_max_relaxed_cas_still_correct() {
625        // 0.9.2 regression: inc_max switched from SeqCst CAS to Relaxed.
626        // Single-threaded behavior must remain identical.
627        let counter = Counter::with_value(5);
628        assert!(counter.inc_max(7));
629        assert_eq!(counter.get(), 6);
630        assert!(counter.inc_max(7));
631        assert_eq!(counter.get(), 7);
632        assert!(!counter.inc_max(7));
633        assert_eq!(counter.get(), 7);
634    }
635
636    #[test]
637    fn test_checked_operations_and_overflow_paths() {
638        let counter = Counter::new();
639
640        counter.try_set(3).unwrap();
641        assert_eq!(counter.get(), 3);
642
643        counter.try_inc().unwrap();
644        assert_eq!(counter.get(), 4);
645
646        counter.try_add(0).unwrap();
647        assert_eq!(counter.get(), 4);
648
649        assert_eq!(counter.try_fetch_add(2).unwrap(), 4);
650        assert_eq!(counter.get(), 6);
651
652        assert_eq!(counter.try_fetch_add(0).unwrap(), 6);
653        assert_eq!(counter.try_inc_and_get().unwrap(), 7);
654
655        let overflow = Counter::with_value(u64::MAX);
656        assert!(matches!(overflow.try_inc(), Err(MetricsError::Overflow)));
657        assert!(matches!(overflow.try_add(1), Err(MetricsError::Overflow)));
658        assert!(matches!(
659            overflow.try_fetch_add(1),
660            Err(MetricsError::Overflow)
661        ));
662        assert!(matches!(
663            overflow.try_inc_and_get(),
664            Err(MetricsError::Overflow)
665        ));
666    }
667}
668
669#[cfg(all(test, feature = "bench-tests", not(tarpaulin)))]
670#[allow(unused_imports)]
671mod benchmarks {
672    use super::*;
673    use std::time::Instant;
674
675    #[cfg_attr(not(feature = "bench-tests"), ignore)]
676    #[test]
677    fn bench_counter_increment() {
678        let counter = Counter::new();
679        let iterations = 10_000_000;
680
681        let start = Instant::now();
682        for _ in 0..iterations {
683            counter.inc();
684        }
685        let elapsed = start.elapsed();
686
687        println!(
688            "Counter increment: {:.2} ns/op",
689            elapsed.as_nanos() as f64 / iterations as f64
690        );
691
692        // Should be under 100ns per increment (relaxed from 50ns)
693        assert!(elapsed.as_nanos() / iterations < 100);
694        assert_eq!(counter.get(), iterations as u64);
695    }
696
697    #[cfg_attr(not(feature = "bench-tests"), ignore)]
698    #[test]
699    fn bench_counter_add() {
700        let counter = Counter::new();
701        let iterations = 1_000_000;
702
703        let start = Instant::now();
704        for i in 0..iterations {
705            counter.add(i + 1);
706        }
707        let elapsed = start.elapsed();
708
709        println!(
710            "Counter add: {:.2} ns/op",
711            elapsed.as_nanos() as f64 / iterations as f64
712        );
713
714        // Should be similar to increment performance (relaxed from 100ns to 200ns)
715        assert!(elapsed.as_nanos() / (iterations as u128) < 200);
716    }
717
718    #[cfg_attr(not(feature = "bench-tests"), ignore)]
719    #[test]
720    fn bench_counter_get() {
721        let counter = Counter::new();
722        counter.set(42);
723        let iterations = 100_000_000;
724
725        let start = Instant::now();
726        let mut sum = 0;
727        for _ in 0..iterations {
728            sum += counter.get();
729        }
730        let elapsed = start.elapsed();
731
732        println!(
733            "Counter get: {:.2} ns/op",
734            elapsed.as_nanos() as f64 / iterations as f64
735        );
736
737        // Prevent optimization
738        assert_eq!(sum, 42 * iterations);
739
740        // Should be under 50ns per get (relaxed from 20ns)
741        assert!(elapsed.as_nanos() / (iterations as u128) < 50);
742    }
743}