Skip to main content

deltalake_core/kernel/schema/
schema.rs

1//! Delta table schema
2
3use std::any::Any;
4use std::sync::Arc;
5
6pub use delta_kernel::schema::{
7    ArrayType, ColumnMetadataKey, DataType, DecimalType, MapType, MetadataValue, PrimitiveType,
8    StructField, StructType,
9};
10use serde_json::Value;
11
12use crate::kernel::error::Error;
13use crate::schema::DataCheck;
14use crate::table::GeneratedColumn;
15
16/// Type alias for a top level schema
17pub type Schema = StructType;
18/// Schema reference type
19pub type SchemaRef = Arc<StructType>;
20
21/// An invariant for a column that is enforced on all writes to a Delta table.
22#[derive(Eq, PartialEq, Debug, Default, Clone)]
23pub struct Invariant {
24    /// The full path to the field.
25    pub field_name: String,
26    /// The SQL string that must always evaluate to true.
27    pub invariant_sql: String,
28}
29
30impl Invariant {
31    /// Create a new invariant
32    pub fn new(field_name: &str, invariant_sql: &str) -> Self {
33        Self {
34            field_name: field_name.to_string(),
35            invariant_sql: invariant_sql.to_string(),
36        }
37    }
38}
39
40impl DataCheck for Invariant {
41    fn get_name(&self) -> &str {
42        &self.field_name
43    }
44
45    fn get_expression(&self) -> &str {
46        &self.invariant_sql
47    }
48
49    fn as_any(&self) -> &dyn Any {
50        self
51    }
52}
53
54/// Trait to add convenience functions to struct type
55pub trait StructTypeExt {
56    /// Get all invariants in the schemas
57    fn get_invariants(&self) -> Result<Vec<Invariant>, Error>;
58
59    /// Get all generated column expressions
60    fn get_generated_columns(&self) -> Result<Vec<GeneratedColumn>, Error>;
61}
62
63impl StructTypeExt for StructType {
64    /// Get all get_generated_columns in the schemas
65    fn get_generated_columns(&self) -> Result<Vec<GeneratedColumn>, Error> {
66        let mut remaining_fields: Vec<(String, StructField)> = self
67            .fields()
68            .map(|field| (field.name.clone(), field.clone()))
69            .collect();
70        let mut generated_cols: Vec<GeneratedColumn> = Vec::new();
71
72        while let Some((field_path, field)) = remaining_fields.pop() {
73            if let Some(MetadataValue::String(generated_col_string)) = field
74                .metadata
75                .get(ColumnMetadataKey::GenerationExpression.as_ref())
76            {
77                generated_cols.push(GeneratedColumn::new(
78                    &field_path,
79                    generated_col_string,
80                    field.data_type(),
81                ));
82            }
83        }
84        Ok(generated_cols)
85    }
86
87    /// Get all invariants in the schemas
88    fn get_invariants(&self) -> Result<Vec<Invariant>, Error> {
89        let mut remaining_fields: Vec<(String, StructField)> = self
90            .fields()
91            .map(|field| (field.name.clone(), field.clone()))
92            .collect();
93        let mut invariants: Vec<Invariant> = Vec::new();
94
95        let add_segment = |prefix: &str, segment: &str| -> String {
96            if prefix.is_empty() {
97                segment.to_owned()
98            } else {
99                format!("{prefix}.{segment}")
100            }
101        };
102
103        while let Some((field_path, field)) = remaining_fields.pop() {
104            match field.data_type() {
105                DataType::Struct(inner) => {
106                    remaining_fields.extend(
107                        inner
108                            .fields()
109                            .map(|field| {
110                                let new_prefix = add_segment(&field_path, &field.name);
111                                (new_prefix, field.clone())
112                            })
113                            .collect::<Vec<(String, StructField)>>(),
114                    );
115                }
116                DataType::Array(inner) => {
117                    let element_field_name = add_segment(&field_path, "element");
118                    remaining_fields.push((
119                        element_field_name,
120                        StructField::new("".to_string(), inner.element_type.clone(), false),
121                    ));
122                }
123                DataType::Map(inner) => {
124                    let key_field_name = add_segment(&field_path, "key");
125                    remaining_fields.push((
126                        key_field_name,
127                        StructField::new("".to_string(), inner.key_type.clone(), false),
128                    ));
129                    let value_field_name = add_segment(&field_path, "value");
130                    remaining_fields.push((
131                        value_field_name,
132                        StructField::new("".to_string(), inner.value_type.clone(), false),
133                    ));
134                }
135                _ => {}
136            }
137            // JSON format: {"expression": {"expression": "<SQL STRING>"} }
138            if let Some(MetadataValue::String(invariant_json)) =
139                field.metadata.get(ColumnMetadataKey::Invariants.as_ref())
140            {
141                let json: Value = serde_json::from_str(invariant_json).map_err(|e| {
142                    Error::InvalidInvariantJson {
143                        json_err: e,
144                        line: invariant_json.to_string(),
145                    }
146                })?;
147                if let Value::Object(json) = json
148                    && let Some(Value::Object(expr1)) = json.get("expression")
149                    && let Some(Value::String(sql)) = expr1.get("expression")
150                {
151                    invariants.push(Invariant::new(&field_path, sql));
152                }
153            }
154        }
155        Ok(invariants)
156    }
157}
158
159#[cfg(test)]
160mod tests {
161    use super::*;
162    use serde_json;
163    use serde_json::json;
164
165    #[test]
166    fn test_get_generated_columns() {
167        let schema: StructType = serde_json::from_value(json!(
168            {
169                "type":"struct",
170                "fields":[
171                    {"name":"id","type":"integer","nullable":true,"metadata":{}},
172                    {"name":"gc","type":"integer","nullable":true,"metadata":{}}]
173            }
174        ))
175        .unwrap();
176        let cols = schema.get_generated_columns().unwrap();
177        assert_eq!(cols.len(), 0);
178
179        let schema: StructType = serde_json::from_value(json!(
180            {
181                "type":"struct",
182                "fields":[
183                    {"name":"id","type":"integer","nullable":true,"metadata":{}},
184                    {"name":"gc","type":"integer","nullable":true,"metadata":{"delta.generationExpression":"5"}}]
185            }
186        )).unwrap();
187        let cols = schema.get_generated_columns().unwrap();
188        assert_eq!(cols.len(), 1);
189        assert_eq!(cols[0].data_type, DataType::INTEGER);
190        assert_eq!(cols[0].validation_expr, "gc <=> 5");
191
192        let schema: StructType = serde_json::from_value(json!(
193            {
194                "type":"struct",
195                "fields":[
196                    {"name":"id","type":"integer","nullable":true,"metadata":{}},
197                    {"name":"gc","type":"integer","nullable":true,"metadata":{"delta.generationExpression":"5"}},
198                    {"name":"id2","type":"integer","nullable":true,"metadata":{"delta.generationExpression":"id * 10"}},]
199            }
200        )).unwrap();
201        let cols = schema.get_generated_columns().unwrap();
202        assert_eq!(cols.len(), 2);
203    }
204
205    #[test]
206    fn test_get_invariants() {
207        let schema: StructType = serde_json::from_value(json!({
208            "type": "struct",
209            "fields": [{"name": "x", "type": "string", "nullable": true, "metadata": {}}]
210        }))
211        .unwrap();
212        let invariants = schema.get_invariants().unwrap();
213        assert_eq!(invariants.len(), 0);
214
215        let schema: StructType = serde_json::from_value(json!({
216            "type": "struct",
217            "fields": [
218                {"name": "x", "type": "integer", "nullable": true, "metadata": {
219                    "delta.invariants": "{\"expression\": { \"expression\": \"x > 2\"} }"
220                }},
221                {"name": "y", "type": "integer", "nullable": true, "metadata": {
222                    "delta.invariants": "{\"expression\": { \"expression\": \"y < 4\"} }"
223                }}
224            ]
225        }))
226        .unwrap();
227        let invariants = schema.get_invariants().unwrap();
228        assert_eq!(invariants.len(), 2);
229        assert!(invariants.contains(&Invariant::new("x", "x > 2")));
230        assert!(invariants.contains(&Invariant::new("y", "y < 4")));
231
232        let schema: StructType = serde_json::from_value(json!({
233            "type": "struct",
234            "fields": [{
235                "name": "a_map",
236                "type": {
237                    "type": "map",
238                    "keyType": "string",
239                    "valueType": {
240                        "type": "array",
241                        "elementType": {
242                            "type": "struct",
243                            "fields": [{
244                                "name": "d",
245                                "type": "integer",
246                                "metadata": {
247                                    "delta.invariants": "{\"expression\": { \"expression\": \"a_map.value.element.d < 4\"} }"
248                                },
249                                "nullable": false
250                            }]
251                        },
252                        "containsNull": false
253                    },
254                    "valueContainsNull": false
255                },
256                "nullable": false,
257                "metadata": {}
258            }]
259        })).unwrap();
260        let invariants = schema.get_invariants().unwrap();
261        assert_eq!(invariants.len(), 1);
262        assert_eq!(
263            invariants[0],
264            Invariant::new("a_map.value.element.d", "a_map.value.element.d < 4")
265        );
266    }
267
268    /// <https://github.com/delta-io/delta-rs/issues/2152>
269    #[test]
270    fn test_identity_columns() {
271        let buf = r#"{"type":"struct","fields":[{"name":"ID_D_DATE","type":"long","nullable":true,"metadata":{"delta.identity.start":1,"delta.identity.step":1,"delta.identity.allowExplicitInsert":false}},{"name":"TXT_DateKey","type":"string","nullable":true,"metadata":{}}]}"#;
272        let _schema: StructType = serde_json::from_str(buf).expect("Failed to load");
273    }
274}