Skip to main content

alimentar/quality/
checks.rs

1//! Quality Issues and Checker
2//!
3//! Types for representing data quality issues and the checker that detects
4//! them.
5
6use std::collections::{HashMap, HashSet};
7
8use crate::{
9    dataset::{ArrowDataset, Dataset},
10    error::Result,
11};
12
13// ═══════════════════════════════════════════════════════════════════════════════
14// Quality Issues
15// ═══════════════════════════════════════════════════════════════════════════════
16
17/// Types of data quality issues
18#[derive(Debug, Clone, PartialEq)]
19pub enum QualityIssue {
20    /// Column has high percentage of null/missing values
21    HighNullRatio {
22        /// Column name
23        column: String,
24        /// Actual null ratio
25        null_ratio: f64,
26        /// Configured threshold
27        threshold: f64,
28    },
29    /// Column has high percentage of duplicate values
30    HighDuplicateRatio {
31        /// Column name
32        column: String,
33        /// Actual duplicate ratio
34        duplicate_ratio: f64,
35        /// Configured threshold
36        threshold: f64,
37    },
38    /// Column has very low cardinality (potential constant)
39    LowCardinality {
40        /// Column name
41        column: String,
42        /// Number of unique values
43        unique_count: usize,
44        /// Total row count
45        total_count: usize,
46    },
47    /// Column has potential outliers (IQR method)
48    OutliersDetected {
49        /// Column name
50        column: String,
51        /// Number of outliers
52        outlier_count: usize,
53        /// Ratio of outliers
54        outlier_ratio: f64,
55    },
56    /// Dataset has duplicate rows
57    DuplicateRows {
58        /// Number of duplicate rows
59        duplicate_count: usize,
60        /// Ratio of duplicate rows
61        duplicate_ratio: f64,
62    },
63    /// Column has constant value (zero variance)
64    ConstantColumn {
65        /// Column name
66        column: String,
67        /// The constant value
68        value: String,
69    },
70    /// Schema has no columns
71    EmptySchema,
72    /// Dataset is empty
73    EmptyDataset,
74}
75
76impl QualityIssue {
77    /// Get severity level (1-5, higher is worse)
78    pub fn severity(&self) -> u8 {
79        match self {
80            Self::EmptySchema | Self::EmptyDataset => 5,
81            Self::ConstantColumn { .. } => 4,
82            Self::HighNullRatio { null_ratio, .. } if *null_ratio > 0.5 => 4,
83            Self::HighNullRatio { .. } => 3,
84            Self::OutliersDetected { outlier_ratio, .. } if *outlier_ratio > 0.1 => 3,
85            Self::OutliersDetected { .. }
86            | Self::HighDuplicateRatio { .. }
87            | Self::DuplicateRows { .. } => 2,
88            Self::LowCardinality { .. } => 1,
89        }
90    }
91
92    /// Get column name if applicable
93    pub fn column(&self) -> Option<&str> {
94        match self {
95            Self::HighNullRatio { column, .. }
96            | Self::HighDuplicateRatio { column, .. }
97            | Self::LowCardinality { column, .. }
98            | Self::OutliersDetected { column, .. }
99            | Self::ConstantColumn { column, .. } => Some(column),
100            _ => None,
101        }
102    }
103}
104
105/// Quality statistics for a single column
106#[derive(Debug, Clone)]
107pub struct ColumnQuality {
108    /// Column name
109    pub name: String,
110    /// Total row count
111    pub total_count: usize,
112    /// Null/missing count
113    pub null_count: usize,
114    /// Null ratio (0-1)
115    pub null_ratio: f64,
116    /// Number of unique values
117    pub unique_count: usize,
118    /// Unique ratio (unique/total)
119    pub unique_ratio: f64,
120    /// Number of duplicate values (non-unique occurrences)
121    pub duplicate_count: usize,
122    /// Duplicate ratio
123    pub duplicate_ratio: f64,
124    /// Number of outliers (for numeric columns)
125    pub outlier_count: Option<usize>,
126    /// Basic stats for numeric columns
127    pub numeric_stats: Option<NumericStats>,
128}
129
130impl ColumnQuality {
131    /// Check if column is constant (single unique value)
132    pub fn is_constant(&self) -> bool {
133        self.unique_count <= 1 && self.total_count > 0
134    }
135
136    /// Check if column is mostly null
137    pub fn is_mostly_null(&self, threshold: f64) -> bool {
138        self.null_ratio >= threshold
139    }
140}
141
142/// Basic statistics for numeric columns
143#[derive(Debug, Clone)]
144pub struct NumericStats {
145    /// Minimum value
146    pub min: f64,
147    /// Maximum value
148    pub max: f64,
149    /// Mean value
150    pub mean: f64,
151    /// Standard deviation
152    pub std_dev: f64,
153    /// 25th percentile (Q1)
154    pub q1: f64,
155    /// 50th percentile (median)
156    pub median: f64,
157    /// 75th percentile (Q3)
158    pub q3: f64,
159}
160
161impl NumericStats {
162    /// Calculate IQR (Interquartile Range)
163    pub fn iqr(&self) -> f64 {
164        self.q3 - self.q1
165    }
166
167    /// Get lower bound for outliers (Q1 - 1.5*IQR)
168    pub fn outlier_lower_bound(&self) -> f64 {
169        self.q1 - 1.5 * self.iqr()
170    }
171
172    /// Get upper bound for outliers (Q3 + 1.5*IQR)
173    pub fn outlier_upper_bound(&self) -> f64 {
174        self.q3 + 1.5 * self.iqr()
175    }
176}
177
178/// Overall data quality report
179#[derive(Debug, Clone)]
180pub struct QualityReport {
181    /// Total row count
182    pub row_count: usize,
183    /// Total column count
184    pub column_count: usize,
185    /// Per-column quality statistics
186    pub columns: HashMap<String, ColumnQuality>,
187    /// Detected issues
188    pub issues: Vec<QualityIssue>,
189    /// Overall quality score (0-100)
190    pub score: f64,
191    /// Number of duplicate rows
192    pub duplicate_row_count: usize,
193}
194
195impl QualityReport {
196    /// Check if any issues were found
197    pub fn has_issues(&self) -> bool {
198        !self.issues.is_empty()
199    }
200
201    /// Get issues for a specific column
202    pub fn column_issues(&self, column: &str) -> Vec<&QualityIssue> {
203        self.issues
204            .iter()
205            .filter(|i| i.column() == Some(column))
206            .collect()
207    }
208
209    /// Get maximum severity among all issues
210    pub fn max_severity(&self) -> u8 {
211        self.issues.iter().map(|i| i.severity()).max().unwrap_or(0)
212    }
213
214    /// Get columns with issues
215    pub fn problematic_columns(&self) -> Vec<&str> {
216        self.issues
217            .iter()
218            .filter_map(|i| i.column())
219            .collect::<HashSet<_>>()
220            .into_iter()
221            .collect()
222    }
223}
224
225/// Configuration thresholds for quality checking
226#[derive(Debug, Clone)]
227pub struct QualityThresholds {
228    /// Maximum acceptable null ratio (default: 0.1)
229    pub max_null_ratio: f64,
230    /// Maximum acceptable duplicate ratio (default: 0.5)
231    pub max_duplicate_ratio: f64,
232    /// Minimum cardinality to not flag as low (default: 2)
233    pub min_cardinality: usize,
234    /// Maximum outlier ratio to report (default: 0.05)
235    pub max_outlier_ratio: f64,
236    /// Maximum duplicate row ratio (default: 0.01)
237    pub max_duplicate_row_ratio: f64,
238}
239
240impl Default for QualityThresholds {
241    fn default() -> Self {
242        Self {
243            max_null_ratio: 0.1,
244            max_duplicate_ratio: 0.5,
245            min_cardinality: 2,
246            max_outlier_ratio: 0.05,
247            max_duplicate_row_ratio: 0.01,
248        }
249    }
250}
251
252/// Data quality checker
253pub struct QualityChecker {
254    pub(crate) thresholds: QualityThresholds,
255    pub(crate) check_outliers: bool,
256    pub(crate) check_duplicates: bool,
257}
258
259impl Default for QualityChecker {
260    fn default() -> Self {
261        Self::new()
262    }
263}
264
265impl QualityChecker {
266    /// Create a new quality checker with default thresholds
267    pub fn new() -> Self {
268        Self {
269            thresholds: QualityThresholds::default(),
270            check_outliers: true,
271            check_duplicates: true,
272        }
273    }
274
275    /// Set maximum null ratio threshold
276    #[must_use]
277    pub fn max_null_ratio(mut self, ratio: f64) -> Self {
278        self.thresholds.max_null_ratio = ratio;
279        self
280    }
281
282    /// Set maximum duplicate ratio threshold
283    #[must_use]
284    pub fn max_duplicate_ratio(mut self, ratio: f64) -> Self {
285        self.thresholds.max_duplicate_ratio = ratio;
286        self
287    }
288
289    /// Set minimum cardinality threshold
290    #[must_use]
291    pub fn min_cardinality(mut self, min: usize) -> Self {
292        self.thresholds.min_cardinality = min;
293        self
294    }
295
296    /// Set maximum outlier ratio threshold
297    #[must_use]
298    pub fn max_outlier_ratio(mut self, ratio: f64) -> Self {
299        self.thresholds.max_outlier_ratio = ratio;
300        self
301    }
302
303    /// Enable/disable outlier checking
304    #[must_use]
305    pub fn with_outlier_check(mut self, enabled: bool) -> Self {
306        self.check_outliers = enabled;
307        self
308    }
309
310    /// Enable/disable duplicate row checking
311    #[must_use]
312    pub fn with_duplicate_check(mut self, enabled: bool) -> Self {
313        self.check_duplicates = enabled;
314        self
315    }
316
317    /// Check dataset quality
318    pub fn check(&self, dataset: &ArrowDataset) -> Result<QualityReport> {
319        let schema = dataset.schema();
320        let mut issues = Vec::new();
321
322        // Check for empty schema
323        if schema.fields().is_empty() {
324            issues.push(QualityIssue::EmptySchema);
325            return Ok(QualityReport {
326                row_count: 0,
327                column_count: 0,
328                columns: HashMap::new(),
329                issues,
330                score: 0.0,
331                duplicate_row_count: 0,
332            });
333        }
334
335        // Collect all data
336        let (column_data, row_count) = self.collect_data(dataset);
337
338        // Check for empty dataset
339        if row_count == 0 {
340            issues.push(QualityIssue::EmptyDataset);
341            return Ok(QualityReport {
342                row_count: 0,
343                column_count: schema.fields().len(),
344                columns: HashMap::new(),
345                issues,
346                score: 0.0,
347                duplicate_row_count: 0,
348            });
349        }
350
351        // Analyze each column
352        let mut columns = HashMap::new();
353        for (col_name, values) in &column_data {
354            let quality = self.analyze_column(col_name, values, row_count);
355
356            // Check for issues
357            if quality.null_ratio > self.thresholds.max_null_ratio {
358                issues.push(QualityIssue::HighNullRatio {
359                    column: col_name.clone(),
360                    null_ratio: quality.null_ratio,
361                    threshold: self.thresholds.max_null_ratio,
362                });
363            }
364
365            if quality.duplicate_ratio > self.thresholds.max_duplicate_ratio {
366                issues.push(QualityIssue::HighDuplicateRatio {
367                    column: col_name.clone(),
368                    duplicate_ratio: quality.duplicate_ratio,
369                    threshold: self.thresholds.max_duplicate_ratio,
370                });
371            }
372
373            if quality.unique_count < self.thresholds.min_cardinality && row_count > 1 {
374                issues.push(QualityIssue::LowCardinality {
375                    column: col_name.clone(),
376                    unique_count: quality.unique_count,
377                    total_count: row_count,
378                });
379            }
380
381            if quality.is_constant() {
382                let value = values
383                    .iter()
384                    .find(|v| v.is_some())
385                    .map(|v| v.clone().unwrap_or_default())
386                    .unwrap_or_default();
387                issues.push(QualityIssue::ConstantColumn {
388                    column: col_name.clone(),
389                    value,
390                });
391            }
392
393            if let Some(outlier_count) = quality.outlier_count {
394                let outlier_ratio = outlier_count as f64 / row_count as f64;
395                if outlier_ratio > self.thresholds.max_outlier_ratio {
396                    issues.push(QualityIssue::OutliersDetected {
397                        column: col_name.clone(),
398                        outlier_count,
399                        outlier_ratio,
400                    });
401                }
402            }
403
404            columns.insert(col_name.clone(), quality);
405        }
406
407        // Check for duplicate rows
408        let duplicate_row_count = if self.check_duplicates {
409            self.count_duplicate_rows(&column_data, row_count)
410        } else {
411            0
412        };
413
414        let duplicate_row_ratio = duplicate_row_count as f64 / row_count as f64;
415        if duplicate_row_ratio > self.thresholds.max_duplicate_row_ratio {
416            issues.push(QualityIssue::DuplicateRows {
417                duplicate_count: duplicate_row_count,
418                duplicate_ratio: duplicate_row_ratio,
419            });
420        }
421
422        // Calculate quality score
423        let score = self.calculate_score(&columns, &issues, row_count);
424
425        Ok(QualityReport {
426            row_count,
427            column_count: schema.fields().len(),
428            columns,
429            issues,
430            score,
431            duplicate_row_count,
432        })
433    }
434
435    /// Collect data from dataset as strings for analysis
436    pub(crate) fn collect_data(
437        &self,
438        dataset: &ArrowDataset,
439    ) -> (HashMap<String, Vec<Option<String>>>, usize) {
440        use arrow::array::{
441            Array, BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, ListArray,
442            StringArray, StructArray,
443        };
444
445        let schema = dataset.schema();
446        let mut data: HashMap<String, Vec<Option<String>>> = HashMap::new();
447        let mut row_count = 0;
448
449        for field in schema.fields() {
450            data.insert(field.name().clone(), Vec::new());
451        }
452
453        for batch in dataset.iter() {
454            row_count += batch.num_rows();
455
456            for (col_idx, field) in schema.fields().iter().enumerate() {
457                if let Some(col_data) = data.get_mut(field.name()) {
458                    let array = batch.column(col_idx);
459
460                    for i in 0..array.len() {
461                        if array.is_null(i) {
462                            col_data.push(None);
463                        } else if let Some(arr) = array.as_any().downcast_ref::<StringArray>() {
464                            col_data.push(Some(arr.value(i).to_string()));
465                        } else if let Some(arr) = array.as_any().downcast_ref::<Int32Array>() {
466                            col_data.push(Some(arr.value(i).to_string()));
467                        } else if let Some(arr) = array.as_any().downcast_ref::<Int64Array>() {
468                            col_data.push(Some(arr.value(i).to_string()));
469                        } else if let Some(arr) = array.as_any().downcast_ref::<Float64Array>() {
470                            col_data.push(Some(arr.value(i).to_string()));
471                        } else if let Some(arr) = array.as_any().downcast_ref::<Float32Array>() {
472                            // GH-013: Support Float32
473                            col_data.push(Some(arr.value(i).to_string()));
474                        } else if let Some(arr) = array.as_any().downcast_ref::<BooleanArray>() {
475                            // GH-013: Support Boolean
476                            col_data.push(Some(arr.value(i).to_string()));
477                        } else if let Some(arr) = array.as_any().downcast_ref::<ListArray>() {
478                            // GH-013: Serialize list to comparable string representation
479                            col_data.push(Some(Self::serialize_list_value(arr, i)));
480                        } else if let Some(arr) = array.as_any().downcast_ref::<StructArray>() {
481                            // GH-013: Serialize struct to comparable string representation
482                            col_data.push(Some(Self::serialize_struct_value(arr, i)));
483                        } else {
484                            col_data.push(Some("?".to_string()));
485                        }
486                    }
487                }
488            }
489        }
490
491        (data, row_count)
492    }
493
494    /// Serialize a list value at index to a comparable string (GH-013)
495    pub(crate) fn serialize_list_value(arr: &arrow::array::ListArray, idx: usize) -> String {
496        use arrow::array::Array;
497
498        let values = arr.value(idx);
499        let mut parts = Vec::new();
500
501        for i in 0..values.len() {
502            if values.is_null(i) {
503                parts.push("null".to_string());
504            } else {
505                // Recursively serialize the element
506                parts.push(Self::serialize_array_value(&values, i));
507            }
508        }
509
510        format!("[{}]", parts.join(","))
511    }
512
513    /// Serialize a struct value at index to a comparable string (GH-013)
514    pub(crate) fn serialize_struct_value(arr: &arrow::array::StructArray, idx: usize) -> String {
515        use arrow::array::Array;
516
517        let mut parts = Vec::new();
518
519        for (field_idx, field) in arr.fields().iter().enumerate() {
520            let col = arr.column(field_idx);
521            let value = if col.is_null(idx) {
522                "null".to_string()
523            } else {
524                Self::serialize_array_value(col, idx)
525            };
526            parts.push(format!("{}:{}", field.name(), value));
527        }
528
529        format!("{{{}}}", parts.join(","))
530    }
531
532    /// Serialize any array value at index to a string (GH-013)
533    pub(crate) fn serialize_array_value(arr: &dyn arrow::array::Array, idx: usize) -> String {
534        use arrow::array::{
535            BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, ListArray,
536            StringArray, StructArray,
537        };
538
539        if arr.is_null(idx) {
540            return "null".to_string();
541        }
542
543        if let Some(a) = arr.as_any().downcast_ref::<StringArray>() {
544            format!("\"{}\"", a.value(idx))
545        } else if let Some(a) = arr.as_any().downcast_ref::<Int32Array>() {
546            a.value(idx).to_string()
547        } else if let Some(a) = arr.as_any().downcast_ref::<Int64Array>() {
548            a.value(idx).to_string()
549        } else if let Some(a) = arr.as_any().downcast_ref::<Float64Array>() {
550            a.value(idx).to_string()
551        } else if let Some(a) = arr.as_any().downcast_ref::<Float32Array>() {
552            a.value(idx).to_string()
553        } else if let Some(a) = arr.as_any().downcast_ref::<BooleanArray>() {
554            a.value(idx).to_string()
555        } else if let Some(a) = arr.as_any().downcast_ref::<ListArray>() {
556            Self::serialize_list_value(a, idx)
557        } else if let Some(a) = arr.as_any().downcast_ref::<StructArray>() {
558            Self::serialize_struct_value(a, idx)
559        } else {
560            "?".to_string()
561        }
562    }
563
564    /// Analyze a single column
565    pub(crate) fn analyze_column(
566        &self,
567        name: &str,
568        values: &[Option<String>],
569        total_count: usize,
570    ) -> ColumnQuality {
571        let null_count = values.iter().filter(|v| v.is_none()).count();
572        let null_ratio = if total_count > 0 {
573            null_count as f64 / total_count as f64
574        } else {
575            0.0
576        };
577
578        // Count unique values
579        let non_null_values: Vec<&str> = values.iter().filter_map(|v| v.as_deref()).collect();
580        let unique_set: HashSet<&str> = non_null_values.iter().copied().collect();
581        let unique_count = unique_set.len();
582        let unique_ratio = if !non_null_values.is_empty() {
583            unique_count as f64 / non_null_values.len() as f64
584        } else {
585            0.0
586        };
587
588        // Calculate duplicates
589        let duplicate_count = non_null_values.len().saturating_sub(unique_count);
590        let duplicate_ratio = if !non_null_values.is_empty() {
591            duplicate_count as f64 / non_null_values.len() as f64
592        } else {
593            0.0
594        };
595
596        // Try to parse as numeric for outlier detection
597        let (outlier_count, numeric_stats) = if self.check_outliers {
598            self.analyze_numeric(&non_null_values)
599        } else {
600            (None, None)
601        };
602
603        ColumnQuality {
604            name: name.to_string(),
605            total_count,
606            null_count,
607            null_ratio,
608            unique_count,
609            unique_ratio,
610            duplicate_count,
611            duplicate_ratio,
612            outlier_count,
613            numeric_stats,
614        }
615    }
616
617    /// Analyze numeric column for outliers and stats
618    pub(crate) fn analyze_numeric(&self, values: &[&str]) -> (Option<usize>, Option<NumericStats>) {
619        let numeric_values: Vec<f64> = values
620            .iter()
621            .filter_map(|v| v.parse::<f64>().ok())
622            .filter(|v| v.is_finite())
623            .collect();
624
625        if numeric_values.len() < 4 {
626            return (None, None);
627        }
628
629        let mut sorted = numeric_values.clone();
630        sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
631
632        let n = sorted.len();
633        let min = sorted[0];
634        let max = sorted[n - 1];
635        let mean = numeric_values.iter().sum::<f64>() / n as f64;
636
637        let variance = numeric_values
638            .iter()
639            .map(|v| (v - mean).powi(2))
640            .sum::<f64>()
641            / n as f64;
642        let std_dev = variance.sqrt();
643
644        let q1 = sorted[n / 4];
645        let median = sorted[n / 2];
646        let q3 = sorted[3 * n / 4];
647
648        let stats = NumericStats {
649            min,
650            max,
651            mean,
652            std_dev,
653            q1,
654            median,
655            q3,
656        };
657
658        // Count outliers using IQR method
659        let lower = stats.outlier_lower_bound();
660        let upper = stats.outlier_upper_bound();
661        let outlier_count = numeric_values
662            .iter()
663            .filter(|&&v| v < lower || v > upper)
664            .count();
665
666        (Some(outlier_count), Some(stats))
667    }
668
669    /// Count duplicate rows
670    pub(crate) fn count_duplicate_rows(
671        &self,
672        data: &HashMap<String, Vec<Option<String>>>,
673        row_count: usize,
674    ) -> usize {
675        if data.is_empty() || row_count == 0 {
676            return 0;
677        }
678
679        // Build row hashes
680        let mut row_set: HashSet<String> = HashSet::new();
681        let mut duplicates = 0;
682
683        let columns: Vec<&String> = data.keys().collect();
684
685        for i in 0..row_count {
686            let row_key: String = columns
687                .iter()
688                .map(|col| {
689                    data.get(*col)
690                        .and_then(|v| v.get(i))
691                        .map(|v| v.clone().unwrap_or_else(|| "NULL".to_string()))
692                        .unwrap_or_else(|| "NULL".to_string())
693                })
694                .collect::<Vec<_>>()
695                .join("|");
696
697            if !row_set.insert(row_key) {
698                duplicates += 1;
699            }
700        }
701
702        duplicates
703    }
704
705    /// Calculate quality score (0-100)
706    pub(crate) fn calculate_score(
707        &self,
708        columns: &HashMap<String, ColumnQuality>,
709        issues: &[QualityIssue],
710        row_count: usize,
711    ) -> f64 {
712        if row_count == 0 || columns.is_empty() {
713            return 0.0;
714        }
715
716        let mut score = 100.0;
717
718        // Deduct for null ratios
719        let avg_null_ratio: f64 =
720            columns.values().map(|c| c.null_ratio).sum::<f64>() / columns.len() as f64;
721        score -= avg_null_ratio * 30.0;
722
723        // Deduct for issues
724        for issue in issues {
725            score -= match issue.severity() {
726                5 => 25.0,
727                4 => 15.0,
728                3 => 10.0,
729                2 => 5.0,
730                1 => 2.0,
731                _ => 0.0,
732            };
733        }
734
735        score.clamp(0.0, 100.0)
736    }
737}
738
739/// Statistics for a text (string) column — useful for ML classification audits.
740#[derive(Debug, Clone)]
741pub struct TextColumnStats {
742    /// Minimum character length
743    pub min_len: usize,
744    /// Maximum character length
745    pub max_len: usize,
746    /// Mean character length
747    pub mean_len: f64,
748    /// Median character length (P50)
749    pub p50_len: usize,
750    /// 95th percentile character length
751    pub p95_len: usize,
752    /// 99th percentile character length
753    pub p99_len: usize,
754    /// Number of empty or whitespace-only strings
755    pub empty_count: usize,
756    /// Number of strings matching a preamble pattern (e.g., "#!/")
757    pub preamble_count: usize,
758    /// Total valid (non-null) strings
759    pub total: usize,
760}
761
762impl TextColumnStats {
763    /// Compute text column statistics from an ArrowDataset.
764    ///
765    /// # Arguments
766    /// * `dataset` - Source dataset
767    /// * `column` - Name of the string column
768    /// * `preamble_prefix` - Optional prefix to count as "preamble" (e.g.,
769    ///   "#!/")
770    ///
771    /// # Errors
772    /// Returns error if column not found or not a string type.
773    #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
774    pub fn from_dataset(
775        dataset: &crate::ArrowDataset,
776        column: &str,
777        preamble_prefix: Option<&str>,
778    ) -> crate::Result<Self> {
779        use arrow::array::{Array, StringArray};
780
781        use crate::Dataset;
782
783        let schema = dataset.schema();
784        let col_idx = schema
785            .fields()
786            .iter()
787            .position(|f| f.name() == column)
788            .ok_or_else(|| crate::Error::invalid_config(format!("Column '{column}' not found")))?;
789
790        let mut lengths: Vec<usize> = Vec::with_capacity(dataset.len());
791        let mut empty_count = 0usize;
792        let mut preamble_count = 0usize;
793
794        for batch in dataset.iter() {
795            let array = batch.column(col_idx);
796            let str_arr = array
797                .as_any()
798                .downcast_ref::<StringArray>()
799                .ok_or_else(|| {
800                    crate::Error::invalid_config(format!("Column '{column}' is not a string type"))
801                })?;
802
803            for i in 0..str_arr.len() {
804                if str_arr.is_null(i) {
805                    continue;
806                }
807                let val = str_arr.value(i);
808                let len = val.len();
809                lengths.push(len);
810
811                if val.trim().is_empty() {
812                    empty_count += 1;
813                }
814                if let Some(prefix) = preamble_prefix {
815                    if val.starts_with(prefix) {
816                        preamble_count += 1;
817                    }
818                }
819            }
820        }
821
822        if lengths.is_empty() {
823            return Ok(Self {
824                min_len: 0,
825                max_len: 0,
826                mean_len: 0.0,
827                p50_len: 0,
828                p95_len: 0,
829                p99_len: 0,
830                empty_count: 0,
831                preamble_count: 0,
832                total: 0,
833            });
834        }
835
836        lengths.sort_unstable();
837        let total = lengths.len();
838        let min_len = lengths[0];
839        let max_len = lengths[total - 1];
840        let mean_len = lengths.iter().sum::<usize>() as f64 / total as f64;
841        let p50_len = lengths[total / 2];
842        let p95_len = lengths[(total as f64 * 0.95) as usize];
843        let p99_len = lengths[(total as f64 * 0.99).min((total - 1) as f64) as usize];
844
845        Ok(Self {
846            min_len,
847            max_len,
848            mean_len,
849            p50_len,
850            p95_len,
851            p99_len,
852            empty_count,
853            preamble_count,
854            total,
855        })
856    }
857}