1use std::io::BufReader;
7use std::path::{Path, PathBuf};
8
9use arrow::csv::reader::Format;
10use arrow::datatypes::Schema;
11
12use super::infer::{format_column_name, widen_type, InferredColumn, InferredTableSchema};
13use super::mapping::{ColumnNameMode, ExasolType, TypeMapper};
14use crate::import::ImportError;
15
16#[derive(Debug, Clone)]
30pub struct CsvInferenceOptions {
31 pub delimiter: u8,
33 pub has_header: bool,
35 pub quote: Option<u8>,
37 pub escape: Option<u8>,
39 pub null_regex: Option<String>,
41 pub max_sample_records: Option<usize>,
43 pub column_name_mode: ColumnNameMode,
45}
46
47impl Default for CsvInferenceOptions {
48 fn default() -> Self {
49 Self {
50 delimiter: b',',
51 has_header: true,
52 quote: Some(b'"'),
53 escape: None,
54 null_regex: Some("^$".to_string()),
55 max_sample_records: None,
56 column_name_mode: ColumnNameMode::Quoted,
57 }
58 }
59}
60
61impl CsvInferenceOptions {
62 #[must_use]
64 pub fn new() -> Self {
65 Self::default()
66 }
67
68 #[must_use]
70 pub fn with_delimiter(mut self, delimiter: u8) -> Self {
71 self.delimiter = delimiter;
72 self
73 }
74
75 #[must_use]
77 pub fn with_has_header(mut self, has_header: bool) -> Self {
78 self.has_header = has_header;
79 self
80 }
81
82 #[must_use]
84 pub fn with_quote(mut self, quote: Option<u8>) -> Self {
85 self.quote = quote;
86 self
87 }
88
89 #[must_use]
91 pub fn with_escape(mut self, escape: Option<u8>) -> Self {
92 self.escape = escape;
93 self
94 }
95
96 #[must_use]
98 pub fn with_null_regex(mut self, null_regex: Option<String>) -> Self {
99 self.null_regex = null_regex;
100 self
101 }
102
103 #[must_use]
105 pub fn with_max_sample_records(mut self, max_sample_records: Option<usize>) -> Self {
106 self.max_sample_records = max_sample_records;
107 self
108 }
109
110 #[must_use]
112 pub fn with_column_name_mode(mut self, mode: ColumnNameMode) -> Self {
113 self.column_name_mode = mode;
114 self
115 }
116}
117
118fn build_csv_format(options: &CsvInferenceOptions) -> Format {
120 let mut format = Format::default()
121 .with_header(options.has_header)
122 .with_delimiter(options.delimiter);
123
124 if let Some(quote) = options.quote {
125 format = format.with_quote(quote);
126 }
127 if let Some(escape) = options.escape {
128 format = format.with_escape(escape);
129 }
130
131 format
132}
133
134fn csv_schema_to_columns(schema: &Schema, options: &CsvInferenceOptions) -> Vec<InferredColumn> {
139 schema
140 .fields()
141 .iter()
142 .enumerate()
143 .map(|(i, field)| {
144 let original_name = if options.has_header {
145 field.name().clone()
146 } else {
147 format!("col_{}", i + 1)
148 };
149
150 let exasol_type = TypeMapper::arrow_to_exasol(field.data_type())
151 .unwrap_or(ExasolType::Varchar { size: 2_000_000 });
152
153 InferredColumn {
154 ddl_name: format_column_name(&original_name, options.column_name_mode),
155 original_name,
156 exasol_type,
157 nullable: field.is_nullable(),
158 }
159 })
160 .collect()
161}
162
163pub fn infer_schema_from_csv(
180 file_path: &Path,
181 options: &CsvInferenceOptions,
182) -> Result<InferredTableSchema, ImportError> {
183 let file = std::fs::File::open(file_path).map_err(|e| {
184 ImportError::SchemaInferenceError(format!(
185 "Failed to open file '{}': {}",
186 file_path.display(),
187 e
188 ))
189 })?;
190
191 let reader = BufReader::new(file);
192 let format = build_csv_format(options);
193
194 let (schema, records_read) = format
195 .infer_schema(reader, options.max_sample_records)
196 .map_err(|e| {
197 ImportError::SchemaInferenceError(format!(
198 "Failed to infer CSV schema from '{}': {}",
199 file_path.display(),
200 e
201 ))
202 })?;
203
204 if records_read == 0 {
205 return Err(ImportError::SchemaInferenceError(format!(
206 "CSV file '{}' contains no data rows",
207 file_path.display()
208 )));
209 }
210
211 let columns = csv_schema_to_columns(&schema, options);
212
213 Ok(InferredTableSchema {
214 columns,
215 source_files: vec![file_path.to_path_buf()],
216 })
217}
218
219pub fn infer_schema_from_csv_files(
237 file_paths: &[PathBuf],
238 options: &CsvInferenceOptions,
239) -> Result<InferredTableSchema, ImportError> {
240 if file_paths.is_empty() {
241 return Err(ImportError::SchemaInferenceError(
242 "No files provided for schema inference".to_string(),
243 ));
244 }
245
246 if file_paths.len() == 1 {
247 return infer_schema_from_csv(&file_paths[0], options);
248 }
249
250 let format = build_csv_format(options);
251 let first_path = &file_paths[0];
252 let mut merged_columns: Option<Vec<InferredColumn>> = None;
253
254 for path in file_paths {
255 let file = std::fs::File::open(path).map_err(|e| {
256 ImportError::SchemaInferenceError(format!(
257 "Failed to open file '{}': {}",
258 path.display(),
259 e
260 ))
261 })?;
262
263 let reader = BufReader::new(file);
264
265 let (schema, records_read) = format
266 .infer_schema(reader, options.max_sample_records)
267 .map_err(|e| {
268 ImportError::SchemaInferenceError(format!(
269 "Failed to infer CSV schema from '{}': {}",
270 path.display(),
271 e
272 ))
273 })?;
274
275 if records_read == 0 {
276 return Err(ImportError::SchemaInferenceError(format!(
277 "CSV file '{}' contains no data rows",
278 path.display()
279 )));
280 }
281
282 let file_columns = csv_schema_to_columns(&schema, options);
283
284 match &mut merged_columns {
285 None => {
286 merged_columns = Some(file_columns);
287 }
288 Some(columns) => {
289 if file_columns.len() != columns.len() {
290 return Err(ImportError::SchemaMismatchError(format!(
291 "Schema mismatch: '{}' has {} columns, but '{}' has {} columns",
292 first_path.display(),
293 columns.len(),
294 path.display(),
295 file_columns.len()
296 )));
297 }
298
299 for (i, other_col) in file_columns.iter().enumerate() {
300 columns[i].exasol_type =
301 widen_type(&columns[i].exasol_type, &other_col.exasol_type);
302 columns[i].nullable = columns[i].nullable || other_col.nullable;
303 }
304 }
305 }
306 }
307
308 Ok(InferredTableSchema {
309 columns: merged_columns.unwrap(),
311 source_files: file_paths.to_vec(),
312 })
313}
314
315#[cfg(test)]
316mod tests {
317 use super::*;
318 use std::io::Write;
319 use tempfile::NamedTempFile;
320
321 fn write_csv(content: &str) -> NamedTempFile {
322 let mut file = NamedTempFile::new().unwrap();
323 file.write_all(content.as_bytes()).unwrap();
324 file.flush().unwrap();
325 file
326 }
327
328 #[test]
329 fn test_csv_inference_options_default() {
330 let options = CsvInferenceOptions::default();
331 assert_eq!(options.delimiter, b',');
332 assert!(options.has_header);
333 assert_eq!(options.quote, Some(b'"'));
334 assert_eq!(options.escape, None);
335 assert_eq!(options.null_regex, Some("^$".to_string()));
336 assert_eq!(options.max_sample_records, None);
337 assert_eq!(options.column_name_mode, ColumnNameMode::Quoted);
338 }
339
340 #[test]
341 fn test_csv_inference_options_builder() {
342 let options = CsvInferenceOptions::new()
343 .with_delimiter(b'\t')
344 .with_has_header(false)
345 .with_quote(None)
346 .with_escape(Some(b'\\'))
347 .with_null_regex(None)
348 .with_max_sample_records(Some(100))
349 .with_column_name_mode(ColumnNameMode::Sanitize);
350
351 assert_eq!(options.delimiter, b'\t');
352 assert!(!options.has_header);
353 assert_eq!(options.quote, None);
354 assert_eq!(options.escape, Some(b'\\'));
355 assert_eq!(options.null_regex, None);
356 assert_eq!(options.max_sample_records, Some(100));
357 assert_eq!(options.column_name_mode, ColumnNameMode::Sanitize);
358 }
359
360 #[test]
361 fn test_infer_mixed_types() {
362 let csv = write_csv("id,value,name,flag\n1,3.14,hello,true\n2,2.71,world,false\n");
363 let options = CsvInferenceOptions::default();
364 let schema = infer_schema_from_csv(csv.path(), &options).unwrap();
365
366 assert_eq!(schema.columns.len(), 4);
367
368 assert_eq!(schema.columns[0].original_name, "id");
369 assert!(matches!(
370 schema.columns[0].exasol_type,
371 ExasolType::Decimal { .. }
372 ));
373
374 assert_eq!(schema.columns[1].original_name, "value");
375 assert_eq!(schema.columns[1].exasol_type, ExasolType::Double);
376
377 assert_eq!(schema.columns[2].original_name, "name");
378 assert_eq!(
379 schema.columns[2].exasol_type,
380 ExasolType::Varchar { size: 2_000_000 }
381 );
382
383 assert_eq!(schema.columns[3].original_name, "flag");
384 assert_eq!(schema.columns[3].exasol_type, ExasolType::Boolean);
385 }
386
387 #[test]
388 fn test_infer_tab_delimiter() {
389 let csv = write_csv("id\tname\n1\thello\n2\tworld\n");
390 let options = CsvInferenceOptions::new().with_delimiter(b'\t');
391 let schema = infer_schema_from_csv(csv.path(), &options).unwrap();
392
393 assert_eq!(schema.columns.len(), 2);
394 assert_eq!(schema.columns[0].original_name, "id");
395 assert_eq!(schema.columns[1].original_name, "name");
396 }
397
398 #[test]
399 fn test_infer_no_header() {
400 let csv = write_csv("1,hello,true\n2,world,false\n");
401 let options = CsvInferenceOptions::new().with_has_header(false);
402 let schema = infer_schema_from_csv(csv.path(), &options).unwrap();
403
404 assert_eq!(schema.columns.len(), 3);
405 assert_eq!(schema.columns[0].original_name, "col_1");
406 assert_eq!(schema.columns[1].original_name, "col_2");
407 assert_eq!(schema.columns[2].original_name, "col_3");
408 }
409
410 #[test]
411 fn test_infer_no_header_ddl_names() {
412 let csv = write_csv("1,hello\n2,world\n");
413 let options = CsvInferenceOptions::new()
414 .with_has_header(false)
415 .with_column_name_mode(ColumnNameMode::Sanitize);
416 let schema = infer_schema_from_csv(csv.path(), &options).unwrap();
417
418 assert_eq!(schema.columns[0].ddl_name, "COL_1");
419 assert_eq!(schema.columns[1].ddl_name, "COL_2");
420 }
421
422 #[test]
423 fn test_infer_multi_file_widening() {
424 let csv_a = write_csv("id,value\n1,hello\n2,world\n");
426 let csv_b = write_csv("id,value\n1.5,foo\n2.5,bar\n");
428
429 let options = CsvInferenceOptions::default();
430 let paths = vec![csv_a.path().to_path_buf(), csv_b.path().to_path_buf()];
431 let schema = infer_schema_from_csv_files(&paths, &options).unwrap();
432
433 assert_eq!(schema.columns.len(), 2);
434 assert_eq!(schema.columns[0].exasol_type, ExasolType::Double);
436 assert_eq!(schema.source_files.len(), 2);
437 }
438
439 #[test]
440 fn test_infer_empty_csv_header_only() {
441 let csv = write_csv("id,name\n");
442 let options = CsvInferenceOptions::default();
443 let result = infer_schema_from_csv(csv.path(), &options);
444
445 assert!(result.is_err());
446 let err = result.unwrap_err().to_string();
447 assert!(err.contains("no data rows"));
448 }
449
450 #[test]
451 fn test_infer_nullable_columns() {
452 let csv = write_csv("id,name\n1,hello\n2,\n3,world\n");
453 let options = CsvInferenceOptions::default();
454 let schema = infer_schema_from_csv(csv.path(), &options).unwrap();
455
456 assert_eq!(schema.columns.len(), 2);
457 assert_eq!(
458 schema.columns[1].exasol_type,
459 ExasolType::Varchar { size: 2_000_000 }
460 );
461 }
462
463 #[test]
464 fn test_infer_no_files() {
465 let options = CsvInferenceOptions::default();
466 let result = infer_schema_from_csv_files(&[], &options);
467
468 assert!(result.is_err());
469 let err = result.unwrap_err().to_string();
470 assert!(err.contains("No files provided"));
471 }
472
473 #[test]
474 fn test_infer_single_file_via_multi() {
475 let csv = write_csv("a,b\n1,hello\n");
476 let options = CsvInferenceOptions::default();
477 let paths = vec![csv.path().to_path_buf()];
478 let schema = infer_schema_from_csv_files(&paths, &options).unwrap();
479
480 assert_eq!(schema.columns.len(), 2);
481 assert_eq!(schema.source_files.len(), 1);
482 }
483
484 #[test]
485 fn test_infer_multi_file_column_count_mismatch() {
486 let csv_a = write_csv("a,b\n1,2\n");
487 let csv_b = write_csv("a,b,c\n1,2,3\n");
488
489 let options = CsvInferenceOptions::default();
490 let paths = vec![csv_a.path().to_path_buf(), csv_b.path().to_path_buf()];
491 let result = infer_schema_from_csv_files(&paths, &options);
492
493 assert!(result.is_err());
494 let err = result.unwrap_err().to_string();
495 assert!(err.contains("Schema mismatch"));
496 }
497
498 #[test]
499 fn test_infer_source_files_tracked() {
500 let csv = write_csv("a\n1\n");
501 let options = CsvInferenceOptions::default();
502 let schema = infer_schema_from_csv(csv.path(), &options).unwrap();
503
504 assert_eq!(schema.source_files.len(), 1);
505 assert_eq!(schema.source_files[0], csv.path());
506 }
507
508 #[test]
509 fn test_infer_ddl_generation() {
510 let csv = write_csv("id,name,active\n1,hello,true\n");
511 let options = CsvInferenceOptions::default();
512 let schema = infer_schema_from_csv(csv.path(), &options).unwrap();
513
514 let ddl = schema.to_ddl("test_table", None);
515 assert!(ddl.contains("CREATE TABLE test_table"));
516 assert!(ddl.contains("\"id\""));
517 assert!(ddl.contains("\"name\""));
518 assert!(ddl.contains("\"active\""));
519 }
520
521 #[test]
522 fn test_infer_file_not_found() {
523 let options = CsvInferenceOptions::default();
524 let result = infer_schema_from_csv(Path::new("/nonexistent/file.csv"), &options);
525
526 assert!(result.is_err());
527 let err = result.unwrap_err().to_string();
528 assert!(err.contains("Failed to open file"));
529 }
530
531 #[test]
532 fn test_infer_multi_file_nullable_merge() {
533 let csv_a = write_csv("id,name\n1,hello\n2,world\n");
535 let csv_b = write_csv("id,name\n3,foo\n4,\n");
537
538 let options = CsvInferenceOptions::default();
539 let paths = vec![csv_a.path().to_path_buf(), csv_b.path().to_path_buf()];
540 let schema = infer_schema_from_csv_files(&paths, &options).unwrap();
541
542 assert_eq!(schema.columns.len(), 2);
546 }
547
548 #[test]
549 fn test_infer_max_sample_records() {
550 let csv = write_csv("value\n1\n2\n3\nhello\nworld\n");
552 let options = CsvInferenceOptions::new().with_max_sample_records(Some(3));
553 let schema = infer_schema_from_csv(csv.path(), &options).unwrap();
554
555 assert!(matches!(
557 schema.columns[0].exasol_type,
558 ExasolType::Decimal { .. }
559 ));
560 }
561}