1use std::{collections::HashSet, fmt, sync::Arc};
6
7use arrow::datatypes::{DataType, Field, FieldRef, Fields, Schema, SchemaRef, TimeUnit};
8
9use serde::{Deserialize, Serialize};
10use snafu::prelude::*;
11
12#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
14pub enum LogicalTimestampUnit {
15 Millis,
17 Micros,
19 Nanos,
21}
22
23impl LogicalTimestampUnit {
24 fn to_arrow_time_unit(self) -> TimeUnit {
25 match self {
26 LogicalTimestampUnit::Millis => TimeUnit::Millisecond,
27 LogicalTimestampUnit::Micros => TimeUnit::Microsecond,
28 LogicalTimestampUnit::Nanos => TimeUnit::Nanosecond,
29 }
30 }
31}
32
33impl fmt::Display for LogicalTimestampUnit {
34 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35 match self {
36 LogicalTimestampUnit::Millis => write!(f, "ms"),
37 LogicalTimestampUnit::Micros => write!(f, "us"),
38 LogicalTimestampUnit::Nanos => write!(f, "ns"),
39 }
40 }
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
45pub struct LogicalField {
46 pub name: String,
48 pub data_type: LogicalDataType,
50 pub nullable: bool,
52}
53
54impl LogicalField {
55 fn to_arrow_field_ref(&self, path: &str) -> Result<FieldRef, SchemaConvertError> {
56 let dt = self.data_type.to_arrow_datatype(path)?;
57 Ok(Arc::new(Field::new(self.name.clone(), dt, self.nullable)))
58 }
59}
60
61impl fmt::Display for LogicalField {
62 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63 if self.nullable {
64 write!(f, "{}?: {}", self.name, self.data_type)
65 } else {
66 write!(f, "{}: {}", self.name, self.data_type)
67 }
68 }
69}
70
71fn join_path(parent: &str, child: &str) -> String {
72 if parent.is_empty() {
73 child.to_string()
74 } else {
75 format!("{parent}.{child}")
76 }
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
81pub enum LogicalDataType {
82 Bool,
84 Int32,
86 Int64,
88 Float32,
90 Float64,
92 Binary,
94 FixedBinary {
96 byte_width: i32,
98 },
99 Utf8,
101 Int96,
103
104 Timestamp {
106 unit: LogicalTimestampUnit,
108 timezone: Option<String>, },
111
112 Decimal {
114 precision: i32,
116 scale: i32,
118 },
119
120 Struct {
122 fields: Vec<LogicalField>,
124 },
125
126 List {
128 elements: Box<LogicalField>,
130 },
131
132 Map {
135 key: Box<LogicalField>,
137 value: Option<Box<LogicalField>>,
139 keys_sorted: bool,
141 },
142
143 Other(String),
145}
146
147impl LogicalDataType {
148 fn to_arrow_datatype(&self, column: &str) -> Result<DataType, SchemaConvertError> {
149 Ok(match self {
150 LogicalDataType::Bool => DataType::Boolean,
151 LogicalDataType::Int32 => DataType::Int32,
152 LogicalDataType::Int64 => DataType::Int64,
153 LogicalDataType::Float32 => DataType::Float32,
154 LogicalDataType::Float64 => DataType::Float64,
155 LogicalDataType::Binary => DataType::Binary,
156 LogicalDataType::Utf8 => DataType::Utf8,
157
158 LogicalDataType::FixedBinary { byte_width } => {
159 if *byte_width <= 0 {
160 return Err(SchemaConvertError::FixedBinaryInvalidWidth {
161 column: column.to_string(),
162 byte_width: *byte_width,
163 });
164 }
165 DataType::FixedSizeBinary(*byte_width)
166 }
167
168 LogicalDataType::Timestamp { unit, timezone } => {
169 let tz: Option<Arc<str>> = timezone.as_ref().map(|s| Arc::<str>::from(s.as_str()));
170 DataType::Timestamp(unit.to_arrow_time_unit(), tz)
171 }
172
173 LogicalDataType::Int96 => {
174 return Err(SchemaConvertError::Int96Unsupported {
175 column: column.to_string(),
176 });
177 }
178
179 LogicalDataType::Decimal { precision, scale } => {
180 let precision = *precision;
181 let scale = *scale;
182 if precision <= 0 {
183 return Err(SchemaConvertError::DecimalInvalid {
184 column: column.to_string(),
185 precision,
186 scale,
187 details: "precision must be > 0".to_string(),
188 });
189 }
190 if scale < 0 {
191 return Err(SchemaConvertError::DecimalInvalid {
192 column: column.to_string(),
193 precision,
194 scale,
195 details: "scale must be >= 0".to_string(),
196 });
197 }
198 if scale > precision {
199 return Err(SchemaConvertError::DecimalInvalid {
200 column: column.to_string(),
201 precision,
202 scale,
203 details: "scale must be <= precision".to_string(),
204 });
205 }
206
207 if precision <= 38 {
208 DataType::Decimal128(precision as u8, scale as i8)
209 } else if precision <= 76 {
210 DataType::Decimal256(precision as u8, scale as i8)
211 } else {
212 return Err(SchemaConvertError::DecimalInvalid {
213 column: column.to_string(),
214 precision,
215 scale,
216 details: "precision exceeds Arrow maximum (76 digits)".to_string(),
217 });
218 }
219 }
220
221 LogicalDataType::Struct { fields } => {
222 let mut arrow_children: Vec<FieldRef> = Vec::with_capacity(fields.len());
223 for f in fields {
224 let child_path = join_path(column, &f.name);
225 arrow_children.push(f.to_arrow_field_ref(&child_path)?);
226 }
227 DataType::Struct(Fields::from(arrow_children))
228 }
229
230 LogicalDataType::List { elements } => {
231 let child_path = join_path(column, &elements.name);
232 let element_field = elements.to_arrow_field_ref(&child_path)?;
233 DataType::List(element_field)
234 }
235
236 LogicalDataType::Map {
237 key,
238 value,
239 keys_sorted,
240 } => {
241 if key.nullable {
242 return Err(SchemaConvertError::MapKeyMustBeNonNull {
243 column: column.to_string(),
244 });
245 }
246
247 let key_path = format!("{column}.key");
249 let val_path = format!("{column}.value");
250
251 let key_dt = key.data_type.to_arrow_datatype(&key_path)?;
252
253 let (val_dt, val_nullable) = match value.as_deref() {
254 Some(v) => (v.data_type.to_arrow_datatype(&val_path)?, v.nullable),
255 None => (DataType::Null, true),
256 };
257
258 let key_field: FieldRef = Arc::new(Field::new("key", key_dt, false));
259 let val_field: FieldRef = Arc::new(Field::new("value", val_dt, val_nullable));
260
261 let entries_dt = DataType::Struct(Fields::from(vec![key_field, val_field]));
262 let entries_field: FieldRef = Arc::new(Field::new("entries", entries_dt, false));
263
264 DataType::Map(entries_field, *keys_sorted)
265 }
266
267 LogicalDataType::Other(name) => {
268 return Err(SchemaConvertError::OtherTypeUnsupported {
269 column: column.to_string(),
270 name: name.clone(),
271 });
272 }
273 })
274 }
275}
276
277impl fmt::Display for LogicalDataType {
278 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
279 match self {
280 LogicalDataType::Bool => write!(f, "bool"),
281 LogicalDataType::Int32 => write!(f, "int32"),
282 LogicalDataType::Int64 => write!(f, "int64"),
283 LogicalDataType::Float32 => write!(f, "float32"),
284 LogicalDataType::Float64 => write!(f, "float64"),
285 LogicalDataType::Binary => write!(f, "binary"),
286 LogicalDataType::FixedBinary { byte_width } => write!(f, "fixed_binary[{byte_width}]"),
287 LogicalDataType::Utf8 => write!(f, "utf8"),
288 LogicalDataType::Int96 => write!(f, "int96"),
289
290 LogicalDataType::Timestamp { unit, timezone } => match timezone {
291 Some(tz) => write!(f, "timestamp[{}]({})", unit, tz),
292 None => write!(f, "timestamp[{}]", unit),
293 },
294
295 LogicalDataType::Decimal { precision, scale } => {
296 write!(f, "decimal(precision={precision}, scale={scale})")
297 }
298
299 LogicalDataType::Struct { fields } => {
300 write!(f, "Struct{{")?;
301 for (i, field) in fields.iter().enumerate() {
302 if i > 0 {
303 write!(f, ", ")?;
304 }
305 write!(f, "{}", field)?;
306 }
307 write!(f, "}}")
308 }
309
310 LogicalDataType::List { elements } => {
311 write!(f, "List<{}>", elements)
312 }
313
314 LogicalDataType::Map {
315 key,
316 value,
317 keys_sorted,
318 } => match value.as_deref() {
319 Some(v) => write!(f, "Map<{}, {}, keys_sorted={}>", key, v, keys_sorted),
320 None => write!(
321 f,
322 "Map<{}, value=omitted, keys_sorted={}>",
323 key, keys_sorted
324 ),
325 },
326
327 LogicalDataType::Other(s) => write!(f, "{s}"),
328 }
329 }
330}
331
332#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
334pub struct LogicalSchema {
335 columns: Vec<LogicalField>,
337}
338
339impl LogicalSchema {
340 pub fn to_arrow_schema(&self) -> Result<Schema, SchemaConvertError> {
345 let mut fields = Vec::with_capacity(self.columns.len());
346 for c in &self.columns {
347 let fref = c.to_arrow_field_ref(&c.name)?;
348 fields.push(fref.as_ref().clone());
349 }
350
351 Ok(Schema::new(fields))
352 }
353
354 pub fn to_arrow_schema_ref(&self) -> Result<SchemaRef, SchemaConvertError> {
358 Ok(Arc::new(self.to_arrow_schema()?))
359 }
360}
361
362#[derive(Debug, Clone, Snafu, PartialEq, Eq)]
364pub enum LogicalSchemaError {
365 #[snafu(display("Duplicate column name: {column}"))]
367 DuplicateColumn {
368 column: String,
370 },
371
372 #[snafu(display(
374 "invalid FixedBinary byte_width for column '{column}': {byte_width} (must be > 0)"
375 ))]
376 FixedBinaryInvalidWidthInSchema {
377 column: String,
379 byte_width: i32,
381 },
382
383 #[snafu(display(
385 "FIXED_LEN_BYTE_ARRAY column '{column}' missing type_length in Parquet schema"
386 ))]
387 FixedBinaryMissingLength {
388 column: String,
390 },
391
392 #[snafu(display("Duplicate field name: column={column_path}, field={field}"))]
394 DuplicatedFieldName {
395 column_path: String,
397 field: String,
399 },
400
401 #[snafu(display("Invalid Map Key: map key should not be null for column={column_path}"))]
403 InvalidMapKeyNullability {
404 column_path: String,
406 },
407
408 #[snafu(display("Struct must have at least one field: column={column_path}"))]
410 EmptyStruct {
411 column_path: String,
413 },
414
415 #[snafu(display("List element field name must be non-empty: column={column_path}"))]
417 ListElementNameEmpty {
418 column_path: String,
420 },
421
422 #[snafu(display("Struct field name must be non-empty: column={column_path}, field={field}"))]
424 StructFieldNameEmpty {
425 column_path: String,
427 field: String,
429 },
430
431 #[snafu(display("Unsupported Parquet LIST encoding: column={column_path}, details={details}"))]
433 UnsupportedParquetListEncoding {
434 column_path: String,
436 details: String,
438 },
439
440 #[snafu(display("Unsupported Parquet MAP encoding: column={column_path}, details={details}"))]
442 UnsupportedParquetMapEncoding {
443 column_path: String,
445 details: String,
447 },
448}
449
450impl LogicalSchema {
451 pub fn new(columns: Vec<LogicalField>) -> Result<Self, LogicalSchemaError> {
453 let mut seen = HashSet::new();
454 for col in &columns {
455 if !seen.insert(col.name.clone()) {
456 return DuplicateColumnSnafu {
457 column: col.name.clone(),
458 }
459 .fail();
460 }
461 validate_field(col, &col.name)?;
462 }
463
464 Ok(Self { columns })
465 }
466
467 pub fn columns(&self) -> &[LogicalField] {
469 &self.columns
470 }
471}
472
473fn validate_field(field: &LogicalField, path: &str) -> Result<(), LogicalSchemaError> {
474 validate_dtype(&field.data_type, path)
475}
476
477fn validate_dtype(dt: &LogicalDataType, path: &str) -> Result<(), LogicalSchemaError> {
478 match dt {
479 LogicalDataType::FixedBinary { byte_width } => {
480 if *byte_width <= 0 {
481 return Err(LogicalSchemaError::FixedBinaryInvalidWidthInSchema {
482 column: path.to_string(),
483 byte_width: *byte_width,
484 });
485 }
486 Ok(())
487 }
488
489 LogicalDataType::Struct { fields } => {
490 if fields.is_empty() {
491 return Err(LogicalSchemaError::EmptyStruct {
492 column_path: path.to_string(),
493 });
494 }
495
496 let mut seen = HashSet::with_capacity(fields.len());
497 for child in fields {
498 if child.name.trim().is_empty() {
499 return Err(LogicalSchemaError::StructFieldNameEmpty {
500 column_path: path.to_string(),
501 field: child.name.clone(),
502 });
503 }
504
505 if !seen.insert(child.name.clone()) {
506 return Err(LogicalSchemaError::DuplicatedFieldName {
507 column_path: path.to_string(),
508 field: child.name.clone(),
509 });
510 }
511 let child_path = format!("{}.{}", path, child.name);
512 validate_field(child, &child_path)?;
513 }
514 Ok(())
515 }
516
517 LogicalDataType::List { elements } => {
518 if elements.name.trim().is_empty() {
519 return Err(LogicalSchemaError::ListElementNameEmpty {
520 column_path: path.to_string(),
521 });
522 }
523 let child_path = format!("{}.{}", path, elements.name);
524 validate_field(elements, &child_path)
525 }
526
527 LogicalDataType::Map { key, value, .. } => {
528 if key.nullable {
529 return Err(LogicalSchemaError::InvalidMapKeyNullability {
530 column_path: path.to_string(),
531 });
532 }
533 validate_field(key, &format!("{}.key", path))?;
534 if let Some(v) = value.as_deref() {
535 validate_field(v, &format!("{}.value", path))?;
536 }
537
538 Ok(())
539 }
540
541 _ => Ok(()),
542 }
543}
544
545#[derive(Debug, Snafu)]
547pub enum SchemaConvertError {
548 #[snafu(display("unsupported logical type for column '{column}': {type_name} ({details})"))]
550 UnsupportedLogicalType {
551 column: String,
553 type_name: String,
555 details: String,
557 },
558
559 #[snafu(display(
561 "invalid FixedBinary byte_width for column '{column}': {byte_width} (must be > 0)"
562 ))]
563 FixedBinaryInvalidWidth {
564 column: String,
566 byte_width: i32,
568 },
569
570 #[snafu(display("Int96 is not supported in v0.1 for column '{column}'"))]
572 Int96Unsupported {
573 column: String,
575 },
576
577 #[snafu(display("Other type '{name}' is not supported in v0.1 for column '{column}'"))]
579 OtherTypeUnsupported {
580 column: String,
582 name: String,
584 },
585
586 #[snafu(display(
588 "invalid decimal definition for column '{column}': precision={precision}, scale={scale} ({details})"
589 ))]
590 DecimalInvalid {
591 column: String,
593 precision: i32,
595 scale: i32,
597 details: String,
599 },
600
601 #[snafu(display("map key must be non-nullable for column '{column}'"))]
603 MapKeyMustBeNonNull {
604 column: String,
606 },
607}
608
609#[cfg(test)]
610mod tests {
611 use super::*;
612
613 fn sample_logical_schema_all_supported() -> LogicalSchema {
614 LogicalSchema::new(vec![
615 LogicalField {
616 name: "flag".to_string(),
617 data_type: LogicalDataType::Bool,
618 nullable: false,
619 },
620 LogicalField {
621 name: "i32".to_string(),
622 data_type: LogicalDataType::Int32,
623 nullable: false,
624 },
625 LogicalField {
626 name: "i64".to_string(),
627 data_type: LogicalDataType::Int64,
628 nullable: true,
629 },
630 LogicalField {
631 name: "f32".to_string(),
632 data_type: LogicalDataType::Float32,
633 nullable: false,
634 },
635 LogicalField {
636 name: "f64".to_string(),
637 data_type: LogicalDataType::Float64,
638 nullable: true,
639 },
640 LogicalField {
641 name: "text".to_string(),
642 data_type: LogicalDataType::Utf8,
643 nullable: true,
644 },
645 LogicalField {
646 name: "bytes".to_string(),
647 data_type: LogicalDataType::Binary,
648 nullable: true,
649 },
650 LogicalField {
651 name: "fixed".to_string(),
652 data_type: LogicalDataType::FixedBinary { byte_width: 16 },
653 nullable: false,
654 },
655 LogicalField {
656 name: "ts".to_string(),
657 data_type: LogicalDataType::Timestamp {
658 unit: LogicalTimestampUnit::Micros,
659 timezone: Some("UTC".to_string()),
660 },
661 nullable: false,
662 },
663 ])
664 .expect("valid logical schema")
665 }
666
667 #[test]
668 fn logical_schema_to_arrow_schema_happy_path() {
669 let logical = sample_logical_schema_all_supported();
670 let schema = logical.to_arrow_schema().expect("arrow schema conversion");
671
672 let expected = Schema::new(vec![
673 Field::new("flag", DataType::Boolean, false),
674 Field::new("i32", DataType::Int32, false),
675 Field::new("i64", DataType::Int64, true),
676 Field::new("f32", DataType::Float32, false),
677 Field::new("f64", DataType::Float64, true),
678 Field::new("text", DataType::Utf8, true),
679 Field::new("bytes", DataType::Binary, true),
680 Field::new("fixed", DataType::FixedSizeBinary(16), false),
681 Field::new(
682 "ts",
683 DataType::Timestamp(TimeUnit::Microsecond, Some(Arc::<str>::from("UTC"))),
684 false,
685 ),
686 ]);
687
688 assert_eq!(schema, expected);
689 }
690
691 #[test]
692 fn logical_schema_rejects_fixed_binary_invalid_width() {
693 for width in [0, -1] {
694 let err = LogicalSchema::new(vec![LogicalField {
695 name: "bad_fixed".to_string(),
696 data_type: LogicalDataType::FixedBinary { byte_width: width },
697 nullable: false,
698 }])
699 .expect_err("expected invalid schema to be rejected");
700
701 assert!(
702 matches!(
703 &err,
704 LogicalSchemaError::FixedBinaryInvalidWidthInSchema {
705 column,
706 byte_width
707 } if column == "bad_fixed" && *byte_width == width
708 ),
709 "unexpected error: {err:?}"
710 );
711 }
712 }
713
714 #[test]
715 fn logical_schema_rejects_int96() {
716 let logical = LogicalSchema::new(vec![LogicalField {
717 name: "legacy_ts".to_string(),
718 data_type: LogicalDataType::Int96,
719 nullable: false,
720 }])
721 .expect("valid schema structure");
722
723 let err = logical.to_arrow_schema().unwrap_err();
724 assert!(
725 matches!(
726 &err,
727 SchemaConvertError::Int96Unsupported { column } if column == "legacy_ts"
728 ),
729 "unexpected error: {err:?}"
730 );
731 }
732
733 #[test]
734 fn logical_schema_map_entries_field_is_non_nullable() {
735 let logical = LogicalSchema::new(vec![LogicalField {
736 name: "attrs".to_string(),
737 data_type: LogicalDataType::Map {
738 key: Box::new(LogicalField {
739 name: "key".to_string(),
740 data_type: LogicalDataType::Utf8,
741 nullable: false,
742 }),
743 value: Some(Box::new(LogicalField {
744 name: "value".to_string(),
745 data_type: LogicalDataType::Int64,
746 nullable: true,
747 })),
748 keys_sorted: false,
749 },
750 nullable: true,
751 }])
752 .expect("valid schema");
753
754 let schema = logical.to_arrow_schema().expect("arrow schema conversion");
755 let field = schema.field(0);
756 let DataType::Map(entries_field, _) = field.data_type() else {
757 panic!("expected map type, got {:?}", field.data_type());
758 };
759 assert!(
760 !entries_field.is_nullable(),
761 "map entries field should be non-nullable"
762 );
763 }
764
765 #[test]
766 fn logical_schema_map_value_none_maps_to_null_field() {
767 let logical = LogicalSchema::new(vec![LogicalField {
768 name: "attrs".to_string(),
769 data_type: LogicalDataType::Map {
770 key: Box::new(LogicalField {
771 name: "key".to_string(),
772 data_type: LogicalDataType::Utf8,
773 nullable: false,
774 }),
775 value: None,
776 keys_sorted: false,
777 },
778 nullable: false,
779 }])
780 .expect("valid schema");
781
782 let schema = logical.to_arrow_schema().expect("arrow schema conversion");
783 let field = schema.field(0);
784 let DataType::Map(entries_field, _) = field.data_type() else {
785 panic!("expected map type, got {:?}", field.data_type());
786 };
787 let DataType::Struct(fields) = entries_field.data_type() else {
788 panic!(
789 "expected entries struct, got {:?}",
790 entries_field.data_type()
791 );
792 };
793 let value_field = fields
794 .iter()
795 .find(|f| f.name() == "value")
796 .expect("value field");
797 assert!(
798 matches!(value_field.data_type(), DataType::Null) && value_field.is_nullable(),
799 "value field should be Null and nullable"
800 );
801 }
802
803 #[test]
804 fn logical_schema_rejects_empty_struct_field_name() {
805 let err = LogicalSchema::new(vec![LogicalField {
806 name: "root".to_string(),
807 data_type: LogicalDataType::Struct {
808 fields: vec![LogicalField {
809 name: "".to_string(),
810 data_type: LogicalDataType::Int32,
811 nullable: false,
812 }],
813 },
814 nullable: false,
815 }])
816 .expect_err("expected invalid schema");
817
818 assert!(
819 matches!(
820 &err,
821 LogicalSchemaError::StructFieldNameEmpty { column_path, field }
822 if column_path == "root" && field.is_empty()
823 ),
824 "unexpected error: {err:?}"
825 );
826 }
827
828 #[test]
829 fn logical_schema_rejects_other_type() {
830 let logical = LogicalSchema::new(vec![LogicalField {
831 name: "opaque".to_string(),
832 data_type: LogicalDataType::Other("parquet::Map".to_string()),
833 nullable: true,
834 }])
835 .expect("valid schema structure");
836
837 let err = logical.to_arrow_schema().unwrap_err();
838 assert!(
839 matches!(
840 &err,
841 SchemaConvertError::OtherTypeUnsupported { column, name }
842 if column == "opaque" && name == "parquet::Map"
843 ),
844 "unexpected error: {err:?}"
845 );
846 }
847
848 #[test]
849 fn logical_schema_timestamp_without_timezone() {
850 let logical = LogicalSchema::new(vec![LogicalField {
851 name: "ts".to_string(),
852 data_type: LogicalDataType::Timestamp {
853 unit: LogicalTimestampUnit::Millis,
854 timezone: None,
855 },
856 nullable: false,
857 }])
858 .expect("valid schema structure");
859
860 let schema = logical.to_arrow_schema().expect("arrow schema conversion");
861 let expected = Schema::new(vec![Field::new(
862 "ts",
863 DataType::Timestamp(TimeUnit::Millisecond, None),
864 false,
865 )]);
866 assert_eq!(schema, expected);
867 }
868
869 #[test]
870 fn logical_schema_decimal_conversion_bounds() {
871 let valid_128 = LogicalSchema::new(vec![LogicalField {
872 name: "dec128".to_string(),
873 data_type: LogicalDataType::Decimal {
874 precision: 38,
875 scale: 10,
876 },
877 nullable: false,
878 }])
879 .expect("valid schema structure");
880 let schema = valid_128
881 .to_arrow_schema()
882 .expect("arrow schema conversion");
883 assert_eq!(
884 schema,
885 Schema::new(vec![Field::new(
886 "dec128",
887 DataType::Decimal128(38, 10),
888 false
889 )])
890 );
891
892 let valid_256 = LogicalSchema::new(vec![LogicalField {
893 name: "dec256".to_string(),
894 data_type: LogicalDataType::Decimal {
895 precision: 76,
896 scale: 5,
897 },
898 nullable: false,
899 }])
900 .expect("valid schema structure");
901 let schema = valid_256
902 .to_arrow_schema()
903 .expect("arrow schema conversion");
904 assert_eq!(
905 schema,
906 Schema::new(vec![Field::new(
907 "dec256",
908 DataType::Decimal256(76, 5),
909 false
910 )])
911 );
912
913 let invalid = LogicalSchema::new(vec![LogicalField {
914 name: "dec_too_large".to_string(),
915 data_type: LogicalDataType::Decimal {
916 precision: 77,
917 scale: 0,
918 },
919 nullable: false,
920 }])
921 .expect("valid schema structure");
922 let err = invalid.to_arrow_schema().unwrap_err();
923 assert!(
924 matches!(
925 &err,
926 SchemaConvertError::DecimalInvalid { column, precision, scale, .. }
927 if column == "dec_too_large" && *precision == 77 && *scale == 0
928 ),
929 "unexpected error: {err:?}"
930 );
931 }
932
933 #[test]
934 fn logical_schema_decimal_validation_errors() {
935 let cases = vec![
936 ("dec_precision_zero", 0, 0, "precision must be > 0"),
937 ("dec_scale_negative", 10, -1, "scale must be >= 0"),
938 ("dec_scale_gt_precision", 4, 5, "scale must be <= precision"),
939 ];
940
941 for (name, precision, scale, details_substr) in cases {
942 let logical = LogicalSchema::new(vec![LogicalField {
943 name: name.to_string(),
944 data_type: LogicalDataType::Decimal { precision, scale },
945 nullable: false,
946 }])
947 .expect("valid schema structure");
948
949 let err = logical.to_arrow_schema().unwrap_err();
950 assert!(
951 matches!(
952 &err,
953 SchemaConvertError::DecimalInvalid { column, precision: p, scale: s, details }
954 if column == name && *p == precision && *s == scale && details.contains(details_substr)
955 ),
956 "unexpected error: {err:?}"
957 );
958 }
959 }
960
961 #[test]
962 fn logical_schema_fixed_binary_json_roundtrip() {
963 let logical = LogicalSchema::new(vec![LogicalField {
964 name: "fixed".to_string(),
965 data_type: LogicalDataType::FixedBinary { byte_width: 8 },
966 nullable: false,
967 }])
968 .expect("valid schema structure");
969
970 let json = serde_json::to_string(&logical).unwrap();
971 let back: LogicalSchema = serde_json::from_str(&json).unwrap();
972 assert_eq!(back, logical);
973 }
974
975 #[test]
976 fn logical_schema_decimal_json_roundtrip() {
977 let logical = LogicalSchema::new(vec![LogicalField {
978 name: "amount".to_string(),
979 data_type: LogicalDataType::Decimal {
980 precision: 18,
981 scale: 4,
982 },
983 nullable: true,
984 }])
985 .expect("valid schema structure");
986
987 let json = serde_json::to_string(&logical).unwrap();
988 let back: LogicalSchema = serde_json::from_str(&json).unwrap();
989 assert_eq!(back, logical);
990 }
991}