Skip to main content

inference_lab/metrics/
quantile.rs

1/// Streaming quantile estimation using P-Squared algorithm
2/// Maintains approximate quantiles (p50, p90, p99) in O(1) time and space
3pub struct StreamingQuantiles {
4    // Marker positions and heights for P² algorithm
5    // We track 5 markers for p50, p90, p99
6    markers: [f64; 11],           // Marker heights (actual values)
7    positions: [f64; 11],         // Marker positions (count-based)
8    desired_positions: [f64; 11], // Desired positions based on quantiles
9    count: usize,
10}
11
12impl StreamingQuantiles {
13    pub fn new() -> Self {
14        Self {
15            markers: [0.0; 11],
16            positions: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0],
17            desired_positions: [0.0; 11],
18            count: 0,
19        }
20    }
21
22    pub fn add(&mut self, value: f64) {
23        if self.count < 11 {
24            // Initial phase: collect first 11 samples
25            self.markers[self.count] = value;
26            self.count += 1;
27
28            if self.count == 11 {
29                // Sort initial markers
30                self.markers
31                    .sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
32                // Initialize desired positions for p50, p90, p99
33                self.update_desired_positions();
34            }
35            return;
36        }
37
38        self.count += 1;
39
40        // Find cell k such that markers[k-1] < value <= markers[k]
41        let mut k = 0;
42        if value < self.markers[0] {
43            self.markers[0] = value;
44            k = 1;
45        } else if value >= self.markers[10] {
46            self.markers[10] = value;
47            k = 10;
48        } else {
49            for i in 1..11 {
50                if value < self.markers[i] {
51                    k = i;
52                    break;
53                }
54            }
55        }
56
57        // Increment positions of markers k+1 through 11
58        for i in k..11 {
59            self.positions[i] += 1.0;
60        }
61
62        // Update desired positions
63        self.update_desired_positions();
64
65        // Adjust marker heights
66        for i in 1..10 {
67            let d = self.desired_positions[i] - self.positions[i];
68
69            if (d >= 1.0 && self.positions[i + 1] - self.positions[i] > 1.0)
70                || (d <= -1.0 && self.positions[i - 1] - self.positions[i] < -1.0)
71            {
72                let d_sign = if d > 0.0 { 1.0 } else { -1.0 };
73
74                // Try parabolic formula
75                let q_new = self.parabolic(i, d_sign);
76
77                if self.markers[i - 1] < q_new && q_new < self.markers[i + 1] {
78                    self.markers[i] = q_new;
79                } else {
80                    // Use linear formula
81                    self.markers[i] = self.linear(i, d_sign);
82                }
83
84                self.positions[i] += d_sign;
85            }
86        }
87    }
88
89    fn update_desired_positions(&mut self) {
90        let n = self.count as f64;
91        // Markers at indices: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
92        // Target quantiles: min, p1, p10, p25, p50, p75, p90, p95, p99, p99.9, max
93        self.desired_positions[0] = 1.0;
94        self.desired_positions[1] = 1.0 + 0.01 * (n - 1.0); // p1
95        self.desired_positions[2] = 1.0 + 0.10 * (n - 1.0); // p10
96        self.desired_positions[3] = 1.0 + 0.25 * (n - 1.0); // p25
97        self.desired_positions[4] = 1.0 + 0.50 * (n - 1.0); // p50
98        self.desired_positions[5] = 1.0 + 0.75 * (n - 1.0); // p75
99        self.desired_positions[6] = 1.0 + 0.90 * (n - 1.0); // p90
100        self.desired_positions[7] = 1.0 + 0.95 * (n - 1.0); // p95
101        self.desired_positions[8] = 1.0 + 0.99 * (n - 1.0); // p99
102        self.desired_positions[9] = 1.0 + 0.999 * (n - 1.0); // p99.9
103        self.desired_positions[10] = n;
104    }
105
106    fn parabolic(&self, i: usize, d: f64) -> f64 {
107        let q_i = self.markers[i];
108        let q_i1 = self.markers[i + 1];
109        let q_i_1 = self.markers[i - 1];
110        let n_i = self.positions[i];
111        let n_i1 = self.positions[i + 1];
112        let n_i_1 = self.positions[i - 1];
113
114        q_i + d / (n_i1 - n_i_1)
115            * ((n_i - n_i_1 + d) * (q_i1 - q_i) / (n_i1 - n_i)
116                + (n_i1 - n_i - d) * (q_i - q_i_1) / (n_i - n_i_1))
117    }
118
119    fn linear(&self, i: usize, d: f64) -> f64 {
120        let d_i = if d > 0.0 { 1 } else { -1 };
121        let q_i = self.markers[i];
122        let q_id = self.markers[(i as i32 + d_i) as usize];
123        let n_i = self.positions[i];
124        let n_id = self.positions[(i as i32 + d_i) as usize];
125
126        q_i + d * (q_id - q_i) / (n_id - n_i)
127    }
128
129    pub fn min(&self) -> f64 {
130        if self.count == 0 {
131            0.0
132        } else if self.count < 11 {
133            self.markers[..self.count]
134                .iter()
135                .fold(f64::INFINITY, |a, &b| a.min(b))
136        } else {
137            self.markers[0]
138        }
139    }
140
141    pub fn p50(&self) -> f64 {
142        if self.count < 11 {
143            self.fallback_quantile(0.50)
144        } else {
145            self.markers[4]
146        }
147    }
148
149    pub fn p90(&self) -> f64 {
150        if self.count < 11 {
151            self.fallback_quantile(0.90)
152        } else {
153            self.markers[6]
154        }
155    }
156
157    pub fn p99(&self) -> f64 {
158        if self.count < 11 {
159            self.fallback_quantile(0.99)
160        } else {
161            self.markers[8]
162        }
163    }
164
165    fn fallback_quantile(&self, p: f64) -> f64 {
166        if self.count == 0 {
167            return 0.0;
168        }
169        let mut sorted: Vec<f64> = self.markers[..self.count].to_vec();
170        sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
171        let index = ((self.count as f64 - 1.0) * p) as usize;
172        sorted[index.min(self.count - 1)]
173    }
174
175    pub fn mean(&self) -> f64 {
176        if self.count == 0 {
177            return 0.0;
178        }
179        if self.count < 11 {
180            self.markers[..self.count].iter().sum::<f64>() / self.count as f64
181        } else {
182            // Approximate mean from markers
183            self.markers.iter().sum::<f64>() / 11.0
184        }
185    }
186}
187
188#[cfg(test)]
189mod tests {
190    use super::*;
191
192    #[test]
193    fn test_empty_quantiles() {
194        let quantiles = StreamingQuantiles::new();
195        assert_eq!(quantiles.min(), 0.0);
196        assert_eq!(quantiles.p50(), 0.0);
197        assert_eq!(quantiles.p90(), 0.0);
198        assert_eq!(quantiles.p99(), 0.0);
199        assert_eq!(quantiles.mean(), 0.0);
200    }
201
202    #[test]
203    fn test_single_value() {
204        let mut quantiles = StreamingQuantiles::new();
205        quantiles.add(42.0);
206        assert_eq!(quantiles.min(), 42.0);
207        assert_eq!(quantiles.p50(), 42.0);
208        assert_eq!(quantiles.p90(), 42.0);
209        assert_eq!(quantiles.p99(), 42.0);
210        assert_eq!(quantiles.mean(), 42.0);
211    }
212
213    #[test]
214    fn test_few_values_sorted() {
215        let mut quantiles = StreamingQuantiles::new();
216        for i in 1..=5 {
217            quantiles.add(i as f64);
218        }
219        // For fixed input, we should get fixed output
220        assert_eq!(quantiles.min(), 1.0);
221        assert_eq!(quantiles.p50(), 3.0);
222        assert_eq!(quantiles.p90(), 4.0);
223        assert_eq!(quantiles.p99(), 4.0);
224        assert_eq!(quantiles.mean(), 3.0);
225    }
226
227    #[test]
228    fn test_uniform_distribution() {
229        let mut quantiles = StreamingQuantiles::new();
230        // Add values 1 to 1000 - deterministic dataset
231        for i in 1..=1000 {
232            quantiles.add(i as f64);
233        }
234
235        // For this exact dataset, algorithm should produce exact values
236        assert_eq!(quantiles.min(), 1.0);
237        assert_eq!(quantiles.p50(), 500.0);
238        assert_eq!(quantiles.p90(), 900.0);
239        assert_eq!(quantiles.p99(), 990.0);
240    }
241
242    #[test]
243    fn test_quantile_ordering() {
244        let mut quantiles = StreamingQuantiles::new();
245        // Add 1000 values
246        for i in 1..=1000 {
247            quantiles.add(i as f64);
248        }
249
250        // Quantiles should be ordered: min <= p50 <= p90 <= p99
251        assert!(quantiles.min() <= quantiles.p50());
252        assert!(quantiles.p50() <= quantiles.p90());
253        assert!(quantiles.p90() <= quantiles.p99());
254    }
255
256    #[test]
257    fn test_skewed_distribution() {
258        let mut quantiles = StreamingQuantiles::new();
259        // Heavy tail: mostly small values, few large values
260        // 900 values of 1.0, 90 values of 10.0, 10 values of 100.0
261        for _ in 0..900 {
262            quantiles.add(1.0);
263        }
264        for _ in 0..90 {
265            quantiles.add(10.0);
266        }
267        for _ in 0..10 {
268            quantiles.add(100.0);
269        }
270
271        // For this fixed dataset, should get fixed values
272        assert_eq!(quantiles.min(), 1.0);
273        assert_eq!(quantiles.p50(), 1.010241318586147);
274        assert_eq!(quantiles.p90(), 5.014807349038084);
275        assert_eq!(quantiles.p99(), 58.33620606034887);
276    }
277
278    #[test]
279    fn test_duplicate_values() {
280        let mut quantiles = StreamingQuantiles::new();
281        // All same value
282        for _ in 0..100 {
283            quantiles.add(5.0);
284        }
285
286        assert_eq!(quantiles.min(), 5.0);
287        assert_eq!(quantiles.p50(), 5.0);
288        assert_eq!(quantiles.p90(), 5.0);
289        assert_eq!(quantiles.p99(), 5.0);
290        assert_eq!(quantiles.mean(), 5.0);
291    }
292
293    #[test]
294    fn test_reverse_order() {
295        let mut quantiles = StreamingQuantiles::new();
296        // Add values in reverse order: 1000 down to 1
297        for i in (1..=1000).rev() {
298            quantiles.add(i as f64);
299        }
300
301        // Insertion order affects streaming algorithm results
302        assert_eq!(quantiles.min(), 1.0);
303        assert_eq!(quantiles.p50(), 501.0);
304        assert_eq!(quantiles.p90(), 901.0);
305        assert_eq!(quantiles.p99(), 991.0);
306    }
307
308    #[test]
309    fn test_boundary_11_values() {
310        let mut quantiles = StreamingQuantiles::new();
311        // Exactly 11 values (boundary where algorithm switches from exact to streaming)
312        for i in 1..=11 {
313            quantiles.add(i as f64 * 10.0); // [10, 20, 30, ..., 110]
314        }
315
316        // After sorting: [10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110]
317        // With 11 values, this is still in the initial phase
318        assert_eq!(quantiles.min(), 10.0);
319        assert_eq!(quantiles.p50(), 50.0);
320        assert_eq!(quantiles.p90(), 70.0);
321        assert_eq!(quantiles.p99(), 90.0);
322    }
323
324    #[test]
325    fn test_values_beyond_initial_11() {
326        let mut quantiles = StreamingQuantiles::new();
327        // Add 20 values to test streaming phase
328        for i in 1..=20 {
329            quantiles.add(i as f64);
330        }
331
332        // Fixed dataset should produce fixed results
333        assert_eq!(quantiles.min(), 1.0);
334        assert_eq!(quantiles.p50(), 9.0);
335        assert_eq!(quantiles.p90(), 13.0);
336        assert_eq!(quantiles.p99(), 17.0);
337    }
338
339    #[test]
340    fn test_extreme_values() {
341        let mut quantiles = StreamingQuantiles::new();
342        quantiles.add(0.001);
343        quantiles.add(1000000.0);
344        quantiles.add(0.002);
345        quantiles.add(0.003);
346        quantiles.add(0.004);
347
348        // Fixed dataset should produce fixed results
349        assert_eq!(quantiles.min(), 0.001);
350        assert_eq!(quantiles.p50(), 0.003);
351        assert_eq!(quantiles.p90(), 0.004);
352        assert_eq!(quantiles.p99(), 0.004);
353    }
354}