deltalake_core/kernel/schema/
schema.rs1use std::any::Any;
4use std::sync::Arc;
5
6pub use delta_kernel::schema::{
7 ArrayType, ColumnMetadataKey, DataType, DecimalType, MapType, MetadataValue, PrimitiveType,
8 StructField, StructType,
9};
10use serde_json::Value;
11
12use crate::kernel::error::Error;
13use crate::schema::DataCheck;
14use crate::table::GeneratedColumn;
15
16pub type Schema = StructType;
18pub type SchemaRef = Arc<StructType>;
20
21#[derive(Eq, PartialEq, Debug, Default, Clone)]
23pub struct Invariant {
24 pub field_name: String,
26 pub invariant_sql: String,
28}
29
30impl Invariant {
31 pub fn new(field_name: &str, invariant_sql: &str) -> Self {
33 Self {
34 field_name: field_name.to_string(),
35 invariant_sql: invariant_sql.to_string(),
36 }
37 }
38}
39
40impl DataCheck for Invariant {
41 fn get_name(&self) -> &str {
42 &self.field_name
43 }
44
45 fn get_expression(&self) -> &str {
46 &self.invariant_sql
47 }
48
49 fn as_any(&self) -> &dyn Any {
50 self
51 }
52}
53
54pub trait StructTypeExt {
56 fn get_invariants(&self) -> Result<Vec<Invariant>, Error>;
58
59 fn get_generated_columns(&self) -> Result<Vec<GeneratedColumn>, Error>;
61}
62
63impl StructTypeExt for StructType {
64 fn get_generated_columns(&self) -> Result<Vec<GeneratedColumn>, Error> {
66 let mut remaining_fields: Vec<(String, StructField)> = self
67 .fields()
68 .map(|field| (field.name.clone(), field.clone()))
69 .collect();
70 let mut generated_cols: Vec<GeneratedColumn> = Vec::new();
71
72 while let Some((field_path, field)) = remaining_fields.pop() {
73 if let Some(MetadataValue::String(generated_col_string)) = field
74 .metadata
75 .get(ColumnMetadataKey::GenerationExpression.as_ref())
76 {
77 generated_cols.push(GeneratedColumn::new(
78 &field_path,
79 generated_col_string,
80 field.data_type(),
81 ));
82 }
83 }
84 Ok(generated_cols)
85 }
86
87 fn get_invariants(&self) -> Result<Vec<Invariant>, Error> {
89 let mut remaining_fields: Vec<(String, StructField)> = self
90 .fields()
91 .map(|field| (field.name.clone(), field.clone()))
92 .collect();
93 let mut invariants: Vec<Invariant> = Vec::new();
94
95 let add_segment = |prefix: &str, segment: &str| -> String {
96 if prefix.is_empty() {
97 segment.to_owned()
98 } else {
99 format!("{prefix}.{segment}")
100 }
101 };
102
103 while let Some((field_path, field)) = remaining_fields.pop() {
104 match field.data_type() {
105 DataType::Struct(inner) => {
106 remaining_fields.extend(
107 inner
108 .fields()
109 .map(|field| {
110 let new_prefix = add_segment(&field_path, &field.name);
111 (new_prefix, field.clone())
112 })
113 .collect::<Vec<(String, StructField)>>(),
114 );
115 }
116 DataType::Array(inner) => {
117 let element_field_name = add_segment(&field_path, "element");
118 remaining_fields.push((
119 element_field_name,
120 StructField::new("".to_string(), inner.element_type.clone(), false),
121 ));
122 }
123 DataType::Map(inner) => {
124 let key_field_name = add_segment(&field_path, "key");
125 remaining_fields.push((
126 key_field_name,
127 StructField::new("".to_string(), inner.key_type.clone(), false),
128 ));
129 let value_field_name = add_segment(&field_path, "value");
130 remaining_fields.push((
131 value_field_name,
132 StructField::new("".to_string(), inner.value_type.clone(), false),
133 ));
134 }
135 _ => {}
136 }
137 if let Some(MetadataValue::String(invariant_json)) =
139 field.metadata.get(ColumnMetadataKey::Invariants.as_ref())
140 {
141 let json: Value = serde_json::from_str(invariant_json).map_err(|e| {
142 Error::InvalidInvariantJson {
143 json_err: e,
144 line: invariant_json.to_string(),
145 }
146 })?;
147 if let Value::Object(json) = json
148 && let Some(Value::Object(expr1)) = json.get("expression")
149 && let Some(Value::String(sql)) = expr1.get("expression")
150 {
151 invariants.push(Invariant::new(&field_path, sql));
152 }
153 }
154 }
155 Ok(invariants)
156 }
157}
158
159#[cfg(test)]
160mod tests {
161 use super::*;
162 use serde_json;
163 use serde_json::json;
164
165 #[test]
166 fn test_get_generated_columns() {
167 let schema: StructType = serde_json::from_value(json!(
168 {
169 "type":"struct",
170 "fields":[
171 {"name":"id","type":"integer","nullable":true,"metadata":{}},
172 {"name":"gc","type":"integer","nullable":true,"metadata":{}}]
173 }
174 ))
175 .unwrap();
176 let cols = schema.get_generated_columns().unwrap();
177 assert_eq!(cols.len(), 0);
178
179 let schema: StructType = serde_json::from_value(json!(
180 {
181 "type":"struct",
182 "fields":[
183 {"name":"id","type":"integer","nullable":true,"metadata":{}},
184 {"name":"gc","type":"integer","nullable":true,"metadata":{"delta.generationExpression":"5"}}]
185 }
186 )).unwrap();
187 let cols = schema.get_generated_columns().unwrap();
188 assert_eq!(cols.len(), 1);
189 assert_eq!(cols[0].data_type, DataType::INTEGER);
190 assert_eq!(cols[0].validation_expr, "gc <=> 5");
191
192 let schema: StructType = serde_json::from_value(json!(
193 {
194 "type":"struct",
195 "fields":[
196 {"name":"id","type":"integer","nullable":true,"metadata":{}},
197 {"name":"gc","type":"integer","nullable":true,"metadata":{"delta.generationExpression":"5"}},
198 {"name":"id2","type":"integer","nullable":true,"metadata":{"delta.generationExpression":"id * 10"}},]
199 }
200 )).unwrap();
201 let cols = schema.get_generated_columns().unwrap();
202 assert_eq!(cols.len(), 2);
203 }
204
205 #[test]
206 fn test_get_invariants() {
207 let schema: StructType = serde_json::from_value(json!({
208 "type": "struct",
209 "fields": [{"name": "x", "type": "string", "nullable": true, "metadata": {}}]
210 }))
211 .unwrap();
212 let invariants = schema.get_invariants().unwrap();
213 assert_eq!(invariants.len(), 0);
214
215 let schema: StructType = serde_json::from_value(json!({
216 "type": "struct",
217 "fields": [
218 {"name": "x", "type": "integer", "nullable": true, "metadata": {
219 "delta.invariants": "{\"expression\": { \"expression\": \"x > 2\"} }"
220 }},
221 {"name": "y", "type": "integer", "nullable": true, "metadata": {
222 "delta.invariants": "{\"expression\": { \"expression\": \"y < 4\"} }"
223 }}
224 ]
225 }))
226 .unwrap();
227 let invariants = schema.get_invariants().unwrap();
228 assert_eq!(invariants.len(), 2);
229 assert!(invariants.contains(&Invariant::new("x", "x > 2")));
230 assert!(invariants.contains(&Invariant::new("y", "y < 4")));
231
232 let schema: StructType = serde_json::from_value(json!({
233 "type": "struct",
234 "fields": [{
235 "name": "a_map",
236 "type": {
237 "type": "map",
238 "keyType": "string",
239 "valueType": {
240 "type": "array",
241 "elementType": {
242 "type": "struct",
243 "fields": [{
244 "name": "d",
245 "type": "integer",
246 "metadata": {
247 "delta.invariants": "{\"expression\": { \"expression\": \"a_map.value.element.d < 4\"} }"
248 },
249 "nullable": false
250 }]
251 },
252 "containsNull": false
253 },
254 "valueContainsNull": false
255 },
256 "nullable": false,
257 "metadata": {}
258 }]
259 })).unwrap();
260 let invariants = schema.get_invariants().unwrap();
261 assert_eq!(invariants.len(), 1);
262 assert_eq!(
263 invariants[0],
264 Invariant::new("a_map.value.element.d", "a_map.value.element.d < 4")
265 );
266 }
267
268 #[test]
270 fn test_identity_columns() {
271 let buf = r#"{"type":"struct","fields":[{"name":"ID_D_DATE","type":"long","nullable":true,"metadata":{"delta.identity.start":1,"delta.identity.step":1,"delta.identity.allowExplicitInsert":false}},{"name":"TXT_DateKey","type":"string","nullable":true,"metadata":{}}]}"#;
272 let _schema: StructType = serde_json::from_str(buf).expect("Failed to load");
273 }
274}