1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
//! Shared counters, suitable for quickly tabulating extant types.
//!
//! The default, [`RelaxedCounter`], is suitable in most circumstances.

use crossbeam_utils::CachePadded;
use num_traits::Num;
use std::sync::atomic::{AtomicIsize, Ordering};

/// A type suitable as a shared census counter.
pub trait Counter: 'static {
    /// The primitive type underlying this counter.
    type Primitive: Num;

    /// A fresh instance of this counter holding the value of `0`.
    const ZERO: Self;

    /// Eventually increase the value of this counter by `n`.
    fn add_assign(&self, n: Self::Primitive);

    /// Eventually decrease the value of this counter by `n`.
    fn sub_assign(&self, n: Self::Primitive);

    /// Eventually retrieve the value of this counter.
    fn fetch(&self) -> Self::Primitive;
}

/// An [`AtomicIsize`] padded and aligned to the cache line size to combat
/// [false sharing].
///
/// As a [`Counter`], this type uses [`Ordering::Relaxed`] for
/// [`Counter::add_assign`], [`Counter::sub_assign`] and [`Counter::fetch`].
///
/// [false sharing]: https://en.wikipedia.org/wiki/False_sharing
#[repr(transparent)]
pub struct RelaxedCounter {
    counter: CachePadded<AtomicIsize>,
}

impl Counter for RelaxedCounter {
    type Primitive = isize;
    const ZERO: Self = Self {
        counter: CachePadded::new(AtomicIsize::new(0)),
    };

    #[inline(always)]
    fn add_assign(&self, n: isize) {
        let _ = self.counter.fetch_add(n, Ordering::Relaxed);
    }

    #[inline(always)]
    fn sub_assign(&self, n: isize) {
        let _ = self.counter.fetch_sub(n, Ordering::Relaxed);
    }

    #[inline(always)]
    fn fetch(&self) -> isize {
        self.counter.load(Ordering::Relaxed)
    }
}

/// A counter that minimizes slowdowns from contenation at the cost of increased
/// memory usage.
///
/// Modeled on the ["adaptive multi-counter" described by Travis Downs][multi].
/// Use this counter type only if [`RelaxedCounter`] performs poorly. Then,
/// benchmark the performance of your code with [`DistributedCounter`] with
/// a bucket count of `1`. Increase the number of buckets (up to your
/// available parallelism) until performance is satisfactory. 
///
/// [multi]: https://travisdowns.github.io/blog/2020/07/06/concurrency-costs.html#adaptive-multi-counter
pub struct DistributedCounter<const BUCKETS: usize> {
    counters: [CachePadded<AtomicIsize>; BUCKETS],
}

impl<const BUCKETS: usize> DistributedCounter<BUCKETS> {
    const fn new() -> Self {
        const BUCKET: CachePadded<AtomicIsize> = CachePadded::new(AtomicIsize::new(0));
        Self {
            counters: [BUCKET; BUCKETS],
        }
    }

    fn thread_id() -> usize {
        use std::sync::atomic::AtomicUsize;
        static THREADS: AtomicUsize = AtomicUsize::new(0);
        thread_local! {
            pub static ID: usize = THREADS.fetch_add(1, Ordering::SeqCst);
        }
        ID.try_with(|id| *id).unwrap_or(0)
    }

    #[inline(always)]
    fn try_add_assign(bucket: &AtomicIsize, n: isize) -> Result<isize, isize> {
        let count = bucket.load(Ordering::SeqCst);
        bucket.compare_exchange(
            count,
            count.wrapping_add(n),
            Ordering::SeqCst,
            Ordering::SeqCst,
        )
    }

    #[inline(always)]
    fn add_assign(&self, n: isize) {
        let id = Self::thread_id();
        let mut bucket = id % BUCKETS;
        loop {
            if Self::try_add_assign(&self.counters[bucket], n).is_ok() {
                return;
            } else {
                bucket = bucket.wrapping_add(1) % BUCKETS;
            }
        }
    }
}

impl<const BUCKETS: usize> Counter for DistributedCounter<BUCKETS> {
    type Primitive = isize;
    const ZERO: Self = Self::new();

    fn add_assign(&self, n: isize) {
        self.add_assign(n)
    }

    fn sub_assign(&self, n: isize) {
        self.add_assign(-n)
    }

    fn fetch(&self) -> isize {
        let mut sum = 0isize;
        for counter in &self.counters {
            sum = sum.wrapping_add(counter.load(Ordering::SeqCst));
        }
        sum
    }
}