timeseries_table_core/metadata/
schema_compat.rs1use std::collections::HashMap;
8
9use snafu::prelude::*;
10
11use crate::metadata::{
12 logical_schema::{LogicalDataType, LogicalField, LogicalSchema, LogicalSchemaError},
13 table_metadata::{TableMeta, TimeIndexSpec},
14};
15
16#[derive(Debug, Snafu)]
18pub enum SchemaCompatibilityError {
19 #[snafu(display("Table has no logical_schema; v0.1 cannot append without a canonical schema"))]
25 MissingTableSchema,
26
27 #[snafu(display("Segment schema is missing required column {column}"))]
29 MissingColumn {
30 column: String,
32 },
33
34 #[snafu(display("Segment schema has extra column {column} not present in table schema"))]
36 ExtraColumn {
37 column: String,
39 },
40
41 #[snafu(display(
43 "Type mismatch for column {column}: table has {table_type}, segment has {segment_type}"
44 ))]
45 TypeMismatch {
46 column: String,
48 table_type: LogicalDataType,
50 segment_type: LogicalDataType,
52 },
53
54 #[snafu(display(
56 "Time index column {column} has incompatible type: table has {table_type}, \
57 segment has {segment_type}"
58 ))]
59 TimeIndexTypeMismatch {
60 column: String,
62 table_type: LogicalDataType,
64 segment_type: LogicalDataType,
66 },
67
68 #[snafu(display("Logical schema is invalid: {source}"))]
70 LogicalSchema {
71 #[snafu(source)]
73 source: LogicalSchemaError,
74 },
75}
76
77pub type SchemaResult<T> = Result<T, SchemaCompatibilityError>;
79
80pub fn require_table_schema(meta: &TableMeta) -> SchemaResult<&LogicalSchema> {
82 match &meta.logical_schema {
83 Some(schema) => Ok(schema),
84 None => MissingTableSchemaSnafu.fail(),
85 }
86}
87
88fn columns_by_name(schema: &LogicalSchema) -> HashMap<&str, &LogicalField> {
89 schema
90 .columns()
91 .iter()
92 .map(|col| (col.name.as_str(), col))
93 .collect()
94}
95
96pub fn ensure_schema_exact_match(
104 table_schema: &LogicalSchema,
105 segment_schema: &LogicalSchema,
106 index: &TimeIndexSpec,
107) -> SchemaResult<()> {
108 let time_col_name = index.timestamp_column.as_str();
109
110 let table_cols = columns_by_name(table_schema);
111 let seg_cols = columns_by_name(segment_schema);
112
113 for (name, table_field) in &table_cols {
114 let seg_field =
115 seg_cols
116 .get(name)
117 .ok_or_else(|| SchemaCompatibilityError::MissingColumn {
118 column: (*name).to_string(),
119 })?;
120
121 if table_field.data_type != seg_field.data_type
122 || table_field.nullable != seg_field.nullable
123 {
124 let err = if *name == time_col_name {
125 SchemaCompatibilityError::TimeIndexTypeMismatch {
126 column: (*name).to_string(),
127 table_type: table_field.data_type.clone(),
128 segment_type: seg_field.data_type.clone(),
129 }
130 } else {
131 SchemaCompatibilityError::TypeMismatch {
132 column: (*name).to_string(),
133 table_type: table_field.data_type.clone(),
134 segment_type: seg_field.data_type.clone(),
135 }
136 };
137 return Err(err);
138 }
139 }
140
141 for name in seg_cols.keys() {
142 if !table_cols.contains_key(name) {
143 return Err(SchemaCompatibilityError::ExtraColumn {
144 column: (*name).to_string(),
145 });
146 }
147 }
148
149 Ok(())
150}