Skip to main content

timeseries_table_core/metadata/
schema_compat.rs

1//! Schema compatibility helpers (pure metadata).
2//!
3//! v0.1 rule: **no schema evolution**.
4//! Every appended segment must have a [`LogicalSchema`] that matches the table's
5//! canonical schema exactly.
6
7use std::collections::HashMap;
8
9use snafu::prelude::*;
10
11use crate::metadata::{
12    logical_schema::{LogicalDataType, LogicalField, LogicalSchema, LogicalSchemaError},
13    table_metadata::{TableMeta, TimeIndexSpec},
14};
15
16/// Errors raised when a segment's schema is not compatible with the table.
17#[derive(Debug, Snafu)]
18pub enum SchemaCompatibilityError {
19    /// The table does not yet have a canonical logical schema.
20    ///
21    /// Many call sites (like append) may choose to *not* use this and
22    /// instead adopt the first segment's schema, but we keep the error
23    /// available for operations that require a fixed schema.
24    #[snafu(display("Table has no logical_schema; v0.1 cannot append without a canonical schema"))]
25    MissingTableSchema,
26
27    /// The segment is missing a column that exists in the table schema.
28    #[snafu(display("Segment schema is missing required column {column}"))]
29    MissingColumn {
30        /// The name of the missing column.
31        column: String,
32    },
33
34    /// The segment has an extra column that does not exist in the table schema.
35    #[snafu(display("Segment schema has extra column {column} not present in table schema"))]
36    ExtraColumn {
37        /// The name of the extra column.
38        column: String,
39    },
40
41    /// Column exists in both schemas, but the logical type / nullability differ.
42    #[snafu(display(
43        "Type mismatch for column {column}: table has {table_type}, segment has {segment_type}"
44    ))]
45    TypeMismatch {
46        /// The name of the column with mismatched type.
47        column: String,
48        /// The type in the table schema.
49        table_type: LogicalDataType,
50        /// The type in the segment schema.
51        segment_type: LogicalDataType,
52    },
53
54    /// Specialised version of TypeMismatch for the time index column.
55    #[snafu(display(
56        "Time index column {column} has incompatible type: table has {table_type}, \
57         segment has {segment_type}"
58    ))]
59    TimeIndexTypeMismatch {
60        /// The name of the time index column.
61        column: String,
62        /// The type in the table schema.
63        table_type: LogicalDataType,
64        /// The type in the segment schema.
65        segment_type: LogicalDataType,
66    },
67
68    /// Logical schema construction or validation failed.
69    #[snafu(display("Logical schema is invalid: {source}"))]
70    LogicalSchema {
71        /// The underlying logical schema error.
72        #[snafu(source)]
73        source: LogicalSchemaError,
74    },
75}
76
77/// A convenience type alias for results of schema compatibility operations.
78pub type SchemaResult<T> = Result<T, SchemaCompatibilityError>;
79
80/// Convenience helper if you want to require a schema to be present.
81pub fn require_table_schema(meta: &TableMeta) -> SchemaResult<&LogicalSchema> {
82    match &meta.logical_schema {
83        Some(schema) => Ok(schema),
84        None => MissingTableSchemaSnafu.fail(),
85    }
86}
87
88fn columns_by_name(schema: &LogicalSchema) -> HashMap<&str, &LogicalField> {
89    schema
90        .columns()
91        .iter()
92        .map(|col| (col.name.as_str(), col))
93        .collect()
94}
95
96/// Enforce the v0.1 "no schema evolution" rule.
97///
98/// - Every table column must appear in the segment schema.
99/// - No extra columns may appear in the segment schema.
100/// - For every column, logical type and nullability must match exactly.
101/// - If the mismatch is on the time index column (from `index`), we use a
102///   more specific `TimeIndexTypeMismatch` error.
103pub fn ensure_schema_exact_match(
104    table_schema: &LogicalSchema,
105    segment_schema: &LogicalSchema,
106    index: &TimeIndexSpec,
107) -> SchemaResult<()> {
108    let time_col_name = index.timestamp_column.as_str();
109
110    let table_cols = columns_by_name(table_schema);
111    let seg_cols = columns_by_name(segment_schema);
112
113    for (name, table_field) in &table_cols {
114        let seg_field =
115            seg_cols
116                .get(name)
117                .ok_or_else(|| SchemaCompatibilityError::MissingColumn {
118                    column: (*name).to_string(),
119                })?;
120
121        if table_field.data_type != seg_field.data_type
122            || table_field.nullable != seg_field.nullable
123        {
124            let err = if *name == time_col_name {
125                SchemaCompatibilityError::TimeIndexTypeMismatch {
126                    column: (*name).to_string(),
127                    table_type: table_field.data_type.clone(),
128                    segment_type: seg_field.data_type.clone(),
129                }
130            } else {
131                SchemaCompatibilityError::TypeMismatch {
132                    column: (*name).to_string(),
133                    table_type: table_field.data_type.clone(),
134                    segment_type: seg_field.data_type.clone(),
135                }
136            };
137            return Err(err);
138        }
139    }
140
141    for name in seg_cols.keys() {
142        if !table_cols.contains_key(name) {
143            return Err(SchemaCompatibilityError::ExtraColumn {
144                column: (*name).to_string(),
145            });
146        }
147    }
148
149    Ok(())
150}