1use std::mem;
21use std::sync::Arc;
22
23use datafusion_catalog::Session;
24use datafusion_common::{
25 HashMap, Result, ScalarValue, TableReference, assert_or_internal_err,
26};
27use datafusion_datasource::PartitionedFile;
28use datafusion_datasource::{FileExtensions, ListingTableUrl};
29use datafusion_expr::{BinaryExpr, Operator, lit, utils};
30
31use arrow::{
32 array::AsArray,
33 datatypes::{DataType, Field},
34 record_batch::RecordBatch,
35};
36use datafusion_expr::execution_props::ExecutionProps;
37use futures::stream::FuturesUnordered;
38use futures::{StreamExt, TryStreamExt, stream::BoxStream};
39use log::{debug, trace};
40
41use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
42use datafusion_common::{Column, DFSchema};
43use datafusion_expr::{Expr, Volatility};
44use datafusion_physical_expr::create_physical_expr;
45use object_store::path::Path;
46use object_store::{ObjectMeta, ObjectStore};
47
48pub fn expr_applicable_for_cols(col_names: &[&str], expr: &Expr) -> bool {
54 let mut is_applicable = true;
55 expr.apply(|expr| match expr {
56 Expr::Column(Column { name, .. }) => {
57 is_applicable &= col_names.contains(&name.as_str());
58 if is_applicable {
59 Ok(TreeNodeRecursion::Jump)
60 } else {
61 Ok(TreeNodeRecursion::Stop)
62 }
63 }
64 Expr::Literal(_, _)
65 | Expr::Alias(_)
66 | Expr::OuterReferenceColumn(_, _)
67 | Expr::ScalarVariable(_, _)
68 | Expr::Not(_)
69 | Expr::IsNotNull(_)
70 | Expr::IsNull(_)
71 | Expr::IsTrue(_)
72 | Expr::IsFalse(_)
73 | Expr::IsUnknown(_)
74 | Expr::IsNotTrue(_)
75 | Expr::IsNotFalse(_)
76 | Expr::IsNotUnknown(_)
77 | Expr::Negative(_)
78 | Expr::Cast(_)
79 | Expr::TryCast(_)
80 | Expr::BinaryExpr(_)
81 | Expr::Between(_)
82 | Expr::Like(_)
83 | Expr::SimilarTo(_)
84 | Expr::InList(_)
85 | Expr::Exists(_)
86 | Expr::InSubquery(_)
87 | Expr::ScalarSubquery(_)
88 | Expr::SetComparison(_)
89 | Expr::GroupingSet(_)
90 | Expr::Case(_)
91 | Expr::Lambda(_)
92 | Expr::LambdaVariable(_) => Ok(TreeNodeRecursion::Continue),
93
94 Expr::ScalarFunction(scalar_function) => {
95 match scalar_function.func.signature().volatility {
96 Volatility::Immutable => Ok(TreeNodeRecursion::Continue),
97 Volatility::Stable | Volatility::Volatile => {
100 is_applicable = false;
101 Ok(TreeNodeRecursion::Stop)
102 }
103 }
104 }
105 Expr::HigherOrderFunction(hof) => {
106 match hof.func.signature().volatility {
107 Volatility::Immutable => Ok(TreeNodeRecursion::Continue),
108 Volatility::Stable | Volatility::Volatile => {
111 is_applicable = false;
112 Ok(TreeNodeRecursion::Stop)
113 }
114 }
115 }
116
117 #[expect(deprecated)]
124 Expr::AggregateFunction { .. }
125 | Expr::WindowFunction { .. }
126 | Expr::Wildcard { .. }
127 | Expr::Unnest { .. }
128 | Expr::Placeholder(_) => {
129 is_applicable = false;
130 Ok(TreeNodeRecursion::Stop)
131 }
132 })
133 .unwrap();
134 is_applicable
135}
136
137const CONCURRENCY_LIMIT: usize = 100;
139
140#[deprecated(since = "47.0.0", note = "use `FileGroup::split_files` instead")]
142pub fn split_files(
143 mut partitioned_files: Vec<PartitionedFile>,
144 n: usize,
145) -> Vec<Vec<PartitionedFile>> {
146 if partitioned_files.is_empty() {
147 return vec![];
148 }
149
150 partitioned_files.sort_by(|a, b| a.path().cmp(b.path()));
154
155 let chunk_size = partitioned_files.len().div_ceil(n);
157 let mut chunks = Vec::with_capacity(n);
158 let mut current_chunk = Vec::with_capacity(chunk_size);
159 for file in partitioned_files.drain(..) {
160 current_chunk.push(file);
161 if current_chunk.len() == chunk_size {
162 let full_chunk =
163 mem::replace(&mut current_chunk, Vec::with_capacity(chunk_size));
164 chunks.push(full_chunk);
165 }
166 }
167
168 if !current_chunk.is_empty() {
169 chunks.push(current_chunk)
170 }
171
172 chunks
173}
174
175#[derive(Debug)]
176pub struct Partition {
177 path: Path,
179 depth: usize,
182 files: Option<Vec<ObjectMeta>>,
184}
185
186impl Partition {
187 async fn list(mut self, store: &dyn ObjectStore) -> Result<(Self, Vec<Path>)> {
190 trace!("Listing partition {}", self.path);
191 let prefix = Some(&self.path).filter(|p| !p.as_ref().is_empty());
192 let result = store.list_with_delimiter(prefix).await?;
193 self.files = Some(
194 result
195 .objects
196 .into_iter()
197 .filter(|object_meta| object_meta.size > 0)
198 .collect(),
199 );
200 Ok((self, result.common_prefixes))
201 }
202}
203
204pub async fn list_partitions(
206 store: &dyn ObjectStore,
207 table_path: &ListingTableUrl,
208 max_depth: usize,
209 partition_prefix: Option<Path>,
210) -> Result<Vec<Partition>> {
211 let partition = Partition {
212 path: match partition_prefix {
213 Some(prefix) => Path::from_iter(
214 Path::from(table_path.prefix().as_ref())
215 .parts()
216 .chain(Path::from(prefix.as_ref()).parts()),
217 ),
218 None => table_path.prefix().clone(),
219 },
220 depth: 0,
221 files: None,
222 };
223
224 let mut out = Vec::with_capacity(64);
225
226 let mut pending = vec![];
227 let mut futures = FuturesUnordered::new();
228 futures.push(partition.list(store));
229
230 while let Some((partition, paths)) = futures.next().await.transpose()? {
231 if let Some(next) = pending.pop() {
235 futures.push(next)
236 }
237
238 let depth = partition.depth;
239 out.push(partition);
240 for path in paths {
241 let child = Partition {
242 path,
243 depth: depth + 1,
244 files: None,
245 };
246 match depth < max_depth {
247 true => match futures.len() < CONCURRENCY_LIMIT {
248 true => futures.push(child.list(store)),
249 false => pending.push(child.list(store)),
250 },
251 false => out.push(child),
252 }
253 }
254 }
255 Ok(out)
256}
257
258#[derive(Debug)]
259enum PartitionValue {
260 Single(String),
261 Multi,
262}
263
264fn populate_partition_values<'a>(
265 partition_values: &mut HashMap<&'a str, PartitionValue>,
266 filter: &'a Expr,
267) {
268 if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = filter {
269 match op {
270 Operator::Eq => match (left.as_ref(), right.as_ref()) {
271 (Expr::Column(Column { name, .. }), Expr::Literal(val, _))
272 | (Expr::Literal(val, _), Expr::Column(Column { name, .. }))
273 if partition_values
274 .insert(name, PartitionValue::Single(val.to_string()))
275 .is_some() =>
276 {
277 partition_values.insert(name, PartitionValue::Multi);
278 }
279 (Expr::Column(Column { .. }), Expr::Literal(_, _))
280 | (Expr::Literal(_, _), Expr::Column(Column { .. })) => {}
281 _ => {}
282 },
283 Operator::And => {
284 populate_partition_values(partition_values, left);
285 populate_partition_values(partition_values, right);
286 }
287 _ => {}
288 }
289 }
290}
291
292pub fn evaluate_partition_prefix<'a>(
293 partition_cols: &'a [(String, DataType)],
294 filters: &'a [Expr],
295) -> Option<Path> {
296 let mut partition_values = HashMap::new();
297 for filter in filters {
298 populate_partition_values(&mut partition_values, filter);
299 }
300
301 if partition_values.is_empty() {
302 return None;
303 }
304
305 let mut parts = vec![];
306 for (p, _) in partition_cols {
307 match partition_values.get(p.as_str()) {
308 Some(PartitionValue::Single(val)) => {
309 parts.push(format!("{p}={val}"));
312 }
313 _ => {
314 break;
317 }
318 }
319 }
320
321 if parts.is_empty() {
322 None
323 } else {
324 Some(Path::from_iter(parts))
325 }
326}
327
328fn filter_partitions(
329 pf: PartitionedFile,
330 filters: &[Expr],
331 df_schema: &DFSchema,
332) -> Result<Option<PartitionedFile>> {
333 if pf.partition_values.is_empty() && !filters.is_empty() {
334 return Ok(None);
335 } else if filters.is_empty() {
336 return Ok(Some(pf));
337 }
338
339 let arrays = pf
340 .partition_values
341 .iter()
342 .map(|v| v.to_array())
343 .collect::<Result<_, _>>()?;
344
345 let batch = RecordBatch::try_new(Arc::clone(df_schema.inner()), arrays)?;
346
347 let filter = utils::conjunction(filters.iter().cloned()).unwrap_or_else(|| lit(true));
348 let props = ExecutionProps::new();
349 let expr = create_physical_expr(&filter, df_schema, &props)?;
350
351 let matches = expr.evaluate(&batch)?.into_array(1)?;
354 if matches.as_boolean().value(0) {
355 return Ok(Some(pf));
356 }
357
358 Ok(None)
359}
360
361fn try_into_partitioned_file(
366 object_meta: ObjectMeta,
367 partition_cols: &[(String, DataType)],
368 table_path: &ListingTableUrl,
369) -> Result<Option<PartitionedFile>> {
370 let cols = partition_cols.iter().map(|(name, _)| name.as_str());
371 let parsed = parse_partitions_for_path(table_path, &object_meta.location, cols);
372
373 let Some(parsed) = parsed else {
374 return Ok(None);
376 };
377
378 let partition_values = parsed
379 .into_iter()
380 .zip(partition_cols)
381 .map(|(parsed, (_, datatype))| {
382 ScalarValue::try_from_string(parsed.to_string(), datatype)
383 })
384 .collect::<Result<Vec<_>>>()?;
385
386 let mut pf: PartitionedFile = object_meta.into();
387 pf.partition_values = partition_values;
388 pf.table_reference.clone_from(table_path.get_table_ref());
389
390 Ok(Some(pf))
391}
392
393pub async fn pruned_partition_list<'a>(
398 ctx: &'a dyn Session,
399 store: &'a dyn ObjectStore,
400 table_path: &'a ListingTableUrl,
401 filters: &'a [Expr],
402 file_extension: &'a str,
403 partition_cols: &'a [(String, DataType)],
404) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
405 let prefix = if !partition_cols.is_empty() {
406 evaluate_partition_prefix(partition_cols, filters)
407 } else {
408 None
409 };
410
411 let objects = table_path
412 .list_prefixed_files(ctx, store, prefix, file_extension)
413 .await?
414 .try_filter(|object_meta| futures::future::ready(object_meta.size > 0));
415
416 if partition_cols.is_empty() {
417 assert_or_internal_err!(
418 filters.is_empty(),
419 "Got partition filters for unpartitioned table {}",
420 table_path
421 );
422
423 Ok(objects
425 .try_filter_map(|object_meta| {
426 futures::future::ready(object_meta_to_partitioned_file(
427 object_meta,
428 table_path.get_table_ref(),
429 ))
430 })
431 .boxed())
432 } else {
433 let df_schema = DFSchema::from_unqualified_fields(
434 partition_cols
435 .iter()
436 .map(|(n, d)| Field::new(n, d.clone(), true))
437 .collect(),
438 Default::default(),
439 )?;
440
441 Ok(objects
442 .try_filter_map(|object_meta| {
443 futures::future::ready(try_into_partitioned_file(
444 object_meta,
445 partition_cols,
446 table_path,
447 ))
448 })
449 .try_filter_map(move |pf| {
450 futures::future::ready(filter_partitions(pf, filters, &df_schema))
451 })
452 .boxed())
453 }
454}
455
456fn object_meta_to_partitioned_file(
457 object_meta: ObjectMeta,
458 table_ref: &Option<TableReference>,
459) -> Result<Option<PartitionedFile>> {
460 Ok(Some(PartitionedFile {
461 object_meta,
462 partition_values: vec![],
463 range: None,
464 statistics: None,
465 ordering: None,
466 extensions: FileExtensions::new(),
467 metadata_size_hint: None,
468 table_reference: table_ref.clone(),
469 }))
470}
471
472pub fn parse_partitions_for_path<'a, I>(
475 table_path: &ListingTableUrl,
476 file_path: &'a Path,
477 table_partition_cols: I,
478) -> Option<Vec<&'a str>>
479where
480 I: IntoIterator<Item = &'a str>,
481{
482 let subpath = table_path.strip_prefix(file_path)?;
483
484 let mut part_values = vec![];
485 for (part, expected_partition) in subpath.zip(table_partition_cols) {
486 match part.split_once('=') {
487 Some((name, val)) if name == expected_partition => part_values.push(val),
488 _ => {
489 debug!(
490 "Ignoring file: file_path='{file_path}', table_path='{table_path}', part='{part}', partition_col='{expected_partition}'",
491 );
492 return None;
493 }
494 }
495 }
496 Some(part_values)
497}
498pub fn describe_partition(partition: &Partition) -> (&str, usize, Vec<&str>) {
500 (
501 partition.path.as_ref(),
502 partition.depth,
503 partition
504 .files
505 .as_ref()
506 .map(|f| f.iter().map(|f| f.location.filename().unwrap()).collect())
507 .unwrap_or_default(),
508 )
509}
510
511#[cfg(test)]
512mod tests {
513 use datafusion_datasource::file_groups::FileGroup;
514 use std::ops::Not;
515
516 use super::*;
517 use datafusion_expr::{case, col};
518
519 #[test]
520 fn test_split_files() {
521 let new_partitioned_file = |path: &str| PartitionedFile::new(path.to_owned(), 10);
522 let files = FileGroup::new(vec![
523 new_partitioned_file("a"),
524 new_partitioned_file("b"),
525 new_partitioned_file("c"),
526 new_partitioned_file("d"),
527 new_partitioned_file("e"),
528 ]);
529
530 let chunks = files.clone().split_files(1);
531 assert_eq!(1, chunks.len());
532 assert_eq!(5, chunks[0].len());
533
534 let chunks = files.clone().split_files(2);
535 assert_eq!(2, chunks.len());
536 assert_eq!(3, chunks[0].len());
537 assert_eq!(2, chunks[1].len());
538
539 let chunks = files.clone().split_files(5);
540 assert_eq!(5, chunks.len());
541 assert_eq!(1, chunks[0].len());
542 assert_eq!(1, chunks[1].len());
543 assert_eq!(1, chunks[2].len());
544 assert_eq!(1, chunks[3].len());
545 assert_eq!(1, chunks[4].len());
546
547 let chunks = files.clone().split_files(123);
548 assert_eq!(5, chunks.len());
549 assert_eq!(1, chunks[0].len());
550 assert_eq!(1, chunks[1].len());
551 assert_eq!(1, chunks[2].len());
552 assert_eq!(1, chunks[3].len());
553 assert_eq!(1, chunks[4].len());
554
555 let empty_group = FileGroup::default();
556 let chunks = empty_group.split_files(2);
557 assert_eq!(0, chunks.len());
558 }
559
560 #[test]
561 fn test_parse_partitions_for_path() {
562 assert_eq!(
563 Some(vec![]),
564 parse_partitions_for_path(
565 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
566 &Path::from("bucket/mytable/file.csv"),
567 vec![]
568 )
569 );
570 assert_eq!(
571 None,
572 parse_partitions_for_path(
573 &ListingTableUrl::parse("file:///bucket/othertable").unwrap(),
574 &Path::from("bucket/mytable/file.csv"),
575 vec![]
576 )
577 );
578 assert_eq!(
579 None,
580 parse_partitions_for_path(
581 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
582 &Path::from("bucket/mytable/file.csv"),
583 vec!["mypartition"]
584 )
585 );
586 assert_eq!(
587 Some(vec!["v1"]),
588 parse_partitions_for_path(
589 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
590 &Path::from("bucket/mytable/mypartition=v1/file.csv"),
591 vec!["mypartition"]
592 )
593 );
594 assert_eq!(
595 Some(vec!["v1"]),
596 parse_partitions_for_path(
597 &ListingTableUrl::parse("file:///bucket/mytable/").unwrap(),
598 &Path::from("bucket/mytable/mypartition=v1/file.csv"),
599 vec!["mypartition"]
600 )
601 );
602 assert_eq!(
604 None,
605 parse_partitions_for_path(
606 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
607 &Path::from("bucket/mytable/v1/file.csv"),
608 vec!["mypartition"]
609 )
610 );
611 assert_eq!(
612 Some(vec!["v1", "v2"]),
613 parse_partitions_for_path(
614 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
615 &Path::from("bucket/mytable/mypartition=v1/otherpartition=v2/file.csv"),
616 vec!["mypartition", "otherpartition"]
617 )
618 );
619 assert_eq!(
620 Some(vec!["v1"]),
621 parse_partitions_for_path(
622 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
623 &Path::from("bucket/mytable/mypartition=v1/otherpartition=v2/file.csv"),
624 vec!["mypartition"]
625 )
626 );
627 }
628
629 #[test]
630 fn test_try_into_partitioned_file_valid_partition() {
631 let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
632 let partition_cols = vec![("year_month".to_string(), DataType::Utf8)];
633 let meta = ObjectMeta {
634 location: Path::from("bucket/mytable/year_month=2024-01/data.parquet"),
635 last_modified: chrono::Utc::now(),
636 size: 100,
637 e_tag: None,
638 version: None,
639 };
640
641 let result =
642 try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
643 assert!(result.is_some());
644 let pf = result.unwrap();
645 assert_eq!(pf.partition_values.len(), 1);
646 assert_eq!(
647 pf.partition_values[0],
648 ScalarValue::Utf8(Some("2024-01".to_string()))
649 );
650 }
651
652 #[test]
653 fn test_try_into_partitioned_file_root_file_skipped() {
654 let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
658 let partition_cols = vec![("year_month".to_string(), DataType::Utf8)];
659 let meta = ObjectMeta {
660 location: Path::from("bucket/mytable/data.parquet"),
661 last_modified: chrono::Utc::now(),
662 size: 100,
663 e_tag: None,
664 version: None,
665 };
666
667 let result =
668 try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
669 assert!(
670 result.is_none(),
671 "Files outside partition structure should be skipped"
672 );
673 }
674
675 #[test]
676 fn test_try_into_partitioned_file_wrong_partition_name() {
677 let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
679 let partition_cols = vec![("year_month".to_string(), DataType::Utf8)];
680 let meta = ObjectMeta {
681 location: Path::from("bucket/mytable/wrong_col=2024-01/data.parquet"),
682 last_modified: chrono::Utc::now(),
683 size: 100,
684 e_tag: None,
685 version: None,
686 };
687
688 let result =
689 try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
690 assert!(
691 result.is_none(),
692 "Files with wrong partition column name should be skipped"
693 );
694 }
695
696 #[test]
697 fn test_try_into_partitioned_file_multiple_partitions() {
698 let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
699 let partition_cols = vec![
700 ("year".to_string(), DataType::Utf8),
701 ("month".to_string(), DataType::Utf8),
702 ];
703 let meta = ObjectMeta {
704 location: Path::from("bucket/mytable/year=2024/month=01/data.parquet"),
705 last_modified: chrono::Utc::now(),
706 size: 100,
707 e_tag: None,
708 version: None,
709 };
710
711 let result =
712 try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
713 assert!(result.is_some());
714 let pf = result.unwrap();
715 assert_eq!(pf.partition_values.len(), 2);
716 assert_eq!(
717 pf.partition_values[0],
718 ScalarValue::Utf8(Some("2024".to_string()))
719 );
720 assert_eq!(
721 pf.partition_values[1],
722 ScalarValue::Utf8(Some("01".to_string()))
723 );
724 }
725
726 #[test]
727 fn test_try_into_partitioned_file_partial_partition_skipped() {
728 let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
730 let partition_cols = vec![
731 ("year".to_string(), DataType::Utf8),
732 ("month".to_string(), DataType::Utf8),
733 ];
734 let meta = ObjectMeta {
735 location: Path::from("bucket/mytable/year=2024/data.parquet"),
736 last_modified: chrono::Utc::now(),
737 size: 100,
738 e_tag: None,
739 version: None,
740 };
741
742 let result =
743 try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
744 assert!(
748 result.is_none(),
749 "Files with incomplete partition structure should be skipped"
750 );
751 }
752
753 #[test]
754 fn test_expr_applicable_for_cols() {
755 assert!(expr_applicable_for_cols(
756 &["c1"],
757 &Expr::eq(col("c1"), lit("value"))
758 ));
759 assert!(!expr_applicable_for_cols(
760 &["c1"],
761 &Expr::eq(col("c2"), lit("value"))
762 ));
763 assert!(!expr_applicable_for_cols(
764 &["c1"],
765 &Expr::eq(col("c1"), col("c2"))
766 ));
767 assert!(expr_applicable_for_cols(
768 &["c1", "c2"],
769 &Expr::eq(col("c1"), col("c2"))
770 ));
771 assert!(expr_applicable_for_cols(
772 &["c1", "c2"],
773 &(Expr::eq(col("c1"), col("c2").alias("c2_alias"))).not()
774 ));
775 assert!(expr_applicable_for_cols(
776 &["c1", "c2"],
777 &(case(col("c1"))
778 .when(lit("v1"), lit(true))
779 .otherwise(lit(false))
780 .expect("valid case expr"))
781 ));
782 assert!(expr_applicable_for_cols(&[], &lit(true)));
786 }
787
788 #[test]
789 fn test_evaluate_partition_prefix() {
790 let partitions = &[
791 ("a".to_string(), DataType::Utf8),
792 ("b".to_string(), DataType::Int16),
793 ("c".to_string(), DataType::Boolean),
794 ];
795
796 assert_eq!(
797 evaluate_partition_prefix(partitions, &[col("a").eq(lit("foo"))]),
798 Some(Path::from("a=foo")),
799 );
800
801 assert_eq!(
802 evaluate_partition_prefix(partitions, &[lit("foo").eq(col("a"))]),
803 Some(Path::from("a=foo")),
804 );
805
806 assert_eq!(
807 evaluate_partition_prefix(
808 partitions,
809 &[col("a").eq(lit("foo")).and(col("b").eq(lit("bar")))],
810 ),
811 Some(Path::from("a=foo/b=bar")),
812 );
813
814 assert_eq!(
815 evaluate_partition_prefix(
816 partitions,
817 &[col("a").eq(lit("foo")), col("b").eq(lit("bar")),],
819 ),
820 Some(Path::from("a=foo/b=bar")),
821 );
822
823 assert_eq!(
824 evaluate_partition_prefix(
825 partitions,
826 &[col("a")
827 .eq(lit("foo"))
828 .and(col("b").eq(lit("1")))
829 .and(col("c").eq(lit("true")))],
830 ),
831 Some(Path::from("a=foo/b=1/c=true")),
832 );
833
834 assert_eq!(evaluate_partition_prefix(partitions, &[]), None);
836
837 assert_eq!(
839 evaluate_partition_prefix(partitions, &[Expr::eq(col("b"), lit("foo"))]),
840 None,
841 );
842
843 assert_eq!(
845 evaluate_partition_prefix(
846 partitions,
847 &[col("a").eq(lit("foo")).and(col("c").eq(lit("baz")))],
848 ),
849 Some(Path::from("a=foo")),
850 );
851
852 assert_eq!(
854 evaluate_partition_prefix(
855 partitions,
856 &[Expr::and(col("a").eq(lit("foo")), col("a").eq(lit("bar")))],
857 ),
858 None,
859 );
860
861 assert_eq!(
863 evaluate_partition_prefix(
864 partitions,
865 &[Expr::or(col("a").eq(lit("foo")), col("a").eq(lit("bar")))],
866 ),
867 None,
868 );
869 assert_eq!(
870 evaluate_partition_prefix(partitions, &[col("b").lt(lit(5))],),
871 None,
872 );
873 }
874
875 #[test]
876 fn test_evaluate_date_partition_prefix() {
877 let partitions = &[("a".to_string(), DataType::Date32)];
878 assert_eq!(
879 evaluate_partition_prefix(
880 partitions,
881 &[col("a").eq(Expr::Literal(ScalarValue::Date32(Some(3)), None))],
882 ),
883 Some(Path::from("a=1970-01-04")),
884 );
885
886 let partitions = &[("a".to_string(), DataType::Date64)];
887 assert_eq!(
888 evaluate_partition_prefix(
889 partitions,
890 &[col("a").eq(Expr::Literal(
891 ScalarValue::Date64(Some(4 * 24 * 60 * 60 * 1000)),
892 None
893 )),],
894 ),
895 Some(Path::from("a=1970-01-05")),
896 );
897 }
898}