1use std::mem;
21use std::sync::Arc;
22
23use datafusion_catalog::Session;
24use datafusion_common::{HashMap, Result, ScalarValue, assert_or_internal_err};
25use datafusion_datasource::ListingTableUrl;
26use datafusion_datasource::PartitionedFile;
27use datafusion_expr::{BinaryExpr, Operator, lit, utils};
28
29use arrow::{
30 array::AsArray,
31 datatypes::{DataType, Field},
32 record_batch::RecordBatch,
33};
34use datafusion_expr::execution_props::ExecutionProps;
35use futures::stream::FuturesUnordered;
36use futures::{StreamExt, TryStreamExt, stream::BoxStream};
37use log::{debug, trace};
38
39use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
40use datafusion_common::{Column, DFSchema};
41use datafusion_expr::{Expr, Volatility};
42use datafusion_physical_expr::create_physical_expr;
43use object_store::path::Path;
44use object_store::{ObjectMeta, ObjectStore};
45
46pub fn expr_applicable_for_cols(col_names: &[&str], expr: &Expr) -> bool {
52 let mut is_applicable = true;
53 expr.apply(|expr| match expr {
54 Expr::Column(Column { name, .. }) => {
55 is_applicable &= col_names.contains(&name.as_str());
56 if is_applicable {
57 Ok(TreeNodeRecursion::Jump)
58 } else {
59 Ok(TreeNodeRecursion::Stop)
60 }
61 }
62 Expr::Literal(_, _)
63 | Expr::Alias(_)
64 | Expr::OuterReferenceColumn(_, _)
65 | Expr::ScalarVariable(_, _)
66 | Expr::Not(_)
67 | Expr::IsNotNull(_)
68 | Expr::IsNull(_)
69 | Expr::IsTrue(_)
70 | Expr::IsFalse(_)
71 | Expr::IsUnknown(_)
72 | Expr::IsNotTrue(_)
73 | Expr::IsNotFalse(_)
74 | Expr::IsNotUnknown(_)
75 | Expr::Negative(_)
76 | Expr::Cast(_)
77 | Expr::TryCast(_)
78 | Expr::BinaryExpr(_)
79 | Expr::Between(_)
80 | Expr::Like(_)
81 | Expr::SimilarTo(_)
82 | Expr::InList(_)
83 | Expr::Exists(_)
84 | Expr::InSubquery(_)
85 | Expr::ScalarSubquery(_)
86 | Expr::GroupingSet(_)
87 | Expr::Case(_) => Ok(TreeNodeRecursion::Continue),
88
89 Expr::ScalarFunction(scalar_function) => {
90 match scalar_function.func.signature().volatility {
91 Volatility::Immutable => Ok(TreeNodeRecursion::Continue),
92 Volatility::Stable | Volatility::Volatile => {
94 is_applicable = false;
95 Ok(TreeNodeRecursion::Stop)
96 }
97 }
98 }
99
100 #[expect(deprecated)]
106 Expr::AggregateFunction { .. }
107 | Expr::WindowFunction { .. }
108 | Expr::Wildcard { .. }
109 | Expr::Unnest { .. }
110 | Expr::Placeholder(_) => {
111 is_applicable = false;
112 Ok(TreeNodeRecursion::Stop)
113 }
114 })
115 .unwrap();
116 is_applicable
117}
118
119const CONCURRENCY_LIMIT: usize = 100;
121
122#[deprecated(since = "47.0.0", note = "use `FileGroup::split_files` instead")]
124pub fn split_files(
125 mut partitioned_files: Vec<PartitionedFile>,
126 n: usize,
127) -> Vec<Vec<PartitionedFile>> {
128 if partitioned_files.is_empty() {
129 return vec![];
130 }
131
132 partitioned_files.sort_by(|a, b| a.path().cmp(b.path()));
136
137 let chunk_size = partitioned_files.len().div_ceil(n);
139 let mut chunks = Vec::with_capacity(n);
140 let mut current_chunk = Vec::with_capacity(chunk_size);
141 for file in partitioned_files.drain(..) {
142 current_chunk.push(file);
143 if current_chunk.len() == chunk_size {
144 let full_chunk =
145 mem::replace(&mut current_chunk, Vec::with_capacity(chunk_size));
146 chunks.push(full_chunk);
147 }
148 }
149
150 if !current_chunk.is_empty() {
151 chunks.push(current_chunk)
152 }
153
154 chunks
155}
156
157#[derive(Debug)]
158pub struct Partition {
159 path: Path,
161 depth: usize,
164 files: Option<Vec<ObjectMeta>>,
166}
167
168impl Partition {
169 async fn list(mut self, store: &dyn ObjectStore) -> Result<(Self, Vec<Path>)> {
172 trace!("Listing partition {}", self.path);
173 let prefix = Some(&self.path).filter(|p| !p.as_ref().is_empty());
174 let result = store.list_with_delimiter(prefix).await?;
175 self.files = Some(
176 result
177 .objects
178 .into_iter()
179 .filter(|object_meta| object_meta.size > 0)
180 .collect(),
181 );
182 Ok((self, result.common_prefixes))
183 }
184}
185
186pub async fn list_partitions(
188 store: &dyn ObjectStore,
189 table_path: &ListingTableUrl,
190 max_depth: usize,
191 partition_prefix: Option<Path>,
192) -> Result<Vec<Partition>> {
193 let partition = Partition {
194 path: match partition_prefix {
195 Some(prefix) => Path::from_iter(
196 Path::from(table_path.prefix().as_ref())
197 .parts()
198 .chain(Path::from(prefix.as_ref()).parts()),
199 ),
200 None => table_path.prefix().clone(),
201 },
202 depth: 0,
203 files: None,
204 };
205
206 let mut out = Vec::with_capacity(64);
207
208 let mut pending = vec![];
209 let mut futures = FuturesUnordered::new();
210 futures.push(partition.list(store));
211
212 while let Some((partition, paths)) = futures.next().await.transpose()? {
213 if let Some(next) = pending.pop() {
217 futures.push(next)
218 }
219
220 let depth = partition.depth;
221 out.push(partition);
222 for path in paths {
223 let child = Partition {
224 path,
225 depth: depth + 1,
226 files: None,
227 };
228 match depth < max_depth {
229 true => match futures.len() < CONCURRENCY_LIMIT {
230 true => futures.push(child.list(store)),
231 false => pending.push(child.list(store)),
232 },
233 false => out.push(child),
234 }
235 }
236 }
237 Ok(out)
238}
239
240#[derive(Debug)]
241enum PartitionValue {
242 Single(String),
243 Multi,
244}
245
246fn populate_partition_values<'a>(
247 partition_values: &mut HashMap<&'a str, PartitionValue>,
248 filter: &'a Expr,
249) {
250 if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = filter {
251 match op {
252 Operator::Eq => match (left.as_ref(), right.as_ref()) {
253 (Expr::Column(Column { name, .. }), Expr::Literal(val, _))
254 | (Expr::Literal(val, _), Expr::Column(Column { name, .. })) => {
255 if partition_values
256 .insert(name, PartitionValue::Single(val.to_string()))
257 .is_some()
258 {
259 partition_values.insert(name, PartitionValue::Multi);
260 }
261 }
262 _ => {}
263 },
264 Operator::And => {
265 populate_partition_values(partition_values, left);
266 populate_partition_values(partition_values, right);
267 }
268 _ => {}
269 }
270 }
271}
272
273pub fn evaluate_partition_prefix<'a>(
274 partition_cols: &'a [(String, DataType)],
275 filters: &'a [Expr],
276) -> Option<Path> {
277 let mut partition_values = HashMap::new();
278 for filter in filters {
279 populate_partition_values(&mut partition_values, filter);
280 }
281
282 if partition_values.is_empty() {
283 return None;
284 }
285
286 let mut parts = vec![];
287 for (p, _) in partition_cols {
288 match partition_values.get(p.as_str()) {
289 Some(PartitionValue::Single(val)) => {
290 parts.push(format!("{p}={val}"));
293 }
294 _ => {
295 break;
298 }
299 }
300 }
301
302 if parts.is_empty() {
303 None
304 } else {
305 Some(Path::from_iter(parts))
306 }
307}
308
309fn filter_partitions(
310 pf: PartitionedFile,
311 filters: &[Expr],
312 df_schema: &DFSchema,
313) -> Result<Option<PartitionedFile>> {
314 if pf.partition_values.is_empty() && !filters.is_empty() {
315 return Ok(None);
316 } else if filters.is_empty() {
317 return Ok(Some(pf));
318 }
319
320 let arrays = pf
321 .partition_values
322 .iter()
323 .map(|v| v.to_array())
324 .collect::<Result<_, _>>()?;
325
326 let batch = RecordBatch::try_new(Arc::clone(df_schema.inner()), arrays)?;
327
328 let filter = utils::conjunction(filters.iter().cloned()).unwrap_or_else(|| lit(true));
329 let props = ExecutionProps::new();
330 let expr = create_physical_expr(&filter, df_schema, &props)?;
331
332 let matches = expr.evaluate(&batch)?.into_array(1)?;
335 if matches.as_boolean().value(0) {
336 return Ok(Some(pf));
337 }
338
339 Ok(None)
340}
341
342fn try_into_partitioned_file(
343 object_meta: ObjectMeta,
344 partition_cols: &[(String, DataType)],
345 table_path: &ListingTableUrl,
346) -> Result<PartitionedFile> {
347 let cols = partition_cols.iter().map(|(name, _)| name.as_str());
348 let parsed = parse_partitions_for_path(table_path, &object_meta.location, cols);
349
350 let partition_values = parsed
351 .into_iter()
352 .flatten()
353 .zip(partition_cols)
354 .map(|(parsed, (_, datatype))| {
355 ScalarValue::try_from_string(parsed.to_string(), datatype)
356 })
357 .collect::<Result<Vec<_>>>()?;
358
359 let mut pf: PartitionedFile = object_meta.into();
360 pf.partition_values = partition_values;
361
362 Ok(pf)
363}
364
365pub async fn pruned_partition_list<'a>(
370 ctx: &'a dyn Session,
371 store: &'a dyn ObjectStore,
372 table_path: &'a ListingTableUrl,
373 filters: &'a [Expr],
374 file_extension: &'a str,
375 partition_cols: &'a [(String, DataType)],
376) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
377 let prefix = if !partition_cols.is_empty() {
378 evaluate_partition_prefix(partition_cols, filters)
379 } else {
380 None
381 };
382
383 let objects = table_path
384 .list_prefixed_files(ctx, store, prefix, file_extension)
385 .await?
386 .try_filter(|object_meta| futures::future::ready(object_meta.size > 0));
387
388 if partition_cols.is_empty() {
389 assert_or_internal_err!(
390 filters.is_empty(),
391 "Got partition filters for unpartitioned table {}",
392 table_path
393 );
394
395 Ok(objects.map_ok(|object_meta| object_meta.into()).boxed())
397 } else {
398 let df_schema = DFSchema::from_unqualified_fields(
399 partition_cols
400 .iter()
401 .map(|(n, d)| Field::new(n, d.clone(), true))
402 .collect(),
403 Default::default(),
404 )?;
405
406 Ok(objects
407 .map_ok(|object_meta| {
408 try_into_partitioned_file(object_meta, partition_cols, table_path)
409 })
410 .try_filter_map(move |pf| {
411 futures::future::ready(
412 pf.and_then(|pf| filter_partitions(pf, filters, &df_schema)),
413 )
414 })
415 .boxed())
416 }
417}
418
419pub fn parse_partitions_for_path<'a, I>(
422 table_path: &ListingTableUrl,
423 file_path: &'a Path,
424 table_partition_cols: I,
425) -> Option<Vec<&'a str>>
426where
427 I: IntoIterator<Item = &'a str>,
428{
429 let subpath = table_path.strip_prefix(file_path)?;
430
431 let mut part_values = vec![];
432 for (part, expected_partition) in subpath.zip(table_partition_cols) {
433 match part.split_once('=') {
434 Some((name, val)) if name == expected_partition => part_values.push(val),
435 _ => {
436 debug!(
437 "Ignoring file: file_path='{file_path}', table_path='{table_path}', part='{part}', partition_col='{expected_partition}'",
438 );
439 return None;
440 }
441 }
442 }
443 Some(part_values)
444}
445pub fn describe_partition(partition: &Partition) -> (&str, usize, Vec<&str>) {
447 (
448 partition.path.as_ref(),
449 partition.depth,
450 partition
451 .files
452 .as_ref()
453 .map(|f| f.iter().map(|f| f.location.filename().unwrap()).collect())
454 .unwrap_or_default(),
455 )
456}
457
458#[cfg(test)]
459mod tests {
460 use datafusion_datasource::file_groups::FileGroup;
461 use std::ops::Not;
462
463 use super::*;
464 use datafusion_expr::{Expr, case, col, lit};
465
466 #[test]
467 fn test_split_files() {
468 let new_partitioned_file = |path: &str| PartitionedFile::new(path.to_owned(), 10);
469 let files = FileGroup::new(vec![
470 new_partitioned_file("a"),
471 new_partitioned_file("b"),
472 new_partitioned_file("c"),
473 new_partitioned_file("d"),
474 new_partitioned_file("e"),
475 ]);
476
477 let chunks = files.clone().split_files(1);
478 assert_eq!(1, chunks.len());
479 assert_eq!(5, chunks[0].len());
480
481 let chunks = files.clone().split_files(2);
482 assert_eq!(2, chunks.len());
483 assert_eq!(3, chunks[0].len());
484 assert_eq!(2, chunks[1].len());
485
486 let chunks = files.clone().split_files(5);
487 assert_eq!(5, chunks.len());
488 assert_eq!(1, chunks[0].len());
489 assert_eq!(1, chunks[1].len());
490 assert_eq!(1, chunks[2].len());
491 assert_eq!(1, chunks[3].len());
492 assert_eq!(1, chunks[4].len());
493
494 let chunks = files.clone().split_files(123);
495 assert_eq!(5, chunks.len());
496 assert_eq!(1, chunks[0].len());
497 assert_eq!(1, chunks[1].len());
498 assert_eq!(1, chunks[2].len());
499 assert_eq!(1, chunks[3].len());
500 assert_eq!(1, chunks[4].len());
501
502 let empty_group = FileGroup::default();
503 let chunks = empty_group.split_files(2);
504 assert_eq!(0, chunks.len());
505 }
506
507 #[test]
508 fn test_parse_partitions_for_path() {
509 assert_eq!(
510 Some(vec![]),
511 parse_partitions_for_path(
512 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
513 &Path::from("bucket/mytable/file.csv"),
514 vec![]
515 )
516 );
517 assert_eq!(
518 None,
519 parse_partitions_for_path(
520 &ListingTableUrl::parse("file:///bucket/othertable").unwrap(),
521 &Path::from("bucket/mytable/file.csv"),
522 vec![]
523 )
524 );
525 assert_eq!(
526 None,
527 parse_partitions_for_path(
528 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
529 &Path::from("bucket/mytable/file.csv"),
530 vec!["mypartition"]
531 )
532 );
533 assert_eq!(
534 Some(vec!["v1"]),
535 parse_partitions_for_path(
536 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
537 &Path::from("bucket/mytable/mypartition=v1/file.csv"),
538 vec!["mypartition"]
539 )
540 );
541 assert_eq!(
542 Some(vec!["v1"]),
543 parse_partitions_for_path(
544 &ListingTableUrl::parse("file:///bucket/mytable/").unwrap(),
545 &Path::from("bucket/mytable/mypartition=v1/file.csv"),
546 vec!["mypartition"]
547 )
548 );
549 assert_eq!(
551 None,
552 parse_partitions_for_path(
553 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
554 &Path::from("bucket/mytable/v1/file.csv"),
555 vec!["mypartition"]
556 )
557 );
558 assert_eq!(
559 Some(vec!["v1", "v2"]),
560 parse_partitions_for_path(
561 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
562 &Path::from("bucket/mytable/mypartition=v1/otherpartition=v2/file.csv"),
563 vec!["mypartition", "otherpartition"]
564 )
565 );
566 assert_eq!(
567 Some(vec!["v1"]),
568 parse_partitions_for_path(
569 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
570 &Path::from("bucket/mytable/mypartition=v1/otherpartition=v2/file.csv"),
571 vec!["mypartition"]
572 )
573 );
574 }
575
576 #[test]
577 fn test_expr_applicable_for_cols() {
578 assert!(expr_applicable_for_cols(
579 &["c1"],
580 &Expr::eq(col("c1"), lit("value"))
581 ));
582 assert!(!expr_applicable_for_cols(
583 &["c1"],
584 &Expr::eq(col("c2"), lit("value"))
585 ));
586 assert!(!expr_applicable_for_cols(
587 &["c1"],
588 &Expr::eq(col("c1"), col("c2"))
589 ));
590 assert!(expr_applicable_for_cols(
591 &["c1", "c2"],
592 &Expr::eq(col("c1"), col("c2"))
593 ));
594 assert!(expr_applicable_for_cols(
595 &["c1", "c2"],
596 &(Expr::eq(col("c1"), col("c2").alias("c2_alias"))).not()
597 ));
598 assert!(expr_applicable_for_cols(
599 &["c1", "c2"],
600 &(case(col("c1"))
601 .when(lit("v1"), lit(true))
602 .otherwise(lit(false))
603 .expect("valid case expr"))
604 ));
605 assert!(expr_applicable_for_cols(&[], &lit(true)));
609 }
610
611 #[test]
612 fn test_evaluate_partition_prefix() {
613 let partitions = &[
614 ("a".to_string(), DataType::Utf8),
615 ("b".to_string(), DataType::Int16),
616 ("c".to_string(), DataType::Boolean),
617 ];
618
619 assert_eq!(
620 evaluate_partition_prefix(partitions, &[col("a").eq(lit("foo"))]),
621 Some(Path::from("a=foo")),
622 );
623
624 assert_eq!(
625 evaluate_partition_prefix(partitions, &[lit("foo").eq(col("a"))]),
626 Some(Path::from("a=foo")),
627 );
628
629 assert_eq!(
630 evaluate_partition_prefix(
631 partitions,
632 &[col("a").eq(lit("foo")).and(col("b").eq(lit("bar")))],
633 ),
634 Some(Path::from("a=foo/b=bar")),
635 );
636
637 assert_eq!(
638 evaluate_partition_prefix(
639 partitions,
640 &[col("a").eq(lit("foo")), col("b").eq(lit("bar")),],
642 ),
643 Some(Path::from("a=foo/b=bar")),
644 );
645
646 assert_eq!(
647 evaluate_partition_prefix(
648 partitions,
649 &[col("a")
650 .eq(lit("foo"))
651 .and(col("b").eq(lit("1")))
652 .and(col("c").eq(lit("true")))],
653 ),
654 Some(Path::from("a=foo/b=1/c=true")),
655 );
656
657 assert_eq!(evaluate_partition_prefix(partitions, &[]), None);
659
660 assert_eq!(
662 evaluate_partition_prefix(partitions, &[Expr::eq(col("b"), lit("foo"))]),
663 None,
664 );
665
666 assert_eq!(
668 evaluate_partition_prefix(
669 partitions,
670 &[col("a").eq(lit("foo")).and(col("c").eq(lit("baz")))],
671 ),
672 Some(Path::from("a=foo")),
673 );
674
675 assert_eq!(
677 evaluate_partition_prefix(
678 partitions,
679 &[Expr::and(col("a").eq(lit("foo")), col("a").eq(lit("bar")))],
680 ),
681 None,
682 );
683
684 assert_eq!(
686 evaluate_partition_prefix(
687 partitions,
688 &[Expr::or(col("a").eq(lit("foo")), col("a").eq(lit("bar")))],
689 ),
690 None,
691 );
692 assert_eq!(
693 evaluate_partition_prefix(partitions, &[col("b").lt(lit(5))],),
694 None,
695 );
696 }
697
698 #[test]
699 fn test_evaluate_date_partition_prefix() {
700 let partitions = &[("a".to_string(), DataType::Date32)];
701 assert_eq!(
702 evaluate_partition_prefix(
703 partitions,
704 &[col("a").eq(Expr::Literal(ScalarValue::Date32(Some(3)), None))],
705 ),
706 Some(Path::from("a=1970-01-04")),
707 );
708
709 let partitions = &[("a".to_string(), DataType::Date64)];
710 assert_eq!(
711 evaluate_partition_prefix(
712 partitions,
713 &[col("a").eq(Expr::Literal(
714 ScalarValue::Date64(Some(4 * 24 * 60 * 60 * 1000)),
715 None
716 )),],
717 ),
718 Some(Path::from("a=1970-01-05")),
719 );
720 }
721}