Skip to main content

deltalake_core/kernel/schema/
partitions.rs

1//! Delta Table partition handling logic.
2use std::convert::TryFrom;
3
4use delta_kernel::expressions::{Expression, JunctionPredicateOp, Predicate, Scalar};
5use delta_kernel::schema::StructType;
6use serde::{Serialize, Serializer};
7
8use crate::errors::{DeltaResult, DeltaTableError};
9
10/// A special value used in Hive to represent the null partition in partitioned tables
11pub const NULL_PARTITION_VALUE_DATA_PATH: &str = "__HIVE_DEFAULT_PARTITION__";
12
13/// A Enum used for selecting the partition value operation when filtering a DeltaTable partition.
14#[derive(Clone, Debug, PartialEq, Eq)]
15pub enum PartitionValue {
16    /// The partition value with the equal operator
17    Equal(String),
18    /// The partition value with the not equal operator
19    NotEqual(String),
20    /// The partition value with the greater than operator
21    GreaterThan(String),
22    /// The partition value with the greater than or equal operator
23    GreaterThanOrEqual(String),
24    /// The partition value with the less than operator
25    LessThan(String),
26    /// The partition value with the less than or equal operator
27    LessThanOrEqual(String),
28    /// The partition values with the in operator
29    In(Vec<String>),
30    /// The partition values with the not in operator
31    NotIn(Vec<String>),
32}
33
34/// A Struct used for filtering a DeltaTable partition by key and value.
35#[derive(Clone, Debug, PartialEq, Eq)]
36pub struct PartitionFilter {
37    /// The key of the PartitionFilter
38    pub key: String,
39    /// The value of the PartitionFilter
40    pub value: PartitionValue,
41}
42
43/// Create desired string representation for PartitionFilter.
44/// Used in places like predicate in operationParameters, etc.
45impl Serialize for PartitionFilter {
46    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
47    where
48        S: Serializer,
49    {
50        let s = match &self.value {
51            PartitionValue::Equal(value) => format!("{} = '{value}'", self.key),
52            PartitionValue::NotEqual(value) => format!("{} != '{value}'", self.key),
53            PartitionValue::GreaterThan(value) => format!("{} > '{value}'", self.key),
54            PartitionValue::GreaterThanOrEqual(value) => format!("{} >= '{value}'", self.key),
55            PartitionValue::LessThan(value) => format!("{} < '{value}'", self.key),
56            PartitionValue::LessThanOrEqual(value) => format!("{} <= '{value}'", self.key),
57            // used upper case for IN and NOT similar to SQL
58            PartitionValue::In(values) => {
59                let quoted_values: Vec<String> = values.iter().map(|v| format!("'{v}'")).collect();
60                format!("{} IN ({})", self.key, quoted_values.join(", "))
61            }
62            PartitionValue::NotIn(values) => {
63                let quoted_values: Vec<String> = values.iter().map(|v| format!("'{v}'")).collect();
64                format!("{} NOT IN ({})", self.key, quoted_values.join(", "))
65            }
66        };
67        serializer.serialize_str(&s)
68    }
69}
70
71/// Create a PartitionFilter from a filter Tuple with the structure (key, operation, value).
72impl TryFrom<(&str, &str, &str)> for PartitionFilter {
73    type Error = DeltaTableError;
74
75    /// Try to create a PartitionFilter from a Tuple of (key, operation, value).
76    /// Returns a DeltaTableError in case of a malformed filter.
77    fn try_from(filter: (&str, &str, &str)) -> Result<Self, DeltaTableError> {
78        match filter {
79            (key, "=", value) if !key.is_empty() => Ok(PartitionFilter {
80                key: key.to_owned(),
81                value: PartitionValue::Equal(value.to_owned()),
82            }),
83            (key, "!=", value) if !key.is_empty() => Ok(PartitionFilter {
84                key: key.to_owned(),
85                value: PartitionValue::NotEqual(value.to_owned()),
86            }),
87            (key, ">", value) if !key.is_empty() => Ok(PartitionFilter {
88                key: key.to_owned(),
89                value: PartitionValue::GreaterThan(value.to_owned()),
90            }),
91            (key, ">=", value) if !key.is_empty() => Ok(PartitionFilter {
92                key: key.to_owned(),
93                value: PartitionValue::GreaterThanOrEqual(value.to_owned()),
94            }),
95            (key, "<", value) if !key.is_empty() => Ok(PartitionFilter {
96                key: key.to_owned(),
97                value: PartitionValue::LessThan(value.to_owned()),
98            }),
99            (key, "<=", value) if !key.is_empty() => Ok(PartitionFilter {
100                key: key.to_owned(),
101                value: PartitionValue::LessThanOrEqual(value.to_owned()),
102            }),
103            (_, _, _) => Err(DeltaTableError::InvalidPartitionFilter {
104                partition_filter: format!("{filter:?}"),
105            }),
106        }
107    }
108}
109
110/// Create a PartitionFilter from a filter Tuple with the structure (key, operation, list(value)).
111impl TryFrom<(&str, &str, &[&str])> for PartitionFilter {
112    type Error = DeltaTableError;
113
114    /// Try to create a PartitionFilter from a Tuple of (key, operation, list(value)).
115    /// Returns a DeltaTableError in case of a malformed filter.
116    fn try_from(filter: (&str, &str, &[&str])) -> Result<Self, DeltaTableError> {
117        match filter {
118            (key, "in", value) if !key.is_empty() => Ok(PartitionFilter {
119                key: key.to_owned(),
120                value: PartitionValue::In(value.iter().map(|x| x.to_string()).collect()),
121            }),
122            (key, "not in", value) if !key.is_empty() => Ok(PartitionFilter {
123                key: key.to_owned(),
124                value: PartitionValue::NotIn(value.iter().map(|x| x.to_string()).collect()),
125            }),
126            (_, _, _) => Err(DeltaTableError::InvalidPartitionFilter {
127                partition_filter: format!("{filter:?}"),
128            }),
129        }
130    }
131}
132
133/// A Struct DeltaTablePartition used to represent a partition of a DeltaTable.
134#[derive(Clone, Debug, PartialEq)]
135pub struct DeltaTablePartition {
136    /// The key of the DeltaTable partition.
137    pub key: String,
138    /// The value of the DeltaTable partition.
139    pub value: Scalar,
140}
141
142impl Eq for DeltaTablePartition {}
143
144impl DeltaTablePartition {
145    /// Create a DeltaTable partition from a Tuple of (key, value).
146    pub fn from_partition_value(partition_value: (&str, &Scalar)) -> Self {
147        let (k, v) = partition_value;
148        DeltaTablePartition {
149            key: k.to_owned(),
150            value: v.to_owned(),
151        }
152    }
153}
154
155///
156/// A HivePartition string is represented by a "key=value" format.
157///
158/// ```rust
159/// # use delta_kernel::expressions::Scalar;
160/// use deltalake_core::DeltaTablePartition;
161///
162/// let hive_part = "ds=2023-01-01";
163/// let partition = DeltaTablePartition::try_from(hive_part).unwrap();
164/// assert_eq!("ds", partition.key);
165/// assert_eq!(Scalar::String("2023-01-01".into()), partition.value);
166/// ```
167impl TryFrom<&str> for DeltaTablePartition {
168    type Error = DeltaTableError;
169
170    /// Try to create a DeltaTable partition from a HivePartition string.
171    /// Returns a DeltaTableError if the string is not in the form of a HivePartition.
172    fn try_from(partition: &str) -> Result<Self, DeltaTableError> {
173        let partition_split: Vec<&str> = partition.split('=').collect();
174        match partition_split {
175            partition_split if partition_split.len() == 2 => Ok(DeltaTablePartition {
176                key: partition_split[0].to_owned(),
177                value: Scalar::String(partition_split[1].to_owned()),
178            }),
179            _ => Err(DeltaTableError::PartitionError {
180                partition: partition.to_string(),
181            }),
182        }
183    }
184}
185
186#[allow(unused)] // TODO: remove once we use this in kernel log replay
187pub(crate) fn to_kernel_predicate(
188    filters: &[PartitionFilter],
189    table_schema: &StructType,
190) -> DeltaResult<Predicate> {
191    let predicates = filters
192        .iter()
193        .map(|filter| filter_to_kernel_predicate(filter, table_schema))
194        .collect::<DeltaResult<Vec<_>>>()?;
195    Ok(Predicate::junction(JunctionPredicateOp::And, predicates))
196}
197
198fn filter_to_kernel_predicate(
199    filter: &PartitionFilter,
200    table_schema: &StructType,
201) -> DeltaResult<Predicate> {
202    let Some(field) = table_schema.field(&filter.key) else {
203        return Err(DeltaTableError::SchemaMismatch {
204            msg: format!("Field '{}' is not a root table field.", filter.key),
205        });
206    };
207    let Some(dt) = field.data_type().as_primitive_opt() else {
208        return Err(DeltaTableError::SchemaMismatch {
209            msg: format!("Field '{}' is not a primitive type", field.name()),
210        });
211    };
212
213    let column = Expression::column([field.name()]);
214    Ok(match &filter.value {
215        // NOTE: In SQL NULL is not equal to anything, including itself. However when specifying partition filters
216        // we have allowed to equality against null. So here we have to handle null values explicitly by using
217        // is_null and is_not_null methods directly.
218        PartitionValue::Equal(raw) => {
219            let scalar = dt.parse_scalar(raw)?;
220            if scalar.is_null() {
221                column.is_null()
222            } else {
223                column.eq(scalar)
224            }
225        }
226        PartitionValue::NotEqual(raw) => {
227            let scalar = dt.parse_scalar(raw)?;
228            if scalar.is_null() {
229                column.is_not_null()
230            } else {
231                column.ne(scalar)
232            }
233        }
234        PartitionValue::LessThan(raw) => column.lt(dt.parse_scalar(raw)?),
235        PartitionValue::LessThanOrEqual(raw) => column.le(dt.parse_scalar(raw)?),
236        PartitionValue::GreaterThan(raw) => column.gt(dt.parse_scalar(raw)?),
237        PartitionValue::GreaterThanOrEqual(raw) => column.ge(dt.parse_scalar(raw)?),
238        op @ PartitionValue::In(raw_values) | op @ PartitionValue::NotIn(raw_values) => {
239            let values = raw_values
240                .iter()
241                .map(|v| dt.parse_scalar(v))
242                .collect::<Result<Vec<_>, _>>()?;
243            let (expr, operator): (Box<dyn Fn(Scalar) -> Predicate>, _) = match op {
244                PartitionValue::In(_) => {
245                    (Box::new(|v| column.clone().eq(v)), JunctionPredicateOp::Or)
246                }
247                PartitionValue::NotIn(_) => {
248                    (Box::new(|v| column.clone().ne(v)), JunctionPredicateOp::And)
249                }
250                _ => unreachable!(),
251            };
252            let predicates = values.into_iter().map(expr).collect::<Vec<_>>();
253            Predicate::junction(operator, predicates)
254        }
255    })
256}
257
258#[cfg(test)]
259mod tests {
260    use super::*;
261    use crate::kernel::StructField;
262    use delta_kernel::schema::{DataType, PrimitiveType};
263    use serde_json::json;
264
265    fn check_json_serialize(filter: PartitionFilter, expected_json: &str) {
266        assert_eq!(serde_json::to_value(filter).unwrap(), json!(expected_json))
267    }
268
269    #[test]
270    fn test_serialize_partition_filter() {
271        check_json_serialize(
272            PartitionFilter::try_from(("date", "=", "2022-05-22")).unwrap(),
273            "date = '2022-05-22'",
274        );
275        check_json_serialize(
276            PartitionFilter::try_from(("date", "!=", "2022-05-22")).unwrap(),
277            "date != '2022-05-22'",
278        );
279        check_json_serialize(
280            PartitionFilter::try_from(("date", ">", "2022-05-22")).unwrap(),
281            "date > '2022-05-22'",
282        );
283        check_json_serialize(
284            PartitionFilter::try_from(("date", ">=", "2022-05-22")).unwrap(),
285            "date >= '2022-05-22'",
286        );
287        check_json_serialize(
288            PartitionFilter::try_from(("date", "<", "2022-05-22")).unwrap(),
289            "date < '2022-05-22'",
290        );
291        check_json_serialize(
292            PartitionFilter::try_from(("date", "<=", "2022-05-22")).unwrap(),
293            "date <= '2022-05-22'",
294        );
295        check_json_serialize(
296            PartitionFilter::try_from(("date", "in", vec!["2023-11-04", "2023-06-07"].as_slice()))
297                .unwrap(),
298            "date IN ('2023-11-04', '2023-06-07')",
299        );
300        check_json_serialize(
301            PartitionFilter::try_from((
302                "date",
303                "not in",
304                vec!["2023-11-04", "2023-06-07"].as_slice(),
305            ))
306            .unwrap(),
307            "date NOT IN ('2023-11-04', '2023-06-07')",
308        );
309    }
310
311    #[test]
312    fn tryfrom_invalid() {
313        let buf = "this-is-not-a-partition";
314        let partition = DeltaTablePartition::try_from(buf);
315        assert!(partition.is_err());
316    }
317
318    #[test]
319    fn tryfrom_valid() {
320        let buf = "ds=2024-04-01";
321        let partition = DeltaTablePartition::try_from(buf);
322        assert!(partition.is_ok());
323        let partition = partition.unwrap();
324        assert_eq!(partition.key, "ds");
325        assert_eq!(partition.value, Scalar::String("2024-04-01".into()));
326    }
327
328    #[test]
329    fn test_create_delta_table_partition() {
330        let year = "2021".to_string();
331        let path = format!("year={year}");
332        assert_eq!(
333            DeltaTablePartition::try_from(path.as_ref()).unwrap(),
334            DeltaTablePartition {
335                key: "year".into(),
336                value: Scalar::String(year),
337            }
338        );
339
340        let _wrong_path = "year=2021/month=";
341        assert!(matches!(
342            DeltaTablePartition::try_from(_wrong_path).unwrap_err(),
343            DeltaTableError::PartitionError {
344                partition: _wrong_path
345            },
346        ))
347    }
348
349    #[test]
350    fn test_filter_to_kernel_predicate_equal() {
351        let schema = StructType::try_new(vec![
352            StructField::new("name", DataType::Primitive(PrimitiveType::String), true),
353            StructField::new("age", DataType::Primitive(PrimitiveType::Integer), true),
354        ])
355        .unwrap();
356        let filter = PartitionFilter {
357            key: "name".to_string(),
358            value: PartitionValue::Equal("Alice".to_string()),
359        };
360
361        let predicate = filter_to_kernel_predicate(&filter, &schema).unwrap();
362
363        let expected = Expression::column(["name"]).eq(Scalar::String("Alice".into()));
364        assert_eq!(predicate, expected);
365    }
366
367    #[test]
368    fn test_filter_to_kernel_predicate_not_equal() {
369        let schema = StructType::try_new(vec![StructField::new(
370            "status",
371            DataType::Primitive(PrimitiveType::String),
372            true,
373        )])
374        .unwrap();
375        let filter = PartitionFilter {
376            key: "status".to_string(),
377            value: PartitionValue::NotEqual("inactive".to_string()),
378        };
379
380        let predicate = filter_to_kernel_predicate(&filter, &schema).unwrap();
381
382        let expected = Expression::column(["status"]).ne(Scalar::String("inactive".into()));
383        assert_eq!(predicate, expected);
384    }
385
386    #[test]
387    fn test_filter_to_kernel_predicate_comparisons() {
388        let schema = StructType::try_new(vec![
389            StructField::new("score", DataType::Primitive(PrimitiveType::Integer), true),
390            StructField::new("price", DataType::Primitive(PrimitiveType::Long), true),
391        ])
392        .unwrap();
393
394        // Test less than
395        let filter = PartitionFilter {
396            key: "score".to_string(),
397            value: PartitionValue::LessThan("100".to_string()),
398        };
399        let predicate = filter_to_kernel_predicate(&filter, &schema).unwrap();
400        let expected = Expression::column(["score"]).lt(Scalar::Integer(100));
401        assert_eq!(predicate, expected);
402
403        // Test less than or equal
404        let filter = PartitionFilter {
405            key: "score".to_string(),
406            value: PartitionValue::LessThanOrEqual("100".to_string()),
407        };
408        let predicate = filter_to_kernel_predicate(&filter, &schema).unwrap();
409        let expected = Expression::column(["score"]).le(Scalar::Integer(100));
410        assert_eq!(predicate, expected);
411
412        // Test greater than
413        let filter = PartitionFilter {
414            key: "price".to_string(),
415            value: PartitionValue::GreaterThan("50".to_string()),
416        };
417        let predicate = filter_to_kernel_predicate(&filter, &schema).unwrap();
418        let expected = Expression::column(["price"]).gt(Scalar::Long(50));
419        assert_eq!(predicate, expected);
420
421        // Test greater than or equal
422        let filter = PartitionFilter {
423            key: "price".to_string(),
424            value: PartitionValue::GreaterThanOrEqual("50".to_string()),
425        };
426        let predicate = filter_to_kernel_predicate(&filter, &schema).unwrap();
427        let expected = Expression::column(["price"]).ge(Scalar::Long(50));
428        assert_eq!(predicate, expected);
429    }
430
431    #[test]
432    fn test_filter_to_kernel_predicate_in_operations() {
433        let schema = StructType::try_new(vec![StructField::new(
434            "category",
435            DataType::Primitive(PrimitiveType::String),
436            true,
437        )])
438        .unwrap();
439
440        let column = Expression::column(["category"]);
441        let categories = [
442            Scalar::String("books".to_string()),
443            Scalar::String("electronics".to_string()),
444        ];
445
446        // Test In operation
447        let filter = PartitionFilter {
448            key: "category".to_string(),
449            value: PartitionValue::In(vec!["books".to_string(), "electronics".to_string()]),
450        };
451        let predicate = filter_to_kernel_predicate(&filter, &schema).unwrap();
452        let expected_inner = categories
453            .clone()
454            .into_iter()
455            .map(|s| column.clone().eq(s))
456            .collect::<Vec<_>>();
457        let expected = Predicate::junction(JunctionPredicateOp::Or, expected_inner);
458        assert_eq!(predicate, expected);
459
460        // Test NotIn operation
461        let filter = PartitionFilter {
462            key: "category".to_string(),
463            value: PartitionValue::NotIn(vec!["books".to_string(), "electronics".to_string()]),
464        };
465        let predicate = filter_to_kernel_predicate(&filter, &schema).unwrap();
466        let expected_inner = categories
467            .into_iter()
468            .map(|s| column.clone().ne(s))
469            .collect::<Vec<_>>();
470        let expected = Predicate::junction(JunctionPredicateOp::And, expected_inner);
471        assert_eq!(predicate, expected);
472    }
473
474    #[test]
475    fn test_filter_to_kernel_predicate_empty_in_list() {
476        let schema = StructType::try_new(vec![StructField::new(
477            "tag",
478            DataType::Primitive(PrimitiveType::String),
479            true,
480        )])
481        .unwrap();
482
483        let filter = PartitionFilter {
484            key: "tag".to_string(),
485            value: PartitionValue::In(vec![]),
486        };
487        let result = filter_to_kernel_predicate(&filter, &schema);
488        assert!(result.is_ok());
489    }
490
491    #[test]
492    fn test_filter_to_kernel_predicate_field_not_found() {
493        let schema = StructType::try_new(vec![StructField::new(
494            "existing_field",
495            DataType::Primitive(PrimitiveType::String),
496            true,
497        )])
498        .unwrap();
499
500        let filter = PartitionFilter {
501            key: "nonexistent_field".to_string(),
502            value: PartitionValue::Equal("value".to_string()),
503        };
504
505        let result = filter_to_kernel_predicate(&filter, &schema);
506        assert!(result.is_err());
507        assert!(matches!(
508            result.unwrap_err(),
509            DeltaTableError::SchemaMismatch { .. }
510        ));
511    }
512
513    #[test]
514    fn test_filter_to_kernel_predicate_non_primitive_field() {
515        let nested_struct = StructType::try_new(vec![StructField::new(
516            "inner",
517            DataType::Primitive(PrimitiveType::String),
518            true,
519        )])
520        .unwrap();
521        let schema = StructType::try_new(vec![StructField::new(
522            "nested",
523            DataType::Struct(Box::new(nested_struct)),
524            true,
525        )])
526        .unwrap();
527
528        let filter = PartitionFilter {
529            key: "nested".to_string(),
530            value: PartitionValue::Equal("value".to_string()),
531        };
532
533        let result = filter_to_kernel_predicate(&filter, &schema);
534        assert!(result.is_err());
535        assert!(matches!(
536            result.unwrap_err(),
537            DeltaTableError::SchemaMismatch { .. }
538        ));
539    }
540
541    #[test]
542    fn test_filter_to_kernel_predicate_different_data_types() {
543        let schema = StructType::try_new(vec![
544            StructField::new(
545                "bool_field",
546                DataType::Primitive(PrimitiveType::Boolean),
547                true,
548            ),
549            StructField::new("date_field", DataType::Primitive(PrimitiveType::Date), true),
550            StructField::new(
551                "timestamp_field",
552                DataType::Primitive(PrimitiveType::Timestamp),
553                true,
554            ),
555            StructField::new(
556                "double_field",
557                DataType::Primitive(PrimitiveType::Double),
558                true,
559            ),
560            StructField::new(
561                "float_field",
562                DataType::Primitive(PrimitiveType::Float),
563                true,
564            ),
565        ])
566        .unwrap();
567
568        // Test boolean field
569        let filter = PartitionFilter {
570            key: "bool_field".to_string(),
571            value: PartitionValue::Equal("true".to_string()),
572        };
573        assert!(filter_to_kernel_predicate(&filter, &schema).is_ok());
574
575        // Test date field
576        let filter = PartitionFilter {
577            key: "date_field".to_string(),
578            value: PartitionValue::GreaterThan("2023-01-01".to_string()),
579        };
580        assert!(filter_to_kernel_predicate(&filter, &schema).is_ok());
581
582        // Test float field
583        let filter = PartitionFilter {
584            key: "float_field".to_string(),
585            value: PartitionValue::LessThan("3.14".to_string()),
586        };
587        assert!(filter_to_kernel_predicate(&filter, &schema).is_ok());
588    }
589
590    #[test]
591    fn test_filter_to_kernel_predicate_invalid_scalar_value() {
592        let schema = StructType::try_new(vec![StructField::new(
593            "number",
594            DataType::Primitive(PrimitiveType::Integer),
595            true,
596        )])
597        .unwrap();
598
599        let filter = PartitionFilter {
600            key: "number".to_string(),
601            value: PartitionValue::Equal("not_a_number".to_string()),
602        };
603
604        let result = filter_to_kernel_predicate(&filter, &schema);
605        assert!(result.is_err());
606    }
607}