scouter_profile/profile/
num_profiler.rs

1use crate::error::DataProfileError;
2use crate::profile::stats::compute_feature_correlations;
3use crate::profile::types::DataProfile;
4use crate::profile::types::{Distinct, FeatureProfile, Histogram, NumericStats, Quantiles};
5use ndarray::prelude::*;
6use ndarray::{aview1, Axis};
7use ndarray_stats::MaybeNan;
8use ndarray_stats::{interpolate::Nearest, QuantileExt};
9use noisy_float::types::{n64, N64};
10use num_traits::ToPrimitive;
11use num_traits::{Float, FromPrimitive, Num};
12use rayon::prelude::*;
13use std::cmp::Ord;
14use std::collections::{BTreeMap, HashMap, HashSet};
15use tracing::{debug, error, warn};
16pub struct NumProfiler {}
17
18impl NumProfiler {
19    pub fn new() -> Self {
20        NumProfiler {}
21    }
22
23    /// Compute quantiles for a 2D array.
24    ///
25    /// # Arguments
26    ///
27    /// * `array` - A 1D array of f64 values.
28    ///
29    /// # Returns
30    ///
31    /// A 2D array of noisy floats.
32    pub fn compute_quantiles<F>(
33        &self,
34        array: &ArrayView2<F>,
35    ) -> Result<(Option<Array2<N64>>, bool), DataProfileError>
36    where
37        F: Num + ndarray_stats::MaybeNan + std::marker::Send + Sync + Clone + Copy + Float,
38        <F as ndarray_stats::MaybeNan>::NotNan: Clone,
39        <F as ndarray_stats::MaybeNan>::NotNan: Ord,
40        f64: From<F>,
41    {
42        // First convert to f64, then to n64
43        // Check for NaN or Inf values early to avoid unnecessary computation
44        if array.iter().any(|&x| x.is_nan() || x.is_infinite()) {
45            warn!("Array contains NaN or Inf values, skipping quantile computation");
46            return Ok((None, true));
47        }
48
49        // Convert F values to n64 in one step
50        let mut n64_array = array.mapv(|x| n64(f64::from(x)));
51
52        let qs = &[n64(0.25), n64(0.5), n64(0.75), n64(0.99)];
53        let quantiles = n64_array.quantiles_axis_mut(Axis(0), &aview1(qs), &Nearest)?;
54
55        Ok((Some(quantiles), false))
56    }
57
58    /// Compute the mean for a 2D array.
59    ///
60    /// # Arguments
61    ///
62    /// * `array` - A 2D array of values.
63    ///
64    /// # Returns
65    ///
66    /// A 1D array of f64 values.
67    pub fn compute_mean<F>(&self, array: &ArrayView2<F>) -> Result<Array1<F>, DataProfileError>
68    where
69        F: FromPrimitive + Num + Clone,
70    {
71        let mean = array
72            .mean_axis(Axis(0))
73            .ok_or(DataProfileError::MeanError)?;
74
75        Ok(mean)
76    }
77
78    /// Compute the stddev for a 2D array.
79    ///
80    /// # Arguments
81    ///
82    /// * `array` - A 2D array of f64 values.
83    ///
84    /// # Returns
85    ///
86    /// A 1D array of f64 values.
87    pub fn compute_stddev<F>(&self, array: &ArrayView2<F>) -> Result<Array1<F>, DataProfileError>
88    where
89        F: FromPrimitive + Num + Float,
90    {
91        let ddof = F::from(1.0).unwrap();
92        let stddev = array.std_axis(Axis(0), ddof);
93        Ok(stddev)
94    }
95
96    /// Compute the min for a 2D array.
97    ///
98    /// # Arguments
99    ///
100    /// * `array` - A 2D array of values.
101    ///
102    /// # Returns
103    ///
104    /// A 1D array of values.
105    pub fn compute_min<F>(&self, array: &ArrayView2<F>) -> Result<Array1<F>, DataProfileError>
106    where
107        F: MaybeNan + Num + Clone,
108        <F as MaybeNan>::NotNan: Ord,
109        F: Into<f64>,
110    {
111        let min = array.map_axis(Axis(0), |a| a.min_skipnan().to_owned());
112        Ok(min)
113    }
114
115    /// Compute the max for a 2D array.
116    ///
117    /// # Arguments
118    ///
119    /// * `array` - A 2D array of values.
120    ///
121    /// # Returns
122    ///
123    /// A 1D array of values.
124    pub fn compute_max<F>(&self, array: &ArrayView2<F>) -> Result<Array1<F>, DataProfileError>
125    where
126        F: MaybeNan + Num + Clone,
127        <F as MaybeNan>::NotNan: Ord,
128        F: Into<f64>,
129    {
130        let max = array.map_axis(Axis(0), |a| a.max_skipnan().to_owned());
131        Ok(max)
132    }
133
134    /// Compute the distinct numbers in a 2D matrix.
135    ///
136    /// # Arguments
137    ///
138    /// * `array` - A 2D array of values.
139    ///
140    /// # Returns
141    ///
142    /// A 1D array of values.
143    pub fn compute_distinct<F>(
144        &self,
145        array: &ArrayView2<F>,
146    ) -> Result<Vec<Distinct>, DataProfileError>
147    where
148        F: std::fmt::Display + Num,
149    {
150        let unique: Vec<Distinct> = array
151            .axis_iter(Axis(1))
152            .map(|x| {
153                let hash = x.iter().map(|x| x.to_string()).collect::<HashSet<String>>();
154                Distinct {
155                    count: hash.len(),
156                    percent: hash.len() as f64 / x.len() as f64,
157                }
158            })
159            .collect();
160
161        Ok(unique)
162    }
163
164    /// Compute the histogram and bins from a 2D matrix.
165    ///
166    /// # Arguments
167    ///
168    /// * `array` - A 2D array of values.
169    ///
170    /// # Returns
171    ///
172    pub fn compute_bins<F>(
173        &self,
174        array: &ArrayView1<F>,
175        bin_size: &usize,
176    ) -> Result<Vec<f64>, DataProfileError>
177    where
178        F: Float + Num + core::ops::Sub,
179        f64: From<F>,
180    {
181        // find the min and max of the data
182
183        let max: f64 = array.max()?.to_owned().into();
184        let min: f64 = array.min()?.to_owned().into();
185
186        // create a vector of bins
187        let mut bins = Vec::<f64>::with_capacity(*bin_size);
188
189        // compute the bin width
190        let bin_width = (max - min) / *bin_size as f64;
191
192        // create the bins
193        for i in 0..*bin_size {
194            bins.push(min + bin_width * i as f64);
195        }
196
197        // return the bins
198        Ok(bins)
199    }
200
201    pub fn compute_bin_counts<F>(
202        &self,
203        array: &ArrayView1<F>,
204        bins: &[f64],
205    ) -> Result<Vec<i32>, DataProfileError>
206    where
207        F: Num
208            + ndarray_stats::MaybeNan
209            + std::marker::Send
210            + Sync
211            + Clone
212            + Copy
213            + num_traits::Float,
214        f64: From<F>,
215    {
216        // create a vector of size bins
217        let mut bin_counts = vec![0; bins.len()];
218        let max_bin = bins.last().ok_or(DataProfileError::MaxBinError)?;
219
220        array.for_each(|datum| {
221            let val: f64 = datum.to_owned().into();
222
223            // iterate over the bins
224            for (i, bin) in bins.iter().enumerate() {
225                if bin != max_bin {
226                    // check if datum is between bin and next bin
227                    if &val >= bin && val < bins[i + 1] {
228                        bin_counts[i] += 1;
229                        break;
230                    }
231                    continue;
232                } else if bin == max_bin {
233                    if &val > bin {
234                        bin_counts[i] += 1;
235                        break;
236                    }
237                    continue;
238                } else {
239                    continue;
240                }
241            }
242        });
243
244        Ok(bin_counts)
245    }
246
247    pub fn compute_histogram<F>(
248        &self,
249        array: &ArrayView2<F>,
250        features: &[String],
251        bin_size: &usize,
252        has_unsupported_types: bool,
253    ) -> Result<HashMap<String, Histogram>, DataProfileError>
254    where
255        F: Num
256            + ndarray_stats::MaybeNan
257            + std::marker::Send
258            + Sync
259            + Clone
260            + Copy
261            + num_traits::Float
262            + std::fmt::Debug,
263        f64: From<F>,
264    {
265        // Process each column in parallel
266        array
267            .axis_iter(Axis(1))
268            .into_par_iter()
269            .enumerate()
270            .map(|(idx, column)| {
271                // Compute histogram components
272
273                if has_unsupported_types {
274                    warn!(
275                        "Skipping histogram computation for feature {} due to unsupported types",
276                        features.get(idx).unwrap_or(&"Unknown".to_string())
277                    );
278                    return Ok((features[idx].clone(), Histogram::default()));
279                }
280
281                let bins = self.compute_bins(&column, bin_size).map_err(|e| {
282                    error!(
283                        error = %e,
284                        feature = %features.get(idx).unwrap_or(&"Unknown".to_string()),
285                        column = ?column,
286                        bin_size = bin_size,
287                        "Failed to compute bins"
288                    );
289                    e
290                })?;
291                let bin_counts = self.compute_bin_counts(&column, &bins).map_err(|e| {
292                    error!(
293                        error = %e,
294                        feature = %features.get(idx).unwrap_or(&"Unknown".to_string()),
295                        "Failed to compute bin counts"
296                    );
297                    e
298                })?;
299
300                // Create histogram for this feature
301                Ok((features[idx].clone(), Histogram { bins, bin_counts }))
302            })
303            .collect()
304    }
305
306    /// Compute the base stats for a 1D array of data
307    ///
308    /// # Arguments
309    ///
310    /// * `array` - A 1D array of f64 values
311    ///  
312    /// # Returns
313    ///
314    /// A tuple containing the mean, standard deviation, min, max, distinct, and quantiles
315    pub fn compute_stats<F>(
316        &self,
317        features: &[String],
318        array: &ArrayView2<F>,
319        bin_size: &usize,
320    ) -> Result<Vec<FeatureProfile>, DataProfileError>
321    where
322        F: Float
323            + MaybeNan
324            + FromPrimitive
325            + std::fmt::Display
326            + Sync
327            + Send
328            + Num
329            + Clone
330            + std::fmt::Debug
331            + 'static,
332        F: Into<f64>,
333        <F as MaybeNan>::NotNan: Ord,
334        f64: From<F>,
335        <F as MaybeNan>::NotNan: Clone,
336    {
337        let means = self.compute_mean(array)?;
338
339        debug!("Computing stddev");
340        let stddevs = self.compute_stddev(array)?;
341
342        debug!("Computing quantiles");
343        let (quantiles, has_unsupported_types) = self.compute_quantiles(array)?;
344
345        debug!("Computing min");
346        let mins = self.compute_min(array)?;
347
348        debug!("Computing max");
349        let maxs = self.compute_max(array)?;
350
351        debug!("Computing distinct values");
352        let distinct = self.compute_distinct(array)?;
353
354        debug!("Computing histogram");
355        let hist = self.compute_histogram(array, features, bin_size, has_unsupported_types)?;
356
357        // loop over list
358        let mut profiles = Vec::new();
359        for i in 0..features.len() {
360            let mean = &means[i];
361            let stddev = &stddevs[i];
362            let min = &mins[i];
363            let max = &maxs[i];
364            let q25 = quantiles.as_ref().map(|q| q[[0, i]]);
365            let q50 = quantiles.as_ref().map(|q| q[[1, i]]);
366            let q75 = quantiles.as_ref().map(|q| q[[2, i]]);
367            let q99 = quantiles.as_ref().map(|q| q[[3, i]]);
368            let dist = &distinct[i];
369
370            let numeric_stats = NumericStats {
371                mean: f64::from(*mean),
372                stddev: f64::from(*stddev),
373                min: f64::from(*min),
374                max: f64::from(*max),
375
376                distinct: Distinct {
377                    count: dist.count,
378                    percent: dist.percent,
379                },
380                quantiles: Quantiles {
381                    q25: q25.unwrap_or_default().to_f64().unwrap_or_default(),
382                    q50: q50.unwrap_or_default().to_f64().unwrap_or_default(),
383                    q75: q75.unwrap_or_default().to_f64().unwrap_or_default(),
384                    q99: q99.unwrap_or_default().to_f64().unwrap_or_default(),
385                },
386                histogram: hist[&features[i]].clone(),
387            };
388
389            let profile = FeatureProfile {
390                id: features[i].clone(),
391                numeric_stats: Some(numeric_stats),
392                string_stats: None,
393                timestamp: chrono::Utc::now(),
394                correlations: None,
395            };
396
397            profiles.push(profile);
398        }
399
400        Ok(profiles)
401    }
402
403    pub fn process_num_array<F>(
404        &mut self,
405        compute_correlations: bool,
406        numeric_array: &ArrayView2<F>,
407        numeric_features: Vec<String>,
408        bin_size: usize,
409    ) -> Result<DataProfile, DataProfileError>
410    where
411        F: Float
412            + MaybeNan
413            + FromPrimitive
414            + std::fmt::Display
415            + Sync
416            + Send
417            + Num
418            + Clone
419            + std::fmt::Debug
420            + 'static,
421        F: Into<f64>,
422        <F as MaybeNan>::NotNan: Ord,
423        f64: From<F>,
424        <F as MaybeNan>::NotNan: Clone,
425    {
426        let profiles = self.compute_stats(&numeric_features, numeric_array, &bin_size)?;
427        let correlations = if compute_correlations {
428            let feature_names = numeric_features.clone();
429            let feature_correlations = compute_feature_correlations(numeric_array, &feature_names);
430
431            // convert all values to f64
432
433            Some(feature_correlations)
434        } else {
435            None
436        };
437
438        let features: BTreeMap<String, FeatureProfile> = profiles
439            .iter()
440            .map(|profile| {
441                let mut profile = profile.clone();
442
443                if let Some(correlations) = correlations.as_ref() {
444                    let correlation = correlations.get(&profile.id);
445                    if let Some(correlation) = correlation {
446                        profile.add_correlations(correlation.clone());
447                    }
448                }
449
450                (profile.id.clone(), profile)
451            })
452            .collect();
453
454        Ok(DataProfile { features })
455    }
456}
457
458impl Default for NumProfiler {
459    fn default() -> Self {
460        NumProfiler::new()
461    }
462}
463
464#[cfg(test)]
465mod tests {
466
467    use super::*;
468    use ndarray::Array;
469    use ndarray::{concatenate, Axis};
470    use ndarray_rand::rand_distr::Uniform;
471    use ndarray_rand::RandomExt;
472
473    use approx::relative_eq;
474
475    #[test]
476    fn test_profile_creation_f64() {
477        // create 2d array
478        let array1 = Array::random((1000, 1), Uniform::new(0., 1.));
479        let array2 = Array::random((1000, 1), Uniform::new(1., 2.));
480        let array3 = Array::random((1000, 1), Uniform::new(2., 3.));
481
482        let array = concatenate![Axis(1), array1, array2, array3];
483        let features = vec![
484            "feature_1".to_string(),
485            "feature_2".to_string(),
486            "feature_3".to_string(),
487        ];
488
489        let profiler = NumProfiler::default();
490        let bin_size = 20;
491
492        let profile = profiler
493            .compute_stats(&features, &array.view(), &bin_size)
494            .unwrap();
495
496        assert_eq!(profile.len(), 3);
497        assert_eq!(profile[0].id, "feature_1");
498        assert_eq!(profile[1].id, "feature_2");
499        assert_eq!(profile[2].id, "feature_3");
500
501        // check mean
502        assert!(relative_eq!(
503            profile[0].numeric_stats.as_ref().unwrap().mean,
504            0.5,
505            epsilon = 0.1
506        ));
507        assert!(relative_eq!(
508            profile[1].numeric_stats.as_ref().unwrap().mean,
509            1.5,
510            epsilon = 0.1
511        ));
512        assert!(relative_eq!(
513            profile[2].numeric_stats.as_ref().unwrap().mean,
514            2.5,
515            epsilon = 0.1
516        ));
517
518        // check quantiles
519        assert!(relative_eq!(
520            profile[0].numeric_stats.as_ref().unwrap().quantiles.q25,
521            0.25,
522            epsilon = 0.1
523        ));
524
525        assert!(relative_eq!(
526            profile[0].numeric_stats.as_ref().unwrap().quantiles.q50,
527            0.5,
528            epsilon = 0.1
529        ));
530        assert!(relative_eq!(
531            profile[0].numeric_stats.as_ref().unwrap().quantiles.q75,
532            0.75,
533            epsilon = 0.1
534        ));
535        assert!(relative_eq!(
536            profile[0].numeric_stats.as_ref().unwrap().quantiles.q99,
537            0.99,
538            epsilon = 0.1
539        ));
540    }
541
542    #[test]
543    fn test_profile_creation_f32() {
544        // create 2d array
545        let array1 = Array::random((1000, 1), Uniform::new(0., 1.));
546        let array2 = Array::random((1000, 1), Uniform::new(1., 2.));
547        let array3 = Array::random((1000, 1), Uniform::new(2., 3.));
548
549        let array = concatenate![Axis(1), array1, array2, array3];
550        let features = vec![
551            "feature_1".to_string(),
552            "feature_2".to_string(),
553            "feature_3".to_string(),
554        ];
555
556        // cast array to f32
557        let array = array.mapv(|x| x as f32);
558        let bin_size = 20;
559
560        let profiler = NumProfiler::default();
561
562        let profile = profiler
563            .compute_stats(&features, &array.view(), &bin_size)
564            .unwrap();
565
566        assert_eq!(profile.len(), 3);
567        assert_eq!(profile[0].id, "feature_1");
568        assert_eq!(profile[1].id, "feature_2");
569        assert_eq!(profile[2].id, "feature_3");
570
571        // check mean
572        assert!(relative_eq!(
573            profile[0].numeric_stats.as_ref().unwrap().mean,
574            0.5,
575            epsilon = 0.05
576        ));
577        assert!(relative_eq!(
578            profile[1].numeric_stats.as_ref().unwrap().mean,
579            1.5,
580            epsilon = 0.05
581        ));
582        assert!(relative_eq!(
583            profile[2].numeric_stats.as_ref().unwrap().mean,
584            2.5,
585            epsilon = 0.05
586        ));
587
588        // check quantiles
589        assert!(relative_eq!(
590            profile[0].numeric_stats.as_ref().unwrap().quantiles.q25,
591            0.25,
592            epsilon = 0.05
593        ));
594        assert!(relative_eq!(
595            profile[0].numeric_stats.as_ref().unwrap().quantiles.q50,
596            0.5,
597            epsilon = 0.05
598        ));
599        assert!(relative_eq!(
600            profile[0].numeric_stats.as_ref().unwrap().quantiles.q75,
601            0.75,
602            epsilon = 0.05
603        ));
604        assert!(relative_eq!(
605            profile[0].numeric_stats.as_ref().unwrap().quantiles.q99,
606            0.99,
607            epsilon = 0.05
608        ));
609    }
610}