Skip to main content

datafusion_catalog_listing/
helpers.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Helper functions for the table implementation
19
20use std::mem;
21use std::sync::Arc;
22
23use datafusion_catalog::Session;
24use datafusion_common::{HashMap, Result, ScalarValue, assert_or_internal_err};
25use datafusion_datasource::ListingTableUrl;
26use datafusion_datasource::PartitionedFile;
27use datafusion_expr::{BinaryExpr, Operator, lit, utils};
28
29use arrow::{
30    array::AsArray,
31    datatypes::{DataType, Field},
32    record_batch::RecordBatch,
33};
34use datafusion_expr::execution_props::ExecutionProps;
35use futures::stream::FuturesUnordered;
36use futures::{StreamExt, TryStreamExt, stream::BoxStream};
37use log::{debug, trace};
38
39use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
40use datafusion_common::{Column, DFSchema};
41use datafusion_expr::{Expr, Volatility};
42use datafusion_physical_expr::create_physical_expr;
43use object_store::path::Path;
44use object_store::{ObjectMeta, ObjectStore};
45
46/// Check whether the given expression can be resolved using only the columns `col_names`.
47/// This means that if this function returns true:
48/// - the table provider can filter the table partition values with this expression
49/// - the expression can be marked as `TableProviderFilterPushDown::Exact` once this filtering
50///   was performed
51pub fn expr_applicable_for_cols(col_names: &[&str], expr: &Expr) -> bool {
52    let mut is_applicable = true;
53    expr.apply(|expr| match expr {
54        Expr::Column(Column { name, .. }) => {
55            is_applicable &= col_names.contains(&name.as_str());
56            if is_applicable {
57                Ok(TreeNodeRecursion::Jump)
58            } else {
59                Ok(TreeNodeRecursion::Stop)
60            }
61        }
62        Expr::Literal(_, _)
63        | Expr::Alias(_)
64        | Expr::OuterReferenceColumn(_, _)
65        | Expr::ScalarVariable(_, _)
66        | Expr::Not(_)
67        | Expr::IsNotNull(_)
68        | Expr::IsNull(_)
69        | Expr::IsTrue(_)
70        | Expr::IsFalse(_)
71        | Expr::IsUnknown(_)
72        | Expr::IsNotTrue(_)
73        | Expr::IsNotFalse(_)
74        | Expr::IsNotUnknown(_)
75        | Expr::Negative(_)
76        | Expr::Cast(_)
77        | Expr::TryCast(_)
78        | Expr::BinaryExpr(_)
79        | Expr::Between(_)
80        | Expr::Like(_)
81        | Expr::SimilarTo(_)
82        | Expr::InList(_)
83        | Expr::Exists(_)
84        | Expr::InSubquery(_)
85        | Expr::ScalarSubquery(_)
86        | Expr::SetComparison(_)
87        | Expr::GroupingSet(_)
88        | Expr::Case(_) => Ok(TreeNodeRecursion::Continue),
89
90        Expr::ScalarFunction(scalar_function) => {
91            match scalar_function.func.signature().volatility {
92                Volatility::Immutable => Ok(TreeNodeRecursion::Continue),
93                // TODO: Stable functions could be `applicable`, but that would require access to the context
94                Volatility::Stable | Volatility::Volatile => {
95                    is_applicable = false;
96                    Ok(TreeNodeRecursion::Stop)
97                }
98            }
99        }
100
101        // TODO other expressions are not handled yet:
102        // - AGGREGATE and WINDOW should not end up in filter conditions, except maybe in some edge cases
103        // - Can `Wildcard` be considered as a `Literal`?
104        // - ScalarVariable could be `applicable`, but that would require access to the context
105        // TODO: remove the next line after `Expr::Wildcard` is removed
106        #[expect(deprecated)]
107        Expr::AggregateFunction { .. }
108        | Expr::WindowFunction { .. }
109        | Expr::Wildcard { .. }
110        | Expr::Unnest { .. }
111        | Expr::Placeholder(_) => {
112            is_applicable = false;
113            Ok(TreeNodeRecursion::Stop)
114        }
115    })
116    .unwrap();
117    is_applicable
118}
119
120/// The maximum number of concurrent listing requests
121const CONCURRENCY_LIMIT: usize = 100;
122
123/// Partition the list of files into `n` groups
124#[deprecated(since = "47.0.0", note = "use `FileGroup::split_files` instead")]
125pub fn split_files(
126    mut partitioned_files: Vec<PartitionedFile>,
127    n: usize,
128) -> Vec<Vec<PartitionedFile>> {
129    if partitioned_files.is_empty() {
130        return vec![];
131    }
132
133    // ObjectStore::list does not guarantee any consistent order and for some
134    // implementations such as LocalFileSystem, it may be inconsistent. Thus
135    // Sort files by path to ensure consistent plans when run more than once.
136    partitioned_files.sort_by(|a, b| a.path().cmp(b.path()));
137
138    // effectively this is div with rounding up instead of truncating
139    let chunk_size = partitioned_files.len().div_ceil(n);
140    let mut chunks = Vec::with_capacity(n);
141    let mut current_chunk = Vec::with_capacity(chunk_size);
142    for file in partitioned_files.drain(..) {
143        current_chunk.push(file);
144        if current_chunk.len() == chunk_size {
145            let full_chunk =
146                mem::replace(&mut current_chunk, Vec::with_capacity(chunk_size));
147            chunks.push(full_chunk);
148        }
149    }
150
151    if !current_chunk.is_empty() {
152        chunks.push(current_chunk)
153    }
154
155    chunks
156}
157
158#[derive(Debug)]
159pub struct Partition {
160    /// The path to the partition, including the table prefix
161    path: Path,
162    /// How many path segments below the table prefix `path` contains
163    /// or equivalently the number of partition values in `path`
164    depth: usize,
165    /// The files contained as direct children of this `Partition` if known
166    files: Option<Vec<ObjectMeta>>,
167}
168
169impl Partition {
170    /// List the direct children of this partition updating `self.files` with
171    /// any child files, and returning a list of child "directories"
172    async fn list(mut self, store: &dyn ObjectStore) -> Result<(Self, Vec<Path>)> {
173        trace!("Listing partition {}", self.path);
174        let prefix = Some(&self.path).filter(|p| !p.as_ref().is_empty());
175        let result = store.list_with_delimiter(prefix).await?;
176        self.files = Some(
177            result
178                .objects
179                .into_iter()
180                .filter(|object_meta| object_meta.size > 0)
181                .collect(),
182        );
183        Ok((self, result.common_prefixes))
184    }
185}
186
187/// Returns a recursive list of the partitions in `table_path` up to `max_depth`
188pub async fn list_partitions(
189    store: &dyn ObjectStore,
190    table_path: &ListingTableUrl,
191    max_depth: usize,
192    partition_prefix: Option<Path>,
193) -> Result<Vec<Partition>> {
194    let partition = Partition {
195        path: match partition_prefix {
196            Some(prefix) => Path::from_iter(
197                Path::from(table_path.prefix().as_ref())
198                    .parts()
199                    .chain(Path::from(prefix.as_ref()).parts()),
200            ),
201            None => table_path.prefix().clone(),
202        },
203        depth: 0,
204        files: None,
205    };
206
207    let mut out = Vec::with_capacity(64);
208
209    let mut pending = vec![];
210    let mut futures = FuturesUnordered::new();
211    futures.push(partition.list(store));
212
213    while let Some((partition, paths)) = futures.next().await.transpose()? {
214        // If pending contains a future it implies prior to this iteration
215        // `futures.len == CONCURRENCY_LIMIT`. We can therefore add a single
216        // future from `pending` to the working set
217        if let Some(next) = pending.pop() {
218            futures.push(next)
219        }
220
221        let depth = partition.depth;
222        out.push(partition);
223        for path in paths {
224            let child = Partition {
225                path,
226                depth: depth + 1,
227                files: None,
228            };
229            match depth < max_depth {
230                true => match futures.len() < CONCURRENCY_LIMIT {
231                    true => futures.push(child.list(store)),
232                    false => pending.push(child.list(store)),
233                },
234                false => out.push(child),
235            }
236        }
237    }
238    Ok(out)
239}
240
241#[derive(Debug)]
242enum PartitionValue {
243    Single(String),
244    Multi,
245}
246
247fn populate_partition_values<'a>(
248    partition_values: &mut HashMap<&'a str, PartitionValue>,
249    filter: &'a Expr,
250) {
251    if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = filter {
252        match op {
253            Operator::Eq => match (left.as_ref(), right.as_ref()) {
254                (Expr::Column(Column { name, .. }), Expr::Literal(val, _))
255                | (Expr::Literal(val, _), Expr::Column(Column { name, .. })) => {
256                    if partition_values
257                        .insert(name, PartitionValue::Single(val.to_string()))
258                        .is_some()
259                    {
260                        partition_values.insert(name, PartitionValue::Multi);
261                    }
262                }
263                _ => {}
264            },
265            Operator::And => {
266                populate_partition_values(partition_values, left);
267                populate_partition_values(partition_values, right);
268            }
269            _ => {}
270        }
271    }
272}
273
274pub fn evaluate_partition_prefix<'a>(
275    partition_cols: &'a [(String, DataType)],
276    filters: &'a [Expr],
277) -> Option<Path> {
278    let mut partition_values = HashMap::new();
279    for filter in filters {
280        populate_partition_values(&mut partition_values, filter);
281    }
282
283    if partition_values.is_empty() {
284        return None;
285    }
286
287    let mut parts = vec![];
288    for (p, _) in partition_cols {
289        match partition_values.get(p.as_str()) {
290            Some(PartitionValue::Single(val)) => {
291                // if a partition only has a single literal value, then it can be added to the
292                // prefix
293                parts.push(format!("{p}={val}"));
294            }
295            _ => {
296                // break on the first unconstrainted partition to create a common prefix
297                // for all covered partitions.
298                break;
299            }
300        }
301    }
302
303    if parts.is_empty() {
304        None
305    } else {
306        Some(Path::from_iter(parts))
307    }
308}
309
310fn filter_partitions(
311    pf: PartitionedFile,
312    filters: &[Expr],
313    df_schema: &DFSchema,
314) -> Result<Option<PartitionedFile>> {
315    if pf.partition_values.is_empty() && !filters.is_empty() {
316        return Ok(None);
317    } else if filters.is_empty() {
318        return Ok(Some(pf));
319    }
320
321    let arrays = pf
322        .partition_values
323        .iter()
324        .map(|v| v.to_array())
325        .collect::<Result<_, _>>()?;
326
327    let batch = RecordBatch::try_new(Arc::clone(df_schema.inner()), arrays)?;
328
329    let filter = utils::conjunction(filters.iter().cloned()).unwrap_or_else(|| lit(true));
330    let props = ExecutionProps::new();
331    let expr = create_physical_expr(&filter, df_schema, &props)?;
332
333    // Since we're only operating on a single file, our batch and resulting "array" holds only one
334    // value indicating if the input file matches the provided filters
335    let matches = expr.evaluate(&batch)?.into_array(1)?;
336    if matches.as_boolean().value(0) {
337        return Ok(Some(pf));
338    }
339
340    Ok(None)
341}
342
343fn try_into_partitioned_file(
344    object_meta: ObjectMeta,
345    partition_cols: &[(String, DataType)],
346    table_path: &ListingTableUrl,
347) -> Result<PartitionedFile> {
348    let cols = partition_cols.iter().map(|(name, _)| name.as_str());
349    let parsed = parse_partitions_for_path(table_path, &object_meta.location, cols);
350
351    let partition_values = parsed
352        .into_iter()
353        .flatten()
354        .zip(partition_cols)
355        .map(|(parsed, (_, datatype))| {
356            ScalarValue::try_from_string(parsed.to_string(), datatype)
357        })
358        .collect::<Result<Vec<_>>>()?;
359
360    let mut pf: PartitionedFile = object_meta.into();
361    pf.partition_values = partition_values;
362
363    Ok(pf)
364}
365
366/// Discover the partitions on the given path and prune out files
367/// that belong to irrelevant partitions using `filters` expressions.
368/// `filters` should only contain expressions that can be evaluated
369/// using only the partition columns.
370pub async fn pruned_partition_list<'a>(
371    ctx: &'a dyn Session,
372    store: &'a dyn ObjectStore,
373    table_path: &'a ListingTableUrl,
374    filters: &'a [Expr],
375    file_extension: &'a str,
376    partition_cols: &'a [(String, DataType)],
377) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
378    let prefix = if !partition_cols.is_empty() {
379        evaluate_partition_prefix(partition_cols, filters)
380    } else {
381        None
382    };
383
384    let objects = table_path
385        .list_prefixed_files(ctx, store, prefix, file_extension)
386        .await?
387        .try_filter(|object_meta| futures::future::ready(object_meta.size > 0));
388
389    if partition_cols.is_empty() {
390        assert_or_internal_err!(
391            filters.is_empty(),
392            "Got partition filters for unpartitioned table {}",
393            table_path
394        );
395
396        // if no partition col => simply list all the files
397        Ok(objects.map_ok(|object_meta| object_meta.into()).boxed())
398    } else {
399        let df_schema = DFSchema::from_unqualified_fields(
400            partition_cols
401                .iter()
402                .map(|(n, d)| Field::new(n, d.clone(), true))
403                .collect(),
404            Default::default(),
405        )?;
406
407        Ok(objects
408            .map_ok(|object_meta| {
409                try_into_partitioned_file(object_meta, partition_cols, table_path)
410            })
411            .try_filter_map(move |pf| {
412                futures::future::ready(
413                    pf.and_then(|pf| filter_partitions(pf, filters, &df_schema)),
414                )
415            })
416            .boxed())
417    }
418}
419
420/// Extract the partition values for the given `file_path` (in the given `table_path`)
421/// associated to the partitions defined by `table_partition_cols`
422pub fn parse_partitions_for_path<'a, I>(
423    table_path: &ListingTableUrl,
424    file_path: &'a Path,
425    table_partition_cols: I,
426) -> Option<Vec<&'a str>>
427where
428    I: IntoIterator<Item = &'a str>,
429{
430    let subpath = table_path.strip_prefix(file_path)?;
431
432    let mut part_values = vec![];
433    for (part, expected_partition) in subpath.zip(table_partition_cols) {
434        match part.split_once('=') {
435            Some((name, val)) if name == expected_partition => part_values.push(val),
436            _ => {
437                debug!(
438                    "Ignoring file: file_path='{file_path}', table_path='{table_path}', part='{part}', partition_col='{expected_partition}'",
439                );
440                return None;
441            }
442        }
443    }
444    Some(part_values)
445}
446/// Describe a partition as a (path, depth, files) tuple for easier assertions
447pub fn describe_partition(partition: &Partition) -> (&str, usize, Vec<&str>) {
448    (
449        partition.path.as_ref(),
450        partition.depth,
451        partition
452            .files
453            .as_ref()
454            .map(|f| f.iter().map(|f| f.location.filename().unwrap()).collect())
455            .unwrap_or_default(),
456    )
457}
458
459#[cfg(test)]
460mod tests {
461    use datafusion_datasource::file_groups::FileGroup;
462    use std::ops::Not;
463
464    use super::*;
465    use datafusion_expr::{Expr, case, col, lit};
466
467    #[test]
468    fn test_split_files() {
469        let new_partitioned_file = |path: &str| PartitionedFile::new(path.to_owned(), 10);
470        let files = FileGroup::new(vec![
471            new_partitioned_file("a"),
472            new_partitioned_file("b"),
473            new_partitioned_file("c"),
474            new_partitioned_file("d"),
475            new_partitioned_file("e"),
476        ]);
477
478        let chunks = files.clone().split_files(1);
479        assert_eq!(1, chunks.len());
480        assert_eq!(5, chunks[0].len());
481
482        let chunks = files.clone().split_files(2);
483        assert_eq!(2, chunks.len());
484        assert_eq!(3, chunks[0].len());
485        assert_eq!(2, chunks[1].len());
486
487        let chunks = files.clone().split_files(5);
488        assert_eq!(5, chunks.len());
489        assert_eq!(1, chunks[0].len());
490        assert_eq!(1, chunks[1].len());
491        assert_eq!(1, chunks[2].len());
492        assert_eq!(1, chunks[3].len());
493        assert_eq!(1, chunks[4].len());
494
495        let chunks = files.clone().split_files(123);
496        assert_eq!(5, chunks.len());
497        assert_eq!(1, chunks[0].len());
498        assert_eq!(1, chunks[1].len());
499        assert_eq!(1, chunks[2].len());
500        assert_eq!(1, chunks[3].len());
501        assert_eq!(1, chunks[4].len());
502
503        let empty_group = FileGroup::default();
504        let chunks = empty_group.split_files(2);
505        assert_eq!(0, chunks.len());
506    }
507
508    #[test]
509    fn test_parse_partitions_for_path() {
510        assert_eq!(
511            Some(vec![]),
512            parse_partitions_for_path(
513                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
514                &Path::from("bucket/mytable/file.csv"),
515                vec![]
516            )
517        );
518        assert_eq!(
519            None,
520            parse_partitions_for_path(
521                &ListingTableUrl::parse("file:///bucket/othertable").unwrap(),
522                &Path::from("bucket/mytable/file.csv"),
523                vec![]
524            )
525        );
526        assert_eq!(
527            None,
528            parse_partitions_for_path(
529                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
530                &Path::from("bucket/mytable/file.csv"),
531                vec!["mypartition"]
532            )
533        );
534        assert_eq!(
535            Some(vec!["v1"]),
536            parse_partitions_for_path(
537                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
538                &Path::from("bucket/mytable/mypartition=v1/file.csv"),
539                vec!["mypartition"]
540            )
541        );
542        assert_eq!(
543            Some(vec!["v1"]),
544            parse_partitions_for_path(
545                &ListingTableUrl::parse("file:///bucket/mytable/").unwrap(),
546                &Path::from("bucket/mytable/mypartition=v1/file.csv"),
547                vec!["mypartition"]
548            )
549        );
550        // Only hive style partitioning supported for now:
551        assert_eq!(
552            None,
553            parse_partitions_for_path(
554                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
555                &Path::from("bucket/mytable/v1/file.csv"),
556                vec!["mypartition"]
557            )
558        );
559        assert_eq!(
560            Some(vec!["v1", "v2"]),
561            parse_partitions_for_path(
562                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
563                &Path::from("bucket/mytable/mypartition=v1/otherpartition=v2/file.csv"),
564                vec!["mypartition", "otherpartition"]
565            )
566        );
567        assert_eq!(
568            Some(vec!["v1"]),
569            parse_partitions_for_path(
570                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
571                &Path::from("bucket/mytable/mypartition=v1/otherpartition=v2/file.csv"),
572                vec!["mypartition"]
573            )
574        );
575    }
576
577    #[test]
578    fn test_expr_applicable_for_cols() {
579        assert!(expr_applicable_for_cols(
580            &["c1"],
581            &Expr::eq(col("c1"), lit("value"))
582        ));
583        assert!(!expr_applicable_for_cols(
584            &["c1"],
585            &Expr::eq(col("c2"), lit("value"))
586        ));
587        assert!(!expr_applicable_for_cols(
588            &["c1"],
589            &Expr::eq(col("c1"), col("c2"))
590        ));
591        assert!(expr_applicable_for_cols(
592            &["c1", "c2"],
593            &Expr::eq(col("c1"), col("c2"))
594        ));
595        assert!(expr_applicable_for_cols(
596            &["c1", "c2"],
597            &(Expr::eq(col("c1"), col("c2").alias("c2_alias"))).not()
598        ));
599        assert!(expr_applicable_for_cols(
600            &["c1", "c2"],
601            &(case(col("c1"))
602                .when(lit("v1"), lit(true))
603                .otherwise(lit(false))
604                .expect("valid case expr"))
605        ));
606        // static expression not relevant in this context but we
607        // test it as an edge case anyway in case we want to generalize
608        // this helper function
609        assert!(expr_applicable_for_cols(&[], &lit(true)));
610    }
611
612    #[test]
613    fn test_evaluate_partition_prefix() {
614        let partitions = &[
615            ("a".to_string(), DataType::Utf8),
616            ("b".to_string(), DataType::Int16),
617            ("c".to_string(), DataType::Boolean),
618        ];
619
620        assert_eq!(
621            evaluate_partition_prefix(partitions, &[col("a").eq(lit("foo"))]),
622            Some(Path::from("a=foo")),
623        );
624
625        assert_eq!(
626            evaluate_partition_prefix(partitions, &[lit("foo").eq(col("a"))]),
627            Some(Path::from("a=foo")),
628        );
629
630        assert_eq!(
631            evaluate_partition_prefix(
632                partitions,
633                &[col("a").eq(lit("foo")).and(col("b").eq(lit("bar")))],
634            ),
635            Some(Path::from("a=foo/b=bar")),
636        );
637
638        assert_eq!(
639            evaluate_partition_prefix(
640                partitions,
641                // list of filters should be evaluated as AND
642                &[col("a").eq(lit("foo")), col("b").eq(lit("bar")),],
643            ),
644            Some(Path::from("a=foo/b=bar")),
645        );
646
647        assert_eq!(
648            evaluate_partition_prefix(
649                partitions,
650                &[col("a")
651                    .eq(lit("foo"))
652                    .and(col("b").eq(lit("1")))
653                    .and(col("c").eq(lit("true")))],
654            ),
655            Some(Path::from("a=foo/b=1/c=true")),
656        );
657
658        // no prefix when filter is empty
659        assert_eq!(evaluate_partition_prefix(partitions, &[]), None);
660
661        // b=foo results in no prefix because a is not restricted
662        assert_eq!(
663            evaluate_partition_prefix(partitions, &[Expr::eq(col("b"), lit("foo"))]),
664            None,
665        );
666
667        // a=foo and c=baz only results in preifx a=foo because b is not restricted
668        assert_eq!(
669            evaluate_partition_prefix(
670                partitions,
671                &[col("a").eq(lit("foo")).and(col("c").eq(lit("baz")))],
672            ),
673            Some(Path::from("a=foo")),
674        );
675
676        // partition with multiple values results in no prefix
677        assert_eq!(
678            evaluate_partition_prefix(
679                partitions,
680                &[Expr::and(col("a").eq(lit("foo")), col("a").eq(lit("bar")))],
681            ),
682            None,
683        );
684
685        // no prefix because partition a is not restricted to a single literal
686        assert_eq!(
687            evaluate_partition_prefix(
688                partitions,
689                &[Expr::or(col("a").eq(lit("foo")), col("a").eq(lit("bar")))],
690            ),
691            None,
692        );
693        assert_eq!(
694            evaluate_partition_prefix(partitions, &[col("b").lt(lit(5))],),
695            None,
696        );
697    }
698
699    #[test]
700    fn test_evaluate_date_partition_prefix() {
701        let partitions = &[("a".to_string(), DataType::Date32)];
702        assert_eq!(
703            evaluate_partition_prefix(
704                partitions,
705                &[col("a").eq(Expr::Literal(ScalarValue::Date32(Some(3)), None))],
706            ),
707            Some(Path::from("a=1970-01-04")),
708        );
709
710        let partitions = &[("a".to_string(), DataType::Date64)];
711        assert_eq!(
712            evaluate_partition_prefix(
713                partitions,
714                &[col("a").eq(Expr::Literal(
715                    ScalarValue::Date64(Some(4 * 24 * 60 * 60 * 1000)),
716                    None
717                )),],
718            ),
719            Some(Path::from("a=1970-01-05")),
720        );
721    }
722}