1use crate::error::DataProfileError;
2use crate::profile::stats::compute_feature_correlations;
3use crate::profile::types::DataProfile;
4use crate::profile::types::{Distinct, FeatureProfile, Histogram, NumericStats, Quantiles};
5use ndarray::prelude::*;
6use ndarray::{aview1, Axis};
7use ndarray_stats::MaybeNan;
8use ndarray_stats::{interpolate::Nearest, QuantileExt};
9use noisy_float::types::{n64, N64};
10use num_traits::ToPrimitive;
11use num_traits::{Float, FromPrimitive, Num};
12use rayon::prelude::*;
13use std::cmp::Ord;
14use std::collections::{BTreeMap, HashMap, HashSet};
15use tracing::{debug, error, warn};
16pub struct NumProfiler {}
17
18impl NumProfiler {
19 pub fn new() -> Self {
20 NumProfiler {}
21 }
22
23 pub fn compute_quantiles<F>(
33 &self,
34 array: &ArrayView2<F>,
35 ) -> Result<(Option<Array2<N64>>, bool), DataProfileError>
36 where
37 F: Num + ndarray_stats::MaybeNan + std::marker::Send + Sync + Clone + Copy + Float,
38 <F as ndarray_stats::MaybeNan>::NotNan: Clone,
39 <F as ndarray_stats::MaybeNan>::NotNan: Ord,
40 f64: From<F>,
41 {
42 if array.iter().any(|&x| x.is_nan() || x.is_infinite()) {
45 warn!("Array contains NaN or Inf values, skipping quantile computation");
46 return Ok((None, true));
47 }
48
49 let mut n64_array = array.mapv(|x| n64(f64::from(x)));
51
52 let qs = &[n64(0.25), n64(0.5), n64(0.75), n64(0.99)];
53 let quantiles = n64_array.quantiles_axis_mut(Axis(0), &aview1(qs), &Nearest)?;
54
55 Ok((Some(quantiles), false))
56 }
57
58 pub fn compute_mean<F>(&self, array: &ArrayView2<F>) -> Result<Array1<F>, DataProfileError>
68 where
69 F: FromPrimitive + Num + Clone,
70 {
71 let mean = array
72 .mean_axis(Axis(0))
73 .ok_or(DataProfileError::MeanError)?;
74
75 Ok(mean)
76 }
77
78 pub fn compute_stddev<F>(&self, array: &ArrayView2<F>) -> Result<Array1<F>, DataProfileError>
88 where
89 F: FromPrimitive + Num + Float,
90 {
91 let ddof = F::from(1.0).unwrap();
92 let stddev = array.std_axis(Axis(0), ddof);
93 Ok(stddev)
94 }
95
96 pub fn compute_min<F>(&self, array: &ArrayView2<F>) -> Result<Array1<F>, DataProfileError>
106 where
107 F: MaybeNan + Num + Clone,
108 <F as MaybeNan>::NotNan: Ord,
109 F: Into<f64>,
110 {
111 let min = array.map_axis(Axis(0), |a| a.min_skipnan().to_owned());
112 Ok(min)
113 }
114
115 pub fn compute_max<F>(&self, array: &ArrayView2<F>) -> Result<Array1<F>, DataProfileError>
125 where
126 F: MaybeNan + Num + Clone,
127 <F as MaybeNan>::NotNan: Ord,
128 F: Into<f64>,
129 {
130 let max = array.map_axis(Axis(0), |a| a.max_skipnan().to_owned());
131 Ok(max)
132 }
133
134 pub fn compute_distinct<F>(
144 &self,
145 array: &ArrayView2<F>,
146 ) -> Result<Vec<Distinct>, DataProfileError>
147 where
148 F: std::fmt::Display + Num,
149 {
150 let unique: Vec<Distinct> = array
151 .axis_iter(Axis(1))
152 .map(|x| {
153 let hash = x.iter().map(|x| x.to_string()).collect::<HashSet<String>>();
154 Distinct {
155 count: hash.len(),
156 percent: hash.len() as f64 / x.len() as f64,
157 }
158 })
159 .collect();
160
161 Ok(unique)
162 }
163
164 pub fn compute_bins<F>(
173 &self,
174 array: &ArrayView1<F>,
175 bin_size: &usize,
176 ) -> Result<Vec<f64>, DataProfileError>
177 where
178 F: Float + Num + core::ops::Sub,
179 f64: From<F>,
180 {
181 let max: f64 = array.max()?.to_owned().into();
184 let min: f64 = array.min()?.to_owned().into();
185
186 let mut bins = Vec::<f64>::with_capacity(*bin_size);
188
189 let bin_width = (max - min) / *bin_size as f64;
191
192 for i in 0..*bin_size {
194 bins.push(min + bin_width * i as f64);
195 }
196
197 Ok(bins)
199 }
200
201 pub fn compute_bin_counts<F>(
202 &self,
203 array: &ArrayView1<F>,
204 bins: &[f64],
205 ) -> Result<Vec<i32>, DataProfileError>
206 where
207 F: Num
208 + ndarray_stats::MaybeNan
209 + std::marker::Send
210 + Sync
211 + Clone
212 + Copy
213 + num_traits::Float,
214 f64: From<F>,
215 {
216 let mut bin_counts = vec![0; bins.len()];
218 let max_bin = bins.last().ok_or(DataProfileError::MaxBinError)?;
219
220 array.for_each(|datum| {
221 let val: f64 = datum.to_owned().into();
222
223 for (i, bin) in bins.iter().enumerate() {
225 if bin != max_bin {
226 if &val >= bin && val < bins[i + 1] {
228 bin_counts[i] += 1;
229 break;
230 }
231 continue;
232 } else if bin == max_bin {
233 if &val > bin {
234 bin_counts[i] += 1;
235 break;
236 }
237 continue;
238 } else {
239 continue;
240 }
241 }
242 });
243
244 Ok(bin_counts)
245 }
246
247 pub fn compute_histogram<F>(
248 &self,
249 array: &ArrayView2<F>,
250 features: &[String],
251 bin_size: &usize,
252 has_unsupported_types: bool,
253 ) -> Result<HashMap<String, Histogram>, DataProfileError>
254 where
255 F: Num
256 + ndarray_stats::MaybeNan
257 + std::marker::Send
258 + Sync
259 + Clone
260 + Copy
261 + num_traits::Float
262 + std::fmt::Debug,
263 f64: From<F>,
264 {
265 array
267 .axis_iter(Axis(1))
268 .into_par_iter()
269 .enumerate()
270 .map(|(idx, column)| {
271 if has_unsupported_types {
274 warn!(
275 "Skipping histogram computation for feature {} due to unsupported types",
276 features.get(idx).unwrap_or(&"Unknown".to_string())
277 );
278 return Ok((features[idx].clone(), Histogram::default()));
279 }
280
281 let bins = self.compute_bins(&column, bin_size).map_err(|e| {
282 error!(
283 error = %e,
284 feature = %features.get(idx).unwrap_or(&"Unknown".to_string()),
285 column = ?column,
286 bin_size = bin_size,
287 "Failed to compute bins"
288 );
289 e
290 })?;
291 let bin_counts = self.compute_bin_counts(&column, &bins).map_err(|e| {
292 error!(
293 error = %e,
294 feature = %features.get(idx).unwrap_or(&"Unknown".to_string()),
295 "Failed to compute bin counts"
296 );
297 e
298 })?;
299
300 Ok((features[idx].clone(), Histogram { bins, bin_counts }))
302 })
303 .collect()
304 }
305
306 pub fn compute_stats<F>(
316 &self,
317 features: &[String],
318 array: &ArrayView2<F>,
319 bin_size: &usize,
320 ) -> Result<Vec<FeatureProfile>, DataProfileError>
321 where
322 F: Float
323 + MaybeNan
324 + FromPrimitive
325 + std::fmt::Display
326 + Sync
327 + Send
328 + Num
329 + Clone
330 + std::fmt::Debug
331 + 'static,
332 F: Into<f64>,
333 <F as MaybeNan>::NotNan: Ord,
334 f64: From<F>,
335 <F as MaybeNan>::NotNan: Clone,
336 {
337 let means = self.compute_mean(array)?;
338
339 debug!("Computing stddev");
340 let stddevs = self.compute_stddev(array)?;
341
342 debug!("Computing quantiles");
343 let (quantiles, has_unsupported_types) = self.compute_quantiles(array)?;
344
345 debug!("Computing min");
346 let mins = self.compute_min(array)?;
347
348 debug!("Computing max");
349 let maxs = self.compute_max(array)?;
350
351 debug!("Computing distinct values");
352 let distinct = self.compute_distinct(array)?;
353
354 debug!("Computing histogram");
355 let hist = self.compute_histogram(array, features, bin_size, has_unsupported_types)?;
356
357 let mut profiles = Vec::new();
359 for i in 0..features.len() {
360 let mean = &means[i];
361 let stddev = &stddevs[i];
362 let min = &mins[i];
363 let max = &maxs[i];
364 let q25 = quantiles.as_ref().map(|q| q[[0, i]]);
365 let q50 = quantiles.as_ref().map(|q| q[[1, i]]);
366 let q75 = quantiles.as_ref().map(|q| q[[2, i]]);
367 let q99 = quantiles.as_ref().map(|q| q[[3, i]]);
368 let dist = &distinct[i];
369
370 let numeric_stats = NumericStats {
371 mean: f64::from(*mean),
372 stddev: f64::from(*stddev),
373 min: f64::from(*min),
374 max: f64::from(*max),
375
376 distinct: Distinct {
377 count: dist.count,
378 percent: dist.percent,
379 },
380 quantiles: Quantiles {
381 q25: q25.unwrap_or_default().to_f64().unwrap_or_default(),
382 q50: q50.unwrap_or_default().to_f64().unwrap_or_default(),
383 q75: q75.unwrap_or_default().to_f64().unwrap_or_default(),
384 q99: q99.unwrap_or_default().to_f64().unwrap_or_default(),
385 },
386 histogram: hist[&features[i]].clone(),
387 };
388
389 let profile = FeatureProfile {
390 id: features[i].clone(),
391 numeric_stats: Some(numeric_stats),
392 string_stats: None,
393 timestamp: chrono::Utc::now(),
394 correlations: None,
395 };
396
397 profiles.push(profile);
398 }
399
400 Ok(profiles)
401 }
402
403 pub fn process_num_array<F>(
404 &mut self,
405 compute_correlations: bool,
406 numeric_array: &ArrayView2<F>,
407 numeric_features: Vec<String>,
408 bin_size: usize,
409 ) -> Result<DataProfile, DataProfileError>
410 where
411 F: Float
412 + MaybeNan
413 + FromPrimitive
414 + std::fmt::Display
415 + Sync
416 + Send
417 + Num
418 + Clone
419 + std::fmt::Debug
420 + 'static,
421 F: Into<f64>,
422 <F as MaybeNan>::NotNan: Ord,
423 f64: From<F>,
424 <F as MaybeNan>::NotNan: Clone,
425 {
426 let profiles = self.compute_stats(&numeric_features, numeric_array, &bin_size)?;
427 let correlations = if compute_correlations {
428 let feature_names = numeric_features.clone();
429 let feature_correlations = compute_feature_correlations(numeric_array, &feature_names);
430
431 Some(feature_correlations)
434 } else {
435 None
436 };
437
438 let features: BTreeMap<String, FeatureProfile> = profiles
439 .iter()
440 .map(|profile| {
441 let mut profile = profile.clone();
442
443 if let Some(correlations) = correlations.as_ref() {
444 let correlation = correlations.get(&profile.id);
445 if let Some(correlation) = correlation {
446 profile.add_correlations(correlation.clone());
447 }
448 }
449
450 (profile.id.clone(), profile)
451 })
452 .collect();
453
454 Ok(DataProfile { features })
455 }
456}
457
458impl Default for NumProfiler {
459 fn default() -> Self {
460 NumProfiler::new()
461 }
462}
463
464#[cfg(test)]
465mod tests {
466
467 use super::*;
468 use ndarray::Array;
469 use ndarray::{concatenate, Axis};
470 use ndarray_rand::rand_distr::Uniform;
471 use ndarray_rand::RandomExt;
472
473 use approx::relative_eq;
474
475 #[test]
476 fn test_profile_creation_f64() {
477 let array1 = Array::random((1000, 1), Uniform::new(0., 1.));
479 let array2 = Array::random((1000, 1), Uniform::new(1., 2.));
480 let array3 = Array::random((1000, 1), Uniform::new(2., 3.));
481
482 let array = concatenate![Axis(1), array1, array2, array3];
483 let features = vec![
484 "feature_1".to_string(),
485 "feature_2".to_string(),
486 "feature_3".to_string(),
487 ];
488
489 let profiler = NumProfiler::default();
490 let bin_size = 20;
491
492 let profile = profiler
493 .compute_stats(&features, &array.view(), &bin_size)
494 .unwrap();
495
496 assert_eq!(profile.len(), 3);
497 assert_eq!(profile[0].id, "feature_1");
498 assert_eq!(profile[1].id, "feature_2");
499 assert_eq!(profile[2].id, "feature_3");
500
501 assert!(relative_eq!(
503 profile[0].numeric_stats.as_ref().unwrap().mean,
504 0.5,
505 epsilon = 0.1
506 ));
507 assert!(relative_eq!(
508 profile[1].numeric_stats.as_ref().unwrap().mean,
509 1.5,
510 epsilon = 0.1
511 ));
512 assert!(relative_eq!(
513 profile[2].numeric_stats.as_ref().unwrap().mean,
514 2.5,
515 epsilon = 0.1
516 ));
517
518 assert!(relative_eq!(
520 profile[0].numeric_stats.as_ref().unwrap().quantiles.q25,
521 0.25,
522 epsilon = 0.1
523 ));
524
525 assert!(relative_eq!(
526 profile[0].numeric_stats.as_ref().unwrap().quantiles.q50,
527 0.5,
528 epsilon = 0.1
529 ));
530 assert!(relative_eq!(
531 profile[0].numeric_stats.as_ref().unwrap().quantiles.q75,
532 0.75,
533 epsilon = 0.1
534 ));
535 assert!(relative_eq!(
536 profile[0].numeric_stats.as_ref().unwrap().quantiles.q99,
537 0.99,
538 epsilon = 0.1
539 ));
540 }
541
542 #[test]
543 fn test_profile_creation_f32() {
544 let array1 = Array::random((1000, 1), Uniform::new(0., 1.));
546 let array2 = Array::random((1000, 1), Uniform::new(1., 2.));
547 let array3 = Array::random((1000, 1), Uniform::new(2., 3.));
548
549 let array = concatenate![Axis(1), array1, array2, array3];
550 let features = vec![
551 "feature_1".to_string(),
552 "feature_2".to_string(),
553 "feature_3".to_string(),
554 ];
555
556 let array = array.mapv(|x| x as f32);
558 let bin_size = 20;
559
560 let profiler = NumProfiler::default();
561
562 let profile = profiler
563 .compute_stats(&features, &array.view(), &bin_size)
564 .unwrap();
565
566 assert_eq!(profile.len(), 3);
567 assert_eq!(profile[0].id, "feature_1");
568 assert_eq!(profile[1].id, "feature_2");
569 assert_eq!(profile[2].id, "feature_3");
570
571 assert!(relative_eq!(
573 profile[0].numeric_stats.as_ref().unwrap().mean,
574 0.5,
575 epsilon = 0.05
576 ));
577 assert!(relative_eq!(
578 profile[1].numeric_stats.as_ref().unwrap().mean,
579 1.5,
580 epsilon = 0.05
581 ));
582 assert!(relative_eq!(
583 profile[2].numeric_stats.as_ref().unwrap().mean,
584 2.5,
585 epsilon = 0.05
586 ));
587
588 assert!(relative_eq!(
590 profile[0].numeric_stats.as_ref().unwrap().quantiles.q25,
591 0.25,
592 epsilon = 0.05
593 ));
594 assert!(relative_eq!(
595 profile[0].numeric_stats.as_ref().unwrap().quantiles.q50,
596 0.5,
597 epsilon = 0.05
598 ));
599 assert!(relative_eq!(
600 profile[0].numeric_stats.as_ref().unwrap().quantiles.q75,
601 0.75,
602 epsilon = 0.05
603 ));
604 assert!(relative_eq!(
605 profile[0].numeric_stats.as_ref().unwrap().quantiles.q99,
606 0.99,
607 epsilon = 0.05
608 ));
609 }
610}