sochdb_storage/sketches/
exponential_histogram.rs1#[derive(Debug, Clone)]
32#[allow(dead_code)]
33pub struct ExponentialHistogram {
34 scale: i32,
38 base: f64,
40 log_base: f64,
42 positive_buckets: Vec<u64>,
44 negative_buckets: Vec<u64>,
46 positive_offset: i32,
48 negative_offset: i32,
50 zero_count: u64,
52 zero_threshold: f64,
54 count: u64,
56 sum: f64,
58 min: f64,
60 max: f64,
62}
63
64impl ExponentialHistogram {
65 pub fn new(scale: i32) -> Self {
73 let base = 2.0_f64.powf(2.0_f64.powi(-scale));
74 Self {
75 scale,
76 base,
77 log_base: base.ln(),
78 positive_buckets: Vec::new(),
79 negative_buckets: Vec::new(),
80 positive_offset: 0,
81 negative_offset: 0,
82 zero_count: 0,
83 zero_threshold: 0.0,
84 count: 0,
85 sum: 0.0,
86 min: f64::MAX,
87 max: f64::MIN,
88 }
89 }
90
91 pub fn default_scale() -> Self {
93 Self::new(3)
94 }
95
96 #[inline]
98 fn bucket_index(&self, value: f64) -> i32 {
99 debug_assert!(value > 0.0);
100 (value.log2() * (1 << self.scale) as f64).floor() as i32
102 }
103
104 #[inline]
106 fn bucket_lower_bound(&self, index: i32) -> f64 {
107 self.base.powi(index)
108 }
109
110 pub fn record(&mut self, value: f64) {
112 self.count += 1;
113 self.sum += value;
114 self.min = self.min.min(value);
115 self.max = self.max.max(value);
116
117 if value.abs() <= self.zero_threshold {
118 self.zero_count += 1;
119 } else if value > 0.0 {
120 let idx = self.bucket_index(value);
121 self.increment_positive(idx);
122 } else {
123 let idx = self.bucket_index(-value);
124 self.increment_negative(idx);
125 }
126 }
127
128 fn increment_positive(&mut self, idx: i32) {
130 if self.positive_buckets.is_empty() {
131 self.positive_offset = idx;
132 self.positive_buckets.push(1);
133 } else {
134 let relative_idx = idx - self.positive_offset;
135 if relative_idx < 0 {
136 let prepend_count = (-relative_idx) as usize;
138 let mut new_buckets = vec![0; prepend_count];
139 new_buckets.append(&mut self.positive_buckets);
140 self.positive_buckets = new_buckets;
141 self.positive_offset = idx;
142 self.positive_buckets[0] = 1;
143 } else if relative_idx as usize >= self.positive_buckets.len() {
144 self.positive_buckets.resize(relative_idx as usize + 1, 0);
146 self.positive_buckets[relative_idx as usize] = 1;
147 } else {
148 self.positive_buckets[relative_idx as usize] += 1;
149 }
150 }
151 }
152
153 fn increment_negative(&mut self, idx: i32) {
155 if self.negative_buckets.is_empty() {
156 self.negative_offset = idx;
157 self.negative_buckets.push(1);
158 } else {
159 let relative_idx = idx - self.negative_offset;
160 if relative_idx < 0 {
161 let prepend_count = (-relative_idx) as usize;
162 let mut new_buckets = vec![0; prepend_count];
163 new_buckets.append(&mut self.negative_buckets);
164 self.negative_buckets = new_buckets;
165 self.negative_offset = idx;
166 self.negative_buckets[0] = 1;
167 } else if relative_idx as usize >= self.negative_buckets.len() {
168 self.negative_buckets.resize(relative_idx as usize + 1, 0);
169 self.negative_buckets[relative_idx as usize] = 1;
170 } else {
171 self.negative_buckets[relative_idx as usize] += 1;
172 }
173 }
174 }
175
176 pub fn merge(&mut self, other: &ExponentialHistogram) {
180 if other.count == 0 {
181 return;
182 }
183
184 assert_eq!(self.scale, other.scale, "Scale mismatch in merge");
186
187 for (i, &count) in other.positive_buckets.iter().enumerate() {
189 if count > 0 {
190 let idx = other.positive_offset + i as i32;
191 self.add_positive_count(idx, count);
192 }
193 }
194
195 for (i, &count) in other.negative_buckets.iter().enumerate() {
197 if count > 0 {
198 let idx = other.negative_offset + i as i32;
199 self.add_negative_count(idx, count);
200 }
201 }
202
203 self.zero_count += other.zero_count;
204 self.count += other.count;
205 self.sum += other.sum;
206
207 if other.count > 0 {
208 self.min = self.min.min(other.min);
209 self.max = self.max.max(other.max);
210 }
211 }
212
213 fn add_positive_count(&mut self, idx: i32, count: u64) {
215 if self.positive_buckets.is_empty() {
216 self.positive_offset = idx;
217 self.positive_buckets.push(count);
218 } else {
219 let relative_idx = idx - self.positive_offset;
220 if relative_idx < 0 {
221 let prepend_count = (-relative_idx) as usize;
222 let mut new_buckets = vec![0; prepend_count];
223 new_buckets.append(&mut self.positive_buckets);
224 self.positive_buckets = new_buckets;
225 self.positive_offset = idx;
226 self.positive_buckets[0] += count;
227 } else if relative_idx as usize >= self.positive_buckets.len() {
228 self.positive_buckets.resize(relative_idx as usize + 1, 0);
229 self.positive_buckets[relative_idx as usize] += count;
230 } else {
231 self.positive_buckets[relative_idx as usize] += count;
232 }
233 }
234 }
235
236 fn add_negative_count(&mut self, idx: i32, count: u64) {
238 if self.negative_buckets.is_empty() {
239 self.negative_offset = idx;
240 self.negative_buckets.push(count);
241 } else {
242 let relative_idx = idx - self.negative_offset;
243 if relative_idx < 0 {
244 let prepend_count = (-relative_idx) as usize;
245 let mut new_buckets = vec![0; prepend_count];
246 new_buckets.append(&mut self.negative_buckets);
247 self.negative_buckets = new_buckets;
248 self.negative_offset = idx;
249 self.negative_buckets[0] += count;
250 } else if relative_idx as usize >= self.negative_buckets.len() {
251 self.negative_buckets.resize(relative_idx as usize + 1, 0);
252 self.negative_buckets[relative_idx as usize] += count;
253 } else {
254 self.negative_buckets[relative_idx as usize] += count;
255 }
256 }
257 }
258
259 pub fn count(&self) -> u64 {
261 self.count
262 }
263
264 pub fn sum(&self) -> f64 {
266 self.sum
267 }
268
269 pub fn mean(&self) -> f64 {
271 if self.count == 0 {
272 0.0
273 } else {
274 self.sum / self.count as f64
275 }
276 }
277
278 pub fn min(&self) -> f64 {
280 if self.count > 0 { self.min } else { 0.0 }
281 }
282
283 pub fn max(&self) -> f64 {
285 if self.count > 0 { self.max } else { 0.0 }
286 }
287
288 pub fn quantile(&self, q: f64) -> f64 {
290 if self.count == 0 {
291 return 0.0;
292 }
293
294 let q = q.clamp(0.0, 1.0);
295 let target_rank = (q * self.count as f64).ceil() as u64;
296 let mut cumulative = 0u64;
297
298 for (i, &count) in self.negative_buckets.iter().enumerate().rev() {
300 if count > 0 {
301 cumulative += count;
302 if cumulative >= target_rank {
303 let idx = self.negative_offset + i as i32;
304 return -self.bucket_lower_bound(idx);
305 }
306 }
307 }
308
309 cumulative += self.zero_count;
311 if cumulative >= target_rank {
312 return 0.0;
313 }
314
315 for (i, &count) in self.positive_buckets.iter().enumerate() {
317 if count > 0 {
318 cumulative += count;
319 if cumulative >= target_rank {
320 let idx = self.positive_offset + i as i32;
321 let lower = self.bucket_lower_bound(idx);
325 let upper = self.bucket_lower_bound(idx + 1);
326 return (lower * upper).sqrt();
327 }
328 }
329 }
330
331 self.max
332 }
333
334 pub fn is_empty(&self) -> bool {
336 self.count == 0
337 }
338
339 pub fn clear(&mut self) {
341 self.positive_buckets.clear();
342 self.negative_buckets.clear();
343 self.positive_offset = 0;
344 self.negative_offset = 0;
345 self.zero_count = 0;
346 self.count = 0;
347 self.sum = 0.0;
348 self.min = f64::MAX;
349 self.max = f64::MIN;
350 }
351
352 pub fn bucket_count(&self) -> usize {
354 self.positive_buckets.len() + self.negative_buckets.len()
355 }
356
357 pub fn memory_usage(&self) -> usize {
359 std::mem::size_of::<Self>()
360 + self.positive_buckets.len() * std::mem::size_of::<u64>()
361 + self.negative_buckets.len() * std::mem::size_of::<u64>()
362 }
363}
364
365impl Default for ExponentialHistogram {
366 fn default() -> Self {
367 Self::default_scale()
368 }
369}
370
371#[cfg(test)]
372mod tests {
373 use super::*;
374
375 #[test]
376 fn test_basic_recording() {
377 let mut hist = ExponentialHistogram::new(3);
378
379 for i in 1..=100 {
380 hist.record(i as f64);
381 }
382
383 assert_eq!(hist.count(), 100);
384 assert!((hist.mean() - 50.5).abs() < 0.01);
385 assert_eq!(hist.min(), 1.0);
386 assert_eq!(hist.max(), 100.0);
387 }
388
389 #[test]
390 fn test_merge() {
391 let mut hist1 = ExponentialHistogram::new(3);
392 let mut hist2 = ExponentialHistogram::new(3);
393
394 for i in 1..=50 {
395 hist1.record(i as f64);
396 }
397 for i in 51..=100 {
398 hist2.record(i as f64);
399 }
400
401 hist1.merge(&hist2);
402
403 assert_eq!(hist1.count(), 100);
404 assert!((hist1.mean() - 50.5).abs() < 0.01);
405 }
406
407 #[test]
408 fn test_quantiles() {
409 let mut hist = ExponentialHistogram::new(3);
410
411 for i in 1..=100 {
413 hist.record(i as f64);
414 }
415
416 let p50 = hist.quantile(0.50);
417 let p99 = hist.quantile(0.99);
418
419 assert!(p50 > 30.0 && p50 < 70.0, "P50 was {}", p50);
422 assert!(p99 > 80.0, "P99 was {}", p99);
423 }
424
425 #[test]
426 fn test_latency_distribution() {
427 let mut hist = ExponentialHistogram::new(3);
428
429 for _ in 0..900 {
431 hist.record(5.0); }
433 for _ in 0..90 {
434 hist.record(50.0); }
436 for _ in 0..10 {
437 hist.record(500.0); }
439
440 assert_eq!(hist.count(), 1000);
441
442 let p50 = hist.quantile(0.50);
443 let p99 = hist.quantile(0.99);
444
445 assert!(p50 < 20.0, "P50 was {}", p50);
447
448 assert!(p99 > 5.0, "P99 was {}", p99);
450 }
451}