1use rand::rngs::SmallRng;
2use rand::{Rng, SeedableRng};
3use std::collections::HashMap;
4use std::hash::{BuildHasher, Hasher};
5
6use crate::profile_builder::infer_data_type_streaming;
7use dataprof_metrics::analysis::inference::is_null_like_token;
8
9#[derive(Debug, Clone)]
19pub struct WelfordAccumulator {
20 count: u64,
21 mean: f64,
22 m2: f64,
23}
24
25impl WelfordAccumulator {
26 pub fn new() -> Self {
27 Self {
28 count: 0,
29 mean: 0.0,
30 m2: 0.0,
31 }
32 }
33
34 #[inline]
35 pub fn update(&mut self, value: f64) {
36 self.count += 1;
37 let delta = value - self.mean;
38 self.mean += delta / self.count as f64;
39 let delta2 = value - self.mean;
40 self.m2 += delta * delta2;
41 }
42
43 #[inline]
44 pub fn mean(&self) -> f64 {
45 if self.count == 0 { 0.0 } else { self.mean }
46 }
47
48 pub fn variance(&self) -> f64 {
49 if self.count < 2 {
50 0.0
51 } else {
52 self.m2 / self.count as f64
53 }
54 }
55
56 pub fn std_dev(&self) -> f64 {
57 self.variance().sqrt()
58 }
59
60 pub fn merge(&mut self, other: &WelfordAccumulator) {
61 if other.count == 0 {
62 return;
63 }
64 if self.count == 0 {
65 *self = other.clone();
66 return;
67 }
68
69 let combined_count = self.count + other.count;
70 let delta = other.mean - self.mean;
71 let new_mean = self.mean + delta * (other.count as f64 / combined_count as f64);
72 let new_m2 = self.m2
73 + other.m2
74 + delta * delta * (self.count as f64 * other.count as f64 / combined_count as f64);
75
76 self.count = combined_count;
77 self.mean = new_mean;
78 self.m2 = new_m2;
79 }
80}
81
82impl Default for WelfordAccumulator {
83 fn default() -> Self {
84 Self::new()
85 }
86}
87
88#[derive(Debug, Clone)]
89pub(crate) struct StreamReservoirSampler {
90 reservoir: Vec<String>,
91 capacity: usize,
92 count: u64,
93 rng: SmallRng,
94}
95
96impl StreamReservoirSampler {
97 pub fn new(capacity: usize) -> Self {
98 Self {
99 reservoir: Vec::with_capacity(capacity.min(1024)),
100 capacity,
101 count: 0,
102 rng: SmallRng::from_os_rng(),
103 }
104 }
105
106 #[cfg(test)]
107 pub fn seed(capacity: usize, seed: u64) -> Self {
108 Self {
109 reservoir: Vec::with_capacity(capacity.min(1024)),
110 capacity,
111 count: 0,
112 rng: SmallRng::seed_from_u64(seed),
113 }
114 }
115
116 #[inline]
117 pub fn offer(&mut self, value: String) {
118 self.count += 1;
119 if self.reservoir.len() < self.capacity {
120 self.reservoir.push(value);
121 } else {
122 let index = self.rng.random_range(0..self.count as usize);
123 if index < self.capacity {
124 self.reservoir[index] = value;
125 }
126 }
127 }
128
129 pub fn shrink_to(&mut self, new_capacity: usize) {
130 let new_capacity = new_capacity.max(1);
131 self.capacity = new_capacity;
132 self.reservoir.truncate(new_capacity);
133 self.reservoir.shrink_to_fit();
134 }
135
136 pub fn samples(&self) -> &[String] {
137 &self.reservoir
138 }
139
140 pub fn memory_usage_bytes(&self) -> usize {
141 self.reservoir
142 .iter()
143 .map(|value| std::mem::size_of::<String>() + value.capacity())
144 .sum()
145 }
146
147 pub fn merge(&mut self, other: &StreamReservoirSampler) {
148 if other.count == 0 {
149 return;
150 }
151
152 let mut combined: Vec<String> = self.reservoir.drain(..).collect();
153 combined.extend(other.reservoir.iter().cloned());
154
155 let total = combined.len();
156 if total <= self.capacity {
157 self.reservoir = combined;
158 } else {
159 for index in 0..self.capacity {
160 let swap_with = self.rng.random_range(index..total);
161 combined.swap(index, swap_with);
162 }
163 combined.truncate(self.capacity);
164 self.reservoir = combined;
165 }
166
167 self.count += other.count;
168 }
169}
170
171#[derive(Debug, Clone)]
172pub struct TextLengthStats {
173 pub min_length: usize,
174 pub max_length: usize,
175 pub avg_length: f64,
176 welford: WelfordAccumulator,
177 histogram: [u64; 32],
178}
179
180impl TextLengthStats {
181 pub fn new() -> Self {
182 Self {
183 min_length: usize::MAX,
184 max_length: 0,
185 avg_length: 0.0,
186 welford: WelfordAccumulator::new(),
187 histogram: [0u64; 32],
188 }
189 }
190
191 pub fn update(&mut self, length: usize) {
192 self.min_length = self.min_length.min(length);
193 self.max_length = self.max_length.max(length);
194 self.welford.update(length as f64);
195 self.avg_length = self.welford.mean();
196
197 let bucket = if length == 0 {
198 0
199 } else {
200 (usize::BITS - length.leading_zeros()).min(31) as usize
201 };
202 self.histogram[bucket] += 1;
203 }
204
205 pub fn merge(&mut self, other: &TextLengthStats) {
206 if other.welford.count == 0 {
207 return;
208 }
209 if self.welford.count == 0 {
210 *self = other.clone();
211 return;
212 }
213
214 self.min_length = self.min_length.min(other.min_length);
215 self.max_length = self.max_length.max(other.max_length);
216 self.welford.merge(&other.welford);
217 self.avg_length = self.welford.mean();
218
219 for (left, right) in self.histogram.iter_mut().zip(other.histogram.iter()) {
220 *left += *right;
221 }
222 }
223
224 pub fn empty() -> Self {
225 Self {
226 min_length: 0,
227 max_length: 0,
228 avg_length: 0.0,
229 welford: WelfordAccumulator::new(),
230 histogram: [0u64; 32],
231 }
232 }
233}
234
235impl Default for TextLengthStats {
236 fn default() -> Self {
237 Self::new()
238 }
239}
240
241struct HllBuildHasher;
242
243impl BuildHasher for HllBuildHasher {
244 type Hasher = std::collections::hash_map::DefaultHasher;
245
246 fn build_hasher(&self) -> Self::Hasher {
247 std::collections::hash_map::DefaultHasher::new()
248 }
249}
250
251#[derive(Clone)]
252struct HllCounter {
253 registers: Vec<u8>,
254}
255
256impl std::fmt::Debug for HllCounter {
257 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
258 f.debug_struct("HllCounter")
259 .field("precision", &14u8)
260 .field("registers_len", &self.registers.len())
261 .finish()
262 }
263}
264
265impl HllCounter {
266 const PRECISION: usize = 14;
267 const NUM_REGISTERS: usize = 1 << Self::PRECISION;
268
269 fn new() -> Self {
270 Self {
271 registers: vec![0u8; Self::NUM_REGISTERS],
272 }
273 }
274
275 #[inline]
276 fn insert(&mut self, value: &str) {
277 let mut hasher = HllBuildHasher.build_hasher();
278 hasher.write(value.as_bytes());
279 let hash = hasher.finish();
280
281 let index = (hash as usize) & (Self::NUM_REGISTERS - 1);
282 let window = hash >> Self::PRECISION;
283 let rank = (window.leading_zeros() - Self::PRECISION as u32 + 1) as u8;
284
285 if rank > self.registers[index] {
286 self.registers[index] = rank;
287 }
288 }
289
290 fn count(&self) -> u64 {
291 let register_count = Self::NUM_REGISTERS as f64;
292 let alpha = 0.7213 / (1.0 + 1.079 / register_count);
293
294 let raw_estimate: f64 = alpha * register_count * register_count
295 / self
296 .registers
297 .iter()
298 .map(|®ister| 2.0_f64.powi(-(register as i32)))
299 .sum::<f64>();
300
301 if raw_estimate <= 2.5 * register_count {
302 let zeros = self
303 .registers
304 .iter()
305 .filter(|&®ister| register == 0)
306 .count() as f64;
307 if zeros > 0.0 {
308 (register_count * (register_count / zeros).ln()) as u64
309 } else {
310 raw_estimate as u64
311 }
312 } else if raw_estimate <= (1u64 << 32) as f64 / 30.0 {
313 raw_estimate as u64
314 } else {
315 let two32 = (1u64 << 32) as f64;
316 (-two32 * (1.0 - raw_estimate / two32).ln()) as u64
317 }
318 }
319
320 fn merge(&mut self, other: &HllCounter) {
321 for (left, right) in self.registers.iter_mut().zip(other.registers.iter()) {
322 *left = (*left).max(*right);
323 }
324 }
325}
326
327#[derive(Debug, Clone)]
328pub struct StreamingStatistics {
329 pub count: usize,
330 pub null_count: usize,
331 pub min: f64,
332 pub max: f64,
333 welford: WelfordAccumulator,
334 hll: HllCounter,
335 sampler: StreamReservoirSampler,
336 text_length_tracker: TextLengthStats,
337}
338
339impl StreamingStatistics {
340 pub fn new() -> Self {
341 Self {
342 count: 0,
343 null_count: 0,
344 min: f64::INFINITY,
345 max: f64::NEG_INFINITY,
346 welford: WelfordAccumulator::new(),
347 hll: HllCounter::new(),
348 sampler: StreamReservoirSampler::new(10_000),
349 text_length_tracker: TextLengthStats::new(),
350 }
351 }
352
353 pub fn with_sample_capacity(max_sample: usize) -> Self {
354 Self {
355 sampler: StreamReservoirSampler::new(max_sample),
356 ..Self::new()
357 }
358 }
359
360 pub fn update(&mut self, value: &str) {
361 self.count += 1;
362
363 if is_null_like_token(value) {
364 self.null_count += 1;
365 return;
366 }
367
368 self.hll.insert(value);
369 self.sampler.offer(value.to_string());
370 self.text_length_tracker.update(value.len());
371
372 if let Some(number) = value.parse::<f64>().ok().filter(|num| num.is_finite()) {
373 self.welford.update(number);
374 self.min = self.min.min(number);
375 self.max = self.max.max(number);
376 }
377 }
378
379 pub fn merge(&mut self, other: &StreamingStatistics) {
380 self.count += other.count;
381 self.null_count += other.null_count;
382
383 if other.min < self.min {
384 self.min = other.min;
385 }
386 if other.max > self.max {
387 self.max = other.max;
388 }
389
390 self.welford.merge(&other.welford);
391 self.hll.merge(&other.hll);
392 self.sampler.merge(&other.sampler);
393 self.text_length_tracker.merge(&other.text_length_tracker);
394 }
395
396 pub fn mean(&self) -> f64 {
397 self.welford.mean()
398 }
399
400 pub fn variance(&self) -> f64 {
401 self.welford.variance()
402 }
403
404 pub fn std_dev(&self) -> f64 {
405 self.welford.std_dev()
406 }
407
408 pub fn unique_count(&self) -> usize {
409 self.hll.count() as usize
410 }
411
412 pub fn unique_count_is_approximate(&self) -> bool {
413 self.hll.count() > 100
414 }
415
416 pub fn sample_values(&self) -> &[String] {
417 self.sampler.samples()
418 }
419
420 pub fn text_length_stats(&self) -> TextLengthStats {
421 if self.text_length_tracker.welford.count == 0 {
422 return TextLengthStats::empty();
423 }
424 self.text_length_tracker.clone()
425 }
426
427 pub fn reduce_sample_capacity(&mut self) {
428 self.sampler.shrink_to(self.sampler.capacity / 2);
429 }
430
431 pub fn memory_usage_bytes(&self) -> usize {
432 let struct_size = std::mem::size_of::<Self>();
433 let hll_size = self.hll.registers.len();
434 let reservoir_size = self.sampler.memory_usage_bytes();
435
436 struct_size + hll_size + reservoir_size
437 }
438}
439
440impl Default for StreamingStatistics {
441 fn default() -> Self {
442 Self::new()
443 }
444}
445
446pub struct StreamingColumnCollection {
447 columns: HashMap<String, StreamingStatistics>,
448 ordered_names: Vec<String>,
449 memory_limit_bytes: usize,
450}
451
452impl StreamingColumnCollection {
453 pub fn new() -> Self {
454 Self {
455 columns: HashMap::new(),
456 ordered_names: Vec::new(),
457 memory_limit_bytes: 100 * 1024 * 1024,
458 }
459 }
460
461 pub fn memory_limit(limit_mb: usize) -> Self {
462 Self {
463 columns: HashMap::new(),
464 ordered_names: Vec::new(),
465 memory_limit_bytes: limit_mb * 1024 * 1024,
466 }
467 }
468
469 pub fn init_columns(&mut self, headers: &[String]) {
470 for header in headers {
471 if !self.columns.contains_key(header) {
472 self.columns
473 .insert(header.clone(), StreamingStatistics::default());
474 self.ordered_names.push(header.clone());
475 }
476 }
477 }
478
479 pub fn process_record<I>(&mut self, headers: &[String], values: I)
480 where
481 I: IntoIterator<Item = String>,
482 {
483 for (header, value) in headers.iter().zip(values) {
484 if !self.columns.contains_key(header) {
485 self.ordered_names.push(header.clone());
486 }
487 let stats = self.columns.entry(header.to_string()).or_default();
488 stats.update(&value);
489 }
490 }
491
492 pub fn get_column_stats(&self, column_name: &str) -> Option<&StreamingStatistics> {
493 self.columns.get(column_name)
494 }
495
496 pub fn column_names(&self) -> Vec<String> {
497 self.ordered_names.clone()
498 }
499
500 pub fn memory_usage_bytes(&self) -> usize {
501 self.columns
502 .values()
503 .map(|stats| stats.memory_usage_bytes())
504 .sum()
505 }
506
507 pub fn is_memory_pressure(&self) -> bool {
508 self.memory_usage_bytes() > (self.memory_limit_bytes * 80 / 100)
509 }
510
511 pub fn reduce_memory_usage(&mut self) {
512 for stats in self.columns.values_mut() {
513 stats.reduce_sample_capacity();
514 }
515 }
516
517 pub fn column_type_fingerprint(&self) -> u64 {
522 use std::collections::hash_map::DefaultHasher;
523 use std::hash::{Hash, Hasher};
524
525 let mut hasher = DefaultHasher::new();
526 let mut names: Vec<&String> = self.columns.keys().collect();
527 names.sort();
528 for name in names {
529 let stats = &self.columns[name];
530 let data_type = infer_data_type_streaming(stats);
531 name.hash(&mut hasher);
532 std::mem::discriminant(&data_type).hash(&mut hasher);
533 }
534 hasher.finish()
535 }
536
537 pub fn merge(&mut self, other: StreamingColumnCollection) {
538 for (column_name, other_stats) in other.columns {
539 match self.columns.get_mut(&column_name) {
540 Some(existing_stats) => existing_stats.merge(&other_stats),
541 None => {
542 self.columns.insert(column_name, other_stats);
543 }
544 }
545 }
546 }
547}
548
549impl Default for StreamingColumnCollection {
550 fn default() -> Self {
551 Self::new()
552 }
553}
554
555#[cfg(test)]
556mod tests {
557 use super::*;
558
559 #[test]
560 fn test_streaming_statistics() {
561 let mut stats = StreamingStatistics::new();
562
563 stats.update("10.5");
564 stats.update("20.0");
565 stats.update("15.5");
566 stats.update("");
567
568 assert_eq!(stats.count, 4);
569 assert_eq!(stats.null_count, 1);
570 let unique_count = stats.unique_count();
571 assert!((2..=5).contains(&unique_count));
572 assert!((stats.mean() - 15.333333333333334).abs() < 1e-10);
573 assert_eq!(stats.min, 10.5);
574 assert_eq!(stats.max, 20.0);
575 }
576
577 #[test]
578 fn test_streaming_statistics_merge() {
579 let mut stats1 = StreamingStatistics::new();
580 stats1.update("10");
581 stats1.update("20");
582
583 let mut stats2 = StreamingStatistics::new();
584 stats2.update("30");
585 stats2.update("40");
586
587 stats1.merge(&stats2);
588
589 assert_eq!(stats1.count, 4);
590 let unique_count = stats1.unique_count();
591 assert!((3..=6).contains(&unique_count));
592 assert!((stats1.mean() - 25.0).abs() < 1e-10);
593 assert_eq!(stats1.min, 10.0);
594 assert_eq!(stats1.max, 40.0);
595 }
596
597 #[test]
598 fn test_column_collection() {
599 let mut collection = StreamingColumnCollection::new();
600 let headers = vec!["name".to_string(), "age".to_string()];
601
602 collection.process_record(&headers, vec!["Alice".to_string(), "25".to_string()]);
603 collection.process_record(&headers, vec!["Bob".to_string(), "30".to_string()]);
604
605 let age_stats = collection.get_column_stats("age").unwrap();
606 assert_eq!(age_stats.count, 2);
607 assert!((age_stats.mean() - 27.5).abs() < 1e-10);
608 }
609
610 #[test]
611 fn test_welford_accuracy() {
612 let mut accumulator = WelfordAccumulator::new();
613 for value in 1..=1000 {
614 accumulator.update(value as f64);
615 }
616 let expected_mean = 500.5;
617 let expected_variance = (1000.0 * 1000.0 - 1.0) / 12.0;
618 assert!((accumulator.mean() - expected_mean).abs() < 1e-6);
619 assert!((accumulator.variance() - expected_variance).abs() < 1.0);
620 }
621
622 #[test]
623 fn test_welford_merge() {
624 let mut left = WelfordAccumulator::new();
625 let mut right = WelfordAccumulator::new();
626 let mut full = WelfordAccumulator::new();
627
628 for value in 1..=500 {
629 left.update(value as f64);
630 full.update(value as f64);
631 }
632 for value in 501..=1000 {
633 right.update(value as f64);
634 full.update(value as f64);
635 }
636
637 left.merge(&right);
638 assert!((left.mean() - full.mean()).abs() < 1e-10);
639 assert!((left.variance() - full.variance()).abs() < 1e-6);
640 }
641
642 #[test]
643 fn test_hll_cardinality() {
644 let mut counter = HllCounter::new();
645 let total = 100_000;
646 for index in 0..total {
647 counter.insert(&format!("item_{index}"));
648 }
649 let estimate = counter.count();
650 let error = (estimate as f64 - total as f64).abs() / total as f64;
651 assert!(error < 0.05);
652 }
653
654 #[test]
655 fn test_reservoir_uniformity() {
656 let mut sampler = StreamReservoirSampler::seed(1000, 42);
657 let total = 100_000;
658 for index in 0..total {
659 sampler.offer(index.to_string());
660 }
661
662 assert_eq!(sampler.samples().len(), 1000);
663 let values: Vec<usize> = sampler
664 .samples()
665 .iter()
666 .map(|value| value.parse().unwrap())
667 .collect();
668 let max_value = *values.iter().max().unwrap();
669 assert!(max_value > total / 2);
670 }
671
672 #[test]
673 fn test_text_length_stats_streaming() {
674 let mut stats = TextLengthStats::new();
675 for &length in &[3, 5, 10, 1, 7] {
676 stats.update(length);
677 }
678 assert_eq!(stats.min_length, 1);
679 assert_eq!(stats.max_length, 10);
680 assert!((stats.avg_length - 5.2).abs() < 1e-10);
681 }
682
683 #[test]
684 fn test_memory_usage_bounded() {
685 let mut stats = StreamingStatistics::new();
686 for index in 0..50_000 {
687 stats.update(&format!("value_{index}"));
688 }
689 let usage = stats.memory_usage_bytes();
690 assert!(usage < 1_000_000);
691 }
692}