Skip to main content

arrow_tiberius/schema/
table_mapping.rs

1//! Bidirectional Arrow/MSSQL schema mapping.
2//!
3//! The initial mapping function starts from an Arrow schema because the first
4//! operation is Arrow-to-SQL Server writing. The resulting `SchemaMapping`
5//! values keep Arrow field metadata and MSSQL column metadata as peer concepts
6//! so future SQL Server-to-Arrow read planning can reuse the shared
7//! representation instead of inheriting a write-only column model.
8
9use arrow_schema::{Field, Schema};
10
11use crate::schema::type_conversion::plan_arrow_data_type_as_mssql_type;
12use crate::write::PlanOptions;
13use crate::{
14    ArrowFieldRef, Diagnostic, DiagnosticCode, DiagnosticSet, FieldRef, Identifier, MssqlColumn,
15    MssqlProfile, PlanOutcome, Result, SchemaMapping, TableName, create_table_sql,
16};
17
18/// Plans Arrow/MSSQL column mappings from an Arrow schema.
19pub fn plan_arrow_schema_to_mssql_mappings(
20    schema: impl AsRef<Schema>,
21    _profile: MssqlProfile,
22    options: PlanOptions,
23) -> Result<PlanOutcome<Vec<SchemaMapping>>> {
24    let schema = schema.as_ref();
25    let mut mappings = Vec::with_capacity(schema.fields().len());
26    let mut diagnostics = DiagnosticSet::new();
27
28    for (index, field) in schema.fields().iter().enumerate() {
29        match plan_arrow_field_to_mssql_column_mapping(index, field, &options) {
30            Ok(mapping) => mappings.push(mapping),
31            Err(diagnostic) => diagnostics.push(diagnostic),
32        }
33    }
34
35    if diagnostics.has_errors() {
36        return Err(crate::Error::Planning { diagnostics });
37    }
38
39    Ok(PlanOutcome::new(mappings, diagnostics))
40}
41
42/// Returns the planned MSSQL columns in mapping order.
43pub fn mssql_columns_from_mappings(mappings: &[SchemaMapping]) -> Vec<MssqlColumn> {
44    mappings
45        .iter()
46        .map(|mapping| mapping.mssql().clone())
47        .collect()
48}
49
50/// Renders deterministic `CREATE TABLE` SQL from mapping metadata.
51pub fn create_table_sql_from_mappings(table: &TableName, mappings: &[SchemaMapping]) -> String {
52    create_table_sql(
53        table,
54        &mssql_columns_from_mappings(mappings),
55        crate::CreateTableOptions,
56    )
57}
58
59fn plan_arrow_field_to_mssql_column_mapping(
60    index: usize,
61    field: &Field,
62    options: &PlanOptions,
63) -> std::result::Result<SchemaMapping, Diagnostic> {
64    let name = Identifier::new(field.name()).map_err(|err| {
65        Diagnostic::error(DiagnosticCode::IdentifierInvalid, err.to_string())
66            .with_field(FieldRef::new(index, field.name()))
67    })?;
68
69    let ty = plan_arrow_data_type_as_mssql_type(index, field, options)?;
70
71    let arrow = ArrowFieldRef::new(
72        index,
73        field.name().clone(),
74        field.is_nullable(),
75        field.data_type().clone(),
76    );
77    let mssql = MssqlColumn::new(name, ty, field.is_nullable());
78
79    Ok(SchemaMapping::new(arrow, mssql))
80}
81
82#[cfg(test)]
83mod tests {
84    use std::sync::Arc;
85
86    use arrow_schema::{DataType, Field, Schema, UnionFields, UnionMode};
87
88    use crate::{
89        DiagnosticCode, Error, MssqlProfile, MssqlType, PlanOptions, TableName,
90        create_table_sql_from_mappings, mssql_columns_from_mappings,
91        plan_arrow_schema_to_mssql_mappings,
92    };
93
94    #[test]
95    fn plans_boolean_and_int32_mappings() {
96        let schema = Arc::new(Schema::new(vec![
97            Field::new("is_active", DataType::Boolean, false),
98            Field::new("quantity", DataType::Int32, true),
99        ]));
100
101        let outcome = plan_arrow_schema_to_mssql_mappings(
102            Arc::clone(&schema),
103            MssqlProfile::sql_server_2016_compat_100(),
104            PlanOptions::default(),
105        )
106        .unwrap();
107        let mappings = outcome.value();
108
109        assert_eq!(mappings.len(), 2);
110
111        let is_active = &mappings[0];
112        assert_eq!(is_active.arrow().index(), 0);
113        assert_eq!(is_active.arrow().name(), "is_active");
114        assert_eq!(is_active.arrow().data_type(), &DataType::Boolean);
115        assert!(!is_active.arrow().nullable());
116        assert_eq!(is_active.mssql().name().quoted_sql(), "[is_active]");
117        assert!(!is_active.mssql().nullable());
118        assert_eq!(is_active.mssql().ty(), &MssqlType::Bit);
119
120        let quantity = &mappings[1];
121        assert_eq!(quantity.arrow().index(), 1);
122        assert_eq!(quantity.arrow().name(), "quantity");
123        assert_eq!(quantity.arrow().data_type(), &DataType::Int32);
124        assert!(quantity.arrow().nullable());
125        assert_eq!(quantity.mssql().name().quoted_sql(), "[quantity]");
126        assert!(quantity.mssql().nullable());
127        assert_eq!(quantity.mssql().ty(), &MssqlType::Int);
128    }
129
130    #[test]
131    fn renders_create_table_sql_from_mssql_side() {
132        let schema = Schema::new(vec![
133            Field::new("is_active", DataType::Boolean, false),
134            Field::new("quantity", DataType::Int32, true),
135        ]);
136        let outcome = plan_arrow_schema_to_mssql_mappings(
137            Arc::new(schema),
138            MssqlProfile::sql_server_2016_compat_100(),
139            PlanOptions::default(),
140        )
141        .unwrap();
142        let table = TableName::new("dbo", "target").unwrap();
143
144        let sql = create_table_sql_from_mappings(&table, outcome.value());
145
146        assert_eq!(
147            sql,
148            "CREATE TABLE [dbo].[target] (\n    [is_active] bit NOT NULL,\n    [quantity] int NULL\n);"
149        );
150    }
151
152    #[test]
153    fn exposes_mssql_columns_without_arrow_identity() {
154        let schema = Schema::new(vec![Field::new("is_active", DataType::Boolean, false)]);
155        let outcome = plan_arrow_schema_to_mssql_mappings(
156            Arc::new(schema),
157            MssqlProfile::sql_server_2016_compat_100(),
158            PlanOptions::default(),
159        )
160        .unwrap();
161
162        let columns = mssql_columns_from_mappings(outcome.value());
163
164        assert_eq!(columns.len(), 1);
165        assert_eq!(columns[0].name().as_str(), "is_active");
166        assert_eq!(columns[0].ty(), &MssqlType::Bit);
167        assert!(!columns[0].nullable());
168    }
169
170    #[test]
171    fn unsupported_nested_and_encoded_types_collect_schema_order_diagnostics() {
172        let union_fields = UnionFields::try_new(
173            [1_i8, 2],
174            [
175                Field::new("left", DataType::Int32, true),
176                Field::new("right", DataType::Utf8, true),
177            ],
178        )
179        .unwrap();
180        let schema = Schema::new(vec![
181            Field::new("ok", DataType::Int32, false),
182            Field::new("list_col", DataType::new_list(DataType::Int64, true), true),
183            Field::new(
184                "struct_col",
185                DataType::Struct(
186                    vec![Field::new("child", DataType::Boolean, true)]
187                        .into_iter()
188                        .collect(),
189                ),
190                true,
191            ),
192            Field::new(
193                "union_col",
194                DataType::Union(union_fields, UnionMode::Sparse),
195                true,
196            ),
197            Field::new(
198                "run_end_col",
199                DataType::RunEndEncoded(
200                    Arc::new(Field::new("run_ends", DataType::Int32, false)),
201                    Arc::new(Field::new("values", DataType::Utf8, true)),
202                ),
203                true,
204            ),
205        ]);
206
207        let err = plan_arrow_schema_to_mssql_mappings(
208            Arc::new(schema),
209            MssqlProfile::sql_server_2016_compat_100(),
210            PlanOptions::default(),
211        )
212        .expect_err("unsupported fields should produce diagnostics");
213
214        let Error::Planning { diagnostics } = err else {
215            panic!("expected planning error");
216        };
217
218        assert_eq!(diagnostics.len(), 4);
219        assert!(
220            diagnostics
221                .all()
222                .iter()
223                .all(|diagnostic| diagnostic.code() == DiagnosticCode::UnsupportedArrowType)
224        );
225
226        let field_refs = diagnostics
227            .all()
228            .iter()
229            .map(|diagnostic| {
230                let field = diagnostic.field().unwrap();
231                (field.index(), field.name())
232            })
233            .collect::<Vec<_>>();
234
235        assert_eq!(
236            field_refs,
237            vec![
238                (1, "list_col"),
239                (2, "struct_col"),
240                (3, "union_col"),
241                (4, "run_end_col"),
242            ]
243        );
244
245        let messages = diagnostics
246            .all()
247            .iter()
248            .map(crate::Diagnostic::message)
249            .collect::<Vec<_>>();
250        assert!(messages[0].contains("nested"));
251        assert!(messages[1].contains("nested"));
252        assert!(messages[2].contains("nested"));
253        assert!(messages[3].contains("encoded"));
254    }
255
256    #[test]
257    fn invalid_identifier_returns_structured_planning_diagnostic() {
258        let schema = Schema::new(vec![Field::new("", DataType::Boolean, false)]);
259
260        let err = plan_arrow_schema_to_mssql_mappings(
261            Arc::new(schema),
262            MssqlProfile::sql_server_2016_compat_100(),
263            PlanOptions::default(),
264        )
265        .expect_err("empty field name should be rejected");
266
267        let Error::Planning { diagnostics } = err else {
268            panic!("expected planning error");
269        };
270
271        assert_eq!(diagnostics.len(), 1);
272
273        let diagnostic = &diagnostics.all()[0];
274        assert_eq!(diagnostic.code(), DiagnosticCode::IdentifierInvalid);
275        assert_eq!(diagnostic.field().unwrap().index(), 0);
276        assert_eq!(diagnostic.field().unwrap().name(), "");
277    }
278}