Skip to main content

exarrow_rs/types/
infer.rs

1//! Schema inference and DDL generation for Parquet files.
2//!
3//! This module provides functionality to infer Exasol table schemas from Parquet files
4//! and generate CREATE TABLE DDL statements automatically.
5
6use std::path::{Path, PathBuf};
7
8use arrow::datatypes::Schema;
9use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
10
11use super::mapping::{ColumnNameMode, ExasolType, TypeMapper};
12use crate::import::ImportError;
13
14/// An inferred column definition for DDL generation.
15#[derive(Debug, Clone, PartialEq, Eq)]
16pub struct InferredColumn {
17    /// Original column name from the source schema.
18    pub original_name: String,
19    /// Column name formatted for DDL (quoted or sanitized).
20    pub ddl_name: String,
21    /// Inferred Exasol data type.
22    pub exasol_type: ExasolType,
23    /// Whether the column allows NULL values.
24    pub nullable: bool,
25}
26
27/// An inferred table schema from one or more source files.
28#[derive(Debug, Clone)]
29pub struct InferredTableSchema {
30    /// Column definitions in order.
31    pub columns: Vec<InferredColumn>,
32    /// Source files used for inference (for error context).
33    pub source_files: Vec<PathBuf>,
34}
35
36impl InferredTableSchema {
37    /// Generate a CREATE TABLE DDL statement from this schema.
38    ///
39    /// # Arguments
40    ///
41    /// * `table_name` - The name of the table to create
42    /// * `schema_name` - Optional schema/database name prefix
43    ///
44    /// # Returns
45    ///
46    /// A complete CREATE TABLE DDL statement as a string.
47    ///
48    /// # Example
49    ///
50    /// ```ignore
51    /// let ddl = schema.to_ddl("my_table", Some("my_schema"));
52    /// // CREATE TABLE my_schema.my_table (
53    /// //     "id" DECIMAL(18,0),
54    /// //     "name" VARCHAR(2000000),
55    /// //     ...
56    /// // );
57    /// ```
58    ///
59    /// Note: Table and schema names are NOT quoted to match how IMPORT statements
60    /// reference tables. In Exasol, unquoted identifiers are converted to uppercase.
61    /// Column names are quoted/sanitized according to the `column_name_mode` used
62    /// during schema inference.
63    #[must_use]
64    pub fn to_ddl(&self, table_name: &str, schema_name: Option<&str>) -> String {
65        // Don't quote table/schema names - they should match IMPORT statement format
66        // Exasol converts unquoted identifiers to uppercase
67        let table_ref = if let Some(schema) = schema_name {
68            format!("{schema}.{table_name}")
69        } else {
70            table_name.to_string()
71        };
72
73        let column_defs: Vec<String> = self
74            .columns
75            .iter()
76            .map(|col| format!("    {} {}", col.ddl_name, col.exasol_type.to_ddl_type()))
77            .collect();
78
79        format!(
80            "CREATE TABLE {} (\n{}\n);",
81            table_ref,
82            column_defs.join(",\n")
83        )
84    }
85}
86
87/// Sanitize a column name to be a valid unquoted Exasol identifier.
88///
89/// This function:
90/// - Converts the name to uppercase
91/// - Replaces invalid identifier characters with underscores
92/// - Prefixes names starting with digits with an underscore
93///
94/// # Arguments
95///
96/// * `name` - The original column name
97///
98/// # Returns
99///
100/// A sanitized column name suitable for use as an unquoted identifier.
101///
102/// # Example
103///
104/// ```ignore
105/// assert_eq!(sanitize_column_name("my Column"), "MY_COLUMN");
106/// assert_eq!(sanitize_column_name("123abc"), "_123ABC");
107/// ```
108#[must_use]
109pub fn sanitize_column_name(name: &str) -> String {
110    let mut result = String::with_capacity(name.len());
111
112    for (i, c) in name.chars().enumerate() {
113        if c.is_ascii_alphanumeric() || c == '_' {
114            result.push(c.to_ascii_uppercase());
115        } else {
116            result.push('_');
117        }
118        // If first character is a digit, we'll prefix with underscore later
119        if i == 0 && c.is_ascii_digit() {
120            // Mark for prefix - we handle this after the loop
121        }
122    }
123
124    // Prefix with underscore if name starts with a digit
125    if result.chars().next().is_some_and(|c| c.is_ascii_digit()) {
126        result = format!("_{result}");
127    }
128
129    // Handle empty result
130    if result.is_empty() {
131        return "_".to_string();
132    }
133
134    result
135}
136
137/// Quote a column name for use in Exasol DDL.
138///
139/// This function wraps the name in double quotes and escapes
140/// any internal double quotes by doubling them.
141///
142/// # Arguments
143///
144/// * `name` - The original column name
145///
146/// # Returns
147///
148/// A quoted column name suitable for DDL statements.
149///
150/// # Example
151///
152/// ```ignore
153/// assert_eq!(quote_column_name("my Column"), "\"my Column\"");
154/// assert_eq!(quote_column_name("col\"name"), "\"col\"\"name\"");
155/// ```
156#[must_use]
157pub fn quote_column_name(name: &str) -> String {
158    let escaped = name.replace('"', "\"\"");
159    format!("\"{escaped}\"")
160}
161
162/// Quote an identifier (table or schema name) for Exasol DDL.
163///
164/// Same escaping rules as `quote_column_name`.
165#[must_use]
166pub fn quote_identifier(name: &str) -> String {
167    quote_column_name(name)
168}
169
170/// Format a column name according to the specified mode.
171#[must_use]
172pub fn format_column_name(name: &str, mode: ColumnNameMode) -> String {
173    match mode {
174        ColumnNameMode::Quoted => quote_column_name(name),
175        ColumnNameMode::Sanitize => sanitize_column_name(name),
176    }
177}
178
179/// Infer an Exasol table schema from a single Parquet file.
180///
181/// This function reads only the Parquet file metadata (not the actual data)
182/// to determine the schema.
183///
184/// # Arguments
185///
186/// * `file_path` - Path to the Parquet file
187/// * `column_name_mode` - How to handle column names in DDL
188///
189/// # Returns
190///
191/// An `InferredTableSchema` containing column definitions.
192///
193/// # Errors
194///
195/// Returns `ImportError::SchemaInferenceError` if:
196/// - The file cannot be opened
197/// - The file is not a valid Parquet file
198/// - A column type cannot be mapped to Exasol
199pub fn infer_schema_from_parquet(
200    file_path: &Path,
201    column_name_mode: ColumnNameMode,
202) -> Result<InferredTableSchema, ImportError> {
203    let file = std::fs::File::open(file_path).map_err(|e| {
204        ImportError::SchemaInferenceError(format!(
205            "Failed to open file '{}': {}",
206            file_path.display(),
207            e
208        ))
209    })?;
210
211    let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
212        ImportError::SchemaInferenceError(format!(
213            "Failed to read Parquet metadata from '{}': {}",
214            file_path.display(),
215            e
216        ))
217    })?;
218
219    let arrow_schema = builder.schema();
220
221    let columns = arrow_schema_to_columns(arrow_schema, column_name_mode)?;
222
223    Ok(InferredTableSchema {
224        columns,
225        source_files: vec![file_path.to_path_buf()],
226    })
227}
228
229/// Infer a union schema from multiple Parquet files.
230///
231/// This function reads metadata from all files and computes a schema
232/// that can accommodate data from all of them using type widening.
233///
234/// # Arguments
235///
236/// * `file_paths` - Paths to the Parquet files
237/// * `column_name_mode` - How to handle column names in DDL
238///
239/// # Returns
240///
241/// An `InferredTableSchema` with widened types.
242///
243/// # Errors
244///
245/// Returns `ImportError::SchemaInferenceError` if:
246/// - Any file cannot be read
247/// - Schemas have incompatible column counts
248/// - A type cannot be mapped
249pub fn infer_schema_from_parquet_files(
250    file_paths: &[PathBuf],
251    column_name_mode: ColumnNameMode,
252) -> Result<InferredTableSchema, ImportError> {
253    if file_paths.is_empty() {
254        return Err(ImportError::SchemaInferenceError(
255            "No files provided for schema inference".to_string(),
256        ));
257    }
258
259    if file_paths.len() == 1 {
260        return infer_schema_from_parquet(&file_paths[0], column_name_mode);
261    }
262
263    // Read schemas from all files
264    let mut schemas: Vec<(PathBuf, Schema)> = Vec::with_capacity(file_paths.len());
265
266    for path in file_paths {
267        let file = std::fs::File::open(path).map_err(|e| {
268            ImportError::SchemaInferenceError(format!(
269                "Failed to open file '{}': {}",
270                path.display(),
271                e
272            ))
273        })?;
274
275        let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
276            ImportError::SchemaInferenceError(format!(
277                "Failed to read Parquet metadata from '{}': {}",
278                path.display(),
279                e
280            ))
281        })?;
282
283        schemas.push((path.clone(), builder.schema().as_ref().clone()));
284    }
285
286    // Start with the first schema
287    let (first_path, first_schema) = &schemas[0];
288    let mut columns = arrow_schema_to_columns(first_schema, column_name_mode)?;
289
290    // Merge with remaining schemas
291    for (path, schema) in schemas.iter().skip(1) {
292        if schema.fields().len() != columns.len() {
293            return Err(ImportError::SchemaMismatchError(format!(
294                "Schema mismatch: '{}' has {} columns, but '{}' has {} columns",
295                first_path.display(),
296                columns.len(),
297                path.display(),
298                schema.fields().len()
299            )));
300        }
301
302        for (i, field) in schema.fields().iter().enumerate() {
303            let other_type = TypeMapper::arrow_to_exasol(field.data_type()).map_err(|e| {
304                ImportError::SchemaInferenceError(format!(
305                    "Failed to map type for column '{}' in '{}': {}",
306                    field.name(),
307                    path.display(),
308                    e
309                ))
310            })?;
311
312            columns[i].exasol_type = widen_type(&columns[i].exasol_type, &other_type);
313            columns[i].nullable = columns[i].nullable || field.is_nullable();
314        }
315    }
316
317    Ok(InferredTableSchema {
318        columns,
319        source_files: file_paths.to_vec(),
320    })
321}
322
323/// Convert an Arrow schema to a list of inferred columns.
324fn arrow_schema_to_columns(
325    schema: &Schema,
326    column_name_mode: ColumnNameMode,
327) -> Result<Vec<InferredColumn>, ImportError> {
328    let mut columns = Vec::with_capacity(schema.fields().len());
329
330    for field in schema.fields() {
331        let exasol_type = TypeMapper::arrow_to_exasol(field.data_type()).map_err(|e| {
332            ImportError::SchemaInferenceError(format!(
333                "Failed to map type for column '{}': {}",
334                field.name(),
335                e
336            ))
337        })?;
338
339        columns.push(InferredColumn {
340            original_name: field.name().clone(),
341            ddl_name: format_column_name(field.name(), column_name_mode),
342            exasol_type,
343            nullable: field.is_nullable(),
344        });
345    }
346
347    Ok(columns)
348}
349
350/// Widen two Exasol types to a common supertype.
351///
352/// Type widening rules:
353/// - Identical types remain unchanged
354/// - DECIMAL: max(precision), max(scale)
355/// - VARCHAR: max(size)
356/// - DECIMAL + DOUBLE -> DOUBLE
357/// - Incompatible types -> VARCHAR(2000000) as fallback
358///
359/// # Arguments
360///
361/// * `a` - First type
362/// * `b` - Second type
363///
364/// # Returns
365///
366/// A type that can represent values from both input types.
367#[must_use]
368pub fn widen_type(a: &ExasolType, b: &ExasolType) -> ExasolType {
369    if a == b {
370        return a.clone();
371    }
372
373    match (a, b) {
374        // Decimal widening: max precision and scale
375        (
376            ExasolType::Decimal {
377                precision: p1,
378                scale: s1,
379            },
380            ExasolType::Decimal {
381                precision: p2,
382                scale: s2,
383            },
384        ) => {
385            let max_precision = (*p1).max(*p2).min(36); // Cap at Exasol's max precision
386            let max_scale = (*s1).max(*s2);
387            ExasolType::Decimal {
388                precision: max_precision,
389                scale: max_scale,
390            }
391        }
392
393        // VARCHAR widening: max size
394        (ExasolType::Varchar { size: s1 }, ExasolType::Varchar { size: s2 }) => {
395            ExasolType::Varchar {
396                size: (*s1).max(*s2).min(2_000_000),
397            }
398        }
399
400        // CHAR widening: max size
401        (ExasolType::Char { size: s1 }, ExasolType::Char { size: s2 }) => ExasolType::Char {
402            size: (*s1).max(*s2).min(2_000),
403        },
404
405        // CHAR + VARCHAR -> VARCHAR
406        (ExasolType::Char { size: s1 }, ExasolType::Varchar { size: s2 })
407        | (ExasolType::Varchar { size: s2 }, ExasolType::Char { size: s1 }) => {
408            ExasolType::Varchar {
409                size: (*s1).max(*s2).min(2_000_000),
410            }
411        }
412
413        // Decimal + Double -> Double
414        (ExasolType::Decimal { .. }, ExasolType::Double)
415        | (ExasolType::Double, ExasolType::Decimal { .. }) => ExasolType::Double,
416
417        // Timestamp widening: prefer with timezone
418        (
419            ExasolType::Timestamp {
420                with_local_time_zone: tz1,
421            },
422            ExasolType::Timestamp {
423                with_local_time_zone: tz2,
424            },
425        ) => ExasolType::Timestamp {
426            with_local_time_zone: *tz1 || *tz2,
427        },
428
429        // Interval widening
430        (
431            ExasolType::IntervalDayToSecond { precision: p1 },
432            ExasolType::IntervalDayToSecond { precision: p2 },
433        ) => ExasolType::IntervalDayToSecond {
434            precision: (*p1).max(*p2),
435        },
436
437        // Incompatible types: fall back to VARCHAR
438        _ => ExasolType::Varchar { size: 2_000_000 },
439    }
440}
441
442#[cfg(test)]
443mod tests {
444    use super::*;
445
446    #[test]
447    fn test_sanitize_column_name_simple() {
448        assert_eq!(sanitize_column_name("name"), "NAME");
449        assert_eq!(sanitize_column_name("MyColumn"), "MYCOLUMN");
450    }
451
452    #[test]
453    fn test_sanitize_column_name_with_spaces() {
454        assert_eq!(sanitize_column_name("my column"), "MY_COLUMN");
455        assert_eq!(sanitize_column_name("first name"), "FIRST_NAME");
456    }
457
458    #[test]
459    fn test_sanitize_column_name_special_chars() {
460        assert_eq!(sanitize_column_name("col@#$%"), "COL____");
461        assert_eq!(sanitize_column_name("a-b-c"), "A_B_C");
462    }
463
464    #[test]
465    fn test_sanitize_column_name_starts_with_digit() {
466        assert_eq!(sanitize_column_name("123abc"), "_123ABC");
467        assert_eq!(sanitize_column_name("1st_column"), "_1ST_COLUMN");
468    }
469
470    #[test]
471    fn test_sanitize_column_name_empty() {
472        assert_eq!(sanitize_column_name(""), "_");
473    }
474
475    #[test]
476    fn test_sanitize_column_name_all_special() {
477        assert_eq!(sanitize_column_name("@#$"), "___");
478    }
479
480    #[test]
481    fn test_quote_column_name_simple() {
482        assert_eq!(quote_column_name("name"), "\"name\"");
483        assert_eq!(quote_column_name("MyColumn"), "\"MyColumn\"");
484    }
485
486    #[test]
487    fn test_quote_column_name_with_spaces() {
488        assert_eq!(quote_column_name("my column"), "\"my column\"");
489    }
490
491    #[test]
492    fn test_quote_column_name_with_quotes() {
493        assert_eq!(quote_column_name("col\"name"), "\"col\"\"name\"");
494        assert_eq!(quote_column_name("\"quoted\""), "\"\"\"quoted\"\"\"");
495    }
496
497    #[test]
498    fn test_format_column_name_quoted_mode() {
499        assert_eq!(
500            format_column_name("my Column", ColumnNameMode::Quoted),
501            "\"my Column\""
502        );
503    }
504
505    #[test]
506    fn test_format_column_name_sanitize_mode() {
507        assert_eq!(
508            format_column_name("my Column", ColumnNameMode::Sanitize),
509            "MY_COLUMN"
510        );
511    }
512
513    #[test]
514    fn test_widen_type_identical() {
515        let t = ExasolType::Boolean;
516        assert_eq!(widen_type(&t, &t), ExasolType::Boolean);
517    }
518
519    #[test]
520    fn test_widen_type_decimal() {
521        let t1 = ExasolType::Decimal {
522            precision: 10,
523            scale: 2,
524        };
525        let t2 = ExasolType::Decimal {
526            precision: 15,
527            scale: 4,
528        };
529        assert_eq!(
530            widen_type(&t1, &t2),
531            ExasolType::Decimal {
532                precision: 15,
533                scale: 4
534            }
535        );
536    }
537
538    #[test]
539    fn test_widen_type_varchar() {
540        let t1 = ExasolType::Varchar { size: 100 };
541        let t2 = ExasolType::Varchar { size: 500 };
542        assert_eq!(widen_type(&t1, &t2), ExasolType::Varchar { size: 500 });
543    }
544
545    #[test]
546    fn test_widen_type_char_varchar() {
547        let t1 = ExasolType::Char { size: 50 };
548        let t2 = ExasolType::Varchar { size: 100 };
549        assert_eq!(widen_type(&t1, &t2), ExasolType::Varchar { size: 100 });
550    }
551
552    #[test]
553    fn test_widen_type_decimal_double() {
554        let t1 = ExasolType::Decimal {
555            precision: 18,
556            scale: 2,
557        };
558        let t2 = ExasolType::Double;
559        assert_eq!(widen_type(&t1, &t2), ExasolType::Double);
560        assert_eq!(widen_type(&t2, &t1), ExasolType::Double);
561    }
562
563    #[test]
564    fn test_widen_type_incompatible() {
565        let t1 = ExasolType::Boolean;
566        let t2 = ExasolType::Date;
567        assert_eq!(
568            widen_type(&t1, &t2),
569            ExasolType::Varchar { size: 2_000_000 }
570        );
571    }
572
573    #[test]
574    fn test_widen_type_timestamp() {
575        let t1 = ExasolType::Timestamp {
576            with_local_time_zone: false,
577        };
578        let t2 = ExasolType::Timestamp {
579            with_local_time_zone: true,
580        };
581        assert_eq!(
582            widen_type(&t1, &t2),
583            ExasolType::Timestamp {
584                with_local_time_zone: true
585            }
586        );
587    }
588
589    #[test]
590    fn test_inferred_table_schema_to_ddl_basic() {
591        let schema = InferredTableSchema {
592            columns: vec![
593                InferredColumn {
594                    original_name: "id".to_string(),
595                    ddl_name: "\"id\"".to_string(),
596                    exasol_type: ExasolType::Decimal {
597                        precision: 18,
598                        scale: 0,
599                    },
600                    nullable: false,
601                },
602                InferredColumn {
603                    original_name: "name".to_string(),
604                    ddl_name: "\"name\"".to_string(),
605                    exasol_type: ExasolType::Varchar { size: 100 },
606                    nullable: true,
607                },
608            ],
609            source_files: vec![],
610        };
611
612        let ddl = schema.to_ddl("my_table", None);
613        // Table name is NOT quoted (matches IMPORT statement format)
614        assert!(ddl.contains("CREATE TABLE my_table"));
615        // Column names ARE quoted (based on ColumnNameMode)
616        assert!(ddl.contains("\"id\" DECIMAL(18,0)"));
617        assert!(ddl.contains("\"name\" VARCHAR(100)"));
618    }
619
620    #[test]
621    fn test_inferred_table_schema_to_ddl_with_schema() {
622        let schema = InferredTableSchema {
623            columns: vec![InferredColumn {
624                original_name: "col".to_string(),
625                ddl_name: "\"col\"".to_string(),
626                exasol_type: ExasolType::Boolean,
627                nullable: false,
628            }],
629            source_files: vec![],
630        };
631
632        let ddl = schema.to_ddl("my_table", Some("my_schema"));
633        // Table and schema names are NOT quoted (matches IMPORT statement format)
634        assert!(ddl.contains("CREATE TABLE my_schema.my_table"));
635    }
636
637    // Note: Integration tests for infer_schema_from_parquet would require
638    // creating actual Parquet files, which is better suited for the
639    // integration test suite.
640}