supertable_core/
validation.rs1use arrow::array::RecordBatch;
2use arrow::datatypes::SchemaRef;
3use thiserror::Error;
4
5#[derive(Debug, Error)]
7pub enum ValidationError {
8 #[error("schema mismatch: expected {expected}, found {found}")]
9 SchemaMismatch { expected: String, found: String },
10
11 #[error("null constraint violation: column {column} is not nullable but found nulls")]
12 NotNullViolation { column: String },
13
14 #[error("compatibility error: {message}")]
15 CompatibilityError { message: String },
16}
17
18pub struct SchemaValidator {
20 target_schema: SchemaRef,
21}
22
23impl SchemaValidator {
24 pub fn new(target_schema: SchemaRef) -> Self {
25 Self { target_schema }
26 }
27
28 pub fn validate(&self, batch: &RecordBatch) -> Result<(), ValidationError> {
30 if batch.schema() != self.target_schema {
32 if !self.schemas_are_compatible(&batch.schema(), &self.target_schema) {
34 return Err(ValidationError::SchemaMismatch {
35 expected: format!("{:?}", self.target_schema),
36 found: format!("{:?}", batch.schema()),
37 });
38 }
39 }
40
41 for field in self.target_schema.fields() {
43 if !field.is_nullable() {
44 let column = batch.column_by_name(field.name()).ok_or_else(|| {
45 ValidationError::SchemaMismatch {
46 expected: field.name().to_string(),
47 found: "missing column".to_string(),
48 }
49 })?;
50
51 if column.null_count() > 0 {
52 return Err(ValidationError::NotNullViolation {
53 column: field.name().to_string(),
54 });
55 }
56 }
57 }
58
59 Ok(())
60 }
61
62 fn schemas_are_compatible(&self, s1: &SchemaRef, s2: &SchemaRef) -> bool {
63 if s1.fields().len() != s2.fields().len() {
64 return false;
65 }
66
67 for (f1, f2) in s1.fields().iter().zip(s2.fields().iter()) {
68 if f1.name() != f2.name() || f1.data_type() != f2.data_type() {
69 return false;
70 }
71 }
72
73 true
74 }
75}
76
77pub struct SchemaCompatibilityValidator {
79 old_schema: crate::schema::Schema,
80}
81
82impl SchemaCompatibilityValidator {
83 pub fn new(old_schema: crate::schema::Schema) -> Self {
84 Self { old_schema }
85 }
86
87 pub fn validate(&self, new_schema: &crate::schema::Schema) -> Result<(), ValidationError> {
89 for old_field in &self.old_schema.fields {
90 let new_field = match new_schema.find_field(old_field.id) {
91 Some(f) => f,
92 None => continue, };
94
95 if old_field.field_type != new_field.field_type {
97 if !old_field.field_type.can_widen_to(&new_field.field_type) {
98 return Err(ValidationError::CompatibilityError {
99 message: format!(
100 "Cannot change type of field {} from {:?} to {:?}",
101 old_field.name, old_field.field_type, new_field.field_type
102 ),
103 });
104 }
105 }
106
107 if !old_field.required && new_field.required {
109 return Err(ValidationError::CompatibilityError {
110 message: format!("Cannot make optional field {} required", old_field.name),
111 });
112 }
113 }
114
115 for new_field in &new_schema.fields {
117 if self.old_schema.find_field(new_field.id).is_none() && new_field.required {
118 return Err(ValidationError::CompatibilityError {
119 message: format!("New field {} must be optional", new_field.name),
120 });
121 }
122 }
123
124 Ok(())
125 }
126}