Skip to main content

sochdb_storage/sketches/
exponential_histogram.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Exponential Histogram
19//!
20//! A histogram with exponentially-spaced bucket boundaries for natural
21//! latency distributions. Used by OpenTelemetry Metrics for efficient
22//! aggregation with O(1) merge operations.
23//!
24//! Reference: OpenTelemetry Exponential Histogram
25//! https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram
26
27/// Exponential histogram with configurable scale
28///
29/// Bucket boundaries follow exponential growth:
30///     boundary[i] = base^i  where base = 2^(2^(-scale))
31#[derive(Debug, Clone)]
32#[allow(dead_code)]
33pub struct ExponentialHistogram {
34    /// Scale parameter (higher = more precision, more buckets)
35    /// scale=0: base=2, buckets at [1,2), [2,4), [4,8), ...
36    /// scale=3: base≈1.09, ~8 buckets per doubling
37    scale: i32,
38    /// Base = 2^(2^(-scale))
39    base: f64,
40    /// Log of base for index calculation
41    log_base: f64,
42    /// Positive bucket counts (index 0 = [1, base), index 1 = [base, base²), ...)
43    positive_buckets: Vec<u64>,
44    /// Negative bucket counts (mirrored)
45    negative_buckets: Vec<u64>,
46    /// Offset for positive buckets (allows representing smaller values)
47    positive_offset: i32,
48    /// Offset for negative buckets
49    negative_offset: i32,
50    /// Zero bucket count
51    zero_count: u64,
52    /// Zero bucket threshold (values in [-threshold, threshold] go to zero bucket)
53    zero_threshold: f64,
54    /// Total count
55    count: u64,
56    /// Sum of all values (for mean calculation)
57    sum: f64,
58    /// Minimum value seen
59    min: f64,
60    /// Maximum value seen
61    max: f64,
62}
63
64impl ExponentialHistogram {
65    /// Create a new exponential histogram
66    ///
67    /// # Arguments
68    /// * `scale` - Scale parameter (-10 to 20 typical)
69    ///   - scale=0: base=2.0, coarse buckets
70    ///   - scale=3: base≈1.09, ~8 buckets per doubling (good default)
71    ///   - scale=5: base≈1.02, ~32 buckets per doubling (high precision)
72    pub fn new(scale: i32) -> Self {
73        let base = 2.0_f64.powf(2.0_f64.powi(-scale));
74        Self {
75            scale,
76            base,
77            log_base: base.ln(),
78            positive_buckets: Vec::new(),
79            negative_buckets: Vec::new(),
80            positive_offset: 0,
81            negative_offset: 0,
82            zero_count: 0,
83            zero_threshold: 0.0,
84            count: 0,
85            sum: 0.0,
86            min: f64::MAX,
87            max: f64::MIN,
88        }
89    }
90
91    /// Create with default scale (3 = good balance of precision and memory)
92    pub fn default_scale() -> Self {
93        Self::new(3)
94    }
95
96    /// Calculate bucket index for a positive value
97    #[inline]
98    fn bucket_index(&self, value: f64) -> i32 {
99        debug_assert!(value > 0.0);
100        // index = floor(log_base(value)) = floor(log2(value) × 2^scale)
101        (value.log2() * (1 << self.scale) as f64).floor() as i32
102    }
103
104    /// Get the lower boundary of a bucket
105    #[inline]
106    fn bucket_lower_bound(&self, index: i32) -> f64 {
107        self.base.powi(index)
108    }
109
110    /// Record a value
111    pub fn record(&mut self, value: f64) {
112        self.count += 1;
113        self.sum += value;
114        self.min = self.min.min(value);
115        self.max = self.max.max(value);
116
117        if value.abs() <= self.zero_threshold {
118            self.zero_count += 1;
119        } else if value > 0.0 {
120            let idx = self.bucket_index(value);
121            self.increment_positive(idx);
122        } else {
123            let idx = self.bucket_index(-value);
124            self.increment_negative(idx);
125        }
126    }
127
128    /// Increment positive bucket at index
129    fn increment_positive(&mut self, idx: i32) {
130        if self.positive_buckets.is_empty() {
131            self.positive_offset = idx;
132            self.positive_buckets.push(1);
133        } else {
134            let relative_idx = idx - self.positive_offset;
135            if relative_idx < 0 {
136                // Need to prepend buckets
137                let prepend_count = (-relative_idx) as usize;
138                let mut new_buckets = vec![0; prepend_count];
139                new_buckets.append(&mut self.positive_buckets);
140                self.positive_buckets = new_buckets;
141                self.positive_offset = idx;
142                self.positive_buckets[0] = 1;
143            } else if relative_idx as usize >= self.positive_buckets.len() {
144                // Need to append buckets
145                self.positive_buckets.resize(relative_idx as usize + 1, 0);
146                self.positive_buckets[relative_idx as usize] = 1;
147            } else {
148                self.positive_buckets[relative_idx as usize] += 1;
149            }
150        }
151    }
152
153    /// Increment negative bucket at index
154    fn increment_negative(&mut self, idx: i32) {
155        if self.negative_buckets.is_empty() {
156            self.negative_offset = idx;
157            self.negative_buckets.push(1);
158        } else {
159            let relative_idx = idx - self.negative_offset;
160            if relative_idx < 0 {
161                let prepend_count = (-relative_idx) as usize;
162                let mut new_buckets = vec![0; prepend_count];
163                new_buckets.append(&mut self.negative_buckets);
164                self.negative_buckets = new_buckets;
165                self.negative_offset = idx;
166                self.negative_buckets[0] = 1;
167            } else if relative_idx as usize >= self.negative_buckets.len() {
168                self.negative_buckets.resize(relative_idx as usize + 1, 0);
169                self.negative_buckets[relative_idx as usize] = 1;
170            } else {
171                self.negative_buckets[relative_idx as usize] += 1;
172            }
173        }
174    }
175
176    /// Merge another histogram into this one
177    ///
178    /// O(buckets) merge - critical for rollups
179    pub fn merge(&mut self, other: &ExponentialHistogram) {
180        if other.count == 0 {
181            return;
182        }
183
184        // Must have same scale (or implement scale downgrade)
185        assert_eq!(self.scale, other.scale, "Scale mismatch in merge");
186
187        // Merge positive buckets
188        for (i, &count) in other.positive_buckets.iter().enumerate() {
189            if count > 0 {
190                let idx = other.positive_offset + i as i32;
191                self.add_positive_count(idx, count);
192            }
193        }
194
195        // Merge negative buckets
196        for (i, &count) in other.negative_buckets.iter().enumerate() {
197            if count > 0 {
198                let idx = other.negative_offset + i as i32;
199                self.add_negative_count(idx, count);
200            }
201        }
202
203        self.zero_count += other.zero_count;
204        self.count += other.count;
205        self.sum += other.sum;
206
207        if other.count > 0 {
208            self.min = self.min.min(other.min);
209            self.max = self.max.max(other.max);
210        }
211    }
212
213    /// Add count to positive bucket
214    fn add_positive_count(&mut self, idx: i32, count: u64) {
215        if self.positive_buckets.is_empty() {
216            self.positive_offset = idx;
217            self.positive_buckets.push(count);
218        } else {
219            let relative_idx = idx - self.positive_offset;
220            if relative_idx < 0 {
221                let prepend_count = (-relative_idx) as usize;
222                let mut new_buckets = vec![0; prepend_count];
223                new_buckets.append(&mut self.positive_buckets);
224                self.positive_buckets = new_buckets;
225                self.positive_offset = idx;
226                self.positive_buckets[0] += count;
227            } else if relative_idx as usize >= self.positive_buckets.len() {
228                self.positive_buckets.resize(relative_idx as usize + 1, 0);
229                self.positive_buckets[relative_idx as usize] += count;
230            } else {
231                self.positive_buckets[relative_idx as usize] += count;
232            }
233        }
234    }
235
236    /// Add count to negative bucket
237    fn add_negative_count(&mut self, idx: i32, count: u64) {
238        if self.negative_buckets.is_empty() {
239            self.negative_offset = idx;
240            self.negative_buckets.push(count);
241        } else {
242            let relative_idx = idx - self.negative_offset;
243            if relative_idx < 0 {
244                let prepend_count = (-relative_idx) as usize;
245                let mut new_buckets = vec![0; prepend_count];
246                new_buckets.append(&mut self.negative_buckets);
247                self.negative_buckets = new_buckets;
248                self.negative_offset = idx;
249                self.negative_buckets[0] += count;
250            } else if relative_idx as usize >= self.negative_buckets.len() {
251                self.negative_buckets.resize(relative_idx as usize + 1, 0);
252                self.negative_buckets[relative_idx as usize] += count;
253            } else {
254                self.negative_buckets[relative_idx as usize] += count;
255            }
256        }
257    }
258
259    /// Get count
260    pub fn count(&self) -> u64 {
261        self.count
262    }
263
264    /// Get sum
265    pub fn sum(&self) -> f64 {
266        self.sum
267    }
268
269    /// Get mean
270    pub fn mean(&self) -> f64 {
271        if self.count == 0 {
272            0.0
273        } else {
274            self.sum / self.count as f64
275        }
276    }
277
278    /// Get min
279    pub fn min(&self) -> f64 {
280        if self.count > 0 { self.min } else { 0.0 }
281    }
282
283    /// Get max
284    pub fn max(&self) -> f64 {
285        if self.count > 0 { self.max } else { 0.0 }
286    }
287
288    /// Estimate a quantile
289    pub fn quantile(&self, q: f64) -> f64 {
290        if self.count == 0 {
291            return 0.0;
292        }
293
294        let q = q.clamp(0.0, 1.0);
295        let target_rank = (q * self.count as f64).ceil() as u64;
296        let mut cumulative = 0u64;
297
298        // Check negative buckets first (from most negative)
299        for (i, &count) in self.negative_buckets.iter().enumerate().rev() {
300            if count > 0 {
301                cumulative += count;
302                if cumulative >= target_rank {
303                    let idx = self.negative_offset + i as i32;
304                    return -self.bucket_lower_bound(idx);
305                }
306            }
307        }
308
309        // Check zero bucket
310        cumulative += self.zero_count;
311        if cumulative >= target_rank {
312            return 0.0;
313        }
314
315        // Check positive buckets
316        for (i, &count) in self.positive_buckets.iter().enumerate() {
317            if count > 0 {
318                cumulative += count;
319                if cumulative >= target_rank {
320                    let idx = self.positive_offset + i as i32;
321                    // Return geometric mean of bucket (proper interpolation for log-spaced buckets)
322                    // Geometric mean: sqrt(lower * upper) = lower * sqrt(base)
323                    // This provides accurate quantile estimates for exponential histograms
324                    let lower = self.bucket_lower_bound(idx);
325                    let upper = self.bucket_lower_bound(idx + 1);
326                    return (lower * upper).sqrt();
327                }
328            }
329        }
330
331        self.max
332    }
333
334    /// Check if empty
335    pub fn is_empty(&self) -> bool {
336        self.count == 0
337    }
338
339    /// Clear all data
340    pub fn clear(&mut self) {
341        self.positive_buckets.clear();
342        self.negative_buckets.clear();
343        self.positive_offset = 0;
344        self.negative_offset = 0;
345        self.zero_count = 0;
346        self.count = 0;
347        self.sum = 0.0;
348        self.min = f64::MAX;
349        self.max = f64::MIN;
350    }
351
352    /// Get number of buckets used
353    pub fn bucket_count(&self) -> usize {
354        self.positive_buckets.len() + self.negative_buckets.len()
355    }
356
357    /// Get memory usage in bytes
358    pub fn memory_usage(&self) -> usize {
359        std::mem::size_of::<Self>()
360            + self.positive_buckets.len() * std::mem::size_of::<u64>()
361            + self.negative_buckets.len() * std::mem::size_of::<u64>()
362    }
363}
364
365impl Default for ExponentialHistogram {
366    fn default() -> Self {
367        Self::default_scale()
368    }
369}
370
371#[cfg(test)]
372mod tests {
373    use super::*;
374
375    #[test]
376    fn test_basic_recording() {
377        let mut hist = ExponentialHistogram::new(3);
378
379        for i in 1..=100 {
380            hist.record(i as f64);
381        }
382
383        assert_eq!(hist.count(), 100);
384        assert!((hist.mean() - 50.5).abs() < 0.01);
385        assert_eq!(hist.min(), 1.0);
386        assert_eq!(hist.max(), 100.0);
387    }
388
389    #[test]
390    fn test_merge() {
391        let mut hist1 = ExponentialHistogram::new(3);
392        let mut hist2 = ExponentialHistogram::new(3);
393
394        for i in 1..=50 {
395            hist1.record(i as f64);
396        }
397        for i in 51..=100 {
398            hist2.record(i as f64);
399        }
400
401        hist1.merge(&hist2);
402
403        assert_eq!(hist1.count(), 100);
404        assert!((hist1.mean() - 50.5).abs() < 0.01);
405    }
406
407    #[test]
408    fn test_quantiles() {
409        let mut hist = ExponentialHistogram::new(3);
410
411        // Uniform distribution 1-100
412        for i in 1..=100 {
413            hist.record(i as f64);
414        }
415
416        let p50 = hist.quantile(0.50);
417        let p99 = hist.quantile(0.99);
418
419        // Should be approximately correct (histogram bins introduce some error)
420        // Exponential histograms have logarithmic buckets, so precision varies
421        assert!(p50 > 30.0 && p50 < 70.0, "P50 was {}", p50);
422        assert!(p99 > 80.0, "P99 was {}", p99);
423    }
424
425    #[test]
426    fn test_latency_distribution() {
427        let mut hist = ExponentialHistogram::new(3);
428
429        // Simulate typical latency distribution
430        for _ in 0..900 {
431            hist.record(5.0); // 5ms - most requests
432        }
433        for _ in 0..90 {
434            hist.record(50.0); // 50ms - some slower
435        }
436        for _ in 0..10 {
437            hist.record(500.0); // 500ms - tail latency
438        }
439
440        assert_eq!(hist.count(), 1000);
441
442        let p50 = hist.quantile(0.50);
443        let p99 = hist.quantile(0.99);
444
445        // P50 should be around 5ms (most data is there)
446        assert!(p50 < 20.0, "P50 was {}", p50);
447
448        // P99 should be above 5ms (captures slower requests)
449        assert!(p99 > 5.0, "P99 was {}", p99);
450    }
451}