1#![allow(clippy::doc_markdown)]
5
6use alloc::collections::BTreeMap;
28use alloc::string::{String, ToString};
29use alloc::vec::Vec;
30
31#[derive(Debug, Clone, PartialEq)]
33pub struct ColumnStats {
34 pub null_frac: f32,
36 pub n_distinct: u64,
41 pub histogram_bounds: Vec<String>,
49}
50
51#[derive(Debug, Clone, PartialEq, Default)]
52pub struct Statistics {
53 inner: BTreeMap<(String, String), ColumnStats>,
57 modified_since: BTreeMap<String, u64>,
61 version: u64,
69}
70
71#[derive(Debug, PartialEq, Eq)]
76pub enum StatisticsError {
77 Corrupt(String),
78}
79
80impl Statistics {
81 pub fn new() -> Self {
82 Self::default()
83 }
84
85 pub fn len(&self) -> usize {
86 self.inner.len()
87 }
88
89 pub fn is_empty(&self) -> bool {
90 self.inner.is_empty()
91 }
92
93 pub fn get(&self, table: &str, column: &str) -> Option<&ColumnStats> {
94 self.inner.get(&(table.to_string(), column.to_string()))
95 }
96
97 pub fn iter(&self) -> impl Iterator<Item = (&(String, String), &ColumnStats)> {
101 self.inner.iter()
102 }
103
104 pub fn set(&mut self, table: String, column: String, stats: ColumnStats) {
107 self.inner.insert((table, column), stats);
108 }
109
110 pub fn clear_table(&mut self, table: &str) {
114 self.inner.retain(|(t, _), _| t != table);
115 }
116
117 pub fn reset_modified(&mut self, table: &str) {
121 self.modified_since.insert(table.to_string(), 0);
122 }
123
124 pub fn record_modifications(&mut self, table: &str, n: u64) {
128 let entry = self.modified_since.entry(table.to_string()).or_default();
129 *entry = entry.saturating_add(n);
130 }
131
132 pub fn modified_since_last_analyze(&self, table: &str) -> u64 {
133 self.modified_since.get(table).copied().unwrap_or(0)
134 }
135
136 pub fn version(&self) -> u64 {
139 self.version
140 }
141
142 pub fn bump_version(&mut self) {
145 self.version = self.version.saturating_add(1);
146 }
147
148 pub fn serialize(&self) -> Vec<u8> {
162 let mut out = Vec::with_capacity(2 + self.inner.len() * 32);
163 let n = u16::try_from(self.inner.len()).expect("≤ 65,535 column-stats rows");
164 out.extend_from_slice(&n.to_le_bytes());
165 for ((table, col), stats) in &self.inner {
166 write_str(&mut out, table);
167 write_str(&mut out, col);
168 out.extend_from_slice(&stats.null_frac.to_le_bytes());
169 out.extend_from_slice(&stats.n_distinct.to_le_bytes());
170 let nb =
171 u16::try_from(stats.histogram_bounds.len()).expect("≤ 65,535 histogram bounds");
172 out.extend_from_slice(&nb.to_le_bytes());
173 for b in &stats.histogram_bounds {
174 write_str(&mut out, b);
175 }
176 }
177 let m = u16::try_from(self.modified_since.len()).expect("≤ 65,535 modified-row counters");
178 out.extend_from_slice(&m.to_le_bytes());
179 for (table, count) in &self.modified_since {
180 write_str(&mut out, table);
181 out.extend_from_slice(&count.to_le_bytes());
182 }
183 out
184 }
185
186 pub fn deserialize(buf: &[u8]) -> Result<Self, StatisticsError> {
187 let mut p = 0usize;
188 let n = read_u16(buf, &mut p)? as usize;
189 let mut inner = BTreeMap::new();
190 for _ in 0..n {
191 let table = read_str(buf, &mut p)?;
192 let col = read_str(buf, &mut p)?;
193 let null_frac_bytes = read_bytes(buf, &mut p, 4)?;
194 let null_frac = f32::from_le_bytes(
195 null_frac_bytes
196 .try_into()
197 .map_err(|_| StatisticsError::Corrupt("null_frac slice".to_string()))?,
198 );
199 let n_distinct = read_u64(buf, &mut p)?;
200 let nb = read_u16(buf, &mut p)? as usize;
201 let mut bounds = Vec::with_capacity(nb);
202 for _ in 0..nb {
203 bounds.push(read_str(buf, &mut p)?);
204 }
205 if inner
206 .insert(
207 (table.clone(), col.clone()),
208 ColumnStats {
209 null_frac,
210 n_distinct,
211 histogram_bounds: bounds,
212 },
213 )
214 .is_some()
215 {
216 return Err(StatisticsError::Corrupt(alloc::format!(
217 "duplicate spg_statistic key ({table:?}, {col:?})"
218 )));
219 }
220 }
221 let m = read_u16(buf, &mut p)? as usize;
222 let mut modified_since = BTreeMap::new();
223 for _ in 0..m {
224 let table = read_str(buf, &mut p)?;
225 let count = read_u64(buf, &mut p)?;
226 modified_since.insert(table, count);
227 }
228 if p != buf.len() {
229 return Err(StatisticsError::Corrupt(alloc::format!(
230 "trailing bytes in statistics payload: read {p}, len {}",
231 buf.len()
232 )));
233 }
234 Ok(Self {
235 inner,
236 modified_since,
237 version: 0,
238 })
239 }
240}
241
242pub const NUM_BUCKETS: usize = 100;
247
248pub fn build_histogram(sorted_values: &[String]) -> Vec<String> {
259 if sorted_values.is_empty() {
260 return Vec::new();
261 }
262 let n = sorted_values.len();
263 if n <= NUM_BUCKETS + 1 {
267 return sorted_values.to_vec();
268 }
269 let mut bounds = Vec::with_capacity(NUM_BUCKETS + 1);
270 for i in 0..=NUM_BUCKETS {
271 let idx = (i as u64 * (n as u64 - 1)) / NUM_BUCKETS as u64;
273 bounds.push(sorted_values[idx as usize].clone());
274 }
275 bounds
276}
277
278pub fn estimate_n_distinct(sorted_values: &[String]) -> u64 {
283 if sorted_values.is_empty() {
284 return 0;
285 }
286 let mut count: u64 = 1;
287 let mut prev = &sorted_values[0];
288 for v in &sorted_values[1..] {
289 if v != prev {
290 count += 1;
291 prev = v;
292 }
293 }
294 count
295}
296
297fn write_str(out: &mut Vec<u8>, s: &str) {
300 let n = u16::try_from(s.len()).expect("table / column / bound names ≤ 65,535 bytes");
301 out.extend_from_slice(&n.to_le_bytes());
302 out.extend_from_slice(s.as_bytes());
303}
304
305fn read_bytes<'a>(buf: &'a [u8], p: &mut usize, n: usize) -> Result<&'a [u8], StatisticsError> {
306 let slice = buf
307 .get(*p..*p + n)
308 .ok_or_else(|| StatisticsError::Corrupt(alloc::format!("short read ({n} bytes)")))?;
309 *p += n;
310 Ok(slice)
311}
312
313fn read_u16(buf: &[u8], p: &mut usize) -> Result<u16, StatisticsError> {
314 let bytes = read_bytes(buf, p, 2)?;
315 Ok(u16::from_le_bytes(bytes.try_into().map_err(|_| {
316 StatisticsError::Corrupt("u16 slice".to_string())
317 })?))
318}
319
320fn read_u64(buf: &[u8], p: &mut usize) -> Result<u64, StatisticsError> {
321 let bytes = read_bytes(buf, p, 8)?;
322 Ok(u64::from_le_bytes(bytes.try_into().map_err(|_| {
323 StatisticsError::Corrupt("u64 slice".to_string())
324 })?))
325}
326
327fn read_str(buf: &[u8], p: &mut usize) -> Result<String, StatisticsError> {
328 let n = read_u16(buf, p)? as usize;
329 let slice = read_bytes(buf, p, n)?;
330 core::str::from_utf8(slice)
331 .map(ToString::to_string)
332 .map_err(|e| StatisticsError::Corrupt(alloc::format!("non-UTF-8 str: {e}")))
333}
334
335#[cfg(test)]
336mod tests {
337 use super::*;
338
339 fn mk_cs(null_frac: f32, n_distinct: u64, bounds: &[&str]) -> ColumnStats {
340 ColumnStats {
341 null_frac,
342 n_distinct,
343 histogram_bounds: bounds.iter().map(|s| s.to_string()).collect(),
344 }
345 }
346
347 #[test]
348 fn empty_roundtrips() {
349 let s = Statistics::new();
350 let bytes = s.serialize();
351 let s2 = Statistics::deserialize(&bytes).unwrap();
352 assert_eq!(s, s2);
353 }
354
355 #[test]
356 fn single_column_roundtrips() {
357 let mut s = Statistics::new();
358 s.set(
359 "users".into(),
360 "id".into(),
361 mk_cs(0.0, 1000, &["1", "500", "1000"]),
362 );
363 let s2 = Statistics::deserialize(&s.serialize()).unwrap();
364 assert_eq!(s, s2);
365 let got = s2.get("users", "id").unwrap();
366 assert_eq!(got.n_distinct, 1000);
367 assert_eq!(got.histogram_bounds.len(), 3);
368 }
369
370 #[test]
371 fn multi_column_roundtrips_with_modified_counter() {
372 let mut s = Statistics::new();
373 s.set(
374 "users".into(),
375 "id".into(),
376 mk_cs(0.0, 100, &["1", "50", "100"]),
377 );
378 s.set(
379 "users".into(),
380 "name".into(),
381 mk_cs(0.1, 99, &["alice", "bob", "zoe"]),
382 );
383 s.record_modifications("users", 17);
384 let s2 = Statistics::deserialize(&s.serialize()).unwrap();
385 assert_eq!(s, s2);
386 assert_eq!(s2.modified_since_last_analyze("users"), 17);
387 }
388
389 #[test]
390 fn histogram_bounds_count_is_101_for_100_buckets() {
391 let vals: Vec<String> = (0..1000).map(|i| alloc::format!("{i:04}")).collect();
393 let bounds = build_histogram(&vals);
394 assert_eq!(bounds.len(), 101);
395 assert_eq!(bounds.first().unwrap(), "0000");
397 assert_eq!(bounds.last().unwrap(), "0999");
398 }
399
400 #[test]
401 fn deterministic_serialise_independent_of_insert_order() {
402 let mut s1 = Statistics::new();
403 s1.set("z".into(), "c1".into(), mk_cs(0.0, 1, &["x"]));
404 s1.set("a".into(), "c2".into(), mk_cs(0.0, 1, &["y"]));
405 let mut s2 = Statistics::new();
406 s2.set("a".into(), "c2".into(), mk_cs(0.0, 1, &["y"]));
407 s2.set("z".into(), "c1".into(), mk_cs(0.0, 1, &["x"]));
408 assert_eq!(s1.serialize(), s2.serialize());
409 }
410
411 #[test]
412 fn n_distinct_estimator_within_5pct_on_uniform_corpus() {
413 let mut vals: Vec<String> = Vec::with_capacity(10000);
415 for i in 0..10000 {
416 vals.push(alloc::format!("v{}", i % 100));
417 }
418 vals.sort();
419 let est = estimate_n_distinct(&vals);
420 assert_eq!(est, 100);
423 }
424
425 #[test]
426 fn clear_table_drops_only_target_rows() {
427 let mut s = Statistics::new();
428 s.set("a".into(), "c1".into(), mk_cs(0.0, 1, &["x"]));
429 s.set("a".into(), "c2".into(), mk_cs(0.0, 1, &["y"]));
430 s.set("b".into(), "c1".into(), mk_cs(0.0, 1, &["z"]));
431 s.clear_table("a");
432 assert_eq!(s.len(), 1);
433 assert!(s.get("a", "c1").is_none());
434 assert!(s.get("b", "c1").is_some());
435 }
436
437 #[test]
438 fn corrupt_short_read_errors() {
439 let buf = 1u16.to_le_bytes();
441 let err = Statistics::deserialize(&buf).unwrap_err();
442 assert!(matches!(err, StatisticsError::Corrupt(_)));
443 }
444
445 #[test]
446 fn build_histogram_passthrough_when_sample_is_small() {
447 let vals: Vec<String> = (0..5).map(|i| alloc::format!("v{i}")).collect();
448 let bounds = build_histogram(&vals);
449 assert_eq!(bounds.len(), 5);
451 assert_eq!(bounds[0], "v0");
452 assert_eq!(bounds[4], "v4");
453 }
454}