reddb_server/storage/primitives/
tdigest.rs1const DEFAULT_COMPRESSION: f64 = 100.0;
17
18#[derive(Debug, Clone, Copy, PartialEq)]
19struct Centroid {
20 mean: f64,
21 weight: f64,
22}
23
24#[derive(Debug, Clone)]
28pub struct TDigest {
29 compression: f64,
30 centroids: Vec<Centroid>,
31 buffer: Vec<Centroid>,
32 buffer_limit: usize,
33 total_weight: f64,
34 min: f64,
35 max: f64,
36}
37
38impl Default for TDigest {
39 fn default() -> Self {
40 Self::with_compression(DEFAULT_COMPRESSION)
41 }
42}
43
44impl TDigest {
45 pub fn new() -> Self {
46 Self::default()
47 }
48
49 pub fn with_compression(compression: f64) -> Self {
50 let c = compression.max(10.0);
51 Self {
52 compression: c,
53 centroids: Vec::new(),
54 buffer: Vec::new(),
55 buffer_limit: (6.0 * c) as usize + 10,
56 total_weight: 0.0,
57 min: f64::INFINITY,
58 max: f64::NEG_INFINITY,
59 }
60 }
61
62 pub fn is_empty(&self) -> bool {
63 self.total_weight == 0.0
64 }
65
66 pub fn count(&self) -> f64 {
67 self.total_weight
68 }
69
70 pub fn min(&self) -> f64 {
71 self.min
72 }
73
74 pub fn max(&self) -> f64 {
75 self.max
76 }
77
78 pub fn add(&mut self, value: f64) {
80 self.add_weighted(value, 1.0);
81 }
82
83 pub fn add_weighted(&mut self, value: f64, weight: f64) {
85 if !value.is_finite() || weight <= 0.0 {
86 return;
87 }
88 self.buffer.push(Centroid {
89 mean: value,
90 weight,
91 });
92 self.total_weight += weight;
93 if value < self.min {
94 self.min = value;
95 }
96 if value > self.max {
97 self.max = value;
98 }
99 if self.buffer.len() >= self.buffer_limit {
100 self.flush();
101 }
102 }
103
104 fn flush(&mut self) {
105 if self.buffer.is_empty() {
106 return;
107 }
108 let mut merged = Vec::with_capacity(self.centroids.len() + self.buffer.len());
109 merged.extend_from_slice(&self.centroids);
110 merged.extend_from_slice(&self.buffer);
111 merged.sort_by(|a, b| {
112 a.mean
113 .partial_cmp(&b.mean)
114 .unwrap_or(std::cmp::Ordering::Equal)
115 });
116 self.centroids = compact(&merged, self.compression, self.total_weight);
117 self.buffer.clear();
118 }
119
120 pub fn merge(&mut self, other: &TDigest) {
124 if other.is_empty() {
125 return;
126 }
127 self.total_weight += other.total_weight;
128 if other.min < self.min {
129 self.min = other.min;
130 }
131 if other.max > self.max {
132 self.max = other.max;
133 }
134 self.buffer.extend_from_slice(&other.centroids);
135 self.buffer.extend_from_slice(&other.buffer);
136 self.flush();
137 }
138
139 pub fn quantile(&self, q: f64) -> f64 {
141 if self.is_empty() {
142 return f64::NAN;
143 }
144 let mut snapshot = self.clone();
145 snapshot.flush();
146 if snapshot.centroids.is_empty() {
147 return snapshot.min;
148 }
149 if q <= 0.0 {
150 return snapshot.min;
151 }
152 if q >= 1.0 {
153 return snapshot.max;
154 }
155 let target = q * snapshot.total_weight;
156 let mut cumulative = 0.0;
157 let mut prev: Option<(f64, f64)> = None; for c in &snapshot.centroids {
159 let next = cumulative + c.weight;
160 if target <= next {
161 if let Some((pm, pc)) = prev {
162 let span = cumulative - pc;
164 if span <= 0.0 {
165 return c.mean;
166 }
167 let frac = (target - pc) / span;
168 let clamped = frac.clamp(0.0, 1.0);
169 return pm + (c.mean - pm) * clamped;
170 }
171 return c.mean;
172 }
173 cumulative = next;
174 prev = Some((c.mean, cumulative));
175 }
176 snapshot.max
177 }
178}
179
180fn compact(sorted: &[Centroid], compression: f64, total: f64) -> Vec<Centroid> {
181 let mut out: Vec<Centroid> = Vec::new();
182 if sorted.is_empty() || total <= 0.0 {
183 return out;
184 }
185 let mut cumulative = 0.0;
186 let mut current = sorted[0];
187 cumulative += current.weight;
188 for next in &sorted[1..] {
189 let q0 = (cumulative - current.weight) / total;
190 let q1 = (cumulative + next.weight) / total;
191 let max_weight = total
192 * (4.0 * q0.min(1.0 - q0).max(0.0).min(q1.min(1.0 - q1).max(0.0)))
193 .max(1.0 / compression);
194 let combined_weight = current.weight + next.weight;
195 if combined_weight <= max_weight {
196 let new_mean =
198 current.mean + (next.mean - current.mean) * (next.weight / combined_weight);
199 current.mean = new_mean;
200 current.weight = combined_weight;
201 } else {
202 out.push(current);
203 current = *next;
204 }
205 cumulative += next.weight;
206 }
207 out.push(current);
208 out
209}
210
211#[cfg(test)]
212mod tests {
213 use super::*;
214
215 #[test]
216 fn empty_digest_returns_nan() {
217 let d = TDigest::new();
218 assert!(d.quantile(0.5).is_nan());
219 }
220
221 #[test]
222 fn single_value_is_every_quantile() {
223 let mut d = TDigest::new();
224 d.add(42.0);
225 assert_eq!(d.quantile(0.0), 42.0);
226 assert_eq!(d.quantile(0.5), 42.0);
227 assert_eq!(d.quantile(1.0), 42.0);
228 }
229
230 #[test]
231 fn median_of_uniform_is_near_half() {
232 let mut d = TDigest::new();
240 for i in 0..10_000 {
241 d.add(i as f64);
242 }
243 let m = d.quantile(0.5);
244 assert!(m > 2000.0 && m < 8000.0, "median was {m}");
245 }
246
247 #[test]
248 fn tail_quantiles_are_better_than_20pct_error() {
249 let mut d = TDigest::new();
250 for i in 0..100_000 {
251 d.add(i as f64);
252 }
253 let p99 = d.quantile(0.99);
254 let expected = 99_000.0;
255 let err = ((p99 - expected).abs() / expected) * 100.0;
256 assert!(err < 20.0, "p99 error {err}% (got {p99})");
257 }
258
259 #[test]
260 fn min_max_are_preserved_exactly() {
261 let mut d = TDigest::new();
262 for x in [3.14, 2.71, 1.41, 10.0, -5.0] {
263 d.add(x);
264 }
265 assert_eq!(d.min(), -5.0);
266 assert_eq!(d.max(), 10.0);
267 }
268
269 #[test]
270 fn merge_is_associative_enough_for_parallel_agg() {
271 let mut left = TDigest::new();
272 let mut right = TDigest::new();
273 for i in 0..5_000 {
274 left.add(i as f64);
275 }
276 for i in 5_000..10_000 {
277 right.add(i as f64);
278 }
279 let mut combined = TDigest::new();
280 for i in 0..10_000 {
281 combined.add(i as f64);
282 }
283 left.merge(&right);
284 let m_combined = combined.quantile(0.5);
285 let m_merged = left.quantile(0.5);
286 assert!(
287 (m_combined - m_merged).abs() < 200.0,
288 "merge median drift: {m_combined} vs {m_merged}"
289 );
290 assert_eq!(left.count(), 10_000.0);
291 }
292
293 #[test]
294 fn non_finite_and_non_positive_weight_are_ignored() {
295 let mut d = TDigest::new();
296 d.add(f64::NAN);
297 d.add(f64::INFINITY);
298 d.add_weighted(1.0, 0.0);
299 d.add_weighted(1.0, -3.0);
300 assert!(d.is_empty());
301 }
302}