Skip to main content

exarrow_rs/types/
csv_infer.rs

1//! Schema inference for CSV files.
2//!
3//! This module provides functionality to infer Exasol table schemas from CSV files,
4//! mirroring the Parquet inference API with additional CSV-specific options.
5
6use std::io::BufReader;
7use std::path::{Path, PathBuf};
8
9use arrow::csv::reader::Format;
10use arrow::datatypes::Schema;
11
12use super::infer::{format_column_name, widen_type, InferredColumn, InferredTableSchema};
13use super::mapping::{ColumnNameMode, ExasolType, TypeMapper};
14use crate::import::ImportError;
15
16/// Options for CSV schema inference.
17///
18/// Controls how CSV files are parsed and how column types are inferred.
19/// Use the builder pattern to customize options from the defaults.
20///
21/// # Example
22///
23/// ```ignore
24/// let options = CsvInferenceOptions::new()
25///     .with_delimiter(b'\t')
26///     .with_has_header(false)
27///     .with_column_name_mode(ColumnNameMode::Sanitize);
28/// ```
29#[derive(Debug, Clone)]
30pub struct CsvInferenceOptions {
31    /// Field delimiter byte (default: `,`).
32    pub delimiter: u8,
33    /// Whether the first row is a header (default: `true`).
34    pub has_header: bool,
35    /// Quote character (default: `Some(b'"')`).
36    pub quote: Option<u8>,
37    /// Escape character (default: `None`).
38    pub escape: Option<u8>,
39    /// Regex pattern for values treated as NULL (default: `"^$"` — empty string).
40    pub null_regex: Option<String>,
41    /// Maximum number of records to sample for type inference (default: `None` — all rows).
42    pub max_sample_records: Option<usize>,
43    /// How to format column names in DDL output.
44    pub column_name_mode: ColumnNameMode,
45}
46
47impl Default for CsvInferenceOptions {
48    fn default() -> Self {
49        Self {
50            delimiter: b',',
51            has_header: true,
52            quote: Some(b'"'),
53            escape: None,
54            null_regex: Some("^$".to_string()),
55            max_sample_records: None,
56            column_name_mode: ColumnNameMode::Quoted,
57        }
58    }
59}
60
61impl CsvInferenceOptions {
62    /// Create a new `CsvInferenceOptions` with default values.
63    #[must_use]
64    pub fn new() -> Self {
65        Self::default()
66    }
67
68    /// Set the field delimiter.
69    #[must_use]
70    pub fn with_delimiter(mut self, delimiter: u8) -> Self {
71        self.delimiter = delimiter;
72        self
73    }
74
75    /// Set whether the CSV has a header row.
76    #[must_use]
77    pub fn with_has_header(mut self, has_header: bool) -> Self {
78        self.has_header = has_header;
79        self
80    }
81
82    /// Set the quote character.
83    #[must_use]
84    pub fn with_quote(mut self, quote: Option<u8>) -> Self {
85        self.quote = quote;
86        self
87    }
88
89    /// Set the escape character.
90    #[must_use]
91    pub fn with_escape(mut self, escape: Option<u8>) -> Self {
92        self.escape = escape;
93        self
94    }
95
96    /// Set the null regex pattern.
97    #[must_use]
98    pub fn with_null_regex(mut self, null_regex: Option<String>) -> Self {
99        self.null_regex = null_regex;
100        self
101    }
102
103    /// Set the maximum number of records to sample.
104    #[must_use]
105    pub fn with_max_sample_records(mut self, max_sample_records: Option<usize>) -> Self {
106        self.max_sample_records = max_sample_records;
107        self
108    }
109
110    /// Set the column name mode.
111    #[must_use]
112    pub fn with_column_name_mode(mut self, mode: ColumnNameMode) -> Self {
113        self.column_name_mode = mode;
114        self
115    }
116}
117
118/// Build an arrow-csv `Format` from `CsvInferenceOptions`.
119fn build_csv_format(options: &CsvInferenceOptions) -> Format {
120    let mut format = Format::default()
121        .with_header(options.has_header)
122        .with_delimiter(options.delimiter);
123
124    if let Some(quote) = options.quote {
125        format = format.with_quote(quote);
126    }
127    if let Some(escape) = options.escape {
128        format = format.with_escape(escape);
129    }
130
131    format
132}
133
134/// Convert a CSV-inferred Arrow schema to inferred columns.
135///
136/// When `has_header` is false, generates column names as `col_1`, `col_2`, etc.
137/// Unrecognized Arrow types fall back to `VARCHAR(2000000)`.
138fn csv_schema_to_columns(schema: &Schema, options: &CsvInferenceOptions) -> Vec<InferredColumn> {
139    schema
140        .fields()
141        .iter()
142        .enumerate()
143        .map(|(i, field)| {
144            let original_name = if options.has_header {
145                field.name().clone()
146            } else {
147                format!("col_{}", i + 1)
148            };
149
150            let exasol_type = TypeMapper::arrow_to_exasol(field.data_type())
151                .unwrap_or(ExasolType::Varchar { size: 2_000_000 });
152
153            InferredColumn {
154                ddl_name: format_column_name(&original_name, options.column_name_mode),
155                original_name,
156                exasol_type,
157                nullable: field.is_nullable(),
158            }
159        })
160        .collect()
161}
162
163/// Infer an Exasol table schema from a single CSV file.
164///
165/// Uses arrow-csv's type inference to detect column types from sampled rows,
166/// then maps Arrow types to Exasol types.
167///
168/// # Arguments
169///
170/// * `file_path` - Path to the CSV file
171/// * `options` - CSV parsing and inference options
172///
173/// # Errors
174///
175/// Returns `ImportError::SchemaInferenceError` if:
176/// - The file cannot be opened
177/// - The CSV cannot be parsed
178/// - The file contains no data rows (header only)
179pub fn infer_schema_from_csv(
180    file_path: &Path,
181    options: &CsvInferenceOptions,
182) -> Result<InferredTableSchema, ImportError> {
183    let file = std::fs::File::open(file_path).map_err(|e| {
184        ImportError::SchemaInferenceError(format!(
185            "Failed to open file '{}': {}",
186            file_path.display(),
187            e
188        ))
189    })?;
190
191    let reader = BufReader::new(file);
192    let format = build_csv_format(options);
193
194    let (schema, records_read) = format
195        .infer_schema(reader, options.max_sample_records)
196        .map_err(|e| {
197            ImportError::SchemaInferenceError(format!(
198                "Failed to infer CSV schema from '{}': {}",
199                file_path.display(),
200                e
201            ))
202        })?;
203
204    if records_read == 0 {
205        return Err(ImportError::SchemaInferenceError(format!(
206            "CSV file '{}' contains no data rows",
207            file_path.display()
208        )));
209    }
210
211    let columns = csv_schema_to_columns(&schema, options);
212
213    Ok(InferredTableSchema {
214        columns,
215        source_files: vec![file_path.to_path_buf()],
216    })
217}
218
219/// Infer a union schema from multiple CSV files.
220///
221/// Reads and infers schemas from all files, then merges them using type widening
222/// to produce a schema that can accommodate data from all files.
223///
224/// # Arguments
225///
226/// * `file_paths` - Paths to the CSV files
227/// * `options` - CSV parsing and inference options
228///
229/// # Errors
230///
231/// Returns an error if:
232/// - No files are provided
233/// - Any file cannot be read or parsed
234/// - Any file contains no data rows
235/// - Files have different numbers of columns
236pub fn infer_schema_from_csv_files(
237    file_paths: &[PathBuf],
238    options: &CsvInferenceOptions,
239) -> Result<InferredTableSchema, ImportError> {
240    if file_paths.is_empty() {
241        return Err(ImportError::SchemaInferenceError(
242            "No files provided for schema inference".to_string(),
243        ));
244    }
245
246    if file_paths.len() == 1 {
247        return infer_schema_from_csv(&file_paths[0], options);
248    }
249
250    let format = build_csv_format(options);
251    let first_path = &file_paths[0];
252    let mut merged_columns: Option<Vec<InferredColumn>> = None;
253
254    for path in file_paths {
255        let file = std::fs::File::open(path).map_err(|e| {
256            ImportError::SchemaInferenceError(format!(
257                "Failed to open file '{}': {}",
258                path.display(),
259                e
260            ))
261        })?;
262
263        let reader = BufReader::new(file);
264
265        let (schema, records_read) = format
266            .infer_schema(reader, options.max_sample_records)
267            .map_err(|e| {
268                ImportError::SchemaInferenceError(format!(
269                    "Failed to infer CSV schema from '{}': {}",
270                    path.display(),
271                    e
272                ))
273            })?;
274
275        if records_read == 0 {
276            return Err(ImportError::SchemaInferenceError(format!(
277                "CSV file '{}' contains no data rows",
278                path.display()
279            )));
280        }
281
282        let file_columns = csv_schema_to_columns(&schema, options);
283
284        match &mut merged_columns {
285            None => {
286                merged_columns = Some(file_columns);
287            }
288            Some(columns) => {
289                if file_columns.len() != columns.len() {
290                    return Err(ImportError::SchemaMismatchError(format!(
291                        "Schema mismatch: '{}' has {} columns, but '{}' has {} columns",
292                        first_path.display(),
293                        columns.len(),
294                        path.display(),
295                        file_columns.len()
296                    )));
297                }
298
299                for (i, other_col) in file_columns.iter().enumerate() {
300                    columns[i].exasol_type =
301                        widen_type(&columns[i].exasol_type, &other_col.exasol_type);
302                    columns[i].nullable = columns[i].nullable || other_col.nullable;
303                }
304            }
305        }
306    }
307
308    Ok(InferredTableSchema {
309        // Safe to unwrap: we checked file_paths is non-empty above
310        columns: merged_columns.unwrap(),
311        source_files: file_paths.to_vec(),
312    })
313}
314
315#[cfg(test)]
316mod tests {
317    use super::*;
318    use std::io::Write;
319    use tempfile::NamedTempFile;
320
321    fn write_csv(content: &str) -> NamedTempFile {
322        let mut file = NamedTempFile::new().unwrap();
323        file.write_all(content.as_bytes()).unwrap();
324        file.flush().unwrap();
325        file
326    }
327
328    #[test]
329    fn test_csv_inference_options_default() {
330        let options = CsvInferenceOptions::default();
331        assert_eq!(options.delimiter, b',');
332        assert!(options.has_header);
333        assert_eq!(options.quote, Some(b'"'));
334        assert_eq!(options.escape, None);
335        assert_eq!(options.null_regex, Some("^$".to_string()));
336        assert_eq!(options.max_sample_records, None);
337        assert_eq!(options.column_name_mode, ColumnNameMode::Quoted);
338    }
339
340    #[test]
341    fn test_csv_inference_options_builder() {
342        let options = CsvInferenceOptions::new()
343            .with_delimiter(b'\t')
344            .with_has_header(false)
345            .with_quote(None)
346            .with_escape(Some(b'\\'))
347            .with_null_regex(None)
348            .with_max_sample_records(Some(100))
349            .with_column_name_mode(ColumnNameMode::Sanitize);
350
351        assert_eq!(options.delimiter, b'\t');
352        assert!(!options.has_header);
353        assert_eq!(options.quote, None);
354        assert_eq!(options.escape, Some(b'\\'));
355        assert_eq!(options.null_regex, None);
356        assert_eq!(options.max_sample_records, Some(100));
357        assert_eq!(options.column_name_mode, ColumnNameMode::Sanitize);
358    }
359
360    #[test]
361    fn test_infer_mixed_types() {
362        let csv = write_csv("id,value,name,flag\n1,3.14,hello,true\n2,2.71,world,false\n");
363        let options = CsvInferenceOptions::default();
364        let schema = infer_schema_from_csv(csv.path(), &options).unwrap();
365
366        assert_eq!(schema.columns.len(), 4);
367
368        assert_eq!(schema.columns[0].original_name, "id");
369        assert!(matches!(
370            schema.columns[0].exasol_type,
371            ExasolType::Decimal { .. }
372        ));
373
374        assert_eq!(schema.columns[1].original_name, "value");
375        assert_eq!(schema.columns[1].exasol_type, ExasolType::Double);
376
377        assert_eq!(schema.columns[2].original_name, "name");
378        assert_eq!(
379            schema.columns[2].exasol_type,
380            ExasolType::Varchar { size: 2_000_000 }
381        );
382
383        assert_eq!(schema.columns[3].original_name, "flag");
384        assert_eq!(schema.columns[3].exasol_type, ExasolType::Boolean);
385    }
386
387    #[test]
388    fn test_infer_tab_delimiter() {
389        let csv = write_csv("id\tname\n1\thello\n2\tworld\n");
390        let options = CsvInferenceOptions::new().with_delimiter(b'\t');
391        let schema = infer_schema_from_csv(csv.path(), &options).unwrap();
392
393        assert_eq!(schema.columns.len(), 2);
394        assert_eq!(schema.columns[0].original_name, "id");
395        assert_eq!(schema.columns[1].original_name, "name");
396    }
397
398    #[test]
399    fn test_infer_no_header() {
400        let csv = write_csv("1,hello,true\n2,world,false\n");
401        let options = CsvInferenceOptions::new().with_has_header(false);
402        let schema = infer_schema_from_csv(csv.path(), &options).unwrap();
403
404        assert_eq!(schema.columns.len(), 3);
405        assert_eq!(schema.columns[0].original_name, "col_1");
406        assert_eq!(schema.columns[1].original_name, "col_2");
407        assert_eq!(schema.columns[2].original_name, "col_3");
408    }
409
410    #[test]
411    fn test_infer_no_header_ddl_names() {
412        let csv = write_csv("1,hello\n2,world\n");
413        let options = CsvInferenceOptions::new()
414            .with_has_header(false)
415            .with_column_name_mode(ColumnNameMode::Sanitize);
416        let schema = infer_schema_from_csv(csv.path(), &options).unwrap();
417
418        assert_eq!(schema.columns[0].ddl_name, "COL_1");
419        assert_eq!(schema.columns[1].ddl_name, "COL_2");
420    }
421
422    #[test]
423    fn test_infer_multi_file_widening() {
424        // File A: id is integer
425        let csv_a = write_csv("id,value\n1,hello\n2,world\n");
426        // File B: id is float → should widen to Double
427        let csv_b = write_csv("id,value\n1.5,foo\n2.5,bar\n");
428
429        let options = CsvInferenceOptions::default();
430        let paths = vec![csv_a.path().to_path_buf(), csv_b.path().to_path_buf()];
431        let schema = infer_schema_from_csv_files(&paths, &options).unwrap();
432
433        assert_eq!(schema.columns.len(), 2);
434        // Int64 (Decimal) + Float64 (Double) → Double
435        assert_eq!(schema.columns[0].exasol_type, ExasolType::Double);
436        assert_eq!(schema.source_files.len(), 2);
437    }
438
439    #[test]
440    fn test_infer_empty_csv_header_only() {
441        let csv = write_csv("id,name\n");
442        let options = CsvInferenceOptions::default();
443        let result = infer_schema_from_csv(csv.path(), &options);
444
445        assert!(result.is_err());
446        let err = result.unwrap_err().to_string();
447        assert!(err.contains("no data rows"));
448    }
449
450    #[test]
451    fn test_infer_nullable_columns() {
452        let csv = write_csv("id,name\n1,hello\n2,\n3,world\n");
453        let options = CsvInferenceOptions::default();
454        let schema = infer_schema_from_csv(csv.path(), &options).unwrap();
455
456        assert_eq!(schema.columns.len(), 2);
457        assert_eq!(
458            schema.columns[1].exasol_type,
459            ExasolType::Varchar { size: 2_000_000 }
460        );
461    }
462
463    #[test]
464    fn test_infer_no_files() {
465        let options = CsvInferenceOptions::default();
466        let result = infer_schema_from_csv_files(&[], &options);
467
468        assert!(result.is_err());
469        let err = result.unwrap_err().to_string();
470        assert!(err.contains("No files provided"));
471    }
472
473    #[test]
474    fn test_infer_single_file_via_multi() {
475        let csv = write_csv("a,b\n1,hello\n");
476        let options = CsvInferenceOptions::default();
477        let paths = vec![csv.path().to_path_buf()];
478        let schema = infer_schema_from_csv_files(&paths, &options).unwrap();
479
480        assert_eq!(schema.columns.len(), 2);
481        assert_eq!(schema.source_files.len(), 1);
482    }
483
484    #[test]
485    fn test_infer_multi_file_column_count_mismatch() {
486        let csv_a = write_csv("a,b\n1,2\n");
487        let csv_b = write_csv("a,b,c\n1,2,3\n");
488
489        let options = CsvInferenceOptions::default();
490        let paths = vec![csv_a.path().to_path_buf(), csv_b.path().to_path_buf()];
491        let result = infer_schema_from_csv_files(&paths, &options);
492
493        assert!(result.is_err());
494        let err = result.unwrap_err().to_string();
495        assert!(err.contains("Schema mismatch"));
496    }
497
498    #[test]
499    fn test_infer_source_files_tracked() {
500        let csv = write_csv("a\n1\n");
501        let options = CsvInferenceOptions::default();
502        let schema = infer_schema_from_csv(csv.path(), &options).unwrap();
503
504        assert_eq!(schema.source_files.len(), 1);
505        assert_eq!(schema.source_files[0], csv.path());
506    }
507
508    #[test]
509    fn test_infer_ddl_generation() {
510        let csv = write_csv("id,name,active\n1,hello,true\n");
511        let options = CsvInferenceOptions::default();
512        let schema = infer_schema_from_csv(csv.path(), &options).unwrap();
513
514        let ddl = schema.to_ddl("test_table", None);
515        assert!(ddl.contains("CREATE TABLE test_table"));
516        assert!(ddl.contains("\"id\""));
517        assert!(ddl.contains("\"name\""));
518        assert!(ddl.contains("\"active\""));
519    }
520
521    #[test]
522    fn test_infer_file_not_found() {
523        let options = CsvInferenceOptions::default();
524        let result = infer_schema_from_csv(Path::new("/nonexistent/file.csv"), &options);
525
526        assert!(result.is_err());
527        let err = result.unwrap_err().to_string();
528        assert!(err.contains("Failed to open file"));
529    }
530
531    #[test]
532    fn test_infer_multi_file_nullable_merge() {
533        // File A: name is always present
534        let csv_a = write_csv("id,name\n1,hello\n2,world\n");
535        // File B: name has empty values → nullable
536        let csv_b = write_csv("id,name\n3,foo\n4,\n");
537
538        let options = CsvInferenceOptions::default();
539        let paths = vec![csv_a.path().to_path_buf(), csv_b.path().to_path_buf()];
540        let schema = infer_schema_from_csv_files(&paths, &options).unwrap();
541
542        // Nullable should be merged (true if any file has nullable)
543        // Note: arrow-csv may or may not detect nullable from empty values,
544        // but the merge logic (||) is correct regardless
545        assert_eq!(schema.columns.len(), 2);
546    }
547
548    #[test]
549    fn test_infer_max_sample_records() {
550        // Create a CSV where early rows are integers but later rows are strings
551        let csv = write_csv("value\n1\n2\n3\nhello\nworld\n");
552        let options = CsvInferenceOptions::new().with_max_sample_records(Some(3));
553        let schema = infer_schema_from_csv(csv.path(), &options).unwrap();
554
555        // With only 3 rows sampled, arrow-csv should infer Int64
556        assert!(matches!(
557            schema.columns[0].exasol_type,
558            ExasolType::Decimal { .. }
559        ));
560    }
561}