Skip to main content

dataprof_runtime/
profile_builder.rs

1//! Shared conversion from [`StreamingColumnCollection`] / [`StreamingStatistics`]
2//! into [`ColumnProfile`] and quality-check sample maps.
3//!
4//! All engines that need to produce a [`ColumnProfile`] should call
5//! [`build_column_profile`] instead of constructing one manually.
6//! This ensures consistent stats calculation and pattern detection.
7
8use std::collections::HashMap;
9
10use dataprof_core::{
11    BooleanStats, ColumnProfile, ColumnStats, DataType, DateTimeStats, SemanticHints, TextStats,
12};
13use dataprof_metrics::{
14    analysis::inference::{is_null_like_token, parse_strict_boolean_token},
15    analysis::patterns::looks_like_date,
16    calculate_datetime_stats, calculate_numeric_stats, calculate_text_stats, detect_patterns,
17};
18
19use crate::streaming_stats::{StreamingColumnCollection, StreamingStatistics};
20
21/// Inputs that every engine can provide for centralized profile construction.
22pub struct ColumnProfileInput<'a> {
23    pub name: String,
24    pub data_type: DataType,
25    pub total_count: usize,
26    pub null_count: usize,
27    pub unique_count: Option<usize>,
28    pub sample_values: &'a [String],
29    /// Pre-computed text lengths for engines that track them incrementally.
30    /// When `Some`, text stats are built from these instead of re-scanning samples.
31    pub text_lengths: Option<TextLengths>,
32    /// Pre-computed boolean counts (true_count, false_count) for boolean columns.
33    pub boolean_counts: Option<(usize, usize)>,
34    /// When true, skip statistics computation (produce `ColumnStats::None`).
35    pub skip_statistics: bool,
36    /// When true, skip pattern detection (produce empty patterns vec).
37    pub skip_patterns: bool,
38    /// Optional locale for pattern detection (e.g. "IT", "US").
39    pub locale: Option<&'a str>,
40}
41
42/// Pre-computed text length stats from streaming/columnar engines.
43pub struct TextLengths {
44    pub min_length: usize,
45    pub max_length: usize,
46    pub avg_length: f64,
47}
48
49/// Build a [`ColumnProfile`] from engine-agnostic inputs.
50///
51/// This is the single canonical construction path. Engines provide
52/// pre-inferred `DataType`, counters, sample values, and optionally
53/// pre-computed text lengths; this function handles stats calculation
54/// and pattern detection.
55pub fn build_column_profile(input: ColumnProfileInput<'_>) -> ColumnProfile {
56    let stats = if input.skip_statistics {
57        ColumnStats::None
58    } else {
59        match input.data_type {
60            DataType::Integer | DataType::Float => calculate_numeric_stats(input.sample_values),
61            DataType::Date => {
62                if !input.sample_values.is_empty() {
63                    calculate_datetime_stats(input.sample_values)
64                } else if let Some(tl) = &input.text_lengths {
65                    ColumnStats::Text(TextStats::from_lengths(
66                        tl.min_length,
67                        tl.max_length,
68                        tl.avg_length,
69                    ))
70                } else {
71                    ColumnStats::DateTime(DateTimeStats::empty())
72                }
73            }
74            DataType::Boolean => {
75                let (true_count, false_count) = input.boolean_counts.unwrap_or_else(|| {
76                    let tc = input
77                        .sample_values
78                        .iter()
79                        .filter(|v| parse_strict_boolean_token(v.trim()) == Some(true))
80                        .count();
81                    let fc = input
82                        .sample_values
83                        .iter()
84                        .filter(|v| parse_strict_boolean_token(v.trim()) == Some(false))
85                        .count();
86                    (tc, fc)
87                });
88                let total = true_count + false_count;
89                let true_ratio = if total > 0 {
90                    true_count as f64 / total as f64
91                } else {
92                    0.0
93                };
94                ColumnStats::Boolean(BooleanStats {
95                    true_count,
96                    false_count,
97                    true_ratio,
98                })
99            }
100            DataType::String | DataType::Identifier => {
101                if let Some(tl) = &input.text_lengths {
102                    ColumnStats::Text(TextStats::from_lengths(
103                        tl.min_length,
104                        tl.max_length,
105                        tl.avg_length,
106                    ))
107                } else {
108                    calculate_text_stats(input.sample_values)
109                }
110            }
111        }
112    };
113
114    let patterns = if input.skip_patterns {
115        Vec::new()
116    } else {
117        detect_patterns(input.sample_values, input.locale)
118    };
119
120    ColumnProfile {
121        name: input.name,
122        data_type: input.data_type,
123        null_count: input.null_count,
124        total_count: input.total_count,
125        unique_count: input.unique_count,
126        stats,
127        patterns,
128    }
129}
130
131/// Convert all columns in a [`StreamingColumnCollection`] into [`ColumnProfile`]s.
132pub fn profiles_from_streaming(
133    column_stats: &StreamingColumnCollection,
134    skip_statistics: bool,
135    skip_patterns: bool,
136    locale: Option<&str>,
137) -> Vec<ColumnProfile> {
138    profiles_from_streaming_with_hints(
139        column_stats,
140        skip_statistics,
141        skip_patterns,
142        locale,
143        &SemanticHints::default(),
144    )
145}
146
147/// Convert all columns into [`ColumnProfile`]s while applying semantic hints.
148pub fn profiles_from_streaming_with_hints(
149    column_stats: &StreamingColumnCollection,
150    skip_statistics: bool,
151    skip_patterns: bool,
152    locale: Option<&str>,
153    semantic_hints: &SemanticHints,
154) -> Vec<ColumnProfile> {
155    let mut profiles = Vec::new();
156
157    for column_name in column_stats.column_names() {
158        if let Some(stats) = column_stats.get_column_stats(&column_name) {
159            let profile = profile_from_stats_with_hints(
160                &column_name,
161                stats,
162                skip_statistics,
163                skip_patterns,
164                locale,
165                semantic_hints,
166            );
167            profiles.push(profile);
168        }
169    }
170
171    profiles
172}
173
174/// Convert a single column's [`StreamingStatistics`] into a [`ColumnProfile`].
175pub fn profile_from_stats(
176    name: &str,
177    stats: &StreamingStatistics,
178    skip_statistics: bool,
179    skip_patterns: bool,
180    locale: Option<&str>,
181) -> ColumnProfile {
182    profile_from_stats_with_hints(
183        name,
184        stats,
185        skip_statistics,
186        skip_patterns,
187        locale,
188        &SemanticHints::default(),
189    )
190}
191
192/// Convert a single column into a [`ColumnProfile`] while applying semantic hints.
193pub fn profile_from_stats_with_hints(
194    name: &str,
195    stats: &StreamingStatistics,
196    skip_statistics: bool,
197    skip_patterns: bool,
198    locale: Option<&str>,
199    semantic_hints: &SemanticHints,
200) -> ColumnProfile {
201    let data_type = if semantic_hints.is_identifier_column(name) {
202        DataType::Identifier
203    } else {
204        infer_data_type_streaming(stats)
205    };
206    let text_stats = stats.text_length_stats();
207
208    build_column_profile(ColumnProfileInput {
209        name: name.to_string(),
210        data_type,
211        total_count: stats.count,
212        null_count: stats.null_count,
213        unique_count: Some(stats.unique_count()),
214        sample_values: stats.sample_values(),
215        text_lengths: Some(TextLengths {
216            min_length: text_stats.min_length,
217            max_length: text_stats.max_length,
218            avg_length: text_stats.avg_length,
219        }),
220        boolean_counts: None,
221        skip_statistics,
222        skip_patterns,
223        locale,
224    })
225}
226
227/// Infer [`DataType`] from [`StreamingStatistics`] sample values.
228pub fn infer_data_type_streaming(stats: &StreamingStatistics) -> DataType {
229    if stats.min.is_finite() && stats.max.is_finite() {
230        let sample_values = stats.sample_values();
231        let non_empty: Vec<&String> = sample_values
232            .iter()
233            .filter(|s| !is_null_like_token(s.trim()))
234            .collect();
235
236        if !non_empty.is_empty() {
237            let all_integers = non_empty.iter().all(|s| s.parse::<i64>().is_ok());
238            if all_integers {
239                return DataType::Integer;
240            }
241
242            let numeric_count = non_empty
243                .iter()
244                .filter(|s| s.parse::<f64>().is_ok())
245                .count();
246            if numeric_count as f64 / non_empty.len() as f64 > 0.8 {
247                return DataType::Float;
248            }
249        }
250    }
251
252    let sample_values = stats.sample_values();
253    let non_empty: Vec<&String> = sample_values
254        .iter()
255        .filter(|s| !is_null_like_token(s.trim()))
256        .collect();
257
258    if !non_empty.is_empty() {
259        let date_like_count = non_empty
260            .iter()
261            .take(100)
262            .filter(|s| looks_like_date(s))
263            .count();
264
265        if date_like_count as f64 / non_empty.len().min(100) as f64 > 0.7 {
266            return DataType::Date;
267        }
268
269        let bool_count = non_empty
270            .iter()
271            .filter(|s| parse_strict_boolean_token(s.trim()).is_some())
272            .count();
273
274        if bool_count as f64 / non_empty.len() as f64 >= 0.9 {
275            return DataType::Boolean;
276        }
277    }
278
279    DataType::String
280}
281
282/// Build a sample `HashMap` from a [`StreamingColumnCollection`] suitable for
283/// `QualityMetrics::calculate_from_data()`.
284pub fn quality_check_samples(
285    column_stats: &StreamingColumnCollection,
286) -> HashMap<String, Vec<String>> {
287    let mut samples = HashMap::new();
288
289    for column_name in column_stats.column_names() {
290        if let Some(stats) = column_stats.get_column_stats(&column_name) {
291            let sample_values: Vec<String> = stats.sample_values().to_vec();
292            samples.insert(column_name, sample_values);
293        }
294    }
295
296    samples
297}
298
299#[cfg(test)]
300mod tests {
301    use super::*;
302    use crate::streaming_stats::StreamingColumnCollection;
303
304    #[test]
305    fn test_profiles_from_streaming() {
306        let mut collection = StreamingColumnCollection::new();
307        let headers = vec!["name".to_string(), "age".to_string()];
308
309        collection.process_record(&headers, vec!["Alice".to_string(), "30".to_string()]);
310        collection.process_record(&headers, vec!["Bob".to_string(), "25".to_string()]);
311        collection.process_record(&headers, vec!["Charlie".to_string(), "35".to_string()]);
312
313        let profiles = profiles_from_streaming(&collection, false, false, None);
314        assert_eq!(profiles.len(), 2);
315
316        let age = profiles.iter().find(|p| p.name == "age").unwrap();
317        assert_eq!(age.data_type, DataType::Integer);
318        assert_eq!(age.total_count, 3);
319    }
320
321    #[test]
322    fn test_quality_check_samples() {
323        let mut collection = StreamingColumnCollection::new();
324        let headers = vec!["col".to_string()];
325
326        collection.process_record(&headers, vec!["val1".to_string()]);
327        collection.process_record(&headers, vec!["val2".to_string()]);
328
329        let samples = quality_check_samples(&collection);
330        assert!(samples.contains_key("col"));
331        assert_eq!(samples["col"].len(), 2);
332    }
333
334    #[test]
335    fn test_boolean_stats_with_counts() {
336        let samples = vec!["True".to_string(), "False".to_string(), "True".to_string()];
337        let profile = build_column_profile(ColumnProfileInput {
338            name: "flag".to_string(),
339            data_type: DataType::Boolean,
340            total_count: 3,
341            null_count: 0,
342            unique_count: Some(2),
343            sample_values: &samples,
344            text_lengths: None,
345            boolean_counts: Some((2, 1)),
346            skip_statistics: false,
347            skip_patterns: false,
348            locale: None,
349        });
350
351        match &profile.stats {
352            ColumnStats::Boolean(b) => {
353                assert_eq!(b.true_count, 2);
354                assert_eq!(b.false_count, 1);
355                assert!((b.true_ratio - 2.0 / 3.0).abs() < 0.001);
356            }
357            other => panic!("expected Boolean stats, got {:?}", other),
358        }
359    }
360
361    #[test]
362    fn test_boolean_stats_fallback_case_insensitive() {
363        let samples = vec![
364            "true".to_string(),
365            "FALSE".to_string(),
366            " True ".to_string(),
367        ];
368        let profile = build_column_profile(ColumnProfileInput {
369            name: "flag".to_string(),
370            data_type: DataType::Boolean,
371            total_count: 3,
372            null_count: 0,
373            unique_count: Some(2),
374            sample_values: &samples,
375            text_lengths: None,
376            boolean_counts: None,
377            skip_statistics: false,
378            skip_patterns: false,
379            locale: None,
380        });
381
382        match &profile.stats {
383            ColumnStats::Boolean(b) => {
384                assert_eq!(b.true_count, 2);
385                assert_eq!(b.false_count, 1);
386                assert!((b.true_ratio - 2.0 / 3.0).abs() < 0.001);
387            }
388            other => panic!("expected Boolean stats, got {:?}", other),
389        }
390    }
391
392    #[test]
393    fn test_skip_statistics() {
394        let samples = vec!["10".to_string(), "20".to_string(), "30".to_string()];
395        let profile = build_column_profile(ColumnProfileInput {
396            name: "num".to_string(),
397            data_type: DataType::Integer,
398            total_count: 3,
399            null_count: 0,
400            unique_count: Some(3),
401            sample_values: &samples,
402            text_lengths: None,
403            boolean_counts: None,
404            skip_statistics: true,
405            skip_patterns: false,
406            locale: None,
407        });
408
409        assert!(matches!(profile.stats, ColumnStats::None));
410        assert_eq!(profile.data_type, DataType::Integer);
411    }
412
413    #[test]
414    fn test_skip_patterns() {
415        let samples = vec!["hello".to_string(), "world".to_string()];
416        let profile = build_column_profile(ColumnProfileInput {
417            name: "text".to_string(),
418            data_type: DataType::String,
419            total_count: 2,
420            null_count: 0,
421            unique_count: Some(2),
422            sample_values: &samples,
423            text_lengths: None,
424            boolean_counts: None,
425            skip_statistics: false,
426            skip_patterns: true,
427            locale: None,
428        });
429
430        assert!(profile.patterns.is_empty());
431        assert!(matches!(profile.stats, ColumnStats::Text(_)));
432    }
433
434    #[test]
435    fn test_all_packs_default() {
436        let samples = vec!["42".to_string(), "99".to_string()];
437        let profile = build_column_profile(ColumnProfileInput {
438            name: "val".to_string(),
439            data_type: DataType::Integer,
440            total_count: 2,
441            null_count: 0,
442            unique_count: Some(2),
443            sample_values: &samples,
444            text_lengths: None,
445            boolean_counts: None,
446            skip_statistics: false,
447            skip_patterns: false,
448            locale: None,
449        });
450
451        assert!(matches!(profile.stats, ColumnStats::Numeric(_)));
452        assert_eq!(profile.data_type, DataType::Integer);
453    }
454}