inference_lab/metrics/
quantile.rs

1/// Streaming quantile estimation using P-Squared algorithm
2/// Maintains approximate quantiles (p50, p90, p99) in O(1) time and space
3pub struct StreamingQuantiles {
4    // Marker positions and heights for P² algorithm
5    // We track 5 markers for p50, p90, p99
6    markers: [f64; 11],           // Marker heights (actual values)
7    positions: [f64; 11],         // Marker positions (count-based)
8    desired_positions: [f64; 11], // Desired positions based on quantiles
9    count: usize,
10}
11
12impl StreamingQuantiles {
13    pub fn new() -> Self {
14        Self {
15            markers: [0.0; 11],
16            positions: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0],
17            desired_positions: [0.0; 11],
18            count: 0,
19        }
20    }
21
22    pub fn add(&mut self, value: f64) {
23        if self.count < 11 {
24            // Initial phase: collect first 11 samples
25            self.markers[self.count] = value;
26            self.count += 1;
27
28            if self.count == 11 {
29                // Sort initial markers
30                self.markers
31                    .sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
32                // Initialize desired positions for p50, p90, p99
33                self.update_desired_positions();
34            }
35            return;
36        }
37
38        self.count += 1;
39
40        // Find cell k such that markers[k-1] < value <= markers[k]
41        let mut k = 0;
42        if value < self.markers[0] {
43            self.markers[0] = value;
44            k = 1;
45        } else if value >= self.markers[10] {
46            self.markers[10] = value;
47            k = 10;
48        } else {
49            for i in 1..11 {
50                if value < self.markers[i] {
51                    k = i;
52                    break;
53                }
54            }
55        }
56
57        // Increment positions of markers k+1 through 11
58        for i in k..11 {
59            self.positions[i] += 1.0;
60        }
61
62        // Update desired positions
63        self.update_desired_positions();
64
65        // Adjust marker heights
66        for i in 1..10 {
67            let d = self.desired_positions[i] - self.positions[i];
68
69            if (d >= 1.0 && self.positions[i + 1] - self.positions[i] > 1.0)
70                || (d <= -1.0 && self.positions[i - 1] - self.positions[i] < -1.0)
71            {
72                let d_sign = if d > 0.0 { 1.0 } else { -1.0 };
73
74                // Try parabolic formula
75                let q_new = self.parabolic(i, d_sign);
76
77                if self.markers[i - 1] < q_new && q_new < self.markers[i + 1] {
78                    self.markers[i] = q_new;
79                } else {
80                    // Use linear formula
81                    self.markers[i] = self.linear(i, d_sign);
82                }
83
84                self.positions[i] += d_sign;
85            }
86        }
87    }
88
89    fn update_desired_positions(&mut self) {
90        let n = self.count as f64;
91        // Markers at indices: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
92        // Target quantiles: min, p1, p10, p25, p50, p75, p90, p95, p99, p99.9, max
93        self.desired_positions[0] = 1.0;
94        self.desired_positions[1] = 1.0 + 0.01 * (n - 1.0); // p1
95        self.desired_positions[2] = 1.0 + 0.10 * (n - 1.0); // p10
96        self.desired_positions[3] = 1.0 + 0.25 * (n - 1.0); // p25
97        self.desired_positions[4] = 1.0 + 0.50 * (n - 1.0); // p50
98        self.desired_positions[5] = 1.0 + 0.75 * (n - 1.0); // p75
99        self.desired_positions[6] = 1.0 + 0.90 * (n - 1.0); // p90
100        self.desired_positions[7] = 1.0 + 0.95 * (n - 1.0); // p95
101        self.desired_positions[8] = 1.0 + 0.99 * (n - 1.0); // p99
102        self.desired_positions[9] = 1.0 + 0.999 * (n - 1.0); // p99.9
103        self.desired_positions[10] = n;
104    }
105
106    fn parabolic(&self, i: usize, d: f64) -> f64 {
107        let q_i = self.markers[i];
108        let q_i1 = self.markers[i + 1];
109        let q_i_1 = self.markers[i - 1];
110        let n_i = self.positions[i];
111        let n_i1 = self.positions[i + 1];
112        let n_i_1 = self.positions[i - 1];
113
114        q_i + d / (n_i1 - n_i_1)
115            * ((n_i - n_i_1 + d) * (q_i1 - q_i) / (n_i1 - n_i)
116                + (n_i1 - n_i - d) * (q_i - q_i_1) / (n_i - n_i_1))
117    }
118
119    fn linear(&self, i: usize, d: f64) -> f64 {
120        let d_i = if d > 0.0 { 1 } else { -1 };
121        let q_i = self.markers[i];
122        let q_id = self.markers[(i as i32 + d_i) as usize];
123        let n_i = self.positions[i];
124        let n_id = self.positions[(i as i32 + d_i) as usize];
125
126        q_i + d * (q_id - q_i) / (n_id - n_i)
127    }
128
129    pub fn min(&self) -> f64 {
130        if self.count == 0 {
131            0.0
132        } else if self.count < 11 {
133            self.markers[..self.count]
134                .iter()
135                .fold(f64::INFINITY, |a, &b| a.min(b))
136        } else {
137            self.markers[0]
138        }
139    }
140
141    pub fn p50(&self) -> f64 {
142        if self.count < 11 {
143            self.fallback_quantile(0.50)
144        } else {
145            self.markers[4]
146        }
147    }
148
149    pub fn p90(&self) -> f64 {
150        if self.count < 11 {
151            self.fallback_quantile(0.90)
152        } else {
153            self.markers[6]
154        }
155    }
156
157    pub fn p99(&self) -> f64 {
158        if self.count < 11 {
159            self.fallback_quantile(0.99)
160        } else {
161            self.markers[8]
162        }
163    }
164
165    fn fallback_quantile(&self, p: f64) -> f64 {
166        if self.count == 0 {
167            return 0.0;
168        }
169        let mut sorted: Vec<f64> = self.markers[..self.count].to_vec();
170        sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
171        let index = ((self.count as f64 - 1.0) * p) as usize;
172        sorted[index.min(self.count - 1)]
173    }
174
175    pub fn mean(&self) -> f64 {
176        if self.count == 0 {
177            return 0.0;
178        }
179        if self.count < 11 {
180            self.markers[..self.count].iter().sum::<f64>() / self.count as f64
181        } else {
182            // Approximate mean from markers
183            self.markers.iter().sum::<f64>() / 11.0
184        }
185    }
186}