inference_lab/metrics/
quantile.rs

1/// Streaming quantile estimation using P-Squared algorithm
2/// Maintains approximate quantiles (p50, p90, p99) in O(1) time and space
3pub struct StreamingQuantiles {
4    // Marker positions and heights for P² algorithm
5    // We track 5 markers for p50, p90, p99
6    markers: [f64; 11],  // Marker heights (actual values)
7    positions: [f64; 11], // Marker positions (count-based)
8    desired_positions: [f64; 11], // Desired positions based on quantiles
9    count: usize,
10}
11
12impl StreamingQuantiles {
13    pub fn new() -> Self {
14        Self {
15            markers: [0.0; 11],
16            positions: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0],
17            desired_positions: [0.0; 11],
18            count: 0,
19        }
20    }
21
22    pub fn add(&mut self, value: f64) {
23        if self.count < 11 {
24            // Initial phase: collect first 11 samples
25            self.markers[self.count] = value;
26            self.count += 1;
27
28            if self.count == 11 {
29                // Sort initial markers
30                self.markers.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
31                // Initialize desired positions for p50, p90, p99
32                self.update_desired_positions();
33            }
34            return;
35        }
36
37        self.count += 1;
38
39        // Find cell k such that markers[k-1] < value <= markers[k]
40        let mut k = 0;
41        if value < self.markers[0] {
42            self.markers[0] = value;
43            k = 1;
44        } else if value >= self.markers[10] {
45            self.markers[10] = value;
46            k = 10;
47        } else {
48            for i in 1..11 {
49                if value < self.markers[i] {
50                    k = i;
51                    break;
52                }
53            }
54        }
55
56        // Increment positions of markers k+1 through 11
57        for i in k..11 {
58            self.positions[i] += 1.0;
59        }
60
61        // Update desired positions
62        self.update_desired_positions();
63
64        // Adjust marker heights
65        for i in 1..10 {
66            let d = self.desired_positions[i] - self.positions[i];
67
68            if (d >= 1.0 && self.positions[i + 1] - self.positions[i] > 1.0)
69                || (d <= -1.0 && self.positions[i - 1] - self.positions[i] < -1.0)
70            {
71                let d_sign = if d > 0.0 { 1.0 } else { -1.0 };
72
73                // Try parabolic formula
74                let q_new = self.parabolic(i, d_sign);
75
76                if self.markers[i - 1] < q_new && q_new < self.markers[i + 1] {
77                    self.markers[i] = q_new;
78                } else {
79                    // Use linear formula
80                    self.markers[i] = self.linear(i, d_sign);
81                }
82
83                self.positions[i] += d_sign;
84            }
85        }
86    }
87
88    fn update_desired_positions(&mut self) {
89        let n = self.count as f64;
90        // Markers at indices: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
91        // Target quantiles: min, p1, p10, p25, p50, p75, p90, p95, p99, p99.9, max
92        self.desired_positions[0] = 1.0;
93        self.desired_positions[1] = 1.0 + 0.01 * (n - 1.0);   // p1
94        self.desired_positions[2] = 1.0 + 0.10 * (n - 1.0);   // p10
95        self.desired_positions[3] = 1.0 + 0.25 * (n - 1.0);   // p25
96        self.desired_positions[4] = 1.0 + 0.50 * (n - 1.0);   // p50
97        self.desired_positions[5] = 1.0 + 0.75 * (n - 1.0);   // p75
98        self.desired_positions[6] = 1.0 + 0.90 * (n - 1.0);   // p90
99        self.desired_positions[7] = 1.0 + 0.95 * (n - 1.0);   // p95
100        self.desired_positions[8] = 1.0 + 0.99 * (n - 1.0);   // p99
101        self.desired_positions[9] = 1.0 + 0.999 * (n - 1.0);  // p99.9
102        self.desired_positions[10] = n;
103    }
104
105    fn parabolic(&self, i: usize, d: f64) -> f64 {
106        let q_i = self.markers[i];
107        let q_i1 = self.markers[i + 1];
108        let q_i_1 = self.markers[i - 1];
109        let n_i = self.positions[i];
110        let n_i1 = self.positions[i + 1];
111        let n_i_1 = self.positions[i - 1];
112
113        q_i + d / (n_i1 - n_i_1) * (
114            (n_i - n_i_1 + d) * (q_i1 - q_i) / (n_i1 - n_i)
115            + (n_i1 - n_i - d) * (q_i - q_i_1) / (n_i - n_i_1)
116        )
117    }
118
119    fn linear(&self, i: usize, d: f64) -> f64 {
120        let d_i = if d > 0.0 { 1 } else { -1 };
121        let q_i = self.markers[i];
122        let q_id = self.markers[(i as i32 + d_i) as usize];
123        let n_i = self.positions[i];
124        let n_id = self.positions[(i as i32 + d_i) as usize];
125
126        q_i + d * (q_id - q_i) / (n_id - n_i)
127    }
128
129    pub fn p50(&self) -> f64 {
130        if self.count < 11 {
131            self.fallback_quantile(0.50)
132        } else {
133            self.markers[4]
134        }
135    }
136
137    pub fn p90(&self) -> f64 {
138        if self.count < 11 {
139            self.fallback_quantile(0.90)
140        } else {
141            self.markers[6]
142        }
143    }
144
145    pub fn p99(&self) -> f64 {
146        if self.count < 11 {
147            self.fallback_quantile(0.99)
148        } else {
149            self.markers[8]
150        }
151    }
152
153    fn fallback_quantile(&self, p: f64) -> f64 {
154        if self.count == 0 {
155            return 0.0;
156        }
157        let mut sorted: Vec<f64> = self.markers[..self.count].to_vec();
158        sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
159        let index = ((self.count as f64 - 1.0) * p) as usize;
160        sorted[index.min(self.count - 1)]
161    }
162
163    pub fn mean(&self) -> f64 {
164        if self.count == 0 {
165            return 0.0;
166        }
167        if self.count < 11 {
168            self.markers[..self.count].iter().sum::<f64>() / self.count as f64
169        } else {
170            // Approximate mean from markers
171            self.markers.iter().sum::<f64>() / 11.0
172        }
173    }
174}