datafusion_catalog_listing/
helpers.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Helper functions for the table implementation
19
20use std::mem;
21use std::sync::Arc;
22
23use datafusion_catalog::Session;
24use datafusion_common::{HashMap, Result, ScalarValue, assert_or_internal_err};
25use datafusion_datasource::ListingTableUrl;
26use datafusion_datasource::PartitionedFile;
27use datafusion_expr::{BinaryExpr, Operator, lit, utils};
28
29use arrow::{
30    array::AsArray,
31    datatypes::{DataType, Field},
32    record_batch::RecordBatch,
33};
34use datafusion_expr::execution_props::ExecutionProps;
35use futures::stream::FuturesUnordered;
36use futures::{StreamExt, TryStreamExt, stream::BoxStream};
37use log::{debug, trace};
38
39use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
40use datafusion_common::{Column, DFSchema};
41use datafusion_expr::{Expr, Volatility};
42use datafusion_physical_expr::create_physical_expr;
43use object_store::path::Path;
44use object_store::{ObjectMeta, ObjectStore};
45
46/// Check whether the given expression can be resolved using only the columns `col_names`.
47/// This means that if this function returns true:
48/// - the table provider can filter the table partition values with this expression
49/// - the expression can be marked as `TableProviderFilterPushDown::Exact` once this filtering
50///   was performed
51pub fn expr_applicable_for_cols(col_names: &[&str], expr: &Expr) -> bool {
52    let mut is_applicable = true;
53    expr.apply(|expr| match expr {
54        Expr::Column(Column { name, .. }) => {
55            is_applicable &= col_names.contains(&name.as_str());
56            if is_applicable {
57                Ok(TreeNodeRecursion::Jump)
58            } else {
59                Ok(TreeNodeRecursion::Stop)
60            }
61        }
62        Expr::Literal(_, _)
63        | Expr::Alias(_)
64        | Expr::OuterReferenceColumn(_, _)
65        | Expr::ScalarVariable(_, _)
66        | Expr::Not(_)
67        | Expr::IsNotNull(_)
68        | Expr::IsNull(_)
69        | Expr::IsTrue(_)
70        | Expr::IsFalse(_)
71        | Expr::IsUnknown(_)
72        | Expr::IsNotTrue(_)
73        | Expr::IsNotFalse(_)
74        | Expr::IsNotUnknown(_)
75        | Expr::Negative(_)
76        | Expr::Cast(_)
77        | Expr::TryCast(_)
78        | Expr::BinaryExpr(_)
79        | Expr::Between(_)
80        | Expr::Like(_)
81        | Expr::SimilarTo(_)
82        | Expr::InList(_)
83        | Expr::Exists(_)
84        | Expr::InSubquery(_)
85        | Expr::ScalarSubquery(_)
86        | Expr::GroupingSet(_)
87        | Expr::Case(_) => Ok(TreeNodeRecursion::Continue),
88
89        Expr::ScalarFunction(scalar_function) => {
90            match scalar_function.func.signature().volatility {
91                Volatility::Immutable => Ok(TreeNodeRecursion::Continue),
92                // TODO: Stable functions could be `applicable`, but that would require access to the context
93                Volatility::Stable | Volatility::Volatile => {
94                    is_applicable = false;
95                    Ok(TreeNodeRecursion::Stop)
96                }
97            }
98        }
99
100        // TODO other expressions are not handled yet:
101        // - AGGREGATE and WINDOW should not end up in filter conditions, except maybe in some edge cases
102        // - Can `Wildcard` be considered as a `Literal`?
103        // - ScalarVariable could be `applicable`, but that would require access to the context
104        // TODO: remove the next line after `Expr::Wildcard` is removed
105        #[expect(deprecated)]
106        Expr::AggregateFunction { .. }
107        | Expr::WindowFunction { .. }
108        | Expr::Wildcard { .. }
109        | Expr::Unnest { .. }
110        | Expr::Placeholder(_) => {
111            is_applicable = false;
112            Ok(TreeNodeRecursion::Stop)
113        }
114    })
115    .unwrap();
116    is_applicable
117}
118
119/// The maximum number of concurrent listing requests
120const CONCURRENCY_LIMIT: usize = 100;
121
122/// Partition the list of files into `n` groups
123#[deprecated(since = "47.0.0", note = "use `FileGroup::split_files` instead")]
124pub fn split_files(
125    mut partitioned_files: Vec<PartitionedFile>,
126    n: usize,
127) -> Vec<Vec<PartitionedFile>> {
128    if partitioned_files.is_empty() {
129        return vec![];
130    }
131
132    // ObjectStore::list does not guarantee any consistent order and for some
133    // implementations such as LocalFileSystem, it may be inconsistent. Thus
134    // Sort files by path to ensure consistent plans when run more than once.
135    partitioned_files.sort_by(|a, b| a.path().cmp(b.path()));
136
137    // effectively this is div with rounding up instead of truncating
138    let chunk_size = partitioned_files.len().div_ceil(n);
139    let mut chunks = Vec::with_capacity(n);
140    let mut current_chunk = Vec::with_capacity(chunk_size);
141    for file in partitioned_files.drain(..) {
142        current_chunk.push(file);
143        if current_chunk.len() == chunk_size {
144            let full_chunk =
145                mem::replace(&mut current_chunk, Vec::with_capacity(chunk_size));
146            chunks.push(full_chunk);
147        }
148    }
149
150    if !current_chunk.is_empty() {
151        chunks.push(current_chunk)
152    }
153
154    chunks
155}
156
157#[derive(Debug)]
158pub struct Partition {
159    /// The path to the partition, including the table prefix
160    path: Path,
161    /// How many path segments below the table prefix `path` contains
162    /// or equivalently the number of partition values in `path`
163    depth: usize,
164    /// The files contained as direct children of this `Partition` if known
165    files: Option<Vec<ObjectMeta>>,
166}
167
168impl Partition {
169    /// List the direct children of this partition updating `self.files` with
170    /// any child files, and returning a list of child "directories"
171    async fn list(mut self, store: &dyn ObjectStore) -> Result<(Self, Vec<Path>)> {
172        trace!("Listing partition {}", self.path);
173        let prefix = Some(&self.path).filter(|p| !p.as_ref().is_empty());
174        let result = store.list_with_delimiter(prefix).await?;
175        self.files = Some(
176            result
177                .objects
178                .into_iter()
179                .filter(|object_meta| object_meta.size > 0)
180                .collect(),
181        );
182        Ok((self, result.common_prefixes))
183    }
184}
185
186/// Returns a recursive list of the partitions in `table_path` up to `max_depth`
187pub async fn list_partitions(
188    store: &dyn ObjectStore,
189    table_path: &ListingTableUrl,
190    max_depth: usize,
191    partition_prefix: Option<Path>,
192) -> Result<Vec<Partition>> {
193    let partition = Partition {
194        path: match partition_prefix {
195            Some(prefix) => Path::from_iter(
196                Path::from(table_path.prefix().as_ref())
197                    .parts()
198                    .chain(Path::from(prefix.as_ref()).parts()),
199            ),
200            None => table_path.prefix().clone(),
201        },
202        depth: 0,
203        files: None,
204    };
205
206    let mut out = Vec::with_capacity(64);
207
208    let mut pending = vec![];
209    let mut futures = FuturesUnordered::new();
210    futures.push(partition.list(store));
211
212    while let Some((partition, paths)) = futures.next().await.transpose()? {
213        // If pending contains a future it implies prior to this iteration
214        // `futures.len == CONCURRENCY_LIMIT`. We can therefore add a single
215        // future from `pending` to the working set
216        if let Some(next) = pending.pop() {
217            futures.push(next)
218        }
219
220        let depth = partition.depth;
221        out.push(partition);
222        for path in paths {
223            let child = Partition {
224                path,
225                depth: depth + 1,
226                files: None,
227            };
228            match depth < max_depth {
229                true => match futures.len() < CONCURRENCY_LIMIT {
230                    true => futures.push(child.list(store)),
231                    false => pending.push(child.list(store)),
232                },
233                false => out.push(child),
234            }
235        }
236    }
237    Ok(out)
238}
239
240#[derive(Debug)]
241enum PartitionValue {
242    Single(String),
243    Multi,
244}
245
246fn populate_partition_values<'a>(
247    partition_values: &mut HashMap<&'a str, PartitionValue>,
248    filter: &'a Expr,
249) {
250    if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = filter {
251        match op {
252            Operator::Eq => match (left.as_ref(), right.as_ref()) {
253                (Expr::Column(Column { name, .. }), Expr::Literal(val, _))
254                | (Expr::Literal(val, _), Expr::Column(Column { name, .. })) => {
255                    if partition_values
256                        .insert(name, PartitionValue::Single(val.to_string()))
257                        .is_some()
258                    {
259                        partition_values.insert(name, PartitionValue::Multi);
260                    }
261                }
262                _ => {}
263            },
264            Operator::And => {
265                populate_partition_values(partition_values, left);
266                populate_partition_values(partition_values, right);
267            }
268            _ => {}
269        }
270    }
271}
272
273pub fn evaluate_partition_prefix<'a>(
274    partition_cols: &'a [(String, DataType)],
275    filters: &'a [Expr],
276) -> Option<Path> {
277    let mut partition_values = HashMap::new();
278    for filter in filters {
279        populate_partition_values(&mut partition_values, filter);
280    }
281
282    if partition_values.is_empty() {
283        return None;
284    }
285
286    let mut parts = vec![];
287    for (p, _) in partition_cols {
288        match partition_values.get(p.as_str()) {
289            Some(PartitionValue::Single(val)) => {
290                // if a partition only has a single literal value, then it can be added to the
291                // prefix
292                parts.push(format!("{p}={val}"));
293            }
294            _ => {
295                // break on the first unconstrainted partition to create a common prefix
296                // for all covered partitions.
297                break;
298            }
299        }
300    }
301
302    if parts.is_empty() {
303        None
304    } else {
305        Some(Path::from_iter(parts))
306    }
307}
308
309fn filter_partitions(
310    pf: PartitionedFile,
311    filters: &[Expr],
312    df_schema: &DFSchema,
313) -> Result<Option<PartitionedFile>> {
314    if pf.partition_values.is_empty() && !filters.is_empty() {
315        return Ok(None);
316    } else if filters.is_empty() {
317        return Ok(Some(pf));
318    }
319
320    let arrays = pf
321        .partition_values
322        .iter()
323        .map(|v| v.to_array())
324        .collect::<Result<_, _>>()?;
325
326    let batch = RecordBatch::try_new(Arc::clone(df_schema.inner()), arrays)?;
327
328    let filter = utils::conjunction(filters.iter().cloned()).unwrap_or_else(|| lit(true));
329    let props = ExecutionProps::new();
330    let expr = create_physical_expr(&filter, df_schema, &props)?;
331
332    // Since we're only operating on a single file, our batch and resulting "array" holds only one
333    // value indicating if the input file matches the provided filters
334    let matches = expr.evaluate(&batch)?.into_array(1)?;
335    if matches.as_boolean().value(0) {
336        return Ok(Some(pf));
337    }
338
339    Ok(None)
340}
341
342fn try_into_partitioned_file(
343    object_meta: ObjectMeta,
344    partition_cols: &[(String, DataType)],
345    table_path: &ListingTableUrl,
346) -> Result<PartitionedFile> {
347    let cols = partition_cols.iter().map(|(name, _)| name.as_str());
348    let parsed = parse_partitions_for_path(table_path, &object_meta.location, cols);
349
350    let partition_values = parsed
351        .into_iter()
352        .flatten()
353        .zip(partition_cols)
354        .map(|(parsed, (_, datatype))| {
355            ScalarValue::try_from_string(parsed.to_string(), datatype)
356        })
357        .collect::<Result<Vec<_>>>()?;
358
359    let mut pf: PartitionedFile = object_meta.into();
360    pf.partition_values = partition_values;
361
362    Ok(pf)
363}
364
365/// Discover the partitions on the given path and prune out files
366/// that belong to irrelevant partitions using `filters` expressions.
367/// `filters` should only contain expressions that can be evaluated
368/// using only the partition columns.
369pub async fn pruned_partition_list<'a>(
370    ctx: &'a dyn Session,
371    store: &'a dyn ObjectStore,
372    table_path: &'a ListingTableUrl,
373    filters: &'a [Expr],
374    file_extension: &'a str,
375    partition_cols: &'a [(String, DataType)],
376) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
377    let prefix = if !partition_cols.is_empty() {
378        evaluate_partition_prefix(partition_cols, filters)
379    } else {
380        None
381    };
382
383    let objects = table_path
384        .list_prefixed_files(ctx, store, prefix, file_extension)
385        .await?
386        .try_filter(|object_meta| futures::future::ready(object_meta.size > 0));
387
388    if partition_cols.is_empty() {
389        assert_or_internal_err!(
390            filters.is_empty(),
391            "Got partition filters for unpartitioned table {}",
392            table_path
393        );
394
395        // if no partition col => simply list all the files
396        Ok(objects.map_ok(|object_meta| object_meta.into()).boxed())
397    } else {
398        let df_schema = DFSchema::from_unqualified_fields(
399            partition_cols
400                .iter()
401                .map(|(n, d)| Field::new(n, d.clone(), true))
402                .collect(),
403            Default::default(),
404        )?;
405
406        Ok(objects
407            .map_ok(|object_meta| {
408                try_into_partitioned_file(object_meta, partition_cols, table_path)
409            })
410            .try_filter_map(move |pf| {
411                futures::future::ready(
412                    pf.and_then(|pf| filter_partitions(pf, filters, &df_schema)),
413                )
414            })
415            .boxed())
416    }
417}
418
419/// Extract the partition values for the given `file_path` (in the given `table_path`)
420/// associated to the partitions defined by `table_partition_cols`
421pub fn parse_partitions_for_path<'a, I>(
422    table_path: &ListingTableUrl,
423    file_path: &'a Path,
424    table_partition_cols: I,
425) -> Option<Vec<&'a str>>
426where
427    I: IntoIterator<Item = &'a str>,
428{
429    let subpath = table_path.strip_prefix(file_path)?;
430
431    let mut part_values = vec![];
432    for (part, expected_partition) in subpath.zip(table_partition_cols) {
433        match part.split_once('=') {
434            Some((name, val)) if name == expected_partition => part_values.push(val),
435            _ => {
436                debug!(
437                    "Ignoring file: file_path='{file_path}', table_path='{table_path}', part='{part}', partition_col='{expected_partition}'",
438                );
439                return None;
440            }
441        }
442    }
443    Some(part_values)
444}
445/// Describe a partition as a (path, depth, files) tuple for easier assertions
446pub fn describe_partition(partition: &Partition) -> (&str, usize, Vec<&str>) {
447    (
448        partition.path.as_ref(),
449        partition.depth,
450        partition
451            .files
452            .as_ref()
453            .map(|f| f.iter().map(|f| f.location.filename().unwrap()).collect())
454            .unwrap_or_default(),
455    )
456}
457
458#[cfg(test)]
459mod tests {
460    use datafusion_datasource::file_groups::FileGroup;
461    use std::ops::Not;
462
463    use super::*;
464    use datafusion_expr::{Expr, case, col, lit};
465
466    #[test]
467    fn test_split_files() {
468        let new_partitioned_file = |path: &str| PartitionedFile::new(path.to_owned(), 10);
469        let files = FileGroup::new(vec![
470            new_partitioned_file("a"),
471            new_partitioned_file("b"),
472            new_partitioned_file("c"),
473            new_partitioned_file("d"),
474            new_partitioned_file("e"),
475        ]);
476
477        let chunks = files.clone().split_files(1);
478        assert_eq!(1, chunks.len());
479        assert_eq!(5, chunks[0].len());
480
481        let chunks = files.clone().split_files(2);
482        assert_eq!(2, chunks.len());
483        assert_eq!(3, chunks[0].len());
484        assert_eq!(2, chunks[1].len());
485
486        let chunks = files.clone().split_files(5);
487        assert_eq!(5, chunks.len());
488        assert_eq!(1, chunks[0].len());
489        assert_eq!(1, chunks[1].len());
490        assert_eq!(1, chunks[2].len());
491        assert_eq!(1, chunks[3].len());
492        assert_eq!(1, chunks[4].len());
493
494        let chunks = files.clone().split_files(123);
495        assert_eq!(5, chunks.len());
496        assert_eq!(1, chunks[0].len());
497        assert_eq!(1, chunks[1].len());
498        assert_eq!(1, chunks[2].len());
499        assert_eq!(1, chunks[3].len());
500        assert_eq!(1, chunks[4].len());
501
502        let empty_group = FileGroup::default();
503        let chunks = empty_group.split_files(2);
504        assert_eq!(0, chunks.len());
505    }
506
507    #[test]
508    fn test_parse_partitions_for_path() {
509        assert_eq!(
510            Some(vec![]),
511            parse_partitions_for_path(
512                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
513                &Path::from("bucket/mytable/file.csv"),
514                vec![]
515            )
516        );
517        assert_eq!(
518            None,
519            parse_partitions_for_path(
520                &ListingTableUrl::parse("file:///bucket/othertable").unwrap(),
521                &Path::from("bucket/mytable/file.csv"),
522                vec![]
523            )
524        );
525        assert_eq!(
526            None,
527            parse_partitions_for_path(
528                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
529                &Path::from("bucket/mytable/file.csv"),
530                vec!["mypartition"]
531            )
532        );
533        assert_eq!(
534            Some(vec!["v1"]),
535            parse_partitions_for_path(
536                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
537                &Path::from("bucket/mytable/mypartition=v1/file.csv"),
538                vec!["mypartition"]
539            )
540        );
541        assert_eq!(
542            Some(vec!["v1"]),
543            parse_partitions_for_path(
544                &ListingTableUrl::parse("file:///bucket/mytable/").unwrap(),
545                &Path::from("bucket/mytable/mypartition=v1/file.csv"),
546                vec!["mypartition"]
547            )
548        );
549        // Only hive style partitioning supported for now:
550        assert_eq!(
551            None,
552            parse_partitions_for_path(
553                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
554                &Path::from("bucket/mytable/v1/file.csv"),
555                vec!["mypartition"]
556            )
557        );
558        assert_eq!(
559            Some(vec!["v1", "v2"]),
560            parse_partitions_for_path(
561                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
562                &Path::from("bucket/mytable/mypartition=v1/otherpartition=v2/file.csv"),
563                vec!["mypartition", "otherpartition"]
564            )
565        );
566        assert_eq!(
567            Some(vec!["v1"]),
568            parse_partitions_for_path(
569                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
570                &Path::from("bucket/mytable/mypartition=v1/otherpartition=v2/file.csv"),
571                vec!["mypartition"]
572            )
573        );
574    }
575
576    #[test]
577    fn test_expr_applicable_for_cols() {
578        assert!(expr_applicable_for_cols(
579            &["c1"],
580            &Expr::eq(col("c1"), lit("value"))
581        ));
582        assert!(!expr_applicable_for_cols(
583            &["c1"],
584            &Expr::eq(col("c2"), lit("value"))
585        ));
586        assert!(!expr_applicable_for_cols(
587            &["c1"],
588            &Expr::eq(col("c1"), col("c2"))
589        ));
590        assert!(expr_applicable_for_cols(
591            &["c1", "c2"],
592            &Expr::eq(col("c1"), col("c2"))
593        ));
594        assert!(expr_applicable_for_cols(
595            &["c1", "c2"],
596            &(Expr::eq(col("c1"), col("c2").alias("c2_alias"))).not()
597        ));
598        assert!(expr_applicable_for_cols(
599            &["c1", "c2"],
600            &(case(col("c1"))
601                .when(lit("v1"), lit(true))
602                .otherwise(lit(false))
603                .expect("valid case expr"))
604        ));
605        // static expression not relevant in this context but we
606        // test it as an edge case anyway in case we want to generalize
607        // this helper function
608        assert!(expr_applicable_for_cols(&[], &lit(true)));
609    }
610
611    #[test]
612    fn test_evaluate_partition_prefix() {
613        let partitions = &[
614            ("a".to_string(), DataType::Utf8),
615            ("b".to_string(), DataType::Int16),
616            ("c".to_string(), DataType::Boolean),
617        ];
618
619        assert_eq!(
620            evaluate_partition_prefix(partitions, &[col("a").eq(lit("foo"))]),
621            Some(Path::from("a=foo")),
622        );
623
624        assert_eq!(
625            evaluate_partition_prefix(partitions, &[lit("foo").eq(col("a"))]),
626            Some(Path::from("a=foo")),
627        );
628
629        assert_eq!(
630            evaluate_partition_prefix(
631                partitions,
632                &[col("a").eq(lit("foo")).and(col("b").eq(lit("bar")))],
633            ),
634            Some(Path::from("a=foo/b=bar")),
635        );
636
637        assert_eq!(
638            evaluate_partition_prefix(
639                partitions,
640                // list of filters should be evaluated as AND
641                &[col("a").eq(lit("foo")), col("b").eq(lit("bar")),],
642            ),
643            Some(Path::from("a=foo/b=bar")),
644        );
645
646        assert_eq!(
647            evaluate_partition_prefix(
648                partitions,
649                &[col("a")
650                    .eq(lit("foo"))
651                    .and(col("b").eq(lit("1")))
652                    .and(col("c").eq(lit("true")))],
653            ),
654            Some(Path::from("a=foo/b=1/c=true")),
655        );
656
657        // no prefix when filter is empty
658        assert_eq!(evaluate_partition_prefix(partitions, &[]), None);
659
660        // b=foo results in no prefix because a is not restricted
661        assert_eq!(
662            evaluate_partition_prefix(partitions, &[Expr::eq(col("b"), lit("foo"))]),
663            None,
664        );
665
666        // a=foo and c=baz only results in preifx a=foo because b is not restricted
667        assert_eq!(
668            evaluate_partition_prefix(
669                partitions,
670                &[col("a").eq(lit("foo")).and(col("c").eq(lit("baz")))],
671            ),
672            Some(Path::from("a=foo")),
673        );
674
675        // partition with multiple values results in no prefix
676        assert_eq!(
677            evaluate_partition_prefix(
678                partitions,
679                &[Expr::and(col("a").eq(lit("foo")), col("a").eq(lit("bar")))],
680            ),
681            None,
682        );
683
684        // no prefix because partition a is not restricted to a single literal
685        assert_eq!(
686            evaluate_partition_prefix(
687                partitions,
688                &[Expr::or(col("a").eq(lit("foo")), col("a").eq(lit("bar")))],
689            ),
690            None,
691        );
692        assert_eq!(
693            evaluate_partition_prefix(partitions, &[col("b").lt(lit(5))],),
694            None,
695        );
696    }
697
698    #[test]
699    fn test_evaluate_date_partition_prefix() {
700        let partitions = &[("a".to_string(), DataType::Date32)];
701        assert_eq!(
702            evaluate_partition_prefix(
703                partitions,
704                &[col("a").eq(Expr::Literal(ScalarValue::Date32(Some(3)), None))],
705            ),
706            Some(Path::from("a=1970-01-04")),
707        );
708
709        let partitions = &[("a".to_string(), DataType::Date64)];
710        assert_eq!(
711            evaluate_partition_prefix(
712                partitions,
713                &[col("a").eq(Expr::Literal(
714                    ScalarValue::Date64(Some(4 * 24 * 60 * 60 * 1000)),
715                    None
716                )),],
717            ),
718            Some(Path::from("a=1970-01-05")),
719        );
720    }
721}