1use std::mem;
21use std::sync::Arc;
22
23use datafusion_catalog::Session;
24use datafusion_common::{HashMap, Result, ScalarValue, assert_or_internal_err};
25use datafusion_datasource::ListingTableUrl;
26use datafusion_datasource::PartitionedFile;
27use datafusion_expr::{BinaryExpr, Operator, lit, utils};
28
29use arrow::{
30 array::AsArray,
31 datatypes::{DataType, Field},
32 record_batch::RecordBatch,
33};
34use datafusion_expr::execution_props::ExecutionProps;
35use futures::stream::FuturesUnordered;
36use futures::{StreamExt, TryStreamExt, stream::BoxStream};
37use log::{debug, trace};
38
39use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
40use datafusion_common::{Column, DFSchema};
41use datafusion_expr::{Expr, Volatility};
42use datafusion_physical_expr::create_physical_expr;
43use object_store::path::Path;
44use object_store::{ObjectMeta, ObjectStore};
45
46pub fn expr_applicable_for_cols(col_names: &[&str], expr: &Expr) -> bool {
52 let mut is_applicable = true;
53 expr.apply(|expr| match expr {
54 Expr::Column(Column { name, .. }) => {
55 is_applicable &= col_names.contains(&name.as_str());
56 if is_applicable {
57 Ok(TreeNodeRecursion::Jump)
58 } else {
59 Ok(TreeNodeRecursion::Stop)
60 }
61 }
62 Expr::Literal(_, _)
63 | Expr::Alias(_)
64 | Expr::OuterReferenceColumn(_, _)
65 | Expr::ScalarVariable(_, _)
66 | Expr::Not(_)
67 | Expr::IsNotNull(_)
68 | Expr::IsNull(_)
69 | Expr::IsTrue(_)
70 | Expr::IsFalse(_)
71 | Expr::IsUnknown(_)
72 | Expr::IsNotTrue(_)
73 | Expr::IsNotFalse(_)
74 | Expr::IsNotUnknown(_)
75 | Expr::Negative(_)
76 | Expr::Cast(_)
77 | Expr::TryCast(_)
78 | Expr::BinaryExpr(_)
79 | Expr::Between(_)
80 | Expr::Like(_)
81 | Expr::SimilarTo(_)
82 | Expr::InList(_)
83 | Expr::Exists(_)
84 | Expr::InSubquery(_)
85 | Expr::ScalarSubquery(_)
86 | Expr::SetComparison(_)
87 | Expr::GroupingSet(_)
88 | Expr::Case(_) => Ok(TreeNodeRecursion::Continue),
89
90 Expr::ScalarFunction(scalar_function) => {
91 match scalar_function.func.signature().volatility {
92 Volatility::Immutable => Ok(TreeNodeRecursion::Continue),
93 Volatility::Stable | Volatility::Volatile => {
95 is_applicable = false;
96 Ok(TreeNodeRecursion::Stop)
97 }
98 }
99 }
100
101 #[expect(deprecated)]
107 Expr::AggregateFunction { .. }
108 | Expr::WindowFunction { .. }
109 | Expr::Wildcard { .. }
110 | Expr::Unnest { .. }
111 | Expr::Placeholder(_) => {
112 is_applicable = false;
113 Ok(TreeNodeRecursion::Stop)
114 }
115 })
116 .unwrap();
117 is_applicable
118}
119
120const CONCURRENCY_LIMIT: usize = 100;
122
123#[deprecated(since = "47.0.0", note = "use `FileGroup::split_files` instead")]
125pub fn split_files(
126 mut partitioned_files: Vec<PartitionedFile>,
127 n: usize,
128) -> Vec<Vec<PartitionedFile>> {
129 if partitioned_files.is_empty() {
130 return vec![];
131 }
132
133 partitioned_files.sort_by(|a, b| a.path().cmp(b.path()));
137
138 let chunk_size = partitioned_files.len().div_ceil(n);
140 let mut chunks = Vec::with_capacity(n);
141 let mut current_chunk = Vec::with_capacity(chunk_size);
142 for file in partitioned_files.drain(..) {
143 current_chunk.push(file);
144 if current_chunk.len() == chunk_size {
145 let full_chunk =
146 mem::replace(&mut current_chunk, Vec::with_capacity(chunk_size));
147 chunks.push(full_chunk);
148 }
149 }
150
151 if !current_chunk.is_empty() {
152 chunks.push(current_chunk)
153 }
154
155 chunks
156}
157
158#[derive(Debug)]
159pub struct Partition {
160 path: Path,
162 depth: usize,
165 files: Option<Vec<ObjectMeta>>,
167}
168
169impl Partition {
170 async fn list(mut self, store: &dyn ObjectStore) -> Result<(Self, Vec<Path>)> {
173 trace!("Listing partition {}", self.path);
174 let prefix = Some(&self.path).filter(|p| !p.as_ref().is_empty());
175 let result = store.list_with_delimiter(prefix).await?;
176 self.files = Some(
177 result
178 .objects
179 .into_iter()
180 .filter(|object_meta| object_meta.size > 0)
181 .collect(),
182 );
183 Ok((self, result.common_prefixes))
184 }
185}
186
187pub async fn list_partitions(
189 store: &dyn ObjectStore,
190 table_path: &ListingTableUrl,
191 max_depth: usize,
192 partition_prefix: Option<Path>,
193) -> Result<Vec<Partition>> {
194 let partition = Partition {
195 path: match partition_prefix {
196 Some(prefix) => Path::from_iter(
197 Path::from(table_path.prefix().as_ref())
198 .parts()
199 .chain(Path::from(prefix.as_ref()).parts()),
200 ),
201 None => table_path.prefix().clone(),
202 },
203 depth: 0,
204 files: None,
205 };
206
207 let mut out = Vec::with_capacity(64);
208
209 let mut pending = vec![];
210 let mut futures = FuturesUnordered::new();
211 futures.push(partition.list(store));
212
213 while let Some((partition, paths)) = futures.next().await.transpose()? {
214 if let Some(next) = pending.pop() {
218 futures.push(next)
219 }
220
221 let depth = partition.depth;
222 out.push(partition);
223 for path in paths {
224 let child = Partition {
225 path,
226 depth: depth + 1,
227 files: None,
228 };
229 match depth < max_depth {
230 true => match futures.len() < CONCURRENCY_LIMIT {
231 true => futures.push(child.list(store)),
232 false => pending.push(child.list(store)),
233 },
234 false => out.push(child),
235 }
236 }
237 }
238 Ok(out)
239}
240
241#[derive(Debug)]
242enum PartitionValue {
243 Single(String),
244 Multi,
245}
246
247fn populate_partition_values<'a>(
248 partition_values: &mut HashMap<&'a str, PartitionValue>,
249 filter: &'a Expr,
250) {
251 if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = filter {
252 match op {
253 Operator::Eq => match (left.as_ref(), right.as_ref()) {
254 (Expr::Column(Column { name, .. }), Expr::Literal(val, _))
255 | (Expr::Literal(val, _), Expr::Column(Column { name, .. })) => {
256 if partition_values
257 .insert(name, PartitionValue::Single(val.to_string()))
258 .is_some()
259 {
260 partition_values.insert(name, PartitionValue::Multi);
261 }
262 }
263 _ => {}
264 },
265 Operator::And => {
266 populate_partition_values(partition_values, left);
267 populate_partition_values(partition_values, right);
268 }
269 _ => {}
270 }
271 }
272}
273
274pub fn evaluate_partition_prefix<'a>(
275 partition_cols: &'a [(String, DataType)],
276 filters: &'a [Expr],
277) -> Option<Path> {
278 let mut partition_values = HashMap::new();
279 for filter in filters {
280 populate_partition_values(&mut partition_values, filter);
281 }
282
283 if partition_values.is_empty() {
284 return None;
285 }
286
287 let mut parts = vec![];
288 for (p, _) in partition_cols {
289 match partition_values.get(p.as_str()) {
290 Some(PartitionValue::Single(val)) => {
291 parts.push(format!("{p}={val}"));
294 }
295 _ => {
296 break;
299 }
300 }
301 }
302
303 if parts.is_empty() {
304 None
305 } else {
306 Some(Path::from_iter(parts))
307 }
308}
309
310fn filter_partitions(
311 pf: PartitionedFile,
312 filters: &[Expr],
313 df_schema: &DFSchema,
314) -> Result<Option<PartitionedFile>> {
315 if pf.partition_values.is_empty() && !filters.is_empty() {
316 return Ok(None);
317 } else if filters.is_empty() {
318 return Ok(Some(pf));
319 }
320
321 let arrays = pf
322 .partition_values
323 .iter()
324 .map(|v| v.to_array())
325 .collect::<Result<_, _>>()?;
326
327 let batch = RecordBatch::try_new(Arc::clone(df_schema.inner()), arrays)?;
328
329 let filter = utils::conjunction(filters.iter().cloned()).unwrap_or_else(|| lit(true));
330 let props = ExecutionProps::new();
331 let expr = create_physical_expr(&filter, df_schema, &props)?;
332
333 let matches = expr.evaluate(&batch)?.into_array(1)?;
336 if matches.as_boolean().value(0) {
337 return Ok(Some(pf));
338 }
339
340 Ok(None)
341}
342
343fn try_into_partitioned_file(
344 object_meta: ObjectMeta,
345 partition_cols: &[(String, DataType)],
346 table_path: &ListingTableUrl,
347) -> Result<PartitionedFile> {
348 let cols = partition_cols.iter().map(|(name, _)| name.as_str());
349 let parsed = parse_partitions_for_path(table_path, &object_meta.location, cols);
350
351 let partition_values = parsed
352 .into_iter()
353 .flatten()
354 .zip(partition_cols)
355 .map(|(parsed, (_, datatype))| {
356 ScalarValue::try_from_string(parsed.to_string(), datatype)
357 })
358 .collect::<Result<Vec<_>>>()?;
359
360 let mut pf: PartitionedFile = object_meta.into();
361 pf.partition_values = partition_values;
362
363 Ok(pf)
364}
365
366pub async fn pruned_partition_list<'a>(
371 ctx: &'a dyn Session,
372 store: &'a dyn ObjectStore,
373 table_path: &'a ListingTableUrl,
374 filters: &'a [Expr],
375 file_extension: &'a str,
376 partition_cols: &'a [(String, DataType)],
377) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
378 let prefix = if !partition_cols.is_empty() {
379 evaluate_partition_prefix(partition_cols, filters)
380 } else {
381 None
382 };
383
384 let objects = table_path
385 .list_prefixed_files(ctx, store, prefix, file_extension)
386 .await?
387 .try_filter(|object_meta| futures::future::ready(object_meta.size > 0));
388
389 if partition_cols.is_empty() {
390 assert_or_internal_err!(
391 filters.is_empty(),
392 "Got partition filters for unpartitioned table {}",
393 table_path
394 );
395
396 Ok(objects.map_ok(|object_meta| object_meta.into()).boxed())
398 } else {
399 let df_schema = DFSchema::from_unqualified_fields(
400 partition_cols
401 .iter()
402 .map(|(n, d)| Field::new(n, d.clone(), true))
403 .collect(),
404 Default::default(),
405 )?;
406
407 Ok(objects
408 .map_ok(|object_meta| {
409 try_into_partitioned_file(object_meta, partition_cols, table_path)
410 })
411 .try_filter_map(move |pf| {
412 futures::future::ready(
413 pf.and_then(|pf| filter_partitions(pf, filters, &df_schema)),
414 )
415 })
416 .boxed())
417 }
418}
419
420pub fn parse_partitions_for_path<'a, I>(
423 table_path: &ListingTableUrl,
424 file_path: &'a Path,
425 table_partition_cols: I,
426) -> Option<Vec<&'a str>>
427where
428 I: IntoIterator<Item = &'a str>,
429{
430 let subpath = table_path.strip_prefix(file_path)?;
431
432 let mut part_values = vec![];
433 for (part, expected_partition) in subpath.zip(table_partition_cols) {
434 match part.split_once('=') {
435 Some((name, val)) if name == expected_partition => part_values.push(val),
436 _ => {
437 debug!(
438 "Ignoring file: file_path='{file_path}', table_path='{table_path}', part='{part}', partition_col='{expected_partition}'",
439 );
440 return None;
441 }
442 }
443 }
444 Some(part_values)
445}
446pub fn describe_partition(partition: &Partition) -> (&str, usize, Vec<&str>) {
448 (
449 partition.path.as_ref(),
450 partition.depth,
451 partition
452 .files
453 .as_ref()
454 .map(|f| f.iter().map(|f| f.location.filename().unwrap()).collect())
455 .unwrap_or_default(),
456 )
457}
458
459#[cfg(test)]
460mod tests {
461 use datafusion_datasource::file_groups::FileGroup;
462 use std::ops::Not;
463
464 use super::*;
465 use datafusion_expr::{Expr, case, col, lit};
466
467 #[test]
468 fn test_split_files() {
469 let new_partitioned_file = |path: &str| PartitionedFile::new(path.to_owned(), 10);
470 let files = FileGroup::new(vec![
471 new_partitioned_file("a"),
472 new_partitioned_file("b"),
473 new_partitioned_file("c"),
474 new_partitioned_file("d"),
475 new_partitioned_file("e"),
476 ]);
477
478 let chunks = files.clone().split_files(1);
479 assert_eq!(1, chunks.len());
480 assert_eq!(5, chunks[0].len());
481
482 let chunks = files.clone().split_files(2);
483 assert_eq!(2, chunks.len());
484 assert_eq!(3, chunks[0].len());
485 assert_eq!(2, chunks[1].len());
486
487 let chunks = files.clone().split_files(5);
488 assert_eq!(5, chunks.len());
489 assert_eq!(1, chunks[0].len());
490 assert_eq!(1, chunks[1].len());
491 assert_eq!(1, chunks[2].len());
492 assert_eq!(1, chunks[3].len());
493 assert_eq!(1, chunks[4].len());
494
495 let chunks = files.clone().split_files(123);
496 assert_eq!(5, chunks.len());
497 assert_eq!(1, chunks[0].len());
498 assert_eq!(1, chunks[1].len());
499 assert_eq!(1, chunks[2].len());
500 assert_eq!(1, chunks[3].len());
501 assert_eq!(1, chunks[4].len());
502
503 let empty_group = FileGroup::default();
504 let chunks = empty_group.split_files(2);
505 assert_eq!(0, chunks.len());
506 }
507
508 #[test]
509 fn test_parse_partitions_for_path() {
510 assert_eq!(
511 Some(vec![]),
512 parse_partitions_for_path(
513 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
514 &Path::from("bucket/mytable/file.csv"),
515 vec![]
516 )
517 );
518 assert_eq!(
519 None,
520 parse_partitions_for_path(
521 &ListingTableUrl::parse("file:///bucket/othertable").unwrap(),
522 &Path::from("bucket/mytable/file.csv"),
523 vec![]
524 )
525 );
526 assert_eq!(
527 None,
528 parse_partitions_for_path(
529 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
530 &Path::from("bucket/mytable/file.csv"),
531 vec!["mypartition"]
532 )
533 );
534 assert_eq!(
535 Some(vec!["v1"]),
536 parse_partitions_for_path(
537 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
538 &Path::from("bucket/mytable/mypartition=v1/file.csv"),
539 vec!["mypartition"]
540 )
541 );
542 assert_eq!(
543 Some(vec!["v1"]),
544 parse_partitions_for_path(
545 &ListingTableUrl::parse("file:///bucket/mytable/").unwrap(),
546 &Path::from("bucket/mytable/mypartition=v1/file.csv"),
547 vec!["mypartition"]
548 )
549 );
550 assert_eq!(
552 None,
553 parse_partitions_for_path(
554 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
555 &Path::from("bucket/mytable/v1/file.csv"),
556 vec!["mypartition"]
557 )
558 );
559 assert_eq!(
560 Some(vec!["v1", "v2"]),
561 parse_partitions_for_path(
562 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
563 &Path::from("bucket/mytable/mypartition=v1/otherpartition=v2/file.csv"),
564 vec!["mypartition", "otherpartition"]
565 )
566 );
567 assert_eq!(
568 Some(vec!["v1"]),
569 parse_partitions_for_path(
570 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
571 &Path::from("bucket/mytable/mypartition=v1/otherpartition=v2/file.csv"),
572 vec!["mypartition"]
573 )
574 );
575 }
576
577 #[test]
578 fn test_expr_applicable_for_cols() {
579 assert!(expr_applicable_for_cols(
580 &["c1"],
581 &Expr::eq(col("c1"), lit("value"))
582 ));
583 assert!(!expr_applicable_for_cols(
584 &["c1"],
585 &Expr::eq(col("c2"), lit("value"))
586 ));
587 assert!(!expr_applicable_for_cols(
588 &["c1"],
589 &Expr::eq(col("c1"), col("c2"))
590 ));
591 assert!(expr_applicable_for_cols(
592 &["c1", "c2"],
593 &Expr::eq(col("c1"), col("c2"))
594 ));
595 assert!(expr_applicable_for_cols(
596 &["c1", "c2"],
597 &(Expr::eq(col("c1"), col("c2").alias("c2_alias"))).not()
598 ));
599 assert!(expr_applicable_for_cols(
600 &["c1", "c2"],
601 &(case(col("c1"))
602 .when(lit("v1"), lit(true))
603 .otherwise(lit(false))
604 .expect("valid case expr"))
605 ));
606 assert!(expr_applicable_for_cols(&[], &lit(true)));
610 }
611
612 #[test]
613 fn test_evaluate_partition_prefix() {
614 let partitions = &[
615 ("a".to_string(), DataType::Utf8),
616 ("b".to_string(), DataType::Int16),
617 ("c".to_string(), DataType::Boolean),
618 ];
619
620 assert_eq!(
621 evaluate_partition_prefix(partitions, &[col("a").eq(lit("foo"))]),
622 Some(Path::from("a=foo")),
623 );
624
625 assert_eq!(
626 evaluate_partition_prefix(partitions, &[lit("foo").eq(col("a"))]),
627 Some(Path::from("a=foo")),
628 );
629
630 assert_eq!(
631 evaluate_partition_prefix(
632 partitions,
633 &[col("a").eq(lit("foo")).and(col("b").eq(lit("bar")))],
634 ),
635 Some(Path::from("a=foo/b=bar")),
636 );
637
638 assert_eq!(
639 evaluate_partition_prefix(
640 partitions,
641 &[col("a").eq(lit("foo")), col("b").eq(lit("bar")),],
643 ),
644 Some(Path::from("a=foo/b=bar")),
645 );
646
647 assert_eq!(
648 evaluate_partition_prefix(
649 partitions,
650 &[col("a")
651 .eq(lit("foo"))
652 .and(col("b").eq(lit("1")))
653 .and(col("c").eq(lit("true")))],
654 ),
655 Some(Path::from("a=foo/b=1/c=true")),
656 );
657
658 assert_eq!(evaluate_partition_prefix(partitions, &[]), None);
660
661 assert_eq!(
663 evaluate_partition_prefix(partitions, &[Expr::eq(col("b"), lit("foo"))]),
664 None,
665 );
666
667 assert_eq!(
669 evaluate_partition_prefix(
670 partitions,
671 &[col("a").eq(lit("foo")).and(col("c").eq(lit("baz")))],
672 ),
673 Some(Path::from("a=foo")),
674 );
675
676 assert_eq!(
678 evaluate_partition_prefix(
679 partitions,
680 &[Expr::and(col("a").eq(lit("foo")), col("a").eq(lit("bar")))],
681 ),
682 None,
683 );
684
685 assert_eq!(
687 evaluate_partition_prefix(
688 partitions,
689 &[Expr::or(col("a").eq(lit("foo")), col("a").eq(lit("bar")))],
690 ),
691 None,
692 );
693 assert_eq!(
694 evaluate_partition_prefix(partitions, &[col("b").lt(lit(5))],),
695 None,
696 );
697 }
698
699 #[test]
700 fn test_evaluate_date_partition_prefix() {
701 let partitions = &[("a".to_string(), DataType::Date32)];
702 assert_eq!(
703 evaluate_partition_prefix(
704 partitions,
705 &[col("a").eq(Expr::Literal(ScalarValue::Date32(Some(3)), None))],
706 ),
707 Some(Path::from("a=1970-01-04")),
708 );
709
710 let partitions = &[("a".to_string(), DataType::Date64)];
711 assert_eq!(
712 evaluate_partition_prefix(
713 partitions,
714 &[col("a").eq(Expr::Literal(
715 ScalarValue::Date64(Some(4 * 24 * 60 * 60 * 1000)),
716 None
717 )),],
718 ),
719 Some(Path::from("a=1970-01-05")),
720 );
721 }
722}