Skip to main content

inference_lab/metrics/
quantile.rs

1/// Streaming quantile estimation using P-Squared algorithm
2/// Maintains approximate quantiles (p50, p90, p99) in O(1) time and space
3pub struct StreamingQuantiles {
4    // Marker positions and heights for P² algorithm
5    // We track 5 markers for p50, p90, p99
6    markers: [f64; 11],           // Marker heights (actual values)
7    positions: [f64; 11],         // Marker positions (count-based)
8    desired_positions: [f64; 11], // Desired positions based on quantiles
9    count: usize,
10}
11
12impl Default for StreamingQuantiles {
13    fn default() -> Self {
14        Self::new()
15    }
16}
17
18impl StreamingQuantiles {
19    pub fn new() -> Self {
20        Self {
21            markers: [0.0; 11],
22            positions: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0],
23            desired_positions: [0.0; 11],
24            count: 0,
25        }
26    }
27
28    pub fn add(&mut self, value: f64) {
29        if self.count < 11 {
30            // Initial phase: collect first 11 samples
31            self.markers[self.count] = value;
32            self.count += 1;
33
34            if self.count == 11 {
35                // Sort initial markers
36                self.markers
37                    .sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
38                // Initialize desired positions for p50, p90, p99
39                self.update_desired_positions();
40            }
41            return;
42        }
43
44        self.count += 1;
45
46        // Find cell k such that markers[k-1] < value <= markers[k]
47        let mut k = 0;
48        if value < self.markers[0] {
49            self.markers[0] = value;
50            k = 1;
51        } else if value >= self.markers[10] {
52            self.markers[10] = value;
53            k = 10;
54        } else {
55            for i in 1..11 {
56                if value < self.markers[i] {
57                    k = i;
58                    break;
59                }
60            }
61        }
62
63        // Increment positions of markers k+1 through 11
64        for i in k..11 {
65            self.positions[i] += 1.0;
66        }
67
68        // Update desired positions
69        self.update_desired_positions();
70
71        // Adjust marker heights
72        for i in 1..10 {
73            let d = self.desired_positions[i] - self.positions[i];
74
75            if (d >= 1.0 && self.positions[i + 1] - self.positions[i] > 1.0)
76                || (d <= -1.0 && self.positions[i - 1] - self.positions[i] < -1.0)
77            {
78                let d_sign = if d > 0.0 { 1.0 } else { -1.0 };
79
80                // Try parabolic formula
81                let q_new = self.parabolic(i, d_sign);
82
83                if self.markers[i - 1] < q_new && q_new < self.markers[i + 1] {
84                    self.markers[i] = q_new;
85                } else {
86                    // Use linear formula
87                    self.markers[i] = self.linear(i, d_sign);
88                }
89
90                self.positions[i] += d_sign;
91            }
92        }
93    }
94
95    fn update_desired_positions(&mut self) {
96        let n = self.count as f64;
97        // Markers at indices: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
98        // Target quantiles: min, p1, p10, p25, p50, p75, p90, p95, p99, p99.9, max
99        self.desired_positions[0] = 1.0;
100        self.desired_positions[1] = 1.0 + 0.01 * (n - 1.0); // p1
101        self.desired_positions[2] = 1.0 + 0.10 * (n - 1.0); // p10
102        self.desired_positions[3] = 1.0 + 0.25 * (n - 1.0); // p25
103        self.desired_positions[4] = 1.0 + 0.50 * (n - 1.0); // p50
104        self.desired_positions[5] = 1.0 + 0.75 * (n - 1.0); // p75
105        self.desired_positions[6] = 1.0 + 0.90 * (n - 1.0); // p90
106        self.desired_positions[7] = 1.0 + 0.95 * (n - 1.0); // p95
107        self.desired_positions[8] = 1.0 + 0.99 * (n - 1.0); // p99
108        self.desired_positions[9] = 1.0 + 0.999 * (n - 1.0); // p99.9
109        self.desired_positions[10] = n;
110    }
111
112    fn parabolic(&self, i: usize, d: f64) -> f64 {
113        let q_i = self.markers[i];
114        let q_i1 = self.markers[i + 1];
115        let q_i_1 = self.markers[i - 1];
116        let n_i = self.positions[i];
117        let n_i1 = self.positions[i + 1];
118        let n_i_1 = self.positions[i - 1];
119
120        q_i + d / (n_i1 - n_i_1)
121            * ((n_i - n_i_1 + d) * (q_i1 - q_i) / (n_i1 - n_i)
122                + (n_i1 - n_i - d) * (q_i - q_i_1) / (n_i - n_i_1))
123    }
124
125    fn linear(&self, i: usize, d: f64) -> f64 {
126        let d_i = if d > 0.0 { 1 } else { -1 };
127        let q_i = self.markers[i];
128        let q_id = self.markers[(i as i32 + d_i) as usize];
129        let n_i = self.positions[i];
130        let n_id = self.positions[(i as i32 + d_i) as usize];
131
132        q_i + d * (q_id - q_i) / (n_id - n_i)
133    }
134
135    pub fn min(&self) -> f64 {
136        if self.count == 0 {
137            0.0
138        } else if self.count < 11 {
139            self.markers[..self.count]
140                .iter()
141                .fold(f64::INFINITY, |a, &b| a.min(b))
142        } else {
143            self.markers[0]
144        }
145    }
146
147    pub fn p50(&self) -> f64 {
148        if self.count < 11 {
149            self.fallback_quantile(0.50)
150        } else {
151            self.markers[4]
152        }
153    }
154
155    pub fn p90(&self) -> f64 {
156        if self.count < 11 {
157            self.fallback_quantile(0.90)
158        } else {
159            self.markers[6]
160        }
161    }
162
163    pub fn p99(&self) -> f64 {
164        if self.count < 11 {
165            self.fallback_quantile(0.99)
166        } else {
167            self.markers[8]
168        }
169    }
170
171    fn fallback_quantile(&self, p: f64) -> f64 {
172        if self.count == 0 {
173            return 0.0;
174        }
175        let mut sorted: Vec<f64> = self.markers[..self.count].to_vec();
176        sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
177        let index = ((self.count as f64 - 1.0) * p) as usize;
178        sorted[index.min(self.count - 1)]
179    }
180
181    pub fn mean(&self) -> f64 {
182        if self.count == 0 {
183            return 0.0;
184        }
185        if self.count < 11 {
186            self.markers[..self.count].iter().sum::<f64>() / self.count as f64
187        } else {
188            // Approximate mean from markers
189            self.markers.iter().sum::<f64>() / 11.0
190        }
191    }
192}
193
194#[cfg(test)]
195mod tests {
196    use super::*;
197
198    #[test]
199    fn test_empty_quantiles() {
200        let quantiles = StreamingQuantiles::new();
201        assert_eq!(quantiles.min(), 0.0);
202        assert_eq!(quantiles.p50(), 0.0);
203        assert_eq!(quantiles.p90(), 0.0);
204        assert_eq!(quantiles.p99(), 0.0);
205        assert_eq!(quantiles.mean(), 0.0);
206    }
207
208    #[test]
209    fn test_single_value() {
210        let mut quantiles = StreamingQuantiles::new();
211        quantiles.add(42.0);
212        assert_eq!(quantiles.min(), 42.0);
213        assert_eq!(quantiles.p50(), 42.0);
214        assert_eq!(quantiles.p90(), 42.0);
215        assert_eq!(quantiles.p99(), 42.0);
216        assert_eq!(quantiles.mean(), 42.0);
217    }
218
219    #[test]
220    fn test_few_values_sorted() {
221        let mut quantiles = StreamingQuantiles::new();
222        for i in 1..=5 {
223            quantiles.add(i as f64);
224        }
225        // For fixed input, we should get fixed output
226        assert_eq!(quantiles.min(), 1.0);
227        assert_eq!(quantiles.p50(), 3.0);
228        assert_eq!(quantiles.p90(), 4.0);
229        assert_eq!(quantiles.p99(), 4.0);
230        assert_eq!(quantiles.mean(), 3.0);
231    }
232
233    #[test]
234    fn test_uniform_distribution() {
235        let mut quantiles = StreamingQuantiles::new();
236        // Add values 1 to 1000 - deterministic dataset
237        for i in 1..=1000 {
238            quantiles.add(i as f64);
239        }
240
241        // For this exact dataset, algorithm should produce exact values
242        assert_eq!(quantiles.min(), 1.0);
243        assert_eq!(quantiles.p50(), 500.0);
244        assert_eq!(quantiles.p90(), 900.0);
245        assert_eq!(quantiles.p99(), 990.0);
246    }
247
248    #[test]
249    fn test_quantile_ordering() {
250        let mut quantiles = StreamingQuantiles::new();
251        // Add 1000 values
252        for i in 1..=1000 {
253            quantiles.add(i as f64);
254        }
255
256        // Quantiles should be ordered: min <= p50 <= p90 <= p99
257        assert!(quantiles.min() <= quantiles.p50());
258        assert!(quantiles.p50() <= quantiles.p90());
259        assert!(quantiles.p90() <= quantiles.p99());
260    }
261
262    #[test]
263    fn test_skewed_distribution() {
264        let mut quantiles = StreamingQuantiles::new();
265        // Heavy tail: mostly small values, few large values
266        // 900 values of 1.0, 90 values of 10.0, 10 values of 100.0
267        for _ in 0..900 {
268            quantiles.add(1.0);
269        }
270        for _ in 0..90 {
271            quantiles.add(10.0);
272        }
273        for _ in 0..10 {
274            quantiles.add(100.0);
275        }
276
277        // For this fixed dataset, should get fixed values
278        assert_eq!(quantiles.min(), 1.0);
279        assert_eq!(quantiles.p50(), 1.010241318586147);
280        assert_eq!(quantiles.p90(), 5.014807349038084);
281        assert_eq!(quantiles.p99(), 58.33620606034887);
282    }
283
284    #[test]
285    fn test_duplicate_values() {
286        let mut quantiles = StreamingQuantiles::new();
287        // All same value
288        for _ in 0..100 {
289            quantiles.add(5.0);
290        }
291
292        assert_eq!(quantiles.min(), 5.0);
293        assert_eq!(quantiles.p50(), 5.0);
294        assert_eq!(quantiles.p90(), 5.0);
295        assert_eq!(quantiles.p99(), 5.0);
296        assert_eq!(quantiles.mean(), 5.0);
297    }
298
299    #[test]
300    fn test_reverse_order() {
301        let mut quantiles = StreamingQuantiles::new();
302        // Add values in reverse order: 1000 down to 1
303        for i in (1..=1000).rev() {
304            quantiles.add(i as f64);
305        }
306
307        // Insertion order affects streaming algorithm results
308        assert_eq!(quantiles.min(), 1.0);
309        assert_eq!(quantiles.p50(), 501.0);
310        assert_eq!(quantiles.p90(), 901.0);
311        assert_eq!(quantiles.p99(), 991.0);
312    }
313
314    #[test]
315    fn test_boundary_11_values() {
316        let mut quantiles = StreamingQuantiles::new();
317        // Exactly 11 values (boundary where algorithm switches from exact to streaming)
318        for i in 1..=11 {
319            quantiles.add(i as f64 * 10.0); // [10, 20, 30, ..., 110]
320        }
321
322        // After sorting: [10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110]
323        // With 11 values, this is still in the initial phase
324        assert_eq!(quantiles.min(), 10.0);
325        assert_eq!(quantiles.p50(), 50.0);
326        assert_eq!(quantiles.p90(), 70.0);
327        assert_eq!(quantiles.p99(), 90.0);
328    }
329
330    #[test]
331    fn test_values_beyond_initial_11() {
332        let mut quantiles = StreamingQuantiles::new();
333        // Add 20 values to test streaming phase
334        for i in 1..=20 {
335            quantiles.add(i as f64);
336        }
337
338        // Fixed dataset should produce fixed results
339        assert_eq!(quantiles.min(), 1.0);
340        assert_eq!(quantiles.p50(), 9.0);
341        assert_eq!(quantiles.p90(), 13.0);
342        assert_eq!(quantiles.p99(), 17.0);
343    }
344
345    #[test]
346    fn test_extreme_values() {
347        let mut quantiles = StreamingQuantiles::new();
348        quantiles.add(0.001);
349        quantiles.add(1000000.0);
350        quantiles.add(0.002);
351        quantiles.add(0.003);
352        quantiles.add(0.004);
353
354        // Fixed dataset should produce fixed results
355        assert_eq!(quantiles.min(), 0.001);
356        assert_eq!(quantiles.p50(), 0.003);
357        assert_eq!(quantiles.p90(), 0.004);
358        assert_eq!(quantiles.p99(), 0.004);
359    }
360}