1use std::convert::TryFrom;
3
4use delta_kernel::expressions::{Expression, JunctionPredicateOp, Predicate, Scalar};
5use delta_kernel::schema::StructType;
6use serde::{Serialize, Serializer};
7
8use crate::errors::{DeltaResult, DeltaTableError};
9
10pub const NULL_PARTITION_VALUE_DATA_PATH: &str = "__HIVE_DEFAULT_PARTITION__";
12
13#[derive(Clone, Debug, PartialEq, Eq)]
15pub enum PartitionValue {
16 Equal(String),
18 NotEqual(String),
20 GreaterThan(String),
22 GreaterThanOrEqual(String),
24 LessThan(String),
26 LessThanOrEqual(String),
28 In(Vec<String>),
30 NotIn(Vec<String>),
32}
33
34#[derive(Clone, Debug, PartialEq, Eq)]
36pub struct PartitionFilter {
37 pub key: String,
39 pub value: PartitionValue,
41}
42
43impl Serialize for PartitionFilter {
46 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
47 where
48 S: Serializer,
49 {
50 let s = match &self.value {
51 PartitionValue::Equal(value) => format!("{} = '{value}'", self.key),
52 PartitionValue::NotEqual(value) => format!("{} != '{value}'", self.key),
53 PartitionValue::GreaterThan(value) => format!("{} > '{value}'", self.key),
54 PartitionValue::GreaterThanOrEqual(value) => format!("{} >= '{value}'", self.key),
55 PartitionValue::LessThan(value) => format!("{} < '{value}'", self.key),
56 PartitionValue::LessThanOrEqual(value) => format!("{} <= '{value}'", self.key),
57 PartitionValue::In(values) => {
59 let quoted_values: Vec<String> = values.iter().map(|v| format!("'{v}'")).collect();
60 format!("{} IN ({})", self.key, quoted_values.join(", "))
61 }
62 PartitionValue::NotIn(values) => {
63 let quoted_values: Vec<String> = values.iter().map(|v| format!("'{v}'")).collect();
64 format!("{} NOT IN ({})", self.key, quoted_values.join(", "))
65 }
66 };
67 serializer.serialize_str(&s)
68 }
69}
70
71impl TryFrom<(&str, &str, &str)> for PartitionFilter {
73 type Error = DeltaTableError;
74
75 fn try_from(filter: (&str, &str, &str)) -> Result<Self, DeltaTableError> {
78 match filter {
79 (key, "=", value) if !key.is_empty() => Ok(PartitionFilter {
80 key: key.to_owned(),
81 value: PartitionValue::Equal(value.to_owned()),
82 }),
83 (key, "!=", value) if !key.is_empty() => Ok(PartitionFilter {
84 key: key.to_owned(),
85 value: PartitionValue::NotEqual(value.to_owned()),
86 }),
87 (key, ">", value) if !key.is_empty() => Ok(PartitionFilter {
88 key: key.to_owned(),
89 value: PartitionValue::GreaterThan(value.to_owned()),
90 }),
91 (key, ">=", value) if !key.is_empty() => Ok(PartitionFilter {
92 key: key.to_owned(),
93 value: PartitionValue::GreaterThanOrEqual(value.to_owned()),
94 }),
95 (key, "<", value) if !key.is_empty() => Ok(PartitionFilter {
96 key: key.to_owned(),
97 value: PartitionValue::LessThan(value.to_owned()),
98 }),
99 (key, "<=", value) if !key.is_empty() => Ok(PartitionFilter {
100 key: key.to_owned(),
101 value: PartitionValue::LessThanOrEqual(value.to_owned()),
102 }),
103 (_, _, _) => Err(DeltaTableError::InvalidPartitionFilter {
104 partition_filter: format!("{filter:?}"),
105 }),
106 }
107 }
108}
109
110impl TryFrom<(&str, &str, &[&str])> for PartitionFilter {
112 type Error = DeltaTableError;
113
114 fn try_from(filter: (&str, &str, &[&str])) -> Result<Self, DeltaTableError> {
117 match filter {
118 (key, "in", value) if !key.is_empty() => Ok(PartitionFilter {
119 key: key.to_owned(),
120 value: PartitionValue::In(value.iter().map(|x| x.to_string()).collect()),
121 }),
122 (key, "not in", value) if !key.is_empty() => Ok(PartitionFilter {
123 key: key.to_owned(),
124 value: PartitionValue::NotIn(value.iter().map(|x| x.to_string()).collect()),
125 }),
126 (_, _, _) => Err(DeltaTableError::InvalidPartitionFilter {
127 partition_filter: format!("{filter:?}"),
128 }),
129 }
130 }
131}
132
133#[derive(Clone, Debug, PartialEq)]
135pub struct DeltaTablePartition {
136 pub key: String,
138 pub value: Scalar,
140}
141
142impl Eq for DeltaTablePartition {}
143
144impl DeltaTablePartition {
145 pub fn from_partition_value(partition_value: (&str, &Scalar)) -> Self {
147 let (k, v) = partition_value;
148 DeltaTablePartition {
149 key: k.to_owned(),
150 value: v.to_owned(),
151 }
152 }
153}
154
155impl TryFrom<&str> for DeltaTablePartition {
168 type Error = DeltaTableError;
169
170 fn try_from(partition: &str) -> Result<Self, DeltaTableError> {
173 let partition_split: Vec<&str> = partition.split('=').collect();
174 match partition_split {
175 partition_split if partition_split.len() == 2 => Ok(DeltaTablePartition {
176 key: partition_split[0].to_owned(),
177 value: Scalar::String(partition_split[1].to_owned()),
178 }),
179 _ => Err(DeltaTableError::PartitionError {
180 partition: partition.to_string(),
181 }),
182 }
183 }
184}
185
186#[allow(unused)] pub(crate) fn to_kernel_predicate(
188 filters: &[PartitionFilter],
189 table_schema: &StructType,
190) -> DeltaResult<Predicate> {
191 let predicates = filters
192 .iter()
193 .map(|filter| filter_to_kernel_predicate(filter, table_schema))
194 .collect::<DeltaResult<Vec<_>>>()?;
195 Ok(Predicate::junction(JunctionPredicateOp::And, predicates))
196}
197
198fn filter_to_kernel_predicate(
199 filter: &PartitionFilter,
200 table_schema: &StructType,
201) -> DeltaResult<Predicate> {
202 let Some(field) = table_schema.field(&filter.key) else {
203 return Err(DeltaTableError::SchemaMismatch {
204 msg: format!("Field '{}' is not a root table field.", filter.key),
205 });
206 };
207 let Some(dt) = field.data_type().as_primitive_opt() else {
208 return Err(DeltaTableError::SchemaMismatch {
209 msg: format!("Field '{}' is not a primitive type", field.name()),
210 });
211 };
212
213 let column = Expression::column([field.name()]);
214 Ok(match &filter.value {
215 PartitionValue::Equal(raw) => {
219 let scalar = dt.parse_scalar(raw)?;
220 if scalar.is_null() {
221 column.is_null()
222 } else {
223 column.eq(scalar)
224 }
225 }
226 PartitionValue::NotEqual(raw) => {
227 let scalar = dt.parse_scalar(raw)?;
228 if scalar.is_null() {
229 column.is_not_null()
230 } else {
231 column.ne(scalar)
232 }
233 }
234 PartitionValue::LessThan(raw) => column.lt(dt.parse_scalar(raw)?),
235 PartitionValue::LessThanOrEqual(raw) => column.le(dt.parse_scalar(raw)?),
236 PartitionValue::GreaterThan(raw) => column.gt(dt.parse_scalar(raw)?),
237 PartitionValue::GreaterThanOrEqual(raw) => column.ge(dt.parse_scalar(raw)?),
238 op @ PartitionValue::In(raw_values) | op @ PartitionValue::NotIn(raw_values) => {
239 let values = raw_values
240 .iter()
241 .map(|v| dt.parse_scalar(v))
242 .collect::<Result<Vec<_>, _>>()?;
243 let (expr, operator): (Box<dyn Fn(Scalar) -> Predicate>, _) = match op {
244 PartitionValue::In(_) => {
245 (Box::new(|v| column.clone().eq(v)), JunctionPredicateOp::Or)
246 }
247 PartitionValue::NotIn(_) => {
248 (Box::new(|v| column.clone().ne(v)), JunctionPredicateOp::And)
249 }
250 _ => unreachable!(),
251 };
252 let predicates = values.into_iter().map(expr).collect::<Vec<_>>();
253 Predicate::junction(operator, predicates)
254 }
255 })
256}
257
258#[cfg(test)]
259mod tests {
260 use super::*;
261 use crate::kernel::StructField;
262 use delta_kernel::schema::{DataType, PrimitiveType};
263 use serde_json::json;
264
265 fn check_json_serialize(filter: PartitionFilter, expected_json: &str) {
266 assert_eq!(serde_json::to_value(filter).unwrap(), json!(expected_json))
267 }
268
269 #[test]
270 fn test_serialize_partition_filter() {
271 check_json_serialize(
272 PartitionFilter::try_from(("date", "=", "2022-05-22")).unwrap(),
273 "date = '2022-05-22'",
274 );
275 check_json_serialize(
276 PartitionFilter::try_from(("date", "!=", "2022-05-22")).unwrap(),
277 "date != '2022-05-22'",
278 );
279 check_json_serialize(
280 PartitionFilter::try_from(("date", ">", "2022-05-22")).unwrap(),
281 "date > '2022-05-22'",
282 );
283 check_json_serialize(
284 PartitionFilter::try_from(("date", ">=", "2022-05-22")).unwrap(),
285 "date >= '2022-05-22'",
286 );
287 check_json_serialize(
288 PartitionFilter::try_from(("date", "<", "2022-05-22")).unwrap(),
289 "date < '2022-05-22'",
290 );
291 check_json_serialize(
292 PartitionFilter::try_from(("date", "<=", "2022-05-22")).unwrap(),
293 "date <= '2022-05-22'",
294 );
295 check_json_serialize(
296 PartitionFilter::try_from(("date", "in", vec!["2023-11-04", "2023-06-07"].as_slice()))
297 .unwrap(),
298 "date IN ('2023-11-04', '2023-06-07')",
299 );
300 check_json_serialize(
301 PartitionFilter::try_from((
302 "date",
303 "not in",
304 vec!["2023-11-04", "2023-06-07"].as_slice(),
305 ))
306 .unwrap(),
307 "date NOT IN ('2023-11-04', '2023-06-07')",
308 );
309 }
310
311 #[test]
312 fn tryfrom_invalid() {
313 let buf = "this-is-not-a-partition";
314 let partition = DeltaTablePartition::try_from(buf);
315 assert!(partition.is_err());
316 }
317
318 #[test]
319 fn tryfrom_valid() {
320 let buf = "ds=2024-04-01";
321 let partition = DeltaTablePartition::try_from(buf);
322 assert!(partition.is_ok());
323 let partition = partition.unwrap();
324 assert_eq!(partition.key, "ds");
325 assert_eq!(partition.value, Scalar::String("2024-04-01".into()));
326 }
327
328 #[test]
329 fn test_create_delta_table_partition() {
330 let year = "2021".to_string();
331 let path = format!("year={year}");
332 assert_eq!(
333 DeltaTablePartition::try_from(path.as_ref()).unwrap(),
334 DeltaTablePartition {
335 key: "year".into(),
336 value: Scalar::String(year),
337 }
338 );
339
340 let _wrong_path = "year=2021/month=";
341 assert!(matches!(
342 DeltaTablePartition::try_from(_wrong_path).unwrap_err(),
343 DeltaTableError::PartitionError {
344 partition: _wrong_path
345 },
346 ))
347 }
348
349 #[test]
350 fn test_filter_to_kernel_predicate_equal() {
351 let schema = StructType::try_new(vec![
352 StructField::new("name", DataType::Primitive(PrimitiveType::String), true),
353 StructField::new("age", DataType::Primitive(PrimitiveType::Integer), true),
354 ])
355 .unwrap();
356 let filter = PartitionFilter {
357 key: "name".to_string(),
358 value: PartitionValue::Equal("Alice".to_string()),
359 };
360
361 let predicate = filter_to_kernel_predicate(&filter, &schema).unwrap();
362
363 let expected = Expression::column(["name"]).eq(Scalar::String("Alice".into()));
364 assert_eq!(predicate, expected);
365 }
366
367 #[test]
368 fn test_filter_to_kernel_predicate_not_equal() {
369 let schema = StructType::try_new(vec![StructField::new(
370 "status",
371 DataType::Primitive(PrimitiveType::String),
372 true,
373 )])
374 .unwrap();
375 let filter = PartitionFilter {
376 key: "status".to_string(),
377 value: PartitionValue::NotEqual("inactive".to_string()),
378 };
379
380 let predicate = filter_to_kernel_predicate(&filter, &schema).unwrap();
381
382 let expected = Expression::column(["status"]).ne(Scalar::String("inactive".into()));
383 assert_eq!(predicate, expected);
384 }
385
386 #[test]
387 fn test_filter_to_kernel_predicate_comparisons() {
388 let schema = StructType::try_new(vec![
389 StructField::new("score", DataType::Primitive(PrimitiveType::Integer), true),
390 StructField::new("price", DataType::Primitive(PrimitiveType::Long), true),
391 ])
392 .unwrap();
393
394 let filter = PartitionFilter {
396 key: "score".to_string(),
397 value: PartitionValue::LessThan("100".to_string()),
398 };
399 let predicate = filter_to_kernel_predicate(&filter, &schema).unwrap();
400 let expected = Expression::column(["score"]).lt(Scalar::Integer(100));
401 assert_eq!(predicate, expected);
402
403 let filter = PartitionFilter {
405 key: "score".to_string(),
406 value: PartitionValue::LessThanOrEqual("100".to_string()),
407 };
408 let predicate = filter_to_kernel_predicate(&filter, &schema).unwrap();
409 let expected = Expression::column(["score"]).le(Scalar::Integer(100));
410 assert_eq!(predicate, expected);
411
412 let filter = PartitionFilter {
414 key: "price".to_string(),
415 value: PartitionValue::GreaterThan("50".to_string()),
416 };
417 let predicate = filter_to_kernel_predicate(&filter, &schema).unwrap();
418 let expected = Expression::column(["price"]).gt(Scalar::Long(50));
419 assert_eq!(predicate, expected);
420
421 let filter = PartitionFilter {
423 key: "price".to_string(),
424 value: PartitionValue::GreaterThanOrEqual("50".to_string()),
425 };
426 let predicate = filter_to_kernel_predicate(&filter, &schema).unwrap();
427 let expected = Expression::column(["price"]).ge(Scalar::Long(50));
428 assert_eq!(predicate, expected);
429 }
430
431 #[test]
432 fn test_filter_to_kernel_predicate_in_operations() {
433 let schema = StructType::try_new(vec![StructField::new(
434 "category",
435 DataType::Primitive(PrimitiveType::String),
436 true,
437 )])
438 .unwrap();
439
440 let column = Expression::column(["category"]);
441 let categories = [
442 Scalar::String("books".to_string()),
443 Scalar::String("electronics".to_string()),
444 ];
445
446 let filter = PartitionFilter {
448 key: "category".to_string(),
449 value: PartitionValue::In(vec!["books".to_string(), "electronics".to_string()]),
450 };
451 let predicate = filter_to_kernel_predicate(&filter, &schema).unwrap();
452 let expected_inner = categories
453 .clone()
454 .into_iter()
455 .map(|s| column.clone().eq(s))
456 .collect::<Vec<_>>();
457 let expected = Predicate::junction(JunctionPredicateOp::Or, expected_inner);
458 assert_eq!(predicate, expected);
459
460 let filter = PartitionFilter {
462 key: "category".to_string(),
463 value: PartitionValue::NotIn(vec!["books".to_string(), "electronics".to_string()]),
464 };
465 let predicate = filter_to_kernel_predicate(&filter, &schema).unwrap();
466 let expected_inner = categories
467 .into_iter()
468 .map(|s| column.clone().ne(s))
469 .collect::<Vec<_>>();
470 let expected = Predicate::junction(JunctionPredicateOp::And, expected_inner);
471 assert_eq!(predicate, expected);
472 }
473
474 #[test]
475 fn test_filter_to_kernel_predicate_empty_in_list() {
476 let schema = StructType::try_new(vec![StructField::new(
477 "tag",
478 DataType::Primitive(PrimitiveType::String),
479 true,
480 )])
481 .unwrap();
482
483 let filter = PartitionFilter {
484 key: "tag".to_string(),
485 value: PartitionValue::In(vec![]),
486 };
487 let result = filter_to_kernel_predicate(&filter, &schema);
488 assert!(result.is_ok());
489 }
490
491 #[test]
492 fn test_filter_to_kernel_predicate_field_not_found() {
493 let schema = StructType::try_new(vec![StructField::new(
494 "existing_field",
495 DataType::Primitive(PrimitiveType::String),
496 true,
497 )])
498 .unwrap();
499
500 let filter = PartitionFilter {
501 key: "nonexistent_field".to_string(),
502 value: PartitionValue::Equal("value".to_string()),
503 };
504
505 let result = filter_to_kernel_predicate(&filter, &schema);
506 assert!(result.is_err());
507 assert!(matches!(
508 result.unwrap_err(),
509 DeltaTableError::SchemaMismatch { .. }
510 ));
511 }
512
513 #[test]
514 fn test_filter_to_kernel_predicate_non_primitive_field() {
515 let nested_struct = StructType::try_new(vec![StructField::new(
516 "inner",
517 DataType::Primitive(PrimitiveType::String),
518 true,
519 )])
520 .unwrap();
521 let schema = StructType::try_new(vec![StructField::new(
522 "nested",
523 DataType::Struct(Box::new(nested_struct)),
524 true,
525 )])
526 .unwrap();
527
528 let filter = PartitionFilter {
529 key: "nested".to_string(),
530 value: PartitionValue::Equal("value".to_string()),
531 };
532
533 let result = filter_to_kernel_predicate(&filter, &schema);
534 assert!(result.is_err());
535 assert!(matches!(
536 result.unwrap_err(),
537 DeltaTableError::SchemaMismatch { .. }
538 ));
539 }
540
541 #[test]
542 fn test_filter_to_kernel_predicate_different_data_types() {
543 let schema = StructType::try_new(vec![
544 StructField::new(
545 "bool_field",
546 DataType::Primitive(PrimitiveType::Boolean),
547 true,
548 ),
549 StructField::new("date_field", DataType::Primitive(PrimitiveType::Date), true),
550 StructField::new(
551 "timestamp_field",
552 DataType::Primitive(PrimitiveType::Timestamp),
553 true,
554 ),
555 StructField::new(
556 "double_field",
557 DataType::Primitive(PrimitiveType::Double),
558 true,
559 ),
560 StructField::new(
561 "float_field",
562 DataType::Primitive(PrimitiveType::Float),
563 true,
564 ),
565 ])
566 .unwrap();
567
568 let filter = PartitionFilter {
570 key: "bool_field".to_string(),
571 value: PartitionValue::Equal("true".to_string()),
572 };
573 assert!(filter_to_kernel_predicate(&filter, &schema).is_ok());
574
575 let filter = PartitionFilter {
577 key: "date_field".to_string(),
578 value: PartitionValue::GreaterThan("2023-01-01".to_string()),
579 };
580 assert!(filter_to_kernel_predicate(&filter, &schema).is_ok());
581
582 let filter = PartitionFilter {
584 key: "float_field".to_string(),
585 value: PartitionValue::LessThan("3.14".to_string()),
586 };
587 assert!(filter_to_kernel_predicate(&filter, &schema).is_ok());
588 }
589
590 #[test]
591 fn test_filter_to_kernel_predicate_invalid_scalar_value() {
592 let schema = StructType::try_new(vec![StructField::new(
593 "number",
594 DataType::Primitive(PrimitiveType::Integer),
595 true,
596 )])
597 .unwrap();
598
599 let filter = PartitionFilter {
600 key: "number".to_string(),
601 value: PartitionValue::Equal("not_a_number".to_string()),
602 };
603
604 let result = filter_to_kernel_predicate(&filter, &schema);
605 assert!(result.is_err());
606 }
607}