Skip to main content

dataprof_runtime/
streaming_stats.rs

1use rand::rngs::SmallRng;
2use rand::{Rng, SeedableRng};
3use std::collections::HashMap;
4use std::hash::{BuildHasher, Hasher};
5
6use crate::profile_builder::infer_data_type_streaming;
7use dataprof_metrics::analysis::inference::is_null_like_token;
8
9/// Incremental statistics computation for streaming data processing.
10///
11/// This module provides bounded-memory statistical computation using:
12/// - **Welford's algorithm** for numerically stable variance/stddev (O(1) memory)
13/// - **HyperLogLog** for approximate distinct counts (~16 KB fixed registers)
14/// - **Reservoir sampling** for unbiased samples (fixed capacity; total memory
15///   depends on the capacity and the length of sampled strings)
16/// - **Streaming text-length tracking** with min/max/mean/histogram (O(1) memory)
17
18#[derive(Debug, Clone)]
19pub struct WelfordAccumulator {
20    count: u64,
21    mean: f64,
22    m2: f64,
23}
24
25impl WelfordAccumulator {
26    pub fn new() -> Self {
27        Self {
28            count: 0,
29            mean: 0.0,
30            m2: 0.0,
31        }
32    }
33
34    #[inline]
35    pub fn update(&mut self, value: f64) {
36        self.count += 1;
37        let delta = value - self.mean;
38        self.mean += delta / self.count as f64;
39        let delta2 = value - self.mean;
40        self.m2 += delta * delta2;
41    }
42
43    #[inline]
44    pub fn mean(&self) -> f64 {
45        if self.count == 0 { 0.0 } else { self.mean }
46    }
47
48    pub fn variance(&self) -> f64 {
49        if self.count < 2 {
50            0.0
51        } else {
52            self.m2 / self.count as f64
53        }
54    }
55
56    pub fn std_dev(&self) -> f64 {
57        self.variance().sqrt()
58    }
59
60    pub fn merge(&mut self, other: &WelfordAccumulator) {
61        if other.count == 0 {
62            return;
63        }
64        if self.count == 0 {
65            *self = other.clone();
66            return;
67        }
68
69        let combined_count = self.count + other.count;
70        let delta = other.mean - self.mean;
71        let new_mean = self.mean + delta * (other.count as f64 / combined_count as f64);
72        let new_m2 = self.m2
73            + other.m2
74            + delta * delta * (self.count as f64 * other.count as f64 / combined_count as f64);
75
76        self.count = combined_count;
77        self.mean = new_mean;
78        self.m2 = new_m2;
79    }
80}
81
82impl Default for WelfordAccumulator {
83    fn default() -> Self {
84        Self::new()
85    }
86}
87
88#[derive(Debug, Clone)]
89pub(crate) struct StreamReservoirSampler {
90    reservoir: Vec<String>,
91    capacity: usize,
92    count: u64,
93    rng: SmallRng,
94}
95
96impl StreamReservoirSampler {
97    pub fn new(capacity: usize) -> Self {
98        Self {
99            reservoir: Vec::with_capacity(capacity.min(1024)),
100            capacity,
101            count: 0,
102            rng: SmallRng::from_os_rng(),
103        }
104    }
105
106    #[cfg(test)]
107    pub fn seed(capacity: usize, seed: u64) -> Self {
108        Self {
109            reservoir: Vec::with_capacity(capacity.min(1024)),
110            capacity,
111            count: 0,
112            rng: SmallRng::seed_from_u64(seed),
113        }
114    }
115
116    #[inline]
117    pub fn offer(&mut self, value: String) {
118        self.count += 1;
119        if self.reservoir.len() < self.capacity {
120            self.reservoir.push(value);
121        } else {
122            let index = self.rng.random_range(0..self.count as usize);
123            if index < self.capacity {
124                self.reservoir[index] = value;
125            }
126        }
127    }
128
129    pub fn shrink_to(&mut self, new_capacity: usize) {
130        let new_capacity = new_capacity.max(1);
131        self.capacity = new_capacity;
132        self.reservoir.truncate(new_capacity);
133        self.reservoir.shrink_to_fit();
134    }
135
136    pub fn samples(&self) -> &[String] {
137        &self.reservoir
138    }
139
140    pub fn memory_usage_bytes(&self) -> usize {
141        self.reservoir
142            .iter()
143            .map(|value| std::mem::size_of::<String>() + value.capacity())
144            .sum()
145    }
146
147    pub fn merge(&mut self, other: &StreamReservoirSampler) {
148        if other.count == 0 {
149            return;
150        }
151
152        let mut combined: Vec<String> = self.reservoir.drain(..).collect();
153        combined.extend(other.reservoir.iter().cloned());
154
155        let total = combined.len();
156        if total <= self.capacity {
157            self.reservoir = combined;
158        } else {
159            for index in 0..self.capacity {
160                let swap_with = self.rng.random_range(index..total);
161                combined.swap(index, swap_with);
162            }
163            combined.truncate(self.capacity);
164            self.reservoir = combined;
165        }
166
167        self.count += other.count;
168    }
169}
170
171#[derive(Debug, Clone)]
172pub struct TextLengthStats {
173    pub min_length: usize,
174    pub max_length: usize,
175    pub avg_length: f64,
176    welford: WelfordAccumulator,
177    histogram: [u64; 32],
178}
179
180impl TextLengthStats {
181    pub fn new() -> Self {
182        Self {
183            min_length: usize::MAX,
184            max_length: 0,
185            avg_length: 0.0,
186            welford: WelfordAccumulator::new(),
187            histogram: [0u64; 32],
188        }
189    }
190
191    pub fn update(&mut self, length: usize) {
192        self.min_length = self.min_length.min(length);
193        self.max_length = self.max_length.max(length);
194        self.welford.update(length as f64);
195        self.avg_length = self.welford.mean();
196
197        let bucket = if length == 0 {
198            0
199        } else {
200            (usize::BITS - length.leading_zeros()).min(31) as usize
201        };
202        self.histogram[bucket] += 1;
203    }
204
205    pub fn merge(&mut self, other: &TextLengthStats) {
206        if other.welford.count == 0 {
207            return;
208        }
209        if self.welford.count == 0 {
210            *self = other.clone();
211            return;
212        }
213
214        self.min_length = self.min_length.min(other.min_length);
215        self.max_length = self.max_length.max(other.max_length);
216        self.welford.merge(&other.welford);
217        self.avg_length = self.welford.mean();
218
219        for (left, right) in self.histogram.iter_mut().zip(other.histogram.iter()) {
220            *left += *right;
221        }
222    }
223
224    pub fn empty() -> Self {
225        Self {
226            min_length: 0,
227            max_length: 0,
228            avg_length: 0.0,
229            welford: WelfordAccumulator::new(),
230            histogram: [0u64; 32],
231        }
232    }
233}
234
235impl Default for TextLengthStats {
236    fn default() -> Self {
237        Self::new()
238    }
239}
240
241struct HllBuildHasher;
242
243impl BuildHasher for HllBuildHasher {
244    type Hasher = std::collections::hash_map::DefaultHasher;
245
246    fn build_hasher(&self) -> Self::Hasher {
247        std::collections::hash_map::DefaultHasher::new()
248    }
249}
250
251#[derive(Clone)]
252struct HllCounter {
253    registers: Vec<u8>,
254}
255
256impl std::fmt::Debug for HllCounter {
257    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
258        f.debug_struct("HllCounter")
259            .field("precision", &14u8)
260            .field("registers_len", &self.registers.len())
261            .finish()
262    }
263}
264
265impl HllCounter {
266    const PRECISION: usize = 14;
267    const NUM_REGISTERS: usize = 1 << Self::PRECISION;
268
269    fn new() -> Self {
270        Self {
271            registers: vec![0u8; Self::NUM_REGISTERS],
272        }
273    }
274
275    #[inline]
276    fn insert(&mut self, value: &str) {
277        let mut hasher = HllBuildHasher.build_hasher();
278        hasher.write(value.as_bytes());
279        let hash = hasher.finish();
280
281        let index = (hash as usize) & (Self::NUM_REGISTERS - 1);
282        let window = hash >> Self::PRECISION;
283        let rank = (window.leading_zeros() - Self::PRECISION as u32 + 1) as u8;
284
285        if rank > self.registers[index] {
286            self.registers[index] = rank;
287        }
288    }
289
290    fn count(&self) -> u64 {
291        let register_count = Self::NUM_REGISTERS as f64;
292        let alpha = 0.7213 / (1.0 + 1.079 / register_count);
293
294        let raw_estimate: f64 = alpha * register_count * register_count
295            / self
296                .registers
297                .iter()
298                .map(|&register| 2.0_f64.powi(-(register as i32)))
299                .sum::<f64>();
300
301        if raw_estimate <= 2.5 * register_count {
302            let zeros = self
303                .registers
304                .iter()
305                .filter(|&&register| register == 0)
306                .count() as f64;
307            if zeros > 0.0 {
308                (register_count * (register_count / zeros).ln()) as u64
309            } else {
310                raw_estimate as u64
311            }
312        } else if raw_estimate <= (1u64 << 32) as f64 / 30.0 {
313            raw_estimate as u64
314        } else {
315            let two32 = (1u64 << 32) as f64;
316            (-two32 * (1.0 - raw_estimate / two32).ln()) as u64
317        }
318    }
319
320    fn merge(&mut self, other: &HllCounter) {
321        for (left, right) in self.registers.iter_mut().zip(other.registers.iter()) {
322            *left = (*left).max(*right);
323        }
324    }
325}
326
327#[derive(Debug, Clone)]
328pub struct StreamingStatistics {
329    pub count: usize,
330    pub null_count: usize,
331    pub min: f64,
332    pub max: f64,
333    welford: WelfordAccumulator,
334    hll: HllCounter,
335    sampler: StreamReservoirSampler,
336    text_length_tracker: TextLengthStats,
337}
338
339impl StreamingStatistics {
340    pub fn new() -> Self {
341        Self {
342            count: 0,
343            null_count: 0,
344            min: f64::INFINITY,
345            max: f64::NEG_INFINITY,
346            welford: WelfordAccumulator::new(),
347            hll: HllCounter::new(),
348            sampler: StreamReservoirSampler::new(10_000),
349            text_length_tracker: TextLengthStats::new(),
350        }
351    }
352
353    pub fn with_sample_capacity(max_sample: usize) -> Self {
354        Self {
355            sampler: StreamReservoirSampler::new(max_sample),
356            ..Self::new()
357        }
358    }
359
360    pub fn update(&mut self, value: &str) {
361        self.count += 1;
362
363        if is_null_like_token(value) {
364            self.null_count += 1;
365            return;
366        }
367
368        self.hll.insert(value);
369        self.sampler.offer(value.to_string());
370        self.text_length_tracker.update(value.len());
371
372        if let Some(number) = value.parse::<f64>().ok().filter(|num| num.is_finite()) {
373            self.welford.update(number);
374            self.min = self.min.min(number);
375            self.max = self.max.max(number);
376        }
377    }
378
379    pub fn merge(&mut self, other: &StreamingStatistics) {
380        self.count += other.count;
381        self.null_count += other.null_count;
382
383        if other.min < self.min {
384            self.min = other.min;
385        }
386        if other.max > self.max {
387            self.max = other.max;
388        }
389
390        self.welford.merge(&other.welford);
391        self.hll.merge(&other.hll);
392        self.sampler.merge(&other.sampler);
393        self.text_length_tracker.merge(&other.text_length_tracker);
394    }
395
396    pub fn mean(&self) -> f64 {
397        self.welford.mean()
398    }
399
400    pub fn variance(&self) -> f64 {
401        self.welford.variance()
402    }
403
404    pub fn std_dev(&self) -> f64 {
405        self.welford.std_dev()
406    }
407
408    pub fn unique_count(&self) -> usize {
409        self.hll.count() as usize
410    }
411
412    pub fn unique_count_is_approximate(&self) -> bool {
413        self.hll.count() > 100
414    }
415
416    pub fn sample_values(&self) -> &[String] {
417        self.sampler.samples()
418    }
419
420    pub fn text_length_stats(&self) -> TextLengthStats {
421        if self.text_length_tracker.welford.count == 0 {
422            return TextLengthStats::empty();
423        }
424        self.text_length_tracker.clone()
425    }
426
427    pub fn reduce_sample_capacity(&mut self) {
428        self.sampler.shrink_to(self.sampler.capacity / 2);
429    }
430
431    pub fn memory_usage_bytes(&self) -> usize {
432        let struct_size = std::mem::size_of::<Self>();
433        let hll_size = self.hll.registers.len();
434        let reservoir_size = self.sampler.memory_usage_bytes();
435
436        struct_size + hll_size + reservoir_size
437    }
438}
439
440impl Default for StreamingStatistics {
441    fn default() -> Self {
442        Self::new()
443    }
444}
445
446pub struct StreamingColumnCollection {
447    columns: HashMap<String, StreamingStatistics>,
448    ordered_names: Vec<String>,
449    memory_limit_bytes: usize,
450}
451
452impl StreamingColumnCollection {
453    pub fn new() -> Self {
454        Self {
455            columns: HashMap::new(),
456            ordered_names: Vec::new(),
457            memory_limit_bytes: 100 * 1024 * 1024,
458        }
459    }
460
461    pub fn memory_limit(limit_mb: usize) -> Self {
462        Self {
463            columns: HashMap::new(),
464            ordered_names: Vec::new(),
465            memory_limit_bytes: limit_mb * 1024 * 1024,
466        }
467    }
468
469    pub fn init_columns(&mut self, headers: &[String]) {
470        for header in headers {
471            if !self.columns.contains_key(header) {
472                self.columns
473                    .insert(header.clone(), StreamingStatistics::default());
474                self.ordered_names.push(header.clone());
475            }
476        }
477    }
478
479    pub fn process_record<I>(&mut self, headers: &[String], values: I)
480    where
481        I: IntoIterator<Item = String>,
482    {
483        for (header, value) in headers.iter().zip(values) {
484            if !self.columns.contains_key(header) {
485                self.ordered_names.push(header.clone());
486            }
487            let stats = self.columns.entry(header.to_string()).or_default();
488            stats.update(&value);
489        }
490    }
491
492    pub fn get_column_stats(&self, column_name: &str) -> Option<&StreamingStatistics> {
493        self.columns.get(column_name)
494    }
495
496    pub fn column_names(&self) -> Vec<String> {
497        self.ordered_names.clone()
498    }
499
500    pub fn memory_usage_bytes(&self) -> usize {
501        self.columns
502            .values()
503            .map(|stats| stats.memory_usage_bytes())
504            .sum()
505    }
506
507    pub fn is_memory_pressure(&self) -> bool {
508        self.memory_usage_bytes() > (self.memory_limit_bytes * 80 / 100)
509    }
510
511    pub fn reduce_memory_usage(&mut self) {
512        for stats in self.columns.values_mut() {
513            stats.reduce_sample_capacity();
514        }
515    }
516
517    /// Fingerprint of each column's currently inferred data type.
518    ///
519    /// Returns a `u64` hash suitable for cheap comparison in a schema
520    /// stability tracker.
521    pub fn column_type_fingerprint(&self) -> u64 {
522        use std::collections::hash_map::DefaultHasher;
523        use std::hash::{Hash, Hasher};
524
525        let mut hasher = DefaultHasher::new();
526        let mut names: Vec<&String> = self.columns.keys().collect();
527        names.sort();
528        for name in names {
529            let stats = &self.columns[name];
530            let data_type = infer_data_type_streaming(stats);
531            name.hash(&mut hasher);
532            std::mem::discriminant(&data_type).hash(&mut hasher);
533        }
534        hasher.finish()
535    }
536
537    pub fn merge(&mut self, other: StreamingColumnCollection) {
538        for (column_name, other_stats) in other.columns {
539            match self.columns.get_mut(&column_name) {
540                Some(existing_stats) => existing_stats.merge(&other_stats),
541                None => {
542                    self.columns.insert(column_name, other_stats);
543                }
544            }
545        }
546    }
547}
548
549impl Default for StreamingColumnCollection {
550    fn default() -> Self {
551        Self::new()
552    }
553}
554
555#[cfg(test)]
556mod tests {
557    use super::*;
558
559    #[test]
560    fn test_streaming_statistics() {
561        let mut stats = StreamingStatistics::new();
562
563        stats.update("10.5");
564        stats.update("20.0");
565        stats.update("15.5");
566        stats.update("");
567
568        assert_eq!(stats.count, 4);
569        assert_eq!(stats.null_count, 1);
570        let unique_count = stats.unique_count();
571        assert!((2..=5).contains(&unique_count));
572        assert!((stats.mean() - 15.333333333333334).abs() < 1e-10);
573        assert_eq!(stats.min, 10.5);
574        assert_eq!(stats.max, 20.0);
575    }
576
577    #[test]
578    fn test_streaming_statistics_merge() {
579        let mut stats1 = StreamingStatistics::new();
580        stats1.update("10");
581        stats1.update("20");
582
583        let mut stats2 = StreamingStatistics::new();
584        stats2.update("30");
585        stats2.update("40");
586
587        stats1.merge(&stats2);
588
589        assert_eq!(stats1.count, 4);
590        let unique_count = stats1.unique_count();
591        assert!((3..=6).contains(&unique_count));
592        assert!((stats1.mean() - 25.0).abs() < 1e-10);
593        assert_eq!(stats1.min, 10.0);
594        assert_eq!(stats1.max, 40.0);
595    }
596
597    #[test]
598    fn test_column_collection() {
599        let mut collection = StreamingColumnCollection::new();
600        let headers = vec!["name".to_string(), "age".to_string()];
601
602        collection.process_record(&headers, vec!["Alice".to_string(), "25".to_string()]);
603        collection.process_record(&headers, vec!["Bob".to_string(), "30".to_string()]);
604
605        let age_stats = collection.get_column_stats("age").unwrap();
606        assert_eq!(age_stats.count, 2);
607        assert!((age_stats.mean() - 27.5).abs() < 1e-10);
608    }
609
610    #[test]
611    fn test_welford_accuracy() {
612        let mut accumulator = WelfordAccumulator::new();
613        for value in 1..=1000 {
614            accumulator.update(value as f64);
615        }
616        let expected_mean = 500.5;
617        let expected_variance = (1000.0 * 1000.0 - 1.0) / 12.0;
618        assert!((accumulator.mean() - expected_mean).abs() < 1e-6);
619        assert!((accumulator.variance() - expected_variance).abs() < 1.0);
620    }
621
622    #[test]
623    fn test_welford_merge() {
624        let mut left = WelfordAccumulator::new();
625        let mut right = WelfordAccumulator::new();
626        let mut full = WelfordAccumulator::new();
627
628        for value in 1..=500 {
629            left.update(value as f64);
630            full.update(value as f64);
631        }
632        for value in 501..=1000 {
633            right.update(value as f64);
634            full.update(value as f64);
635        }
636
637        left.merge(&right);
638        assert!((left.mean() - full.mean()).abs() < 1e-10);
639        assert!((left.variance() - full.variance()).abs() < 1e-6);
640    }
641
642    #[test]
643    fn test_hll_cardinality() {
644        let mut counter = HllCounter::new();
645        let total = 100_000;
646        for index in 0..total {
647            counter.insert(&format!("item_{index}"));
648        }
649        let estimate = counter.count();
650        let error = (estimate as f64 - total as f64).abs() / total as f64;
651        assert!(error < 0.05);
652    }
653
654    #[test]
655    fn test_reservoir_uniformity() {
656        let mut sampler = StreamReservoirSampler::seed(1000, 42);
657        let total = 100_000;
658        for index in 0..total {
659            sampler.offer(index.to_string());
660        }
661
662        assert_eq!(sampler.samples().len(), 1000);
663        let values: Vec<usize> = sampler
664            .samples()
665            .iter()
666            .map(|value| value.parse().unwrap())
667            .collect();
668        let max_value = *values.iter().max().unwrap();
669        assert!(max_value > total / 2);
670    }
671
672    #[test]
673    fn test_text_length_stats_streaming() {
674        let mut stats = TextLengthStats::new();
675        for &length in &[3, 5, 10, 1, 7] {
676            stats.update(length);
677        }
678        assert_eq!(stats.min_length, 1);
679        assert_eq!(stats.max_length, 10);
680        assert!((stats.avg_length - 5.2).abs() < 1e-10);
681    }
682
683    #[test]
684    fn test_memory_usage_bounded() {
685        let mut stats = StreamingStatistics::new();
686        for index in 0..50_000 {
687            stats.update(&format!("value_{index}"));
688        }
689        let usage = stats.memory_usage_bytes();
690        assert!(usage < 1_000_000);
691    }
692}