Skip to main content

feldera_storage/
histogram.rs

1use std::{
2    collections::VecDeque,
3    ops::RangeInclusive,
4    sync::atomic::{AtomicU64, Ordering},
5    time::{Duration, Instant},
6};
7
8use itertools::Itertools;
9
10const N_BUCKETS: usize = 92;
11
12/// A histogram with exponential buckets.
13///
14/// This histogram maintains 92 buckets, one for each value or range below,
15/// listing ranges by their lower endpoints:
16///
17/// - 0
18/// - 1, 2, 3, ... 9
19/// - 10, 20, 30, ... 90
20/// - 100, 200, 300, ... 900
21/// - 1000, 2000, 3000, ... 9000
22/// - 10_000, 20_000, 30_000, ... 90_000
23/// - 100_000, 200_000, 300_000, ... 900_000
24/// - 1_000_000, 2_000_000, 3_000_000, ... 9_000_000
25/// - 10_000_000, 20_000_000, 30_000_000, ... 90_000_000
26/// - 100_000_000, 200_000_000, 300_000_000, ... 900_000_000
27/// - 1_000_000_000, 2_000_000_000, 3_000_000_000, ... 9_000_000_000
28/// - 10_000_000_000 through [u64::MAX].
29#[derive(Debug)]
30pub struct ExponentialHistogram {
31    buckets: [AtomicU64; N_BUCKETS],
32    sum: AtomicU64,
33}
34
35impl ExponentialHistogram {
36    /// Constructs a new exponential histogram.
37    pub const fn new() -> Self {
38        Self {
39            buckets: [const { AtomicU64::new(0) }; N_BUCKETS],
40            sum: AtomicU64::new(0),
41        }
42    }
43
44    /// Records `value` in the histogram.
45    pub fn record(&self, value: impl TryInto<u64>) {
46        if let Ok(value) = value.try_into() {
47            self.buckets[number_to_bucket(value)].fetch_add(1, Ordering::Relaxed);
48            self.sum.fetch_add(value, Ordering::Relaxed);
49        }
50    }
51
52    /// Records the time elapsed since `start` in the histogram, as a count of
53    /// microseconds.
54    pub fn record_elapsed(&self, start: Instant) {
55        self.record_duration(start.elapsed());
56    }
57
58    /// Records `duration` as a count of microseconds.
59    pub fn record_duration(&self, duration: Duration) {
60        self.record(duration.as_micros());
61    }
62
63    /// Returns a snapshot of the histogram.
64    pub fn snapshot(&self) -> ExponentialHistogramSnapshot {
65        ExponentialHistogramSnapshot {
66            buckets: self
67                .buckets
68                .iter()
69                .map(|bucket| bucket.load(Ordering::Relaxed))
70                .collect_array()
71                .unwrap(),
72            sum: self.sum.load(Ordering::Relaxed),
73        }
74    }
75
76    /// Returns the sum of the values in the histogram
77    pub fn sum(&self) -> u64 {
78        self.sum.load(Ordering::Relaxed)
79    }
80
81    /// Calls `f` and records the amount of time that it takes to run, in
82    /// microseconds, in the histogram.
83    pub fn record_callback<F, T>(&self, f: F) -> T
84    where
85        F: FnOnce() -> T,
86    {
87        let start = Instant::now();
88        let retval = f();
89        self.record_elapsed(start);
90        retval
91    }
92}
93
94impl Default for ExponentialHistogram {
95    fn default() -> Self {
96        Self::new()
97    }
98}
99
100pub struct ExponentialHistogramSnapshot {
101    buckets: [u64; N_BUCKETS],
102    sum: u64,
103}
104
105impl ExponentialHistogramSnapshot {
106    pub fn iter_buckets(&self) -> impl Iterator<Item = Bucket> + use<'_> {
107        self.buckets
108            .iter()
109            .enumerate()
110            .map(|(index, count)| Bucket {
111                range: bucket_to_range(index),
112                count: *count,
113            })
114    }
115    pub fn sum(&self) -> u64 {
116        self.sum
117    }
118}
119
120pub struct Bucket {
121    pub range: RangeInclusive<u64>,
122    pub count: u64,
123}
124
125fn number_to_bucket(number: u64) -> usize {
126    let bucket = match number {
127        // buckets 0..=9
128        0..10 => number,
129        // buckets 10..=18
130        10..100 => (number - 10) / 10 + 10,
131        // buckets 19..=27
132        100..1000 => (number - 100) / 100 + 19,
133        // buckets 28..=36
134        1000..10_000 => (number - 1000) / 1000 + 28,
135        // buckets 37..=45
136        10_000..100_000 => (number - 10_000) / 10_000 + 37,
137        // buckets 46..=54
138        100_000..1_000_000 => (number - 100_000) / 100_000 + 46,
139        // buckets 55..=63
140        1_000_000..10_000_000 => (number - 1_000_000) / 1_000_000 + 55,
141        // buckets 64..=72
142        10_000_000..100_000_000 => (number - 10_000_000) / 10_000_000 + 64,
143        // buckets 73..=81
144        100_000_000..1_000_000_000 => (number - 100_000_000) / 100_000_000 + 73,
145        // buckets 82..=90
146        1_000_000_000..10_000_000_000 => (number - 1_000_000_000) / 1_000_000_000 + 82,
147        // bucket 91
148        _ => 91,
149    };
150    bucket as usize
151}
152
153fn bucket_to_range(bucket: usize) -> RangeInclusive<u64> {
154    let bucket = bucket as u64;
155    fn bucket_range(index: u64, width: u64) -> RangeInclusive<u64> {
156        let start = (index + 1) * width;
157        let end = start + (width - 1);
158        start..=end
159    }
160    match bucket {
161        0..=9 => bucket..=bucket,
162        10..=18 => bucket_range(bucket - 10, 10),
163        19..=27 => bucket_range(bucket - 19, 100),
164        28..=36 => bucket_range(bucket - 28, 1000),
165        37..=45 => bucket_range(bucket - 37, 10_000),
166        46..=54 => bucket_range(bucket - 46, 100_000),
167        55..=63 => bucket_range(bucket - 55, 1_000_000),
168        64..=72 => bucket_range(bucket - 64, 10_000_000),
169        73..=81 => bucket_range(bucket - 73, 100_000_000),
170        82..=90 => bucket_range(bucket - 82, 1_000_000_000),
171        91 => 1_000_000_001..=u64::MAX,
172        _ => unreachable!(),
173    }
174}
175
176/// A sliding histogram with exponential buckets.
177///
178/// This histogram records up to a specified number of samples across a
179/// specified maximum amount of time.  Within that range, it maintains an
180/// exponential histogram with the same form as [ExponentialHistogram].
181#[derive(Debug)]
182pub struct SlidingHistogram {
183    buckets: [u64; N_BUCKETS],
184    samples: VecDeque<Sample>,
185    sum: u64,
186    max_samples: usize,
187    max_elapsed: Duration,
188}
189
190#[derive(Debug)]
191struct Sample {
192    time: Instant,
193    value: u64,
194}
195
196impl SlidingHistogram {
197    /// Constructs a new sliding histogram.  The histogram will keep at most the
198    /// most recent `max_samples` samples that have been recorded over at most
199    /// the most recent `max_elapsed` amount of time.
200    pub const fn new(max_samples: usize, max_elapsed: Duration) -> Self {
201        Self {
202            buckets: [0; N_BUCKETS],
203            samples: VecDeque::new(),
204            sum: 0,
205            max_samples,
206            max_elapsed,
207        }
208    }
209
210    /// Records `value` in the histogram.
211    pub fn record(&mut self, value: impl TryInto<u64>) {
212        if let Ok(value) = value.try_into() {
213            if self.samples.len() >= self.max_samples {
214                self.drop_sample();
215            }
216            self.samples.push_back(Sample {
217                time: Instant::now(),
218                value,
219            });
220            self.buckets[number_to_bucket(value)] += 1;
221            self.sum += value;
222        }
223    }
224
225    /// Records the time elapsed since `start` in the histogram, as a count of
226    /// microseconds.
227    pub fn record_elapsed(&mut self, start: Instant) {
228        self.record(start.elapsed().as_micros());
229    }
230
231    /// Returns a snapshot of the histogram.
232    pub fn snapshot(&mut self) -> ExponentialHistogramSnapshot {
233        // Drop samples that are more than `max_elapsed` older than the most
234        // recent sample.
235        //
236        // (We use the most recent sample instead of the current time to avoid
237        // dropping all the samples if nothing has been recorded recently.)
238        if let Some(most_recent) = self.samples.back() {
239            let cutoff = most_recent.time - self.max_elapsed;
240            while self
241                .samples
242                .front()
243                .is_some_and(|sample| sample.time < cutoff)
244            {
245                self.drop_sample();
246            }
247        }
248
249        ExponentialHistogramSnapshot {
250            buckets: self.buckets,
251            sum: self.sum,
252        }
253    }
254
255    /// Returns the sum of the values in the histogram
256    pub fn sum(&self) -> u64 {
257        self.sum
258    }
259
260    /// Calls `f` and records the amount of time that it takes to run, in
261    /// microseconds, in the histogram.
262    pub fn record_callback<F, T>(&mut self, f: F) -> T
263    where
264        F: FnOnce() -> T,
265    {
266        let start = Instant::now();
267        let retval = f();
268        self.record_elapsed(start);
269        retval
270    }
271
272    fn drop_sample(&mut self) {
273        let sample = self.samples.pop_front().unwrap();
274        self.buckets[number_to_bucket(sample.value)] -= 1;
275        self.sum -= sample.value;
276    }
277}
278
279#[cfg(test)]
280mod test {
281    use crate::histogram::{N_BUCKETS, bucket_to_range, number_to_bucket};
282
283    #[test]
284    fn buckets() {
285        let mut base = 1;
286        while base <= u64::MAX / 10 {
287            for multiple in 1..10 {
288                let number = base * multiple;
289                for number in [number - 1, number, number + 1] {
290                    let bucket = number_to_bucket(number);
291                    assert!((0..N_BUCKETS).contains(&bucket));
292                    let bucket_range = bucket_to_range(bucket);
293                    assert!(bucket_range.contains(&number));
294                }
295            }
296            base *= 10;
297        }
298    }
299}