use arrow_schema::{Field, Schema};
use crate::schema::type_conversion::plan_arrow_data_type_as_mssql_type;
use crate::write::PlanOptions;
use crate::{
ArrowFieldRef, Diagnostic, DiagnosticCode, DiagnosticSet, FieldRef, Identifier, MssqlColumn,
MssqlProfile, PlanOutcome, Result, SchemaMapping, TableName, create_table_sql,
};
pub fn plan_arrow_schema_to_mssql_mappings(
schema: impl AsRef<Schema>,
_profile: MssqlProfile,
options: PlanOptions,
) -> Result<PlanOutcome<Vec<SchemaMapping>>> {
let schema = schema.as_ref();
let mut mappings = Vec::with_capacity(schema.fields().len());
let mut diagnostics = DiagnosticSet::new();
for (index, field) in schema.fields().iter().enumerate() {
match plan_arrow_field_to_mssql_column_mapping(index, field, &options) {
Ok(mapping) => mappings.push(mapping),
Err(diagnostic) => diagnostics.push(diagnostic),
}
}
if diagnostics.has_errors() {
return Err(crate::Error::Planning { diagnostics });
}
Ok(PlanOutcome::new(mappings, diagnostics))
}
pub fn mssql_columns_from_mappings(mappings: &[SchemaMapping]) -> Vec<MssqlColumn> {
mappings
.iter()
.map(|mapping| mapping.mssql().clone())
.collect()
}
pub fn create_table_sql_from_mappings(table: &TableName, mappings: &[SchemaMapping]) -> String {
create_table_sql(
table,
&mssql_columns_from_mappings(mappings),
crate::CreateTableOptions,
)
}
fn plan_arrow_field_to_mssql_column_mapping(
index: usize,
field: &Field,
options: &PlanOptions,
) -> std::result::Result<SchemaMapping, Diagnostic> {
let name = Identifier::new(field.name()).map_err(|err| {
Diagnostic::error(DiagnosticCode::IdentifierInvalid, err.to_string())
.with_field(FieldRef::new(index, field.name()))
})?;
let ty = plan_arrow_data_type_as_mssql_type(index, field, options)?;
let arrow = ArrowFieldRef::new(
index,
field.name().clone(),
field.is_nullable(),
field.data_type().clone(),
);
let mssql = MssqlColumn::new(name, ty, field.is_nullable());
Ok(SchemaMapping::new(arrow, mssql))
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow_schema::{DataType, Field, Schema, UnionFields, UnionMode};
use crate::{
DiagnosticCode, Error, MssqlProfile, MssqlType, PlanOptions, TableName,
create_table_sql_from_mappings, mssql_columns_from_mappings,
plan_arrow_schema_to_mssql_mappings,
};
#[test]
fn plans_boolean_and_int32_mappings() {
let schema = Arc::new(Schema::new(vec![
Field::new("is_active", DataType::Boolean, false),
Field::new("quantity", DataType::Int32, true),
]));
let outcome = plan_arrow_schema_to_mssql_mappings(
Arc::clone(&schema),
MssqlProfile::sql_server_2016_compat_100(),
PlanOptions::default(),
)
.unwrap();
let mappings = outcome.value();
assert_eq!(mappings.len(), 2);
let is_active = &mappings[0];
assert_eq!(is_active.arrow().index(), 0);
assert_eq!(is_active.arrow().name(), "is_active");
assert_eq!(is_active.arrow().data_type(), &DataType::Boolean);
assert!(!is_active.arrow().nullable());
assert_eq!(is_active.mssql().name().quoted_sql(), "[is_active]");
assert!(!is_active.mssql().nullable());
assert_eq!(is_active.mssql().ty(), &MssqlType::Bit);
let quantity = &mappings[1];
assert_eq!(quantity.arrow().index(), 1);
assert_eq!(quantity.arrow().name(), "quantity");
assert_eq!(quantity.arrow().data_type(), &DataType::Int32);
assert!(quantity.arrow().nullable());
assert_eq!(quantity.mssql().name().quoted_sql(), "[quantity]");
assert!(quantity.mssql().nullable());
assert_eq!(quantity.mssql().ty(), &MssqlType::Int);
}
#[test]
fn renders_create_table_sql_from_mssql_side() {
let schema = Schema::new(vec![
Field::new("is_active", DataType::Boolean, false),
Field::new("quantity", DataType::Int32, true),
]);
let outcome = plan_arrow_schema_to_mssql_mappings(
Arc::new(schema),
MssqlProfile::sql_server_2016_compat_100(),
PlanOptions::default(),
)
.unwrap();
let table = TableName::new("dbo", "target").unwrap();
let sql = create_table_sql_from_mappings(&table, outcome.value());
assert_eq!(
sql,
"CREATE TABLE [dbo].[target] (\n [is_active] bit NOT NULL,\n [quantity] int NULL\n);"
);
}
#[test]
fn exposes_mssql_columns_without_arrow_identity() {
let schema = Schema::new(vec![Field::new("is_active", DataType::Boolean, false)]);
let outcome = plan_arrow_schema_to_mssql_mappings(
Arc::new(schema),
MssqlProfile::sql_server_2016_compat_100(),
PlanOptions::default(),
)
.unwrap();
let columns = mssql_columns_from_mappings(outcome.value());
assert_eq!(columns.len(), 1);
assert_eq!(columns[0].name().as_str(), "is_active");
assert_eq!(columns[0].ty(), &MssqlType::Bit);
assert!(!columns[0].nullable());
}
#[test]
fn unsupported_nested_and_encoded_types_collect_schema_order_diagnostics() {
let union_fields = UnionFields::try_new(
[1_i8, 2],
[
Field::new("left", DataType::Int32, true),
Field::new("right", DataType::Utf8, true),
],
)
.unwrap();
let schema = Schema::new(vec![
Field::new("ok", DataType::Int32, false),
Field::new("list_col", DataType::new_list(DataType::Int64, true), true),
Field::new(
"struct_col",
DataType::Struct(
vec![Field::new("child", DataType::Boolean, true)]
.into_iter()
.collect(),
),
true,
),
Field::new(
"union_col",
DataType::Union(union_fields, UnionMode::Sparse),
true,
),
Field::new(
"run_end_col",
DataType::RunEndEncoded(
Arc::new(Field::new("run_ends", DataType::Int32, false)),
Arc::new(Field::new("values", DataType::Utf8, true)),
),
true,
),
]);
let err = plan_arrow_schema_to_mssql_mappings(
Arc::new(schema),
MssqlProfile::sql_server_2016_compat_100(),
PlanOptions::default(),
)
.expect_err("unsupported fields should produce diagnostics");
let Error::Planning { diagnostics } = err else {
panic!("expected planning error");
};
assert_eq!(diagnostics.len(), 4);
assert!(
diagnostics
.all()
.iter()
.all(|diagnostic| diagnostic.code() == DiagnosticCode::UnsupportedArrowType)
);
let field_refs = diagnostics
.all()
.iter()
.map(|diagnostic| {
let field = diagnostic.field().unwrap();
(field.index(), field.name())
})
.collect::<Vec<_>>();
assert_eq!(
field_refs,
vec![
(1, "list_col"),
(2, "struct_col"),
(3, "union_col"),
(4, "run_end_col"),
]
);
let messages = diagnostics
.all()
.iter()
.map(crate::Diagnostic::message)
.collect::<Vec<_>>();
assert!(messages[0].contains("nested"));
assert!(messages[1].contains("nested"));
assert!(messages[2].contains("nested"));
assert!(messages[3].contains("encoded"));
}
#[test]
fn invalid_identifier_returns_structured_planning_diagnostic() {
let schema = Schema::new(vec![Field::new("", DataType::Boolean, false)]);
let err = plan_arrow_schema_to_mssql_mappings(
Arc::new(schema),
MssqlProfile::sql_server_2016_compat_100(),
PlanOptions::default(),
)
.expect_err("empty field name should be rejected");
let Error::Planning { diagnostics } = err else {
panic!("expected planning error");
};
assert_eq!(diagnostics.len(), 1);
let diagnostic = &diagnostics.all()[0];
assert_eq!(diagnostic.code(), DiagnosticCode::IdentifierInvalid);
assert_eq!(diagnostic.field().unwrap().index(), 0);
assert_eq!(diagnostic.field().unwrap().name(), "");
}
}