1use std::collections::HashMap;
9
10use dataprof_core::{
11 BooleanStats, ColumnProfile, ColumnStats, DataType, DateTimeStats, SemanticHints, TextStats,
12};
13use dataprof_metrics::{
14 analysis::inference::{is_null_like_token, parse_strict_boolean_token},
15 analysis::patterns::looks_like_date,
16 calculate_datetime_stats, calculate_numeric_stats, calculate_text_stats, detect_patterns,
17};
18
19use crate::streaming_stats::{StreamingColumnCollection, StreamingStatistics};
20
21pub struct ColumnProfileInput<'a> {
23 pub name: String,
24 pub data_type: DataType,
25 pub total_count: usize,
26 pub null_count: usize,
27 pub unique_count: Option<usize>,
28 pub sample_values: &'a [String],
29 pub text_lengths: Option<TextLengths>,
32 pub boolean_counts: Option<(usize, usize)>,
34 pub skip_statistics: bool,
36 pub skip_patterns: bool,
38 pub locale: Option<&'a str>,
40}
41
42pub struct TextLengths {
44 pub min_length: usize,
45 pub max_length: usize,
46 pub avg_length: f64,
47}
48
49pub fn build_column_profile(input: ColumnProfileInput<'_>) -> ColumnProfile {
56 let stats = if input.skip_statistics {
57 ColumnStats::None
58 } else {
59 match input.data_type {
60 DataType::Integer | DataType::Float => calculate_numeric_stats(input.sample_values),
61 DataType::Date => {
62 if !input.sample_values.is_empty() {
63 calculate_datetime_stats(input.sample_values)
64 } else if let Some(tl) = &input.text_lengths {
65 ColumnStats::Text(TextStats::from_lengths(
66 tl.min_length,
67 tl.max_length,
68 tl.avg_length,
69 ))
70 } else {
71 ColumnStats::DateTime(DateTimeStats::empty())
72 }
73 }
74 DataType::Boolean => {
75 let (true_count, false_count) = input.boolean_counts.unwrap_or_else(|| {
76 let tc = input
77 .sample_values
78 .iter()
79 .filter(|v| parse_strict_boolean_token(v.trim()) == Some(true))
80 .count();
81 let fc = input
82 .sample_values
83 .iter()
84 .filter(|v| parse_strict_boolean_token(v.trim()) == Some(false))
85 .count();
86 (tc, fc)
87 });
88 let total = true_count + false_count;
89 let true_ratio = if total > 0 {
90 true_count as f64 / total as f64
91 } else {
92 0.0
93 };
94 ColumnStats::Boolean(BooleanStats {
95 true_count,
96 false_count,
97 true_ratio,
98 })
99 }
100 DataType::String | DataType::Identifier => {
101 if let Some(tl) = &input.text_lengths {
102 ColumnStats::Text(TextStats::from_lengths(
103 tl.min_length,
104 tl.max_length,
105 tl.avg_length,
106 ))
107 } else {
108 calculate_text_stats(input.sample_values)
109 }
110 }
111 }
112 };
113
114 let patterns = if input.skip_patterns {
115 Vec::new()
116 } else {
117 detect_patterns(input.sample_values, input.locale)
118 };
119
120 ColumnProfile {
121 name: input.name,
122 data_type: input.data_type,
123 null_count: input.null_count,
124 total_count: input.total_count,
125 unique_count: input.unique_count,
126 stats,
127 patterns,
128 }
129}
130
131pub fn profiles_from_streaming(
133 column_stats: &StreamingColumnCollection,
134 skip_statistics: bool,
135 skip_patterns: bool,
136 locale: Option<&str>,
137) -> Vec<ColumnProfile> {
138 profiles_from_streaming_with_hints(
139 column_stats,
140 skip_statistics,
141 skip_patterns,
142 locale,
143 &SemanticHints::default(),
144 )
145}
146
147pub fn profiles_from_streaming_with_hints(
149 column_stats: &StreamingColumnCollection,
150 skip_statistics: bool,
151 skip_patterns: bool,
152 locale: Option<&str>,
153 semantic_hints: &SemanticHints,
154) -> Vec<ColumnProfile> {
155 let mut profiles = Vec::new();
156
157 for column_name in column_stats.column_names() {
158 if let Some(stats) = column_stats.get_column_stats(&column_name) {
159 let profile = profile_from_stats_with_hints(
160 &column_name,
161 stats,
162 skip_statistics,
163 skip_patterns,
164 locale,
165 semantic_hints,
166 );
167 profiles.push(profile);
168 }
169 }
170
171 profiles
172}
173
174pub fn profile_from_stats(
176 name: &str,
177 stats: &StreamingStatistics,
178 skip_statistics: bool,
179 skip_patterns: bool,
180 locale: Option<&str>,
181) -> ColumnProfile {
182 profile_from_stats_with_hints(
183 name,
184 stats,
185 skip_statistics,
186 skip_patterns,
187 locale,
188 &SemanticHints::default(),
189 )
190}
191
192pub fn profile_from_stats_with_hints(
194 name: &str,
195 stats: &StreamingStatistics,
196 skip_statistics: bool,
197 skip_patterns: bool,
198 locale: Option<&str>,
199 semantic_hints: &SemanticHints,
200) -> ColumnProfile {
201 let data_type = if semantic_hints.is_identifier_column(name) {
202 DataType::Identifier
203 } else {
204 infer_data_type_streaming(stats)
205 };
206 let text_stats = stats.text_length_stats();
207
208 build_column_profile(ColumnProfileInput {
209 name: name.to_string(),
210 data_type,
211 total_count: stats.count,
212 null_count: stats.null_count,
213 unique_count: Some(stats.unique_count()),
214 sample_values: stats.sample_values(),
215 text_lengths: Some(TextLengths {
216 min_length: text_stats.min_length,
217 max_length: text_stats.max_length,
218 avg_length: text_stats.avg_length,
219 }),
220 boolean_counts: None,
221 skip_statistics,
222 skip_patterns,
223 locale,
224 })
225}
226
227pub fn infer_data_type_streaming(stats: &StreamingStatistics) -> DataType {
229 if stats.min.is_finite() && stats.max.is_finite() {
230 let sample_values = stats.sample_values();
231 let non_empty: Vec<&String> = sample_values
232 .iter()
233 .filter(|s| !is_null_like_token(s.trim()))
234 .collect();
235
236 if !non_empty.is_empty() {
237 let all_integers = non_empty.iter().all(|s| s.parse::<i64>().is_ok());
238 if all_integers {
239 return DataType::Integer;
240 }
241
242 let numeric_count = non_empty
243 .iter()
244 .filter(|s| s.parse::<f64>().is_ok())
245 .count();
246 if numeric_count as f64 / non_empty.len() as f64 > 0.8 {
247 return DataType::Float;
248 }
249 }
250 }
251
252 let sample_values = stats.sample_values();
253 let non_empty: Vec<&String> = sample_values
254 .iter()
255 .filter(|s| !is_null_like_token(s.trim()))
256 .collect();
257
258 if !non_empty.is_empty() {
259 let date_like_count = non_empty
260 .iter()
261 .take(100)
262 .filter(|s| looks_like_date(s))
263 .count();
264
265 if date_like_count as f64 / non_empty.len().min(100) as f64 > 0.7 {
266 return DataType::Date;
267 }
268
269 let bool_count = non_empty
270 .iter()
271 .filter(|s| parse_strict_boolean_token(s.trim()).is_some())
272 .count();
273
274 if bool_count as f64 / non_empty.len() as f64 >= 0.9 {
275 return DataType::Boolean;
276 }
277 }
278
279 DataType::String
280}
281
282pub fn quality_check_samples(
285 column_stats: &StreamingColumnCollection,
286) -> HashMap<String, Vec<String>> {
287 let mut samples = HashMap::new();
288
289 for column_name in column_stats.column_names() {
290 if let Some(stats) = column_stats.get_column_stats(&column_name) {
291 let sample_values: Vec<String> = stats.sample_values().to_vec();
292 samples.insert(column_name, sample_values);
293 }
294 }
295
296 samples
297}
298
299#[cfg(test)]
300mod tests {
301 use super::*;
302 use crate::streaming_stats::StreamingColumnCollection;
303
304 #[test]
305 fn test_profiles_from_streaming() {
306 let mut collection = StreamingColumnCollection::new();
307 let headers = vec!["name".to_string(), "age".to_string()];
308
309 collection.process_record(&headers, vec!["Alice".to_string(), "30".to_string()]);
310 collection.process_record(&headers, vec!["Bob".to_string(), "25".to_string()]);
311 collection.process_record(&headers, vec!["Charlie".to_string(), "35".to_string()]);
312
313 let profiles = profiles_from_streaming(&collection, false, false, None);
314 assert_eq!(profiles.len(), 2);
315
316 let age = profiles.iter().find(|p| p.name == "age").unwrap();
317 assert_eq!(age.data_type, DataType::Integer);
318 assert_eq!(age.total_count, 3);
319 }
320
321 #[test]
322 fn test_quality_check_samples() {
323 let mut collection = StreamingColumnCollection::new();
324 let headers = vec!["col".to_string()];
325
326 collection.process_record(&headers, vec!["val1".to_string()]);
327 collection.process_record(&headers, vec!["val2".to_string()]);
328
329 let samples = quality_check_samples(&collection);
330 assert!(samples.contains_key("col"));
331 assert_eq!(samples["col"].len(), 2);
332 }
333
334 #[test]
335 fn test_boolean_stats_with_counts() {
336 let samples = vec!["True".to_string(), "False".to_string(), "True".to_string()];
337 let profile = build_column_profile(ColumnProfileInput {
338 name: "flag".to_string(),
339 data_type: DataType::Boolean,
340 total_count: 3,
341 null_count: 0,
342 unique_count: Some(2),
343 sample_values: &samples,
344 text_lengths: None,
345 boolean_counts: Some((2, 1)),
346 skip_statistics: false,
347 skip_patterns: false,
348 locale: None,
349 });
350
351 match &profile.stats {
352 ColumnStats::Boolean(b) => {
353 assert_eq!(b.true_count, 2);
354 assert_eq!(b.false_count, 1);
355 assert!((b.true_ratio - 2.0 / 3.0).abs() < 0.001);
356 }
357 other => panic!("expected Boolean stats, got {:?}", other),
358 }
359 }
360
361 #[test]
362 fn test_boolean_stats_fallback_case_insensitive() {
363 let samples = vec![
364 "true".to_string(),
365 "FALSE".to_string(),
366 " True ".to_string(),
367 ];
368 let profile = build_column_profile(ColumnProfileInput {
369 name: "flag".to_string(),
370 data_type: DataType::Boolean,
371 total_count: 3,
372 null_count: 0,
373 unique_count: Some(2),
374 sample_values: &samples,
375 text_lengths: None,
376 boolean_counts: None,
377 skip_statistics: false,
378 skip_patterns: false,
379 locale: None,
380 });
381
382 match &profile.stats {
383 ColumnStats::Boolean(b) => {
384 assert_eq!(b.true_count, 2);
385 assert_eq!(b.false_count, 1);
386 assert!((b.true_ratio - 2.0 / 3.0).abs() < 0.001);
387 }
388 other => panic!("expected Boolean stats, got {:?}", other),
389 }
390 }
391
392 #[test]
393 fn test_skip_statistics() {
394 let samples = vec!["10".to_string(), "20".to_string(), "30".to_string()];
395 let profile = build_column_profile(ColumnProfileInput {
396 name: "num".to_string(),
397 data_type: DataType::Integer,
398 total_count: 3,
399 null_count: 0,
400 unique_count: Some(3),
401 sample_values: &samples,
402 text_lengths: None,
403 boolean_counts: None,
404 skip_statistics: true,
405 skip_patterns: false,
406 locale: None,
407 });
408
409 assert!(matches!(profile.stats, ColumnStats::None));
410 assert_eq!(profile.data_type, DataType::Integer);
411 }
412
413 #[test]
414 fn test_skip_patterns() {
415 let samples = vec!["hello".to_string(), "world".to_string()];
416 let profile = build_column_profile(ColumnProfileInput {
417 name: "text".to_string(),
418 data_type: DataType::String,
419 total_count: 2,
420 null_count: 0,
421 unique_count: Some(2),
422 sample_values: &samples,
423 text_lengths: None,
424 boolean_counts: None,
425 skip_statistics: false,
426 skip_patterns: true,
427 locale: None,
428 });
429
430 assert!(profile.patterns.is_empty());
431 assert!(matches!(profile.stats, ColumnStats::Text(_)));
432 }
433
434 #[test]
435 fn test_all_packs_default() {
436 let samples = vec!["42".to_string(), "99".to_string()];
437 let profile = build_column_profile(ColumnProfileInput {
438 name: "val".to_string(),
439 data_type: DataType::Integer,
440 total_count: 2,
441 null_count: 0,
442 unique_count: Some(2),
443 sample_values: &samples,
444 text_lengths: None,
445 boolean_counts: None,
446 skip_statistics: false,
447 skip_patterns: false,
448 locale: None,
449 });
450
451 assert!(matches!(profile.stats, ColumnStats::Numeric(_)));
452 assert_eq!(profile.data_type, DataType::Integer);
453 }
454}