use std::path::{Path, PathBuf};
use arrow::datatypes::Schema;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use super::mapping::{ColumnNameMode, ExasolType, TypeMapper};
use crate::import::ImportError;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct InferredColumn {
pub original_name: String,
pub ddl_name: String,
pub exasol_type: ExasolType,
pub nullable: bool,
}
#[derive(Debug, Clone)]
pub struct InferredTableSchema {
pub columns: Vec<InferredColumn>,
pub source_files: Vec<PathBuf>,
}
impl InferredTableSchema {
#[must_use]
pub fn to_ddl(&self, table_name: &str, schema_name: Option<&str>) -> String {
let table_ref = if let Some(schema) = schema_name {
format!("{schema}.{table_name}")
} else {
table_name.to_string()
};
let column_defs: Vec<String> = self
.columns
.iter()
.map(|col| format!(" {} {}", col.ddl_name, col.exasol_type.to_ddl_type()))
.collect();
format!(
"CREATE TABLE {} (\n{}\n);",
table_ref,
column_defs.join(",\n")
)
}
}
#[must_use]
pub fn sanitize_column_name(name: &str) -> String {
let mut result = String::with_capacity(name.len());
for (i, c) in name.chars().enumerate() {
if c.is_ascii_alphanumeric() || c == '_' {
result.push(c.to_ascii_uppercase());
} else {
result.push('_');
}
if i == 0 && c.is_ascii_digit() {
}
}
if result.chars().next().is_some_and(|c| c.is_ascii_digit()) {
result = format!("_{result}");
}
if result.is_empty() {
return "_".to_string();
}
result
}
#[must_use]
pub fn quote_column_name(name: &str) -> String {
let escaped = name.replace('"', "\"\"");
format!("\"{escaped}\"")
}
#[must_use]
pub fn quote_identifier(name: &str) -> String {
quote_column_name(name)
}
#[must_use]
pub fn format_column_name(name: &str, mode: ColumnNameMode) -> String {
match mode {
ColumnNameMode::Quoted => quote_column_name(name),
ColumnNameMode::Sanitize => sanitize_column_name(name),
}
}
pub fn infer_schema_from_parquet(
file_path: &Path,
column_name_mode: ColumnNameMode,
) -> Result<InferredTableSchema, ImportError> {
let file = std::fs::File::open(file_path).map_err(|e| {
ImportError::SchemaInferenceError(format!(
"Failed to open file '{}': {}",
file_path.display(),
e
))
})?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
ImportError::SchemaInferenceError(format!(
"Failed to read Parquet metadata from '{}': {}",
file_path.display(),
e
))
})?;
let arrow_schema = builder.schema();
let columns = arrow_schema_to_columns(arrow_schema, column_name_mode)?;
Ok(InferredTableSchema {
columns,
source_files: vec![file_path.to_path_buf()],
})
}
pub fn infer_schema_from_parquet_files(
file_paths: &[PathBuf],
column_name_mode: ColumnNameMode,
) -> Result<InferredTableSchema, ImportError> {
if file_paths.is_empty() {
return Err(ImportError::SchemaInferenceError(
"No files provided for schema inference".to_string(),
));
}
if file_paths.len() == 1 {
return infer_schema_from_parquet(&file_paths[0], column_name_mode);
}
let mut schemas: Vec<(PathBuf, Schema)> = Vec::with_capacity(file_paths.len());
for path in file_paths {
let file = std::fs::File::open(path).map_err(|e| {
ImportError::SchemaInferenceError(format!(
"Failed to open file '{}': {}",
path.display(),
e
))
})?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
ImportError::SchemaInferenceError(format!(
"Failed to read Parquet metadata from '{}': {}",
path.display(),
e
))
})?;
schemas.push((path.clone(), builder.schema().as_ref().clone()));
}
let (first_path, first_schema) = &schemas[0];
let mut columns = arrow_schema_to_columns(first_schema, column_name_mode)?;
for (path, schema) in schemas.iter().skip(1) {
if schema.fields().len() != columns.len() {
return Err(ImportError::SchemaMismatchError(format!(
"Schema mismatch: '{}' has {} columns, but '{}' has {} columns",
first_path.display(),
columns.len(),
path.display(),
schema.fields().len()
)));
}
for (i, field) in schema.fields().iter().enumerate() {
let other_type = TypeMapper::arrow_to_exasol(field.data_type()).map_err(|e| {
ImportError::SchemaInferenceError(format!(
"Failed to map type for column '{}' in '{}': {}",
field.name(),
path.display(),
e
))
})?;
columns[i].exasol_type = widen_type(&columns[i].exasol_type, &other_type);
columns[i].nullable = columns[i].nullable || field.is_nullable();
}
}
Ok(InferredTableSchema {
columns,
source_files: file_paths.to_vec(),
})
}
fn arrow_schema_to_columns(
schema: &Schema,
column_name_mode: ColumnNameMode,
) -> Result<Vec<InferredColumn>, ImportError> {
let mut columns = Vec::with_capacity(schema.fields().len());
for field in schema.fields() {
let exasol_type = TypeMapper::arrow_to_exasol(field.data_type()).map_err(|e| {
ImportError::SchemaInferenceError(format!(
"Failed to map type for column '{}': {}",
field.name(),
e
))
})?;
columns.push(InferredColumn {
original_name: field.name().clone(),
ddl_name: format_column_name(field.name(), column_name_mode),
exasol_type,
nullable: field.is_nullable(),
});
}
Ok(columns)
}
#[must_use]
pub fn widen_type(a: &ExasolType, b: &ExasolType) -> ExasolType {
if a == b {
return a.clone();
}
match (a, b) {
(
ExasolType::Decimal {
precision: p1,
scale: s1,
},
ExasolType::Decimal {
precision: p2,
scale: s2,
},
) => {
let max_precision = (*p1).max(*p2).min(36); let max_scale = (*s1).max(*s2);
ExasolType::Decimal {
precision: max_precision,
scale: max_scale,
}
}
(ExasolType::Varchar { size: s1 }, ExasolType::Varchar { size: s2 }) => {
ExasolType::Varchar {
size: (*s1).max(*s2).min(2_000_000),
}
}
(ExasolType::Char { size: s1 }, ExasolType::Char { size: s2 }) => ExasolType::Char {
size: (*s1).max(*s2).min(2_000),
},
(ExasolType::Char { size: s1 }, ExasolType::Varchar { size: s2 })
| (ExasolType::Varchar { size: s2 }, ExasolType::Char { size: s1 }) => {
ExasolType::Varchar {
size: (*s1).max(*s2).min(2_000_000),
}
}
(ExasolType::Decimal { .. }, ExasolType::Double)
| (ExasolType::Double, ExasolType::Decimal { .. }) => ExasolType::Double,
(
ExasolType::Timestamp {
with_local_time_zone: tz1,
},
ExasolType::Timestamp {
with_local_time_zone: tz2,
},
) => ExasolType::Timestamp {
with_local_time_zone: *tz1 || *tz2,
},
(
ExasolType::IntervalDayToSecond { precision: p1 },
ExasolType::IntervalDayToSecond { precision: p2 },
) => ExasolType::IntervalDayToSecond {
precision: (*p1).max(*p2),
},
_ => ExasolType::Varchar { size: 2_000_000 },
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sanitize_column_name_simple() {
assert_eq!(sanitize_column_name("name"), "NAME");
assert_eq!(sanitize_column_name("MyColumn"), "MYCOLUMN");
}
#[test]
fn test_sanitize_column_name_with_spaces() {
assert_eq!(sanitize_column_name("my column"), "MY_COLUMN");
assert_eq!(sanitize_column_name("first name"), "FIRST_NAME");
}
#[test]
fn test_sanitize_column_name_special_chars() {
assert_eq!(sanitize_column_name("col@#$%"), "COL____");
assert_eq!(sanitize_column_name("a-b-c"), "A_B_C");
}
#[test]
fn test_sanitize_column_name_starts_with_digit() {
assert_eq!(sanitize_column_name("123abc"), "_123ABC");
assert_eq!(sanitize_column_name("1st_column"), "_1ST_COLUMN");
}
#[test]
fn test_sanitize_column_name_empty() {
assert_eq!(sanitize_column_name(""), "_");
}
#[test]
fn test_sanitize_column_name_all_special() {
assert_eq!(sanitize_column_name("@#$"), "___");
}
#[test]
fn test_quote_column_name_simple() {
assert_eq!(quote_column_name("name"), "\"name\"");
assert_eq!(quote_column_name("MyColumn"), "\"MyColumn\"");
}
#[test]
fn test_quote_column_name_with_spaces() {
assert_eq!(quote_column_name("my column"), "\"my column\"");
}
#[test]
fn test_quote_column_name_with_quotes() {
assert_eq!(quote_column_name("col\"name"), "\"col\"\"name\"");
assert_eq!(quote_column_name("\"quoted\""), "\"\"\"quoted\"\"\"");
}
#[test]
fn test_format_column_name_quoted_mode() {
assert_eq!(
format_column_name("my Column", ColumnNameMode::Quoted),
"\"my Column\""
);
}
#[test]
fn test_format_column_name_sanitize_mode() {
assert_eq!(
format_column_name("my Column", ColumnNameMode::Sanitize),
"MY_COLUMN"
);
}
#[test]
fn test_widen_type_identical() {
let t = ExasolType::Boolean;
assert_eq!(widen_type(&t, &t), ExasolType::Boolean);
}
#[test]
fn test_widen_type_decimal() {
let t1 = ExasolType::Decimal {
precision: 10,
scale: 2,
};
let t2 = ExasolType::Decimal {
precision: 15,
scale: 4,
};
assert_eq!(
widen_type(&t1, &t2),
ExasolType::Decimal {
precision: 15,
scale: 4
}
);
}
#[test]
fn test_widen_type_varchar() {
let t1 = ExasolType::Varchar { size: 100 };
let t2 = ExasolType::Varchar { size: 500 };
assert_eq!(widen_type(&t1, &t2), ExasolType::Varchar { size: 500 });
}
#[test]
fn test_widen_type_char_varchar() {
let t1 = ExasolType::Char { size: 50 };
let t2 = ExasolType::Varchar { size: 100 };
assert_eq!(widen_type(&t1, &t2), ExasolType::Varchar { size: 100 });
}
#[test]
fn test_widen_type_decimal_double() {
let t1 = ExasolType::Decimal {
precision: 18,
scale: 2,
};
let t2 = ExasolType::Double;
assert_eq!(widen_type(&t1, &t2), ExasolType::Double);
assert_eq!(widen_type(&t2, &t1), ExasolType::Double);
}
#[test]
fn test_widen_type_incompatible() {
let t1 = ExasolType::Boolean;
let t2 = ExasolType::Date;
assert_eq!(
widen_type(&t1, &t2),
ExasolType::Varchar { size: 2_000_000 }
);
}
#[test]
fn test_widen_type_timestamp() {
let t1 = ExasolType::Timestamp {
with_local_time_zone: false,
};
let t2 = ExasolType::Timestamp {
with_local_time_zone: true,
};
assert_eq!(
widen_type(&t1, &t2),
ExasolType::Timestamp {
with_local_time_zone: true
}
);
}
#[test]
fn test_inferred_table_schema_to_ddl_basic() {
let schema = InferredTableSchema {
columns: vec![
InferredColumn {
original_name: "id".to_string(),
ddl_name: "\"id\"".to_string(),
exasol_type: ExasolType::Decimal {
precision: 18,
scale: 0,
},
nullable: false,
},
InferredColumn {
original_name: "name".to_string(),
ddl_name: "\"name\"".to_string(),
exasol_type: ExasolType::Varchar { size: 100 },
nullable: true,
},
],
source_files: vec![],
};
let ddl = schema.to_ddl("my_table", None);
assert!(ddl.contains("CREATE TABLE my_table"));
assert!(ddl.contains("\"id\" DECIMAL(18,0)"));
assert!(ddl.contains("\"name\" VARCHAR(100)"));
}
#[test]
fn test_inferred_table_schema_to_ddl_with_schema() {
let schema = InferredTableSchema {
columns: vec![InferredColumn {
original_name: "col".to_string(),
ddl_name: "\"col\"".to_string(),
exasol_type: ExasolType::Boolean,
nullable: false,
}],
source_files: vec![],
};
let ddl = schema.to_ddl("my_table", Some("my_schema"));
assert!(ddl.contains("CREATE TABLE my_schema.my_table"));
}
}