Skip to main content

supertable_core/
validation.rs

1use arrow::array::RecordBatch;
2use arrow::datatypes::SchemaRef;
3use thiserror::Error;
4
5/// Errors that can occur during validation.
6#[derive(Debug, Error)]
7pub enum ValidationError {
8    #[error("schema mismatch: expected {expected}, found {found}")]
9    SchemaMismatch { expected: String, found: String },
10
11    #[error("null constraint violation: column {column} is not nullable but found nulls")]
12    NotNullViolation { column: String },
13
14    #[error("compatibility error: {message}")]
15    CompatibilityError { message: String },
16}
17
18/// Validates record batches against a target schema.
19pub struct SchemaValidator {
20    target_schema: SchemaRef,
21}
22
23impl SchemaValidator {
24    pub fn new(target_schema: SchemaRef) -> Self {
25        Self { target_schema }
26    }
27
28    /// Validates a RecordBatch against the target schema.
29    pub fn validate(&self, batch: &RecordBatch) -> Result<(), ValidationError> {
30        // 1. Check if schemas are compatible
31        if batch.schema() != self.target_schema {
32            // Deep check: sometimes the metadata differs but fields are same
33            if !self.schemas_are_compatible(&batch.schema(), &self.target_schema) {
34                return Err(ValidationError::SchemaMismatch {
35                    expected: format!("{:?}", self.target_schema),
36                    found: format!("{:?}", batch.schema()),
37                });
38            }
39        }
40
41        // 2. Check for null constraints
42        for field in self.target_schema.fields() {
43            if !field.is_nullable() {
44                let column = batch.column_by_name(field.name()).ok_or_else(|| {
45                    ValidationError::SchemaMismatch {
46                        expected: field.name().to_string(),
47                        found: "missing column".to_string(),
48                    }
49                })?;
50
51                if column.null_count() > 0 {
52                    return Err(ValidationError::NotNullViolation {
53                        column: field.name().to_string(),
54                    });
55                }
56            }
57        }
58
59        Ok(())
60    }
61
62    fn schemas_are_compatible(&self, s1: &SchemaRef, s2: &SchemaRef) -> bool {
63        if s1.fields().len() != s2.fields().len() {
64            return false;
65        }
66
67        for (f1, f2) in s1.fields().iter().zip(s2.fields().iter()) {
68            if f1.name() != f2.name() || f1.data_type() != f2.data_type() {
69                return false;
70            }
71        }
72
73        true
74    }
75}
76
77/// Validates if a new schema is compatible with an old schema.
78pub struct SchemaCompatibilityValidator {
79    old_schema: crate::schema::Schema,
80}
81
82impl SchemaCompatibilityValidator {
83    pub fn new(old_schema: crate::schema::Schema) -> Self {
84        Self { old_schema }
85    }
86
87    /// Validates if new_schema can safely evolve from old_schema.
88    pub fn validate(&self, new_schema: &crate::schema::Schema) -> Result<(), ValidationError> {
89        for old_field in &self.old_schema.fields {
90            let new_field = match new_schema.find_field(old_field.id) {
91                Some(f) => f,
92                None => continue, // Dropping columns is generally allowed in Iceberg
93            };
94
95            // 1. Type widening check
96            if old_field.field_type != new_field.field_type {
97                if !old_field.field_type.can_widen_to(&new_field.field_type) {
98                    return Err(ValidationError::CompatibilityError {
99                        message: format!(
100                            "Cannot change type of field {} from {:?} to {:?}",
101                            old_field.name, old_field.field_type, new_field.field_type
102                        ),
103                    });
104                }
105            }
106
107            // 2. Nullability check: Cannot make an optional field required
108            if !old_field.required && new_field.required {
109                return Err(ValidationError::CompatibilityError {
110                    message: format!("Cannot make optional field {} required", old_field.name),
111                });
112            }
113        }
114
115        // 3. New fields must be optional
116        for new_field in &new_schema.fields {
117            if self.old_schema.find_field(new_field.id).is_none() && new_field.required {
118                return Err(ValidationError::CompatibilityError {
119                    message: format!("New field {} must be optional", new_field.name),
120                });
121            }
122        }
123
124        Ok(())
125    }
126}