1use std::path::{Path, PathBuf};
7
8use arrow::datatypes::Schema;
9use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
10
11use super::mapping::{ColumnNameMode, ExasolType, TypeMapper};
12use crate::import::ImportError;
13
14#[derive(Debug, Clone, PartialEq, Eq)]
16pub struct InferredColumn {
17 pub original_name: String,
19 pub ddl_name: String,
21 pub exasol_type: ExasolType,
23 pub nullable: bool,
25}
26
27#[derive(Debug, Clone)]
29pub struct InferredTableSchema {
30 pub columns: Vec<InferredColumn>,
32 pub source_files: Vec<PathBuf>,
34}
35
36impl InferredTableSchema {
37 #[must_use]
64 pub fn to_ddl(&self, table_name: &str, schema_name: Option<&str>) -> String {
65 let table_ref = if let Some(schema) = schema_name {
68 format!("{schema}.{table_name}")
69 } else {
70 table_name.to_string()
71 };
72
73 let column_defs: Vec<String> = self
74 .columns
75 .iter()
76 .map(|col| format!(" {} {}", col.ddl_name, col.exasol_type.to_ddl_type()))
77 .collect();
78
79 format!(
80 "CREATE TABLE {} (\n{}\n);",
81 table_ref,
82 column_defs.join(",\n")
83 )
84 }
85}
86
87#[must_use]
109pub fn sanitize_column_name(name: &str) -> String {
110 let mut result = String::with_capacity(name.len());
111
112 for (i, c) in name.chars().enumerate() {
113 if c.is_ascii_alphanumeric() || c == '_' {
114 result.push(c.to_ascii_uppercase());
115 } else {
116 result.push('_');
117 }
118 if i == 0 && c.is_ascii_digit() {
120 }
122 }
123
124 if result.chars().next().is_some_and(|c| c.is_ascii_digit()) {
126 result = format!("_{result}");
127 }
128
129 if result.is_empty() {
131 return "_".to_string();
132 }
133
134 result
135}
136
137#[must_use]
157pub fn quote_column_name(name: &str) -> String {
158 let escaped = name.replace('"', "\"\"");
159 format!("\"{escaped}\"")
160}
161
162#[must_use]
166pub fn quote_identifier(name: &str) -> String {
167 quote_column_name(name)
168}
169
170#[must_use]
172pub fn format_column_name(name: &str, mode: ColumnNameMode) -> String {
173 match mode {
174 ColumnNameMode::Quoted => quote_column_name(name),
175 ColumnNameMode::Sanitize => sanitize_column_name(name),
176 }
177}
178
179pub fn infer_schema_from_parquet(
200 file_path: &Path,
201 column_name_mode: ColumnNameMode,
202) -> Result<InferredTableSchema, ImportError> {
203 let file = std::fs::File::open(file_path).map_err(|e| {
204 ImportError::SchemaInferenceError(format!(
205 "Failed to open file '{}': {}",
206 file_path.display(),
207 e
208 ))
209 })?;
210
211 let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
212 ImportError::SchemaInferenceError(format!(
213 "Failed to read Parquet metadata from '{}': {}",
214 file_path.display(),
215 e
216 ))
217 })?;
218
219 let arrow_schema = builder.schema();
220
221 let columns = arrow_schema_to_columns(arrow_schema, column_name_mode)?;
222
223 Ok(InferredTableSchema {
224 columns,
225 source_files: vec![file_path.to_path_buf()],
226 })
227}
228
229pub fn infer_schema_from_parquet_files(
250 file_paths: &[PathBuf],
251 column_name_mode: ColumnNameMode,
252) -> Result<InferredTableSchema, ImportError> {
253 if file_paths.is_empty() {
254 return Err(ImportError::SchemaInferenceError(
255 "No files provided for schema inference".to_string(),
256 ));
257 }
258
259 if file_paths.len() == 1 {
260 return infer_schema_from_parquet(&file_paths[0], column_name_mode);
261 }
262
263 let mut schemas: Vec<(PathBuf, Schema)> = Vec::with_capacity(file_paths.len());
265
266 for path in file_paths {
267 let file = std::fs::File::open(path).map_err(|e| {
268 ImportError::SchemaInferenceError(format!(
269 "Failed to open file '{}': {}",
270 path.display(),
271 e
272 ))
273 })?;
274
275 let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
276 ImportError::SchemaInferenceError(format!(
277 "Failed to read Parquet metadata from '{}': {}",
278 path.display(),
279 e
280 ))
281 })?;
282
283 schemas.push((path.clone(), builder.schema().as_ref().clone()));
284 }
285
286 let (first_path, first_schema) = &schemas[0];
288 let mut columns = arrow_schema_to_columns(first_schema, column_name_mode)?;
289
290 for (path, schema) in schemas.iter().skip(1) {
292 if schema.fields().len() != columns.len() {
293 return Err(ImportError::SchemaMismatchError(format!(
294 "Schema mismatch: '{}' has {} columns, but '{}' has {} columns",
295 first_path.display(),
296 columns.len(),
297 path.display(),
298 schema.fields().len()
299 )));
300 }
301
302 for (i, field) in schema.fields().iter().enumerate() {
303 let other_type = TypeMapper::arrow_to_exasol(field.data_type()).map_err(|e| {
304 ImportError::SchemaInferenceError(format!(
305 "Failed to map type for column '{}' in '{}': {}",
306 field.name(),
307 path.display(),
308 e
309 ))
310 })?;
311
312 columns[i].exasol_type = widen_type(&columns[i].exasol_type, &other_type);
313 columns[i].nullable = columns[i].nullable || field.is_nullable();
314 }
315 }
316
317 Ok(InferredTableSchema {
318 columns,
319 source_files: file_paths.to_vec(),
320 })
321}
322
323fn arrow_schema_to_columns(
325 schema: &Schema,
326 column_name_mode: ColumnNameMode,
327) -> Result<Vec<InferredColumn>, ImportError> {
328 let mut columns = Vec::with_capacity(schema.fields().len());
329
330 for field in schema.fields() {
331 let exasol_type = TypeMapper::arrow_to_exasol(field.data_type()).map_err(|e| {
332 ImportError::SchemaInferenceError(format!(
333 "Failed to map type for column '{}': {}",
334 field.name(),
335 e
336 ))
337 })?;
338
339 columns.push(InferredColumn {
340 original_name: field.name().clone(),
341 ddl_name: format_column_name(field.name(), column_name_mode),
342 exasol_type,
343 nullable: field.is_nullable(),
344 });
345 }
346
347 Ok(columns)
348}
349
350#[must_use]
368pub fn widen_type(a: &ExasolType, b: &ExasolType) -> ExasolType {
369 if a == b {
370 return a.clone();
371 }
372
373 match (a, b) {
374 (
376 ExasolType::Decimal {
377 precision: p1,
378 scale: s1,
379 },
380 ExasolType::Decimal {
381 precision: p2,
382 scale: s2,
383 },
384 ) => {
385 let max_precision = (*p1).max(*p2).min(36); let max_scale = (*s1).max(*s2);
387 ExasolType::Decimal {
388 precision: max_precision,
389 scale: max_scale,
390 }
391 }
392
393 (ExasolType::Varchar { size: s1 }, ExasolType::Varchar { size: s2 }) => {
395 ExasolType::Varchar {
396 size: (*s1).max(*s2).min(2_000_000),
397 }
398 }
399
400 (ExasolType::Char { size: s1 }, ExasolType::Char { size: s2 }) => ExasolType::Char {
402 size: (*s1).max(*s2).min(2_000),
403 },
404
405 (ExasolType::Char { size: s1 }, ExasolType::Varchar { size: s2 })
407 | (ExasolType::Varchar { size: s2 }, ExasolType::Char { size: s1 }) => {
408 ExasolType::Varchar {
409 size: (*s1).max(*s2).min(2_000_000),
410 }
411 }
412
413 (ExasolType::Decimal { .. }, ExasolType::Double)
415 | (ExasolType::Double, ExasolType::Decimal { .. }) => ExasolType::Double,
416
417 (
419 ExasolType::Timestamp {
420 with_local_time_zone: tz1,
421 },
422 ExasolType::Timestamp {
423 with_local_time_zone: tz2,
424 },
425 ) => ExasolType::Timestamp {
426 with_local_time_zone: *tz1 || *tz2,
427 },
428
429 (
431 ExasolType::IntervalDayToSecond { precision: p1 },
432 ExasolType::IntervalDayToSecond { precision: p2 },
433 ) => ExasolType::IntervalDayToSecond {
434 precision: (*p1).max(*p2),
435 },
436
437 _ => ExasolType::Varchar { size: 2_000_000 },
439 }
440}
441
442#[cfg(test)]
443mod tests {
444 use super::*;
445
446 #[test]
447 fn test_sanitize_column_name_simple() {
448 assert_eq!(sanitize_column_name("name"), "NAME");
449 assert_eq!(sanitize_column_name("MyColumn"), "MYCOLUMN");
450 }
451
452 #[test]
453 fn test_sanitize_column_name_with_spaces() {
454 assert_eq!(sanitize_column_name("my column"), "MY_COLUMN");
455 assert_eq!(sanitize_column_name("first name"), "FIRST_NAME");
456 }
457
458 #[test]
459 fn test_sanitize_column_name_special_chars() {
460 assert_eq!(sanitize_column_name("col@#$%"), "COL____");
461 assert_eq!(sanitize_column_name("a-b-c"), "A_B_C");
462 }
463
464 #[test]
465 fn test_sanitize_column_name_starts_with_digit() {
466 assert_eq!(sanitize_column_name("123abc"), "_123ABC");
467 assert_eq!(sanitize_column_name("1st_column"), "_1ST_COLUMN");
468 }
469
470 #[test]
471 fn test_sanitize_column_name_empty() {
472 assert_eq!(sanitize_column_name(""), "_");
473 }
474
475 #[test]
476 fn test_sanitize_column_name_all_special() {
477 assert_eq!(sanitize_column_name("@#$"), "___");
478 }
479
480 #[test]
481 fn test_quote_column_name_simple() {
482 assert_eq!(quote_column_name("name"), "\"name\"");
483 assert_eq!(quote_column_name("MyColumn"), "\"MyColumn\"");
484 }
485
486 #[test]
487 fn test_quote_column_name_with_spaces() {
488 assert_eq!(quote_column_name("my column"), "\"my column\"");
489 }
490
491 #[test]
492 fn test_quote_column_name_with_quotes() {
493 assert_eq!(quote_column_name("col\"name"), "\"col\"\"name\"");
494 assert_eq!(quote_column_name("\"quoted\""), "\"\"\"quoted\"\"\"");
495 }
496
497 #[test]
498 fn test_format_column_name_quoted_mode() {
499 assert_eq!(
500 format_column_name("my Column", ColumnNameMode::Quoted),
501 "\"my Column\""
502 );
503 }
504
505 #[test]
506 fn test_format_column_name_sanitize_mode() {
507 assert_eq!(
508 format_column_name("my Column", ColumnNameMode::Sanitize),
509 "MY_COLUMN"
510 );
511 }
512
513 #[test]
514 fn test_widen_type_identical() {
515 let t = ExasolType::Boolean;
516 assert_eq!(widen_type(&t, &t), ExasolType::Boolean);
517 }
518
519 #[test]
520 fn test_widen_type_decimal() {
521 let t1 = ExasolType::Decimal {
522 precision: 10,
523 scale: 2,
524 };
525 let t2 = ExasolType::Decimal {
526 precision: 15,
527 scale: 4,
528 };
529 assert_eq!(
530 widen_type(&t1, &t2),
531 ExasolType::Decimal {
532 precision: 15,
533 scale: 4
534 }
535 );
536 }
537
538 #[test]
539 fn test_widen_type_varchar() {
540 let t1 = ExasolType::Varchar { size: 100 };
541 let t2 = ExasolType::Varchar { size: 500 };
542 assert_eq!(widen_type(&t1, &t2), ExasolType::Varchar { size: 500 });
543 }
544
545 #[test]
546 fn test_widen_type_char_varchar() {
547 let t1 = ExasolType::Char { size: 50 };
548 let t2 = ExasolType::Varchar { size: 100 };
549 assert_eq!(widen_type(&t1, &t2), ExasolType::Varchar { size: 100 });
550 }
551
552 #[test]
553 fn test_widen_type_decimal_double() {
554 let t1 = ExasolType::Decimal {
555 precision: 18,
556 scale: 2,
557 };
558 let t2 = ExasolType::Double;
559 assert_eq!(widen_type(&t1, &t2), ExasolType::Double);
560 assert_eq!(widen_type(&t2, &t1), ExasolType::Double);
561 }
562
563 #[test]
564 fn test_widen_type_incompatible() {
565 let t1 = ExasolType::Boolean;
566 let t2 = ExasolType::Date;
567 assert_eq!(
568 widen_type(&t1, &t2),
569 ExasolType::Varchar { size: 2_000_000 }
570 );
571 }
572
573 #[test]
574 fn test_widen_type_timestamp() {
575 let t1 = ExasolType::Timestamp {
576 with_local_time_zone: false,
577 };
578 let t2 = ExasolType::Timestamp {
579 with_local_time_zone: true,
580 };
581 assert_eq!(
582 widen_type(&t1, &t2),
583 ExasolType::Timestamp {
584 with_local_time_zone: true
585 }
586 );
587 }
588
589 #[test]
590 fn test_inferred_table_schema_to_ddl_basic() {
591 let schema = InferredTableSchema {
592 columns: vec![
593 InferredColumn {
594 original_name: "id".to_string(),
595 ddl_name: "\"id\"".to_string(),
596 exasol_type: ExasolType::Decimal {
597 precision: 18,
598 scale: 0,
599 },
600 nullable: false,
601 },
602 InferredColumn {
603 original_name: "name".to_string(),
604 ddl_name: "\"name\"".to_string(),
605 exasol_type: ExasolType::Varchar { size: 100 },
606 nullable: true,
607 },
608 ],
609 source_files: vec![],
610 };
611
612 let ddl = schema.to_ddl("my_table", None);
613 assert!(ddl.contains("CREATE TABLE my_table"));
615 assert!(ddl.contains("\"id\" DECIMAL(18,0)"));
617 assert!(ddl.contains("\"name\" VARCHAR(100)"));
618 }
619
620 #[test]
621 fn test_inferred_table_schema_to_ddl_with_schema() {
622 let schema = InferredTableSchema {
623 columns: vec![InferredColumn {
624 original_name: "col".to_string(),
625 ddl_name: "\"col\"".to_string(),
626 exasol_type: ExasolType::Boolean,
627 nullable: false,
628 }],
629 source_files: vec![],
630 };
631
632 let ddl = schema.to_ddl("my_table", Some("my_schema"));
633 assert!(ddl.contains("CREATE TABLE my_schema.my_table"));
635 }
636
637 }