1use crate::error::DataProfileError;
2use crate::profile::stats::compute_feature_correlations;
3use crate::profile::types::DataProfile;
4use crate::profile::types::{Distinct, FeatureProfile, Histogram, NumericStats, Quantiles};
5use ndarray::prelude::*;
6use ndarray::{aview1, Axis};
7use ndarray_stats::MaybeNan;
8use ndarray_stats::{interpolate::Nearest, QuantileExt};
9use noisy_float::types::{n64, N64};
10use num_traits::ToPrimitive;
11use num_traits::{Float, FromPrimitive, Num};
12use rayon::prelude::*;
13use std::cmp::Ord;
14use std::collections::{BTreeMap, HashMap, HashSet};
15use tracing::{debug, error, warn};
16
17pub fn compute_bins<F>(
26 array: &ArrayView1<F>,
27 bin_size: &usize,
28) -> Result<Vec<f64>, DataProfileError>
29where
30 F: Float + Num + core::ops::Sub,
31 f64: From<F>,
32{
33 let max: f64 = array.max()?.to_owned().into();
36 let min: f64 = array.min()?.to_owned().into();
37
38 let mut bins = Vec::<f64>::with_capacity(*bin_size);
40
41 let bin_width = (max - min) / *bin_size as f64;
43
44 for i in 0..*bin_size {
46 bins.push(min + bin_width * i as f64);
47 }
48
49 Ok(bins)
51}
52
53pub fn compute_bin_counts<F>(
54 array: &ArrayView1<F>,
55 bins: &[f64],
56) -> Result<Vec<i32>, DataProfileError>
57where
58 F: Num + ndarray_stats::MaybeNan + std::marker::Send + Sync + Clone + Copy + num_traits::Float,
59 f64: From<F>,
60{
61 let mut bin_counts = vec![0; bins.len()];
63 let max_bin = bins.last().ok_or(DataProfileError::MaxBinError)?;
64
65 array.for_each(|datum| {
66 let val: f64 = datum.to_owned().into();
67
68 for (i, bin) in bins.iter().enumerate() {
70 if bin != max_bin {
71 if &val >= bin && val < bins[i + 1] {
73 bin_counts[i] += 1;
74 break;
75 }
76 continue;
77 } else if bin == max_bin {
78 if &val > bin {
79 bin_counts[i] += 1;
80 break;
81 }
82 continue;
83 } else {
84 continue;
85 }
86 }
87 });
88
89 Ok(bin_counts)
90}
91
92pub struct NumProfiler {}
93
94impl NumProfiler {
95 pub fn new() -> Self {
96 NumProfiler {}
97 }
98
99 pub fn compute_quantiles<F>(
109 &self,
110 array: &ArrayView2<F>,
111 ) -> Result<(Option<Array2<N64>>, bool), DataProfileError>
112 where
113 F: Num + ndarray_stats::MaybeNan + std::marker::Send + Sync + Clone + Copy + Float,
114 <F as ndarray_stats::MaybeNan>::NotNan: Clone,
115 <F as ndarray_stats::MaybeNan>::NotNan: Ord,
116 f64: From<F>,
117 {
118 if array.iter().any(|&x| x.is_nan() || x.is_infinite()) {
121 warn!("Array contains NaN or Inf values, skipping quantile computation");
122 return Ok((None, true));
123 }
124
125 let mut n64_array = array.mapv(|x| n64(f64::from(x)));
127
128 let qs = &[n64(0.25), n64(0.5), n64(0.75), n64(0.99)];
129 let quantiles = n64_array.quantiles_axis_mut(Axis(0), &aview1(qs), &Nearest)?;
130
131 Ok((Some(quantiles), false))
132 }
133
134 pub fn compute_mean<F>(&self, array: &ArrayView2<F>) -> Result<Array1<F>, DataProfileError>
144 where
145 F: FromPrimitive + Num + Clone,
146 {
147 let mean = array
148 .mean_axis(Axis(0))
149 .ok_or(DataProfileError::MeanError)?;
150
151 Ok(mean)
152 }
153
154 pub fn compute_stddev<F>(&self, array: &ArrayView2<F>) -> Result<Array1<F>, DataProfileError>
164 where
165 F: FromPrimitive + Num + Float,
166 {
167 let ddof = F::from(1.0).unwrap();
168 let stddev = array.std_axis(Axis(0), ddof);
169 Ok(stddev)
170 }
171
172 pub fn compute_min<F>(&self, array: &ArrayView2<F>) -> Result<Array1<F>, DataProfileError>
182 where
183 F: MaybeNan + Num + Clone,
184 <F as MaybeNan>::NotNan: Ord,
185 F: Into<f64>,
186 {
187 let min = array.map_axis(Axis(0), |a| a.min_skipnan().to_owned());
188 Ok(min)
189 }
190
191 pub fn compute_max<F>(&self, array: &ArrayView2<F>) -> Result<Array1<F>, DataProfileError>
201 where
202 F: MaybeNan + Num + Clone,
203 <F as MaybeNan>::NotNan: Ord,
204 F: Into<f64>,
205 {
206 let max = array.map_axis(Axis(0), |a| a.max_skipnan().to_owned());
207 Ok(max)
208 }
209
210 pub fn compute_distinct<F>(
220 &self,
221 array: &ArrayView2<F>,
222 ) -> Result<Vec<Distinct>, DataProfileError>
223 where
224 F: std::fmt::Display + Num,
225 {
226 let unique: Vec<Distinct> = array
227 .axis_iter(Axis(1))
228 .map(|x| {
229 let hash = x.iter().map(|x| x.to_string()).collect::<HashSet<String>>();
230 Distinct {
231 count: hash.len(),
232 percent: hash.len() as f64 / x.len() as f64,
233 }
234 })
235 .collect();
236
237 Ok(unique)
238 }
239
240 pub fn compute_histogram<F>(
241 &self,
242 array: &ArrayView2<F>,
243 features: &[String],
244 bin_size: &usize,
245 has_unsupported_types: bool,
246 ) -> Result<HashMap<String, Histogram>, DataProfileError>
247 where
248 F: Num
249 + ndarray_stats::MaybeNan
250 + std::marker::Send
251 + Sync
252 + Clone
253 + Copy
254 + num_traits::Float
255 + std::fmt::Debug,
256 f64: From<F>,
257 {
258 array
260 .axis_iter(Axis(1))
261 .into_par_iter()
262 .enumerate()
263 .map(|(idx, column)| {
264 if has_unsupported_types {
267 warn!(
268 "Skipping histogram computation for feature {} due to unsupported types",
269 features.get(idx).unwrap_or(&"Unknown".to_string())
270 );
271 return Ok((features[idx].clone(), Histogram::default()));
272 }
273
274 let bins = compute_bins(&column, bin_size).inspect_err(|e| {
275 error!(
276 error = %e,
277 feature = %features.get(idx).unwrap_or(&"Unknown".to_string()),
278 column = ?column,
279 bin_size = bin_size,
280 "Failed to compute bins"
281 );
282 })?;
283 let bin_counts = compute_bin_counts(&column, &bins).inspect_err(|e| {
284 error!(
285 error = %e,
286 feature = %features.get(idx).unwrap_or(&"Unknown".to_string()),
287 "Failed to compute bin counts"
288 );
289 })?;
290
291 Ok((features[idx].clone(), Histogram { bins, bin_counts }))
293 })
294 .collect()
295 }
296
297 pub fn compute_stats<F>(
307 &self,
308 features: &[String],
309 array: &ArrayView2<F>,
310 bin_size: &usize,
311 ) -> Result<Vec<FeatureProfile>, DataProfileError>
312 where
313 F: Float
314 + MaybeNan
315 + FromPrimitive
316 + std::fmt::Display
317 + Sync
318 + Send
319 + Num
320 + Clone
321 + std::fmt::Debug
322 + 'static,
323 F: Into<f64>,
324 <F as MaybeNan>::NotNan: Ord,
325 f64: From<F>,
326 <F as MaybeNan>::NotNan: Clone,
327 {
328 let means = self.compute_mean(array)?;
329
330 debug!("Computing stddev");
331 let stddevs = self.compute_stddev(array)?;
332
333 debug!("Computing quantiles");
334 let (quantiles, has_unsupported_types) = self.compute_quantiles(array)?;
335
336 debug!("Computing min");
337 let mins = self.compute_min(array)?;
338
339 debug!("Computing max");
340 let maxs = self.compute_max(array)?;
341
342 debug!("Computing distinct values");
343 let distinct = self.compute_distinct(array)?;
344
345 debug!("Computing histogram");
346 let hist = self.compute_histogram(array, features, bin_size, has_unsupported_types)?;
347
348 let mut profiles = Vec::new();
350 for i in 0..features.len() {
351 let mean = &means[i];
352 let stddev = &stddevs[i];
353 let min = &mins[i];
354 let max = &maxs[i];
355 let q25 = quantiles.as_ref().map(|q| q[[0, i]]);
356 let q50 = quantiles.as_ref().map(|q| q[[1, i]]);
357 let q75 = quantiles.as_ref().map(|q| q[[2, i]]);
358 let q99 = quantiles.as_ref().map(|q| q[[3, i]]);
359 let dist = &distinct[i];
360
361 let numeric_stats = NumericStats {
362 mean: f64::from(*mean),
363 stddev: f64::from(*stddev),
364 min: f64::from(*min),
365 max: f64::from(*max),
366
367 distinct: Distinct {
368 count: dist.count,
369 percent: dist.percent,
370 },
371 quantiles: Quantiles {
372 q25: q25.unwrap_or_default().to_f64().unwrap_or_default(),
373 q50: q50.unwrap_or_default().to_f64().unwrap_or_default(),
374 q75: q75.unwrap_or_default().to_f64().unwrap_or_default(),
375 q99: q99.unwrap_or_default().to_f64().unwrap_or_default(),
376 },
377 histogram: hist[&features[i]].clone(),
378 };
379
380 let profile = FeatureProfile {
381 id: features[i].clone(),
382 numeric_stats: Some(numeric_stats),
383 string_stats: None,
384 timestamp: chrono::Utc::now(),
385 correlations: None,
386 };
387
388 profiles.push(profile);
389 }
390
391 Ok(profiles)
392 }
393
394 pub fn process_num_array<F>(
395 &mut self,
396 compute_correlations: bool,
397 numeric_array: &ArrayView2<F>,
398 numeric_features: Vec<String>,
399 bin_size: usize,
400 ) -> Result<DataProfile, DataProfileError>
401 where
402 F: Float
403 + MaybeNan
404 + FromPrimitive
405 + std::fmt::Display
406 + Sync
407 + Send
408 + Num
409 + Clone
410 + std::fmt::Debug
411 + 'static,
412 F: Into<f64>,
413 <F as MaybeNan>::NotNan: Ord,
414 f64: From<F>,
415 <F as MaybeNan>::NotNan: Clone,
416 {
417 let profiles = self.compute_stats(&numeric_features, numeric_array, &bin_size)?;
418 let correlations = if compute_correlations {
419 let feature_names = numeric_features.clone();
420 let feature_correlations = compute_feature_correlations(numeric_array, &feature_names);
421
422 Some(feature_correlations)
425 } else {
426 None
427 };
428
429 let features: BTreeMap<String, FeatureProfile> = profiles
430 .iter()
431 .map(|profile| {
432 let mut profile = profile.clone();
433
434 if let Some(correlations) = correlations.as_ref() {
435 let correlation = correlations.get(&profile.id);
436 if let Some(correlation) = correlation {
437 profile.add_correlations(correlation.clone());
438 }
439 }
440
441 (profile.id.clone(), profile)
442 })
443 .collect();
444
445 Ok(DataProfile { features })
446 }
447}
448
449impl Default for NumProfiler {
450 fn default() -> Self {
451 NumProfiler::new()
452 }
453}
454
455#[cfg(test)]
456mod tests {
457
458 use super::*;
459 use ndarray::Array;
460 use ndarray::{concatenate, Axis};
461 use ndarray_rand::rand_distr::Uniform;
462 use ndarray_rand::RandomExt;
463
464 use approx::relative_eq;
465
466 #[test]
467 fn test_profile_creation_f64() {
468 let array1 = Array::random((1000, 1), Uniform::new(0., 1.));
470 let array2 = Array::random((1000, 1), Uniform::new(1., 2.));
471 let array3 = Array::random((1000, 1), Uniform::new(2., 3.));
472
473 let array = concatenate![Axis(1), array1, array2, array3];
474 let features = vec![
475 "feature_1".to_string(),
476 "feature_2".to_string(),
477 "feature_3".to_string(),
478 ];
479
480 let profiler = NumProfiler::default();
481 let bin_size = 20;
482
483 let profile = profiler
484 .compute_stats(&features, &array.view(), &bin_size)
485 .unwrap();
486
487 assert_eq!(profile.len(), 3);
488 assert_eq!(profile[0].id, "feature_1");
489 assert_eq!(profile[1].id, "feature_2");
490 assert_eq!(profile[2].id, "feature_3");
491
492 assert!(relative_eq!(
494 profile[0].numeric_stats.as_ref().unwrap().mean,
495 0.5,
496 epsilon = 0.1
497 ));
498 assert!(relative_eq!(
499 profile[1].numeric_stats.as_ref().unwrap().mean,
500 1.5,
501 epsilon = 0.1
502 ));
503 assert!(relative_eq!(
504 profile[2].numeric_stats.as_ref().unwrap().mean,
505 2.5,
506 epsilon = 0.1
507 ));
508
509 assert!(relative_eq!(
511 profile[0].numeric_stats.as_ref().unwrap().quantiles.q25,
512 0.25,
513 epsilon = 0.1
514 ));
515
516 assert!(relative_eq!(
517 profile[0].numeric_stats.as_ref().unwrap().quantiles.q50,
518 0.5,
519 epsilon = 0.1
520 ));
521 assert!(relative_eq!(
522 profile[0].numeric_stats.as_ref().unwrap().quantiles.q75,
523 0.75,
524 epsilon = 0.1
525 ));
526 assert!(relative_eq!(
527 profile[0].numeric_stats.as_ref().unwrap().quantiles.q99,
528 0.99,
529 epsilon = 0.1
530 ));
531 }
532
533 #[test]
534 fn test_profile_creation_f32() {
535 let array1 = Array::random((1000, 1), Uniform::new(0., 1.));
537 let array2 = Array::random((1000, 1), Uniform::new(1., 2.));
538 let array3 = Array::random((1000, 1), Uniform::new(2., 3.));
539
540 let array = concatenate![Axis(1), array1, array2, array3];
541 let features = vec![
542 "feature_1".to_string(),
543 "feature_2".to_string(),
544 "feature_3".to_string(),
545 ];
546
547 let array = array.mapv(|x| x as f32);
549 let bin_size = 20;
550
551 let profiler = NumProfiler::default();
552
553 let profile = profiler
554 .compute_stats(&features, &array.view(), &bin_size)
555 .unwrap();
556
557 assert_eq!(profile.len(), 3);
558 assert_eq!(profile[0].id, "feature_1");
559 assert_eq!(profile[1].id, "feature_2");
560 assert_eq!(profile[2].id, "feature_3");
561
562 assert!(relative_eq!(
564 profile[0].numeric_stats.as_ref().unwrap().mean,
565 0.5,
566 epsilon = 0.05
567 ));
568 assert!(relative_eq!(
569 profile[1].numeric_stats.as_ref().unwrap().mean,
570 1.5,
571 epsilon = 0.05
572 ));
573 assert!(relative_eq!(
574 profile[2].numeric_stats.as_ref().unwrap().mean,
575 2.5,
576 epsilon = 0.05
577 ));
578
579 assert!(relative_eq!(
581 profile[0].numeric_stats.as_ref().unwrap().quantiles.q25,
582 0.25,
583 epsilon = 0.05
584 ));
585 assert!(relative_eq!(
586 profile[0].numeric_stats.as_ref().unwrap().quantiles.q50,
587 0.5,
588 epsilon = 0.05
589 ));
590 assert!(relative_eq!(
591 profile[0].numeric_stats.as_ref().unwrap().quantiles.q75,
592 0.75,
593 epsilon = 0.05
594 ));
595 assert!(relative_eq!(
596 profile[0].numeric_stats.as_ref().unwrap().quantiles.q99,
597 0.99,
598 epsilon = 0.05
599 ));
600 }
601}