1use arrow_schema::{Field, Schema};
10
11use crate::schema::type_conversion::plan_arrow_data_type_as_mssql_type;
12use crate::write::PlanOptions;
13use crate::{
14 ArrowFieldRef, Diagnostic, DiagnosticCode, DiagnosticSet, FieldRef, Identifier, MssqlColumn,
15 MssqlProfile, PlanOutcome, Result, SchemaMapping, TableName, create_table_sql,
16};
17
18pub fn plan_arrow_schema_to_mssql_mappings(
20 schema: impl AsRef<Schema>,
21 _profile: MssqlProfile,
22 options: PlanOptions,
23) -> Result<PlanOutcome<Vec<SchemaMapping>>> {
24 let schema = schema.as_ref();
25 let mut mappings = Vec::with_capacity(schema.fields().len());
26 let mut diagnostics = DiagnosticSet::new();
27
28 for (index, field) in schema.fields().iter().enumerate() {
29 match plan_arrow_field_to_mssql_column_mapping(index, field, &options) {
30 Ok(mapping) => mappings.push(mapping),
31 Err(diagnostic) => diagnostics.push(diagnostic),
32 }
33 }
34
35 if diagnostics.has_errors() {
36 return Err(crate::Error::Planning { diagnostics });
37 }
38
39 Ok(PlanOutcome::new(mappings, diagnostics))
40}
41
42pub fn mssql_columns_from_mappings(mappings: &[SchemaMapping]) -> Vec<MssqlColumn> {
44 mappings
45 .iter()
46 .map(|mapping| mapping.mssql().clone())
47 .collect()
48}
49
50pub fn create_table_sql_from_mappings(table: &TableName, mappings: &[SchemaMapping]) -> String {
52 create_table_sql(
53 table,
54 &mssql_columns_from_mappings(mappings),
55 crate::CreateTableOptions,
56 )
57}
58
59fn plan_arrow_field_to_mssql_column_mapping(
60 index: usize,
61 field: &Field,
62 options: &PlanOptions,
63) -> std::result::Result<SchemaMapping, Diagnostic> {
64 let name = Identifier::new(field.name()).map_err(|err| {
65 Diagnostic::error(DiagnosticCode::IdentifierInvalid, err.to_string())
66 .with_field(FieldRef::new(index, field.name()))
67 })?;
68
69 let ty = plan_arrow_data_type_as_mssql_type(index, field, options)?;
70
71 let arrow = ArrowFieldRef::new(
72 index,
73 field.name().clone(),
74 field.is_nullable(),
75 field.data_type().clone(),
76 );
77 let mssql = MssqlColumn::new(name, ty, field.is_nullable());
78
79 Ok(SchemaMapping::new(arrow, mssql))
80}
81
82#[cfg(test)]
83mod tests {
84 use std::sync::Arc;
85
86 use arrow_schema::{DataType, Field, Schema, UnionFields, UnionMode};
87
88 use crate::{
89 DiagnosticCode, Error, MssqlProfile, MssqlType, PlanOptions, TableName,
90 create_table_sql_from_mappings, mssql_columns_from_mappings,
91 plan_arrow_schema_to_mssql_mappings,
92 };
93
94 #[test]
95 fn plans_boolean_and_int32_mappings() {
96 let schema = Arc::new(Schema::new(vec![
97 Field::new("is_active", DataType::Boolean, false),
98 Field::new("quantity", DataType::Int32, true),
99 ]));
100
101 let outcome = plan_arrow_schema_to_mssql_mappings(
102 Arc::clone(&schema),
103 MssqlProfile::sql_server_2016_compat_100(),
104 PlanOptions::default(),
105 )
106 .unwrap();
107 let mappings = outcome.value();
108
109 assert_eq!(mappings.len(), 2);
110
111 let is_active = &mappings[0];
112 assert_eq!(is_active.arrow().index(), 0);
113 assert_eq!(is_active.arrow().name(), "is_active");
114 assert_eq!(is_active.arrow().data_type(), &DataType::Boolean);
115 assert!(!is_active.arrow().nullable());
116 assert_eq!(is_active.mssql().name().quoted_sql(), "[is_active]");
117 assert!(!is_active.mssql().nullable());
118 assert_eq!(is_active.mssql().ty(), &MssqlType::Bit);
119
120 let quantity = &mappings[1];
121 assert_eq!(quantity.arrow().index(), 1);
122 assert_eq!(quantity.arrow().name(), "quantity");
123 assert_eq!(quantity.arrow().data_type(), &DataType::Int32);
124 assert!(quantity.arrow().nullable());
125 assert_eq!(quantity.mssql().name().quoted_sql(), "[quantity]");
126 assert!(quantity.mssql().nullable());
127 assert_eq!(quantity.mssql().ty(), &MssqlType::Int);
128 }
129
130 #[test]
131 fn renders_create_table_sql_from_mssql_side() {
132 let schema = Schema::new(vec![
133 Field::new("is_active", DataType::Boolean, false),
134 Field::new("quantity", DataType::Int32, true),
135 ]);
136 let outcome = plan_arrow_schema_to_mssql_mappings(
137 Arc::new(schema),
138 MssqlProfile::sql_server_2016_compat_100(),
139 PlanOptions::default(),
140 )
141 .unwrap();
142 let table = TableName::new("dbo", "target").unwrap();
143
144 let sql = create_table_sql_from_mappings(&table, outcome.value());
145
146 assert_eq!(
147 sql,
148 "CREATE TABLE [dbo].[target] (\n [is_active] bit NOT NULL,\n [quantity] int NULL\n);"
149 );
150 }
151
152 #[test]
153 fn exposes_mssql_columns_without_arrow_identity() {
154 let schema = Schema::new(vec![Field::new("is_active", DataType::Boolean, false)]);
155 let outcome = plan_arrow_schema_to_mssql_mappings(
156 Arc::new(schema),
157 MssqlProfile::sql_server_2016_compat_100(),
158 PlanOptions::default(),
159 )
160 .unwrap();
161
162 let columns = mssql_columns_from_mappings(outcome.value());
163
164 assert_eq!(columns.len(), 1);
165 assert_eq!(columns[0].name().as_str(), "is_active");
166 assert_eq!(columns[0].ty(), &MssqlType::Bit);
167 assert!(!columns[0].nullable());
168 }
169
170 #[test]
171 fn unsupported_nested_and_encoded_types_collect_schema_order_diagnostics() {
172 let union_fields = UnionFields::try_new(
173 [1_i8, 2],
174 [
175 Field::new("left", DataType::Int32, true),
176 Field::new("right", DataType::Utf8, true),
177 ],
178 )
179 .unwrap();
180 let schema = Schema::new(vec![
181 Field::new("ok", DataType::Int32, false),
182 Field::new("list_col", DataType::new_list(DataType::Int64, true), true),
183 Field::new(
184 "struct_col",
185 DataType::Struct(
186 vec![Field::new("child", DataType::Boolean, true)]
187 .into_iter()
188 .collect(),
189 ),
190 true,
191 ),
192 Field::new(
193 "union_col",
194 DataType::Union(union_fields, UnionMode::Sparse),
195 true,
196 ),
197 Field::new(
198 "run_end_col",
199 DataType::RunEndEncoded(
200 Arc::new(Field::new("run_ends", DataType::Int32, false)),
201 Arc::new(Field::new("values", DataType::Utf8, true)),
202 ),
203 true,
204 ),
205 ]);
206
207 let err = plan_arrow_schema_to_mssql_mappings(
208 Arc::new(schema),
209 MssqlProfile::sql_server_2016_compat_100(),
210 PlanOptions::default(),
211 )
212 .expect_err("unsupported fields should produce diagnostics");
213
214 let Error::Planning { diagnostics } = err else {
215 panic!("expected planning error");
216 };
217
218 assert_eq!(diagnostics.len(), 4);
219 assert!(
220 diagnostics
221 .all()
222 .iter()
223 .all(|diagnostic| diagnostic.code() == DiagnosticCode::UnsupportedArrowType)
224 );
225
226 let field_refs = diagnostics
227 .all()
228 .iter()
229 .map(|diagnostic| {
230 let field = diagnostic.field().unwrap();
231 (field.index(), field.name())
232 })
233 .collect::<Vec<_>>();
234
235 assert_eq!(
236 field_refs,
237 vec![
238 (1, "list_col"),
239 (2, "struct_col"),
240 (3, "union_col"),
241 (4, "run_end_col"),
242 ]
243 );
244
245 let messages = diagnostics
246 .all()
247 .iter()
248 .map(crate::Diagnostic::message)
249 .collect::<Vec<_>>();
250 assert!(messages[0].contains("nested"));
251 assert!(messages[1].contains("nested"));
252 assert!(messages[2].contains("nested"));
253 assert!(messages[3].contains("encoded"));
254 }
255
256 #[test]
257 fn invalid_identifier_returns_structured_planning_diagnostic() {
258 let schema = Schema::new(vec![Field::new("", DataType::Boolean, false)]);
259
260 let err = plan_arrow_schema_to_mssql_mappings(
261 Arc::new(schema),
262 MssqlProfile::sql_server_2016_compat_100(),
263 PlanOptions::default(),
264 )
265 .expect_err("empty field name should be rejected");
266
267 let Error::Planning { diagnostics } = err else {
268 panic!("expected planning error");
269 };
270
271 assert_eq!(diagnostics.len(), 1);
272
273 let diagnostic = &diagnostics.all()[0];
274 assert_eq!(diagnostic.code(), DiagnosticCode::IdentifierInvalid);
275 assert_eq!(diagnostic.field().unwrap().index(), 0);
276 assert_eq!(diagnostic.field().unwrap().name(), "");
277 }
278}