Skip to main content

timeseries_table_core/metadata/
logical_schema.rs

1//! Logical schema definitions and validation for table metadata.
2//!
3//! This module models logical fields and data types stored in the transaction
4//! log, along with validation and conversion to Arrow schemas.
5use std::{collections::HashSet, fmt, sync::Arc};
6
7use arrow::datatypes::{DataType, Field, FieldRef, Fields, Schema, SchemaRef, TimeUnit};
8
9use serde::{Deserialize, Serialize};
10use snafu::prelude::*;
11
12/// Units for logical timestamps recorded in the table metadata.
13#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
14pub enum LogicalTimestampUnit {
15    /// Millisecond precision timestamps.
16    Millis,
17    /// Microsecond precision timestamps.
18    Micros,
19    /// Nanosecond precision timestamps.
20    Nanos,
21}
22
23impl LogicalTimestampUnit {
24    fn to_arrow_time_unit(self) -> TimeUnit {
25        match self {
26            LogicalTimestampUnit::Millis => TimeUnit::Millisecond,
27            LogicalTimestampUnit::Micros => TimeUnit::Microsecond,
28            LogicalTimestampUnit::Nanos => TimeUnit::Nanosecond,
29        }
30    }
31}
32
33impl fmt::Display for LogicalTimestampUnit {
34    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35        match self {
36            LogicalTimestampUnit::Millis => write!(f, "ms"),
37            LogicalTimestampUnit::Micros => write!(f, "us"),
38            LogicalTimestampUnit::Nanos => write!(f, "ns"),
39        }
40    }
41}
42
43/// Logical column definition in a schema.
44#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
45pub struct LogicalField {
46    /// Column name as stored in the schema.
47    pub name: String,
48    /// Logical data type for the column.
49    pub data_type: LogicalDataType,
50    /// Whether the column allows null values.
51    pub nullable: bool,
52}
53
54impl LogicalField {
55    fn to_arrow_field_ref(&self, path: &str) -> Result<FieldRef, SchemaConvertError> {
56        let dt = self.data_type.to_arrow_datatype(path)?;
57        Ok(Arc::new(Field::new(self.name.clone(), dt, self.nullable)))
58    }
59}
60
61impl fmt::Display for LogicalField {
62    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63        if self.nullable {
64            write!(f, "{}?: {}", self.name, self.data_type)
65        } else {
66            write!(f, "{}: {}", self.name, self.data_type)
67        }
68    }
69}
70
71fn join_path(parent: &str, child: &str) -> String {
72    if parent.is_empty() {
73        child.to_string()
74    } else {
75        format!("{parent}.{child}")
76    }
77}
78
79/// Logical data types that can be stored in the table schema metadata.
80#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
81pub enum LogicalDataType {
82    /// Boolean value.
83    Bool,
84    /// 32-bit signed integer.
85    Int32,
86    /// 64-bit signed integer.
87    Int64,
88    /// 32-bit floating point.
89    Float32,
90    /// 64-bit floating point.
91    Float64,
92    /// Variable-length binary data.
93    Binary,
94    /// Fixed-length binary data.
95    FixedBinary {
96        /// Fixed byte width for each value (in bytes).
97        byte_width: i32,
98    },
99    /// UTF-8 encoded string.
100    Utf8,
101    /// Legacy 96-bit integer (primarily for Parquet compatibility).
102    Int96,
103
104    /// Timestamp value with a precision unit and optional timezone.
105    Timestamp {
106        /// Timestamp precision unit (millis, micros, nanos).
107        unit: LogicalTimestampUnit,
108        /// Optional IANA timezone identifier.
109        timezone: Option<String>, // keep Option for future TZ support
110    },
111
112    /// Fixed-precision decimal value with declared precision and scale.
113    Decimal {
114        /// Total number of decimal digits (both sides of the decimal point).
115        precision: i32,
116        /// Number of digits to the right of the decimal point.
117        scale: i32,
118    },
119
120    /// Struct with named child fields.
121    Struct {
122        /// Ordered set of child fields for the struct.
123        fields: Vec<LogicalField>,
124    },
125
126    /// List (array) with a single element field definition.
127    List {
128        /// Element field definition for list items.
129        elements: Box<LogicalField>,
130    },
131
132    /// Map with key/value field definitions.
133    /// If `value` is None, this represents Parquet MAP "keys-only" semantics (set of keys).
134    Map {
135        /// Key field definition (must be non-nullable for Arrow compatibility).
136        key: Box<LogicalField>,
137        /// Value field definition.
138        value: Option<Box<LogicalField>>,
139        /// Whether entries are sorted by key.
140        keys_sorted: bool,
141    },
142
143    /// Catch-all logical data type referenced by name.
144    Other(String),
145}
146
147impl LogicalDataType {
148    fn to_arrow_datatype(&self, column: &str) -> Result<DataType, SchemaConvertError> {
149        Ok(match self {
150            LogicalDataType::Bool => DataType::Boolean,
151            LogicalDataType::Int32 => DataType::Int32,
152            LogicalDataType::Int64 => DataType::Int64,
153            LogicalDataType::Float32 => DataType::Float32,
154            LogicalDataType::Float64 => DataType::Float64,
155            LogicalDataType::Binary => DataType::Binary,
156            LogicalDataType::Utf8 => DataType::Utf8,
157
158            LogicalDataType::FixedBinary { byte_width } => {
159                if *byte_width <= 0 {
160                    return Err(SchemaConvertError::FixedBinaryInvalidWidth {
161                        column: column.to_string(),
162                        byte_width: *byte_width,
163                    });
164                }
165                DataType::FixedSizeBinary(*byte_width)
166            }
167
168            LogicalDataType::Timestamp { unit, timezone } => {
169                let tz: Option<Arc<str>> = timezone.as_ref().map(|s| Arc::<str>::from(s.as_str()));
170                DataType::Timestamp(unit.to_arrow_time_unit(), tz)
171            }
172
173            LogicalDataType::Int96 => {
174                return Err(SchemaConvertError::Int96Unsupported {
175                    column: column.to_string(),
176                });
177            }
178
179            LogicalDataType::Decimal { precision, scale } => {
180                let precision = *precision;
181                let scale = *scale;
182                if precision <= 0 {
183                    return Err(SchemaConvertError::DecimalInvalid {
184                        column: column.to_string(),
185                        precision,
186                        scale,
187                        details: "precision must be > 0".to_string(),
188                    });
189                }
190                if scale < 0 {
191                    return Err(SchemaConvertError::DecimalInvalid {
192                        column: column.to_string(),
193                        precision,
194                        scale,
195                        details: "scale must be >= 0".to_string(),
196                    });
197                }
198                if scale > precision {
199                    return Err(SchemaConvertError::DecimalInvalid {
200                        column: column.to_string(),
201                        precision,
202                        scale,
203                        details: "scale must be <= precision".to_string(),
204                    });
205                }
206
207                if precision <= 38 {
208                    DataType::Decimal128(precision as u8, scale as i8)
209                } else if precision <= 76 {
210                    DataType::Decimal256(precision as u8, scale as i8)
211                } else {
212                    return Err(SchemaConvertError::DecimalInvalid {
213                        column: column.to_string(),
214                        precision,
215                        scale,
216                        details: "precision exceeds Arrow maximum (76 digits)".to_string(),
217                    });
218                }
219            }
220
221            LogicalDataType::Struct { fields } => {
222                let mut arrow_children: Vec<FieldRef> = Vec::with_capacity(fields.len());
223                for f in fields {
224                    let child_path = join_path(column, &f.name);
225                    arrow_children.push(f.to_arrow_field_ref(&child_path)?);
226                }
227                DataType::Struct(Fields::from(arrow_children))
228            }
229
230            LogicalDataType::List { elements } => {
231                let child_path = join_path(column, &elements.name);
232                let element_field = elements.to_arrow_field_ref(&child_path)?;
233                DataType::List(element_field)
234            }
235
236            LogicalDataType::Map {
237                key,
238                value,
239                keys_sorted,
240            } => {
241                if key.nullable {
242                    return Err(SchemaConvertError::MapKeyMustBeNonNull {
243                        column: column.to_string(),
244                    });
245                }
246
247                // Canonical Arrow Map field names are "entries", "key", "value"
248                let key_path = format!("{column}.key");
249                let val_path = format!("{column}.value");
250
251                let key_dt = key.data_type.to_arrow_datatype(&key_path)?;
252
253                let (val_dt, val_nullable) = match value.as_deref() {
254                    Some(v) => (v.data_type.to_arrow_datatype(&val_path)?, v.nullable),
255                    None => (DataType::Null, true),
256                };
257
258                let key_field: FieldRef = Arc::new(Field::new("key", key_dt, false));
259                let val_field: FieldRef = Arc::new(Field::new("value", val_dt, val_nullable));
260
261                let entries_dt = DataType::Struct(Fields::from(vec![key_field, val_field]));
262                let entries_field: FieldRef = Arc::new(Field::new("entries", entries_dt, false));
263
264                DataType::Map(entries_field, *keys_sorted)
265            }
266
267            LogicalDataType::Other(name) => {
268                return Err(SchemaConvertError::OtherTypeUnsupported {
269                    column: column.to_string(),
270                    name: name.clone(),
271                });
272            }
273        })
274    }
275}
276
277impl fmt::Display for LogicalDataType {
278    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
279        match self {
280            LogicalDataType::Bool => write!(f, "bool"),
281            LogicalDataType::Int32 => write!(f, "int32"),
282            LogicalDataType::Int64 => write!(f, "int64"),
283            LogicalDataType::Float32 => write!(f, "float32"),
284            LogicalDataType::Float64 => write!(f, "float64"),
285            LogicalDataType::Binary => write!(f, "binary"),
286            LogicalDataType::FixedBinary { byte_width } => write!(f, "fixed_binary[{byte_width}]"),
287            LogicalDataType::Utf8 => write!(f, "utf8"),
288            LogicalDataType::Int96 => write!(f, "int96"),
289
290            LogicalDataType::Timestamp { unit, timezone } => match timezone {
291                Some(tz) => write!(f, "timestamp[{}]({})", unit, tz),
292                None => write!(f, "timestamp[{}]", unit),
293            },
294
295            LogicalDataType::Decimal { precision, scale } => {
296                write!(f, "decimal(precision={precision}, scale={scale})")
297            }
298
299            LogicalDataType::Struct { fields } => {
300                write!(f, "Struct{{")?;
301                for (i, field) in fields.iter().enumerate() {
302                    if i > 0 {
303                        write!(f, ", ")?;
304                    }
305                    write!(f, "{}", field)?;
306                }
307                write!(f, "}}")
308            }
309
310            LogicalDataType::List { elements } => {
311                write!(f, "List<{}>", elements)
312            }
313
314            LogicalDataType::Map {
315                key,
316                value,
317                keys_sorted,
318            } => match value.as_deref() {
319                Some(v) => write!(f, "Map<{}, {}, keys_sorted={}>", key, v, keys_sorted),
320                None => write!(
321                    f,
322                    "Map<{}, value=omitted, keys_sorted={}>",
323                    key, keys_sorted
324                ),
325            },
326
327            LogicalDataType::Other(s) => write!(f, "{s}"),
328        }
329    }
330}
331
332/// Logical schema metadata describing the ordered collection of logical columns.
333#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
334pub struct LogicalSchema {
335    /// All logical columns that compose the schema in their defined order.
336    columns: Vec<LogicalField>,
337}
338
339impl LogicalSchema {
340    /// Convert this logical schema to an owned Arrow [`Schema`].
341    ///
342    /// Fails if any column uses a logical type that cannot be represented in
343    /// Arrow (see [`SchemaConvertError`]).
344    pub fn to_arrow_schema(&self) -> Result<Schema, SchemaConvertError> {
345        let mut fields = Vec::with_capacity(self.columns.len());
346        for c in &self.columns {
347            let fref = c.to_arrow_field_ref(&c.name)?;
348            fields.push(fref.as_ref().clone());
349        }
350
351        Ok(Schema::new(fields))
352    }
353
354    /// Convert this logical schema to a shared Arrow [`SchemaRef`].
355    ///
356    /// This is a convenience wrapper around [`Self::to_arrow_schema`].
357    pub fn to_arrow_schema_ref(&self) -> Result<SchemaRef, SchemaConvertError> {
358        Ok(Arc::new(self.to_arrow_schema()?))
359    }
360}
361
362/// Errors that can occur while constructing or validating a logical schema.
363#[derive(Debug, Clone, Snafu, PartialEq, Eq)]
364pub enum LogicalSchemaError {
365    /// Duplicate column names are not allowed.
366    #[snafu(display("Duplicate column name: {column}"))]
367    DuplicateColumn {
368        /// The duplicate column name.
369        column: String,
370    },
371
372    /// FixedBinary columns must include a positive byte width.
373    #[snafu(display(
374        "invalid FixedBinary byte_width for column '{column}': {byte_width} (must be > 0)"
375    ))]
376    FixedBinaryInvalidWidthInSchema {
377        /// Column name that failed validation.
378        column: String,
379        /// Declared byte width.
380        byte_width: i32,
381    },
382
383    /// Parquet FIXED_LEN_BYTE_ARRAY columns must include a type_length.
384    #[snafu(display(
385        "FIXED_LEN_BYTE_ARRAY column '{column}' missing type_length in Parquet schema"
386    ))]
387    FixedBinaryMissingLength {
388        /// Column name that failed validation.
389        column: String,
390    },
391
392    /// Duplicate field names within a struct are not allowed.
393    #[snafu(display("Duplicate field name: column={column_path}, field={field}"))]
394    DuplicatedFieldName {
395        /// Column path for the struct that contains the duplicate field.
396        column_path: String,
397        /// Duplicate field name.
398        field: String,
399    },
400
401    /// Map key fields must be non-nullable in schema validation.
402    #[snafu(display("Invalid Map Key: map key should not be null for column={column_path}"))]
403    InvalidMapKeyNullability {
404        /// Column path for the map with an invalid key nullability.
405        column_path: String,
406    },
407
408    /// Struct fields must be non-empty.
409    #[snafu(display("Struct must have at least one field: column={column_path}"))]
410    EmptyStruct {
411        /// Column path for the empty struct.
412        column_path: String,
413    },
414
415    /// List element fields must have a non-empty name.
416    #[snafu(display("List element field name must be non-empty: column={column_path}"))]
417    ListElementNameEmpty {
418        /// Column path for the list with an empty element name.
419        column_path: String,
420    },
421
422    /// Struct fields must have a non-empty name.
423    #[snafu(display("Struct field name must be non-empty: column={column_path}, field={field}"))]
424    StructFieldNameEmpty {
425        /// Column path for the struct with an empty field name.
426        column_path: String,
427        /// Empty field name.
428        field: String,
429    },
430
431    /// Parquet LIST encoding does not match the supported layout.
432    #[snafu(display("Unsupported Parquet LIST encoding: column={column_path}, details={details}"))]
433    UnsupportedParquetListEncoding {
434        /// Column path for the list with unsupported encoding.
435        column_path: String,
436        /// Details describing why the LIST encoding is unsupported.
437        details: String,
438    },
439
440    /// Parquet MAP encoding does not match the supported layout.
441    #[snafu(display("Unsupported Parquet MAP encoding: column={column_path}, details={details}"))]
442    UnsupportedParquetMapEncoding {
443        /// Column path for the map with unsupported encoding.
444        column_path: String,
445        /// Details describing why the MAP encoding is unsupported.
446        details: String,
447    },
448}
449
450impl LogicalSchema {
451    /// Construct a validated logical schema (rejects duplicate column names).
452    pub fn new(columns: Vec<LogicalField>) -> Result<Self, LogicalSchemaError> {
453        let mut seen = HashSet::new();
454        for col in &columns {
455            if !seen.insert(col.name.clone()) {
456                return DuplicateColumnSnafu {
457                    column: col.name.clone(),
458                }
459                .fail();
460            }
461            validate_field(col, &col.name)?;
462        }
463
464        Ok(Self { columns })
465    }
466
467    /// Borrow the logical columns.
468    pub fn columns(&self) -> &[LogicalField] {
469        &self.columns
470    }
471}
472
473fn validate_field(field: &LogicalField, path: &str) -> Result<(), LogicalSchemaError> {
474    validate_dtype(&field.data_type, path)
475}
476
477fn validate_dtype(dt: &LogicalDataType, path: &str) -> Result<(), LogicalSchemaError> {
478    match dt {
479        LogicalDataType::FixedBinary { byte_width } => {
480            if *byte_width <= 0 {
481                return Err(LogicalSchemaError::FixedBinaryInvalidWidthInSchema {
482                    column: path.to_string(),
483                    byte_width: *byte_width,
484                });
485            }
486            Ok(())
487        }
488
489        LogicalDataType::Struct { fields } => {
490            if fields.is_empty() {
491                return Err(LogicalSchemaError::EmptyStruct {
492                    column_path: path.to_string(),
493                });
494            }
495
496            let mut seen = HashSet::with_capacity(fields.len());
497            for child in fields {
498                if child.name.trim().is_empty() {
499                    return Err(LogicalSchemaError::StructFieldNameEmpty {
500                        column_path: path.to_string(),
501                        field: child.name.clone(),
502                    });
503                }
504
505                if !seen.insert(child.name.clone()) {
506                    return Err(LogicalSchemaError::DuplicatedFieldName {
507                        column_path: path.to_string(),
508                        field: child.name.clone(),
509                    });
510                }
511                let child_path = format!("{}.{}", path, child.name);
512                validate_field(child, &child_path)?;
513            }
514            Ok(())
515        }
516
517        LogicalDataType::List { elements } => {
518            if elements.name.trim().is_empty() {
519                return Err(LogicalSchemaError::ListElementNameEmpty {
520                    column_path: path.to_string(),
521                });
522            }
523            let child_path = format!("{}.{}", path, elements.name);
524            validate_field(elements, &child_path)
525        }
526
527        LogicalDataType::Map { key, value, .. } => {
528            if key.nullable {
529                return Err(LogicalSchemaError::InvalidMapKeyNullability {
530                    column_path: path.to_string(),
531                });
532            }
533            validate_field(key, &format!("{}.key", path))?;
534            if let Some(v) = value.as_deref() {
535                validate_field(v, &format!("{}.value", path))?;
536            }
537
538            Ok(())
539        }
540
541        _ => Ok(()),
542    }
543}
544
545/// Errors encountered while converting between logical schema representations.
546#[derive(Debug, Snafu)]
547pub enum SchemaConvertError {
548    /// The logical type is not supported by the target representation.
549    #[snafu(display("unsupported logical type for column '{column}': {type_name} ({details})"))]
550    UnsupportedLogicalType {
551        /// Column name that failed conversion.
552        column: String,
553        /// High-level type name (for diagnostics).
554        type_name: String,
555        /// Additional details describing why it is unsupported.
556        details: String,
557    },
558
559    /// FixedBinary fields must declare a positive byte width.
560    #[snafu(display(
561        "invalid FixedBinary byte_width for column '{column}': {byte_width} (must be > 0)"
562    ))]
563    FixedBinaryInvalidWidth {
564        /// Column name that failed validation.
565        column: String,
566        /// Declared byte width.
567        byte_width: i32,
568    },
569
570    /// Int96 is rejected for now to avoid legacy timestamp ambiguity.
571    #[snafu(display("Int96 is not supported in v0.1 for column '{column}'"))]
572    Int96Unsupported {
573        /// Column name that failed conversion.
574        column: String,
575    },
576
577    /// Catch-all "Other" types are not accepted in v0.1.
578    #[snafu(display("Other type '{name}' is not supported in v0.1 for column '{column}'"))]
579    OtherTypeUnsupported {
580        /// Column name that failed conversion.
581        column: String,
582        /// Type name reported by the source.
583        name: String,
584    },
585
586    /// Decimal precision/scale is out of supported bounds for Arrow conversion.
587    #[snafu(display(
588        "invalid decimal definition for column '{column}': precision={precision}, scale={scale} ({details})"
589    ))]
590    DecimalInvalid {
591        /// Column name that failed conversion.
592        column: String,
593        /// Declared total precision.
594        precision: i32,
595        /// Declared scale (digits to the right of the decimal point).
596        scale: i32,
597        /// Human-readable details describing the constraint violation.
598        details: String,
599    },
600
601    /// Map keys must be non-nullable when converting to Arrow.
602    #[snafu(display("map key must be non-nullable for column '{column}'"))]
603    MapKeyMustBeNonNull {
604        /// Column name that failed conversion.
605        column: String,
606    },
607}
608
609#[cfg(test)]
610mod tests {
611    use super::*;
612
613    fn sample_logical_schema_all_supported() -> LogicalSchema {
614        LogicalSchema::new(vec![
615            LogicalField {
616                name: "flag".to_string(),
617                data_type: LogicalDataType::Bool,
618                nullable: false,
619            },
620            LogicalField {
621                name: "i32".to_string(),
622                data_type: LogicalDataType::Int32,
623                nullable: false,
624            },
625            LogicalField {
626                name: "i64".to_string(),
627                data_type: LogicalDataType::Int64,
628                nullable: true,
629            },
630            LogicalField {
631                name: "f32".to_string(),
632                data_type: LogicalDataType::Float32,
633                nullable: false,
634            },
635            LogicalField {
636                name: "f64".to_string(),
637                data_type: LogicalDataType::Float64,
638                nullable: true,
639            },
640            LogicalField {
641                name: "text".to_string(),
642                data_type: LogicalDataType::Utf8,
643                nullable: true,
644            },
645            LogicalField {
646                name: "bytes".to_string(),
647                data_type: LogicalDataType::Binary,
648                nullable: true,
649            },
650            LogicalField {
651                name: "fixed".to_string(),
652                data_type: LogicalDataType::FixedBinary { byte_width: 16 },
653                nullable: false,
654            },
655            LogicalField {
656                name: "ts".to_string(),
657                data_type: LogicalDataType::Timestamp {
658                    unit: LogicalTimestampUnit::Micros,
659                    timezone: Some("UTC".to_string()),
660                },
661                nullable: false,
662            },
663        ])
664        .expect("valid logical schema")
665    }
666
667    #[test]
668    fn logical_schema_to_arrow_schema_happy_path() {
669        let logical = sample_logical_schema_all_supported();
670        let schema = logical.to_arrow_schema().expect("arrow schema conversion");
671
672        let expected = Schema::new(vec![
673            Field::new("flag", DataType::Boolean, false),
674            Field::new("i32", DataType::Int32, false),
675            Field::new("i64", DataType::Int64, true),
676            Field::new("f32", DataType::Float32, false),
677            Field::new("f64", DataType::Float64, true),
678            Field::new("text", DataType::Utf8, true),
679            Field::new("bytes", DataType::Binary, true),
680            Field::new("fixed", DataType::FixedSizeBinary(16), false),
681            Field::new(
682                "ts",
683                DataType::Timestamp(TimeUnit::Microsecond, Some(Arc::<str>::from("UTC"))),
684                false,
685            ),
686        ]);
687
688        assert_eq!(schema, expected);
689    }
690
691    #[test]
692    fn logical_schema_rejects_fixed_binary_invalid_width() {
693        for width in [0, -1] {
694            let err = LogicalSchema::new(vec![LogicalField {
695                name: "bad_fixed".to_string(),
696                data_type: LogicalDataType::FixedBinary { byte_width: width },
697                nullable: false,
698            }])
699            .expect_err("expected invalid schema to be rejected");
700
701            assert!(
702                matches!(
703                    &err,
704                    LogicalSchemaError::FixedBinaryInvalidWidthInSchema {
705                        column,
706                        byte_width
707                    } if column == "bad_fixed" && *byte_width == width
708                ),
709                "unexpected error: {err:?}"
710            );
711        }
712    }
713
714    #[test]
715    fn logical_schema_rejects_int96() {
716        let logical = LogicalSchema::new(vec![LogicalField {
717            name: "legacy_ts".to_string(),
718            data_type: LogicalDataType::Int96,
719            nullable: false,
720        }])
721        .expect("valid schema structure");
722
723        let err = logical.to_arrow_schema().unwrap_err();
724        assert!(
725            matches!(
726                &err,
727                SchemaConvertError::Int96Unsupported { column } if column == "legacy_ts"
728            ),
729            "unexpected error: {err:?}"
730        );
731    }
732
733    #[test]
734    fn logical_schema_map_entries_field_is_non_nullable() {
735        let logical = LogicalSchema::new(vec![LogicalField {
736            name: "attrs".to_string(),
737            data_type: LogicalDataType::Map {
738                key: Box::new(LogicalField {
739                    name: "key".to_string(),
740                    data_type: LogicalDataType::Utf8,
741                    nullable: false,
742                }),
743                value: Some(Box::new(LogicalField {
744                    name: "value".to_string(),
745                    data_type: LogicalDataType::Int64,
746                    nullable: true,
747                })),
748                keys_sorted: false,
749            },
750            nullable: true,
751        }])
752        .expect("valid schema");
753
754        let schema = logical.to_arrow_schema().expect("arrow schema conversion");
755        let field = schema.field(0);
756        let DataType::Map(entries_field, _) = field.data_type() else {
757            panic!("expected map type, got {:?}", field.data_type());
758        };
759        assert!(
760            !entries_field.is_nullable(),
761            "map entries field should be non-nullable"
762        );
763    }
764
765    #[test]
766    fn logical_schema_map_value_none_maps_to_null_field() {
767        let logical = LogicalSchema::new(vec![LogicalField {
768            name: "attrs".to_string(),
769            data_type: LogicalDataType::Map {
770                key: Box::new(LogicalField {
771                    name: "key".to_string(),
772                    data_type: LogicalDataType::Utf8,
773                    nullable: false,
774                }),
775                value: None,
776                keys_sorted: false,
777            },
778            nullable: false,
779        }])
780        .expect("valid schema");
781
782        let schema = logical.to_arrow_schema().expect("arrow schema conversion");
783        let field = schema.field(0);
784        let DataType::Map(entries_field, _) = field.data_type() else {
785            panic!("expected map type, got {:?}", field.data_type());
786        };
787        let DataType::Struct(fields) = entries_field.data_type() else {
788            panic!(
789                "expected entries struct, got {:?}",
790                entries_field.data_type()
791            );
792        };
793        let value_field = fields
794            .iter()
795            .find(|f| f.name() == "value")
796            .expect("value field");
797        assert!(
798            matches!(value_field.data_type(), DataType::Null) && value_field.is_nullable(),
799            "value field should be Null and nullable"
800        );
801    }
802
803    #[test]
804    fn logical_schema_rejects_empty_struct_field_name() {
805        let err = LogicalSchema::new(vec![LogicalField {
806            name: "root".to_string(),
807            data_type: LogicalDataType::Struct {
808                fields: vec![LogicalField {
809                    name: "".to_string(),
810                    data_type: LogicalDataType::Int32,
811                    nullable: false,
812                }],
813            },
814            nullable: false,
815        }])
816        .expect_err("expected invalid schema");
817
818        assert!(
819            matches!(
820                &err,
821                LogicalSchemaError::StructFieldNameEmpty { column_path, field }
822                if column_path == "root" && field.is_empty()
823            ),
824            "unexpected error: {err:?}"
825        );
826    }
827
828    #[test]
829    fn logical_schema_rejects_other_type() {
830        let logical = LogicalSchema::new(vec![LogicalField {
831            name: "opaque".to_string(),
832            data_type: LogicalDataType::Other("parquet::Map".to_string()),
833            nullable: true,
834        }])
835        .expect("valid schema structure");
836
837        let err = logical.to_arrow_schema().unwrap_err();
838        assert!(
839            matches!(
840                &err,
841                SchemaConvertError::OtherTypeUnsupported { column, name }
842                    if column == "opaque" && name == "parquet::Map"
843            ),
844            "unexpected error: {err:?}"
845        );
846    }
847
848    #[test]
849    fn logical_schema_timestamp_without_timezone() {
850        let logical = LogicalSchema::new(vec![LogicalField {
851            name: "ts".to_string(),
852            data_type: LogicalDataType::Timestamp {
853                unit: LogicalTimestampUnit::Millis,
854                timezone: None,
855            },
856            nullable: false,
857        }])
858        .expect("valid schema structure");
859
860        let schema = logical.to_arrow_schema().expect("arrow schema conversion");
861        let expected = Schema::new(vec![Field::new(
862            "ts",
863            DataType::Timestamp(TimeUnit::Millisecond, None),
864            false,
865        )]);
866        assert_eq!(schema, expected);
867    }
868
869    #[test]
870    fn logical_schema_decimal_conversion_bounds() {
871        let valid_128 = LogicalSchema::new(vec![LogicalField {
872            name: "dec128".to_string(),
873            data_type: LogicalDataType::Decimal {
874                precision: 38,
875                scale: 10,
876            },
877            nullable: false,
878        }])
879        .expect("valid schema structure");
880        let schema = valid_128
881            .to_arrow_schema()
882            .expect("arrow schema conversion");
883        assert_eq!(
884            schema,
885            Schema::new(vec![Field::new(
886                "dec128",
887                DataType::Decimal128(38, 10),
888                false
889            )])
890        );
891
892        let valid_256 = LogicalSchema::new(vec![LogicalField {
893            name: "dec256".to_string(),
894            data_type: LogicalDataType::Decimal {
895                precision: 76,
896                scale: 5,
897            },
898            nullable: false,
899        }])
900        .expect("valid schema structure");
901        let schema = valid_256
902            .to_arrow_schema()
903            .expect("arrow schema conversion");
904        assert_eq!(
905            schema,
906            Schema::new(vec![Field::new(
907                "dec256",
908                DataType::Decimal256(76, 5),
909                false
910            )])
911        );
912
913        let invalid = LogicalSchema::new(vec![LogicalField {
914            name: "dec_too_large".to_string(),
915            data_type: LogicalDataType::Decimal {
916                precision: 77,
917                scale: 0,
918            },
919            nullable: false,
920        }])
921        .expect("valid schema structure");
922        let err = invalid.to_arrow_schema().unwrap_err();
923        assert!(
924            matches!(
925                &err,
926                SchemaConvertError::DecimalInvalid { column, precision, scale, .. }
927                    if column == "dec_too_large" && *precision == 77 && *scale == 0
928            ),
929            "unexpected error: {err:?}"
930        );
931    }
932
933    #[test]
934    fn logical_schema_decimal_validation_errors() {
935        let cases = vec![
936            ("dec_precision_zero", 0, 0, "precision must be > 0"),
937            ("dec_scale_negative", 10, -1, "scale must be >= 0"),
938            ("dec_scale_gt_precision", 4, 5, "scale must be <= precision"),
939        ];
940
941        for (name, precision, scale, details_substr) in cases {
942            let logical = LogicalSchema::new(vec![LogicalField {
943                name: name.to_string(),
944                data_type: LogicalDataType::Decimal { precision, scale },
945                nullable: false,
946            }])
947            .expect("valid schema structure");
948
949            let err = logical.to_arrow_schema().unwrap_err();
950            assert!(
951                matches!(
952                    &err,
953                    SchemaConvertError::DecimalInvalid { column, precision: p, scale: s, details }
954                        if column == name && *p == precision && *s == scale && details.contains(details_substr)
955                ),
956                "unexpected error: {err:?}"
957            );
958        }
959    }
960
961    #[test]
962    fn logical_schema_fixed_binary_json_roundtrip() {
963        let logical = LogicalSchema::new(vec![LogicalField {
964            name: "fixed".to_string(),
965            data_type: LogicalDataType::FixedBinary { byte_width: 8 },
966            nullable: false,
967        }])
968        .expect("valid schema structure");
969
970        let json = serde_json::to_string(&logical).unwrap();
971        let back: LogicalSchema = serde_json::from_str(&json).unwrap();
972        assert_eq!(back, logical);
973    }
974
975    #[test]
976    fn logical_schema_decimal_json_roundtrip() {
977        let logical = LogicalSchema::new(vec![LogicalField {
978            name: "amount".to_string(),
979            data_type: LogicalDataType::Decimal {
980                precision: 18,
981                scale: 4,
982            },
983            nullable: true,
984        }])
985        .expect("valid schema structure");
986
987        let json = serde_json::to_string(&logical).unwrap();
988        let back: LogicalSchema = serde_json::from_str(&json).unwrap();
989        assert_eq!(back, logical);
990    }
991}