Skip to main content

datafusion_catalog_listing/
helpers.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Helper functions for the table implementation
19
20use std::mem;
21use std::sync::Arc;
22
23use datafusion_catalog::Session;
24use datafusion_common::{
25    HashMap, Result, ScalarValue, TableReference, assert_or_internal_err,
26};
27use datafusion_datasource::PartitionedFile;
28use datafusion_datasource::{FileExtensions, ListingTableUrl};
29use datafusion_expr::{BinaryExpr, Operator, lit, utils};
30
31use arrow::{
32    array::AsArray,
33    datatypes::{DataType, Field},
34    record_batch::RecordBatch,
35};
36use datafusion_expr::execution_props::ExecutionProps;
37use futures::stream::FuturesUnordered;
38use futures::{StreamExt, TryStreamExt, stream::BoxStream};
39use log::{debug, trace};
40
41use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
42use datafusion_common::{Column, DFSchema};
43use datafusion_expr::{Expr, Volatility};
44use datafusion_physical_expr::create_physical_expr;
45use object_store::path::Path;
46use object_store::{ObjectMeta, ObjectStore};
47
48/// Check whether the given expression can be resolved using only the columns `col_names`.
49/// This means that if this function returns true:
50/// - the table provider can filter the table partition values with this expression
51/// - the expression can be marked as `TableProviderFilterPushDown::Exact` once this filtering
52///   was performed
53pub fn expr_applicable_for_cols(col_names: &[&str], expr: &Expr) -> bool {
54    let mut is_applicable = true;
55    expr.apply(|expr| match expr {
56        Expr::Column(Column { name, .. }) => {
57            is_applicable &= col_names.contains(&name.as_str());
58            if is_applicable {
59                Ok(TreeNodeRecursion::Jump)
60            } else {
61                Ok(TreeNodeRecursion::Stop)
62            }
63        }
64        Expr::Literal(_, _)
65        | Expr::Alias(_)
66        | Expr::OuterReferenceColumn(_, _)
67        | Expr::ScalarVariable(_, _)
68        | Expr::Not(_)
69        | Expr::IsNotNull(_)
70        | Expr::IsNull(_)
71        | Expr::IsTrue(_)
72        | Expr::IsFalse(_)
73        | Expr::IsUnknown(_)
74        | Expr::IsNotTrue(_)
75        | Expr::IsNotFalse(_)
76        | Expr::IsNotUnknown(_)
77        | Expr::Negative(_)
78        | Expr::Cast(_)
79        | Expr::TryCast(_)
80        | Expr::BinaryExpr(_)
81        | Expr::Between(_)
82        | Expr::Like(_)
83        | Expr::SimilarTo(_)
84        | Expr::InList(_)
85        | Expr::Exists(_)
86        | Expr::InSubquery(_)
87        | Expr::ScalarSubquery(_)
88        | Expr::SetComparison(_)
89        | Expr::GroupingSet(_)
90        | Expr::Case(_)
91        | Expr::Lambda(_)
92        | Expr::LambdaVariable(_) => Ok(TreeNodeRecursion::Continue),
93
94        Expr::ScalarFunction(scalar_function) => {
95            match scalar_function.func.signature().volatility {
96                Volatility::Immutable => Ok(TreeNodeRecursion::Continue),
97                // TODO: Stable functions could be `applicable`, but that would require access to the context
98                // https://github.com/apache/datafusion/issues/21690
99                Volatility::Stable | Volatility::Volatile => {
100                    is_applicable = false;
101                    Ok(TreeNodeRecursion::Stop)
102                }
103            }
104        }
105        Expr::HigherOrderFunction(hof) => {
106            match hof.func.signature().volatility {
107                Volatility::Immutable => Ok(TreeNodeRecursion::Continue),
108                // TODO: Stable functions could be `applicable`, but that would require access to the context
109                // https://github.com/apache/datafusion/issues/21690
110                Volatility::Stable | Volatility::Volatile => {
111                    is_applicable = false;
112                    Ok(TreeNodeRecursion::Stop)
113                }
114            }
115        }
116
117        // TODO other expressions are not handled yet:
118        // - AGGREGATE and WINDOW should not end up in filter conditions, except maybe in some edge cases
119        // - Can `Wildcard` be considered as a `Literal`?
120        // - ScalarVariable could be `applicable`, but that would require access to the context
121        //   https://github.com/apache/datafusion/issues/21690
122        // TODO: remove the next line after `Expr::Wildcard` is removed
123        #[expect(deprecated)]
124        Expr::AggregateFunction { .. }
125        | Expr::WindowFunction { .. }
126        | Expr::Wildcard { .. }
127        | Expr::Unnest { .. }
128        | Expr::Placeholder(_) => {
129            is_applicable = false;
130            Ok(TreeNodeRecursion::Stop)
131        }
132    })
133    .unwrap();
134    is_applicable
135}
136
137/// The maximum number of concurrent listing requests
138const CONCURRENCY_LIMIT: usize = 100;
139
140/// Partition the list of files into `n` groups
141#[deprecated(since = "47.0.0", note = "use `FileGroup::split_files` instead")]
142pub fn split_files(
143    mut partitioned_files: Vec<PartitionedFile>,
144    n: usize,
145) -> Vec<Vec<PartitionedFile>> {
146    if partitioned_files.is_empty() {
147        return vec![];
148    }
149
150    // ObjectStore::list does not guarantee any consistent order and for some
151    // implementations such as LocalFileSystem, it may be inconsistent. Thus
152    // Sort files by path to ensure consistent plans when run more than once.
153    partitioned_files.sort_by(|a, b| a.path().cmp(b.path()));
154
155    // effectively this is div with rounding up instead of truncating
156    let chunk_size = partitioned_files.len().div_ceil(n);
157    let mut chunks = Vec::with_capacity(n);
158    let mut current_chunk = Vec::with_capacity(chunk_size);
159    for file in partitioned_files.drain(..) {
160        current_chunk.push(file);
161        if current_chunk.len() == chunk_size {
162            let full_chunk =
163                mem::replace(&mut current_chunk, Vec::with_capacity(chunk_size));
164            chunks.push(full_chunk);
165        }
166    }
167
168    if !current_chunk.is_empty() {
169        chunks.push(current_chunk)
170    }
171
172    chunks
173}
174
175#[derive(Debug)]
176pub struct Partition {
177    /// The path to the partition, including the table prefix
178    path: Path,
179    /// How many path segments below the table prefix `path` contains
180    /// or equivalently the number of partition values in `path`
181    depth: usize,
182    /// The files contained as direct children of this `Partition` if known
183    files: Option<Vec<ObjectMeta>>,
184}
185
186impl Partition {
187    /// List the direct children of this partition updating `self.files` with
188    /// any child files, and returning a list of child "directories"
189    async fn list(mut self, store: &dyn ObjectStore) -> Result<(Self, Vec<Path>)> {
190        trace!("Listing partition {}", self.path);
191        let prefix = Some(&self.path).filter(|p| !p.as_ref().is_empty());
192        let result = store.list_with_delimiter(prefix).await?;
193        self.files = Some(
194            result
195                .objects
196                .into_iter()
197                .filter(|object_meta| object_meta.size > 0)
198                .collect(),
199        );
200        Ok((self, result.common_prefixes))
201    }
202}
203
204/// Returns a recursive list of the partitions in `table_path` up to `max_depth`
205pub async fn list_partitions(
206    store: &dyn ObjectStore,
207    table_path: &ListingTableUrl,
208    max_depth: usize,
209    partition_prefix: Option<Path>,
210) -> Result<Vec<Partition>> {
211    let partition = Partition {
212        path: match partition_prefix {
213            Some(prefix) => Path::from_iter(
214                Path::from(table_path.prefix().as_ref())
215                    .parts()
216                    .chain(Path::from(prefix.as_ref()).parts()),
217            ),
218            None => table_path.prefix().clone(),
219        },
220        depth: 0,
221        files: None,
222    };
223
224    let mut out = Vec::with_capacity(64);
225
226    let mut pending = vec![];
227    let mut futures = FuturesUnordered::new();
228    futures.push(partition.list(store));
229
230    while let Some((partition, paths)) = futures.next().await.transpose()? {
231        // If pending contains a future it implies prior to this iteration
232        // `futures.len == CONCURRENCY_LIMIT`. We can therefore add a single
233        // future from `pending` to the working set
234        if let Some(next) = pending.pop() {
235            futures.push(next)
236        }
237
238        let depth = partition.depth;
239        out.push(partition);
240        for path in paths {
241            let child = Partition {
242                path,
243                depth: depth + 1,
244                files: None,
245            };
246            match depth < max_depth {
247                true => match futures.len() < CONCURRENCY_LIMIT {
248                    true => futures.push(child.list(store)),
249                    false => pending.push(child.list(store)),
250                },
251                false => out.push(child),
252            }
253        }
254    }
255    Ok(out)
256}
257
258#[derive(Debug)]
259enum PartitionValue {
260    Single(String),
261    Multi,
262}
263
264fn populate_partition_values<'a>(
265    partition_values: &mut HashMap<&'a str, PartitionValue>,
266    filter: &'a Expr,
267) {
268    if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = filter {
269        match op {
270            Operator::Eq => match (left.as_ref(), right.as_ref()) {
271                (Expr::Column(Column { name, .. }), Expr::Literal(val, _))
272                | (Expr::Literal(val, _), Expr::Column(Column { name, .. }))
273                    if partition_values
274                        .insert(name, PartitionValue::Single(val.to_string()))
275                        .is_some() =>
276                {
277                    partition_values.insert(name, PartitionValue::Multi);
278                }
279                (Expr::Column(Column { .. }), Expr::Literal(_, _))
280                | (Expr::Literal(_, _), Expr::Column(Column { .. })) => {}
281                _ => {}
282            },
283            Operator::And => {
284                populate_partition_values(partition_values, left);
285                populate_partition_values(partition_values, right);
286            }
287            _ => {}
288        }
289    }
290}
291
292pub fn evaluate_partition_prefix<'a>(
293    partition_cols: &'a [(String, DataType)],
294    filters: &'a [Expr],
295) -> Option<Path> {
296    let mut partition_values = HashMap::new();
297    for filter in filters {
298        populate_partition_values(&mut partition_values, filter);
299    }
300
301    if partition_values.is_empty() {
302        return None;
303    }
304
305    let mut parts = vec![];
306    for (p, _) in partition_cols {
307        match partition_values.get(p.as_str()) {
308            Some(PartitionValue::Single(val)) => {
309                // if a partition only has a single literal value, then it can be added to the
310                // prefix
311                parts.push(format!("{p}={val}"));
312            }
313            _ => {
314                // break on the first unconstrainted partition to create a common prefix
315                // for all covered partitions.
316                break;
317            }
318        }
319    }
320
321    if parts.is_empty() {
322        None
323    } else {
324        Some(Path::from_iter(parts))
325    }
326}
327
328fn filter_partitions(
329    pf: PartitionedFile,
330    filters: &[Expr],
331    df_schema: &DFSchema,
332) -> Result<Option<PartitionedFile>> {
333    if pf.partition_values.is_empty() && !filters.is_empty() {
334        return Ok(None);
335    } else if filters.is_empty() {
336        return Ok(Some(pf));
337    }
338
339    let arrays = pf
340        .partition_values
341        .iter()
342        .map(|v| v.to_array())
343        .collect::<Result<_, _>>()?;
344
345    let batch = RecordBatch::try_new(Arc::clone(df_schema.inner()), arrays)?;
346
347    let filter = utils::conjunction(filters.iter().cloned()).unwrap_or_else(|| lit(true));
348    let props = ExecutionProps::new();
349    let expr = create_physical_expr(&filter, df_schema, &props)?;
350
351    // Since we're only operating on a single file, our batch and resulting "array" holds only one
352    // value indicating if the input file matches the provided filters
353    let matches = expr.evaluate(&batch)?.into_array(1)?;
354    if matches.as_boolean().value(0) {
355        return Ok(Some(pf));
356    }
357
358    Ok(None)
359}
360
361/// Returns `Ok(None)` when the file is not inside a valid partition path
362/// (e.g. a stale file in the table root directory). Such files are skipped
363/// because hive-style partition values are never null and there is no valid
364/// value to assign for non-partitioned files.
365fn try_into_partitioned_file(
366    object_meta: ObjectMeta,
367    partition_cols: &[(String, DataType)],
368    table_path: &ListingTableUrl,
369) -> Result<Option<PartitionedFile>> {
370    let cols = partition_cols.iter().map(|(name, _)| name.as_str());
371    let parsed = parse_partitions_for_path(table_path, &object_meta.location, cols);
372
373    let Some(parsed) = parsed else {
374        // parse_partitions_for_path already logs a debug message
375        return Ok(None);
376    };
377
378    let partition_values = parsed
379        .into_iter()
380        .zip(partition_cols)
381        .map(|(parsed, (_, datatype))| {
382            ScalarValue::try_from_string(parsed.to_string(), datatype)
383        })
384        .collect::<Result<Vec<_>>>()?;
385
386    let mut pf: PartitionedFile = object_meta.into();
387    pf.partition_values = partition_values;
388    pf.table_reference.clone_from(table_path.get_table_ref());
389
390    Ok(Some(pf))
391}
392
393/// Discover the partitions on the given path and prune out files
394/// that belong to irrelevant partitions using `filters` expressions.
395/// `filters` should only contain expressions that can be evaluated
396/// using only the partition columns.
397pub async fn pruned_partition_list<'a>(
398    ctx: &'a dyn Session,
399    store: &'a dyn ObjectStore,
400    table_path: &'a ListingTableUrl,
401    filters: &'a [Expr],
402    file_extension: &'a str,
403    partition_cols: &'a [(String, DataType)],
404) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
405    let prefix = if !partition_cols.is_empty() {
406        evaluate_partition_prefix(partition_cols, filters)
407    } else {
408        None
409    };
410
411    let objects = table_path
412        .list_prefixed_files(ctx, store, prefix, file_extension)
413        .await?
414        .try_filter(|object_meta| futures::future::ready(object_meta.size > 0));
415
416    if partition_cols.is_empty() {
417        assert_or_internal_err!(
418            filters.is_empty(),
419            "Got partition filters for unpartitioned table {}",
420            table_path
421        );
422
423        // if no partition col => list all the files
424        Ok(objects
425            .try_filter_map(|object_meta| {
426                futures::future::ready(object_meta_to_partitioned_file(
427                    object_meta,
428                    table_path.get_table_ref(),
429                ))
430            })
431            .boxed())
432    } else {
433        let df_schema = DFSchema::from_unqualified_fields(
434            partition_cols
435                .iter()
436                .map(|(n, d)| Field::new(n, d.clone(), true))
437                .collect(),
438            Default::default(),
439        )?;
440
441        Ok(objects
442            .try_filter_map(|object_meta| {
443                futures::future::ready(try_into_partitioned_file(
444                    object_meta,
445                    partition_cols,
446                    table_path,
447                ))
448            })
449            .try_filter_map(move |pf| {
450                futures::future::ready(filter_partitions(pf, filters, &df_schema))
451            })
452            .boxed())
453    }
454}
455
456fn object_meta_to_partitioned_file(
457    object_meta: ObjectMeta,
458    table_ref: &Option<TableReference>,
459) -> Result<Option<PartitionedFile>> {
460    Ok(Some(PartitionedFile {
461        object_meta,
462        partition_values: vec![],
463        range: None,
464        statistics: None,
465        ordering: None,
466        extensions: FileExtensions::new(),
467        metadata_size_hint: None,
468        table_reference: table_ref.clone(),
469    }))
470}
471
472/// Extract the partition values for the given `file_path` (in the given `table_path`)
473/// associated to the partitions defined by `table_partition_cols`
474pub fn parse_partitions_for_path<'a, I>(
475    table_path: &ListingTableUrl,
476    file_path: &'a Path,
477    table_partition_cols: I,
478) -> Option<Vec<&'a str>>
479where
480    I: IntoIterator<Item = &'a str>,
481{
482    let subpath = table_path.strip_prefix(file_path)?;
483
484    let mut part_values = vec![];
485    for (part, expected_partition) in subpath.zip(table_partition_cols) {
486        match part.split_once('=') {
487            Some((name, val)) if name == expected_partition => part_values.push(val),
488            _ => {
489                debug!(
490                    "Ignoring file: file_path='{file_path}', table_path='{table_path}', part='{part}', partition_col='{expected_partition}'",
491                );
492                return None;
493            }
494        }
495    }
496    Some(part_values)
497}
498/// Describe a partition as a (path, depth, files) tuple for easier assertions
499pub fn describe_partition(partition: &Partition) -> (&str, usize, Vec<&str>) {
500    (
501        partition.path.as_ref(),
502        partition.depth,
503        partition
504            .files
505            .as_ref()
506            .map(|f| f.iter().map(|f| f.location.filename().unwrap()).collect())
507            .unwrap_or_default(),
508    )
509}
510
511#[cfg(test)]
512mod tests {
513    use datafusion_datasource::file_groups::FileGroup;
514    use std::ops::Not;
515
516    use super::*;
517    use datafusion_expr::{case, col};
518
519    #[test]
520    fn test_split_files() {
521        let new_partitioned_file = |path: &str| PartitionedFile::new(path.to_owned(), 10);
522        let files = FileGroup::new(vec![
523            new_partitioned_file("a"),
524            new_partitioned_file("b"),
525            new_partitioned_file("c"),
526            new_partitioned_file("d"),
527            new_partitioned_file("e"),
528        ]);
529
530        let chunks = files.clone().split_files(1);
531        assert_eq!(1, chunks.len());
532        assert_eq!(5, chunks[0].len());
533
534        let chunks = files.clone().split_files(2);
535        assert_eq!(2, chunks.len());
536        assert_eq!(3, chunks[0].len());
537        assert_eq!(2, chunks[1].len());
538
539        let chunks = files.clone().split_files(5);
540        assert_eq!(5, chunks.len());
541        assert_eq!(1, chunks[0].len());
542        assert_eq!(1, chunks[1].len());
543        assert_eq!(1, chunks[2].len());
544        assert_eq!(1, chunks[3].len());
545        assert_eq!(1, chunks[4].len());
546
547        let chunks = files.clone().split_files(123);
548        assert_eq!(5, chunks.len());
549        assert_eq!(1, chunks[0].len());
550        assert_eq!(1, chunks[1].len());
551        assert_eq!(1, chunks[2].len());
552        assert_eq!(1, chunks[3].len());
553        assert_eq!(1, chunks[4].len());
554
555        let empty_group = FileGroup::default();
556        let chunks = empty_group.split_files(2);
557        assert_eq!(0, chunks.len());
558    }
559
560    #[test]
561    fn test_parse_partitions_for_path() {
562        assert_eq!(
563            Some(vec![]),
564            parse_partitions_for_path(
565                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
566                &Path::from("bucket/mytable/file.csv"),
567                vec![]
568            )
569        );
570        assert_eq!(
571            None,
572            parse_partitions_for_path(
573                &ListingTableUrl::parse("file:///bucket/othertable").unwrap(),
574                &Path::from("bucket/mytable/file.csv"),
575                vec![]
576            )
577        );
578        assert_eq!(
579            None,
580            parse_partitions_for_path(
581                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
582                &Path::from("bucket/mytable/file.csv"),
583                vec!["mypartition"]
584            )
585        );
586        assert_eq!(
587            Some(vec!["v1"]),
588            parse_partitions_for_path(
589                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
590                &Path::from("bucket/mytable/mypartition=v1/file.csv"),
591                vec!["mypartition"]
592            )
593        );
594        assert_eq!(
595            Some(vec!["v1"]),
596            parse_partitions_for_path(
597                &ListingTableUrl::parse("file:///bucket/mytable/").unwrap(),
598                &Path::from("bucket/mytable/mypartition=v1/file.csv"),
599                vec!["mypartition"]
600            )
601        );
602        // Only hive style partitioning supported for now:
603        assert_eq!(
604            None,
605            parse_partitions_for_path(
606                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
607                &Path::from("bucket/mytable/v1/file.csv"),
608                vec!["mypartition"]
609            )
610        );
611        assert_eq!(
612            Some(vec!["v1", "v2"]),
613            parse_partitions_for_path(
614                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
615                &Path::from("bucket/mytable/mypartition=v1/otherpartition=v2/file.csv"),
616                vec!["mypartition", "otherpartition"]
617            )
618        );
619        assert_eq!(
620            Some(vec!["v1"]),
621            parse_partitions_for_path(
622                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
623                &Path::from("bucket/mytable/mypartition=v1/otherpartition=v2/file.csv"),
624                vec!["mypartition"]
625            )
626        );
627    }
628
629    #[test]
630    fn test_try_into_partitioned_file_valid_partition() {
631        let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
632        let partition_cols = vec![("year_month".to_string(), DataType::Utf8)];
633        let meta = ObjectMeta {
634            location: Path::from("bucket/mytable/year_month=2024-01/data.parquet"),
635            last_modified: chrono::Utc::now(),
636            size: 100,
637            e_tag: None,
638            version: None,
639        };
640
641        let result =
642            try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
643        assert!(result.is_some());
644        let pf = result.unwrap();
645        assert_eq!(pf.partition_values.len(), 1);
646        assert_eq!(
647            pf.partition_values[0],
648            ScalarValue::Utf8(Some("2024-01".to_string()))
649        );
650    }
651
652    #[test]
653    fn test_try_into_partitioned_file_root_file_skipped() {
654        // File in root directory (not inside any partition path) should be
655        // skipped — this is the case where a stale file exists from before
656        // hive partitioning was added.
657        let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
658        let partition_cols = vec![("year_month".to_string(), DataType::Utf8)];
659        let meta = ObjectMeta {
660            location: Path::from("bucket/mytable/data.parquet"),
661            last_modified: chrono::Utc::now(),
662            size: 100,
663            e_tag: None,
664            version: None,
665        };
666
667        let result =
668            try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
669        assert!(
670            result.is_none(),
671            "Files outside partition structure should be skipped"
672        );
673    }
674
675    #[test]
676    fn test_try_into_partitioned_file_wrong_partition_name() {
677        // File in a directory that doesn't match the expected partition column
678        let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
679        let partition_cols = vec![("year_month".to_string(), DataType::Utf8)];
680        let meta = ObjectMeta {
681            location: Path::from("bucket/mytable/wrong_col=2024-01/data.parquet"),
682            last_modified: chrono::Utc::now(),
683            size: 100,
684            e_tag: None,
685            version: None,
686        };
687
688        let result =
689            try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
690        assert!(
691            result.is_none(),
692            "Files with wrong partition column name should be skipped"
693        );
694    }
695
696    #[test]
697    fn test_try_into_partitioned_file_multiple_partitions() {
698        let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
699        let partition_cols = vec![
700            ("year".to_string(), DataType::Utf8),
701            ("month".to_string(), DataType::Utf8),
702        ];
703        let meta = ObjectMeta {
704            location: Path::from("bucket/mytable/year=2024/month=01/data.parquet"),
705            last_modified: chrono::Utc::now(),
706            size: 100,
707            e_tag: None,
708            version: None,
709        };
710
711        let result =
712            try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
713        assert!(result.is_some());
714        let pf = result.unwrap();
715        assert_eq!(pf.partition_values.len(), 2);
716        assert_eq!(
717            pf.partition_values[0],
718            ScalarValue::Utf8(Some("2024".to_string()))
719        );
720        assert_eq!(
721            pf.partition_values[1],
722            ScalarValue::Utf8(Some("01".to_string()))
723        );
724    }
725
726    #[test]
727    fn test_try_into_partitioned_file_partial_partition_skipped() {
728        // File has first partition but not second — should be skipped
729        let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
730        let partition_cols = vec![
731            ("year".to_string(), DataType::Utf8),
732            ("month".to_string(), DataType::Utf8),
733        ];
734        let meta = ObjectMeta {
735            location: Path::from("bucket/mytable/year=2024/data.parquet"),
736            last_modified: chrono::Utc::now(),
737            size: 100,
738            e_tag: None,
739            version: None,
740        };
741
742        let result =
743            try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
744        // File has year=2024 but no month= directory — parse_partitions_for_path
745        // returns None because the path component "data.parquet" doesn't match
746        // the expected "month=..." pattern.
747        assert!(
748            result.is_none(),
749            "Files with incomplete partition structure should be skipped"
750        );
751    }
752
753    #[test]
754    fn test_expr_applicable_for_cols() {
755        assert!(expr_applicable_for_cols(
756            &["c1"],
757            &Expr::eq(col("c1"), lit("value"))
758        ));
759        assert!(!expr_applicable_for_cols(
760            &["c1"],
761            &Expr::eq(col("c2"), lit("value"))
762        ));
763        assert!(!expr_applicable_for_cols(
764            &["c1"],
765            &Expr::eq(col("c1"), col("c2"))
766        ));
767        assert!(expr_applicable_for_cols(
768            &["c1", "c2"],
769            &Expr::eq(col("c1"), col("c2"))
770        ));
771        assert!(expr_applicable_for_cols(
772            &["c1", "c2"],
773            &(Expr::eq(col("c1"), col("c2").alias("c2_alias"))).not()
774        ));
775        assert!(expr_applicable_for_cols(
776            &["c1", "c2"],
777            &(case(col("c1"))
778                .when(lit("v1"), lit(true))
779                .otherwise(lit(false))
780                .expect("valid case expr"))
781        ));
782        // static expression not relevant in this context but we
783        // test it as an edge case anyway in case we want to generalize
784        // this helper function
785        assert!(expr_applicable_for_cols(&[], &lit(true)));
786    }
787
788    #[test]
789    fn test_evaluate_partition_prefix() {
790        let partitions = &[
791            ("a".to_string(), DataType::Utf8),
792            ("b".to_string(), DataType::Int16),
793            ("c".to_string(), DataType::Boolean),
794        ];
795
796        assert_eq!(
797            evaluate_partition_prefix(partitions, &[col("a").eq(lit("foo"))]),
798            Some(Path::from("a=foo")),
799        );
800
801        assert_eq!(
802            evaluate_partition_prefix(partitions, &[lit("foo").eq(col("a"))]),
803            Some(Path::from("a=foo")),
804        );
805
806        assert_eq!(
807            evaluate_partition_prefix(
808                partitions,
809                &[col("a").eq(lit("foo")).and(col("b").eq(lit("bar")))],
810            ),
811            Some(Path::from("a=foo/b=bar")),
812        );
813
814        assert_eq!(
815            evaluate_partition_prefix(
816                partitions,
817                // list of filters should be evaluated as AND
818                &[col("a").eq(lit("foo")), col("b").eq(lit("bar")),],
819            ),
820            Some(Path::from("a=foo/b=bar")),
821        );
822
823        assert_eq!(
824            evaluate_partition_prefix(
825                partitions,
826                &[col("a")
827                    .eq(lit("foo"))
828                    .and(col("b").eq(lit("1")))
829                    .and(col("c").eq(lit("true")))],
830            ),
831            Some(Path::from("a=foo/b=1/c=true")),
832        );
833
834        // no prefix when filter is empty
835        assert_eq!(evaluate_partition_prefix(partitions, &[]), None);
836
837        // b=foo results in no prefix because a is not restricted
838        assert_eq!(
839            evaluate_partition_prefix(partitions, &[Expr::eq(col("b"), lit("foo"))]),
840            None,
841        );
842
843        // a=foo and c=baz only results in preifx a=foo because b is not restricted
844        assert_eq!(
845            evaluate_partition_prefix(
846                partitions,
847                &[col("a").eq(lit("foo")).and(col("c").eq(lit("baz")))],
848            ),
849            Some(Path::from("a=foo")),
850        );
851
852        // partition with multiple values results in no prefix
853        assert_eq!(
854            evaluate_partition_prefix(
855                partitions,
856                &[Expr::and(col("a").eq(lit("foo")), col("a").eq(lit("bar")))],
857            ),
858            None,
859        );
860
861        // no prefix because partition a is not restricted to a single literal
862        assert_eq!(
863            evaluate_partition_prefix(
864                partitions,
865                &[Expr::or(col("a").eq(lit("foo")), col("a").eq(lit("bar")))],
866            ),
867            None,
868        );
869        assert_eq!(
870            evaluate_partition_prefix(partitions, &[col("b").lt(lit(5))],),
871            None,
872        );
873    }
874
875    #[test]
876    fn test_evaluate_date_partition_prefix() {
877        let partitions = &[("a".to_string(), DataType::Date32)];
878        assert_eq!(
879            evaluate_partition_prefix(
880                partitions,
881                &[col("a").eq(Expr::Literal(ScalarValue::Date32(Some(3)), None))],
882            ),
883            Some(Path::from("a=1970-01-04")),
884        );
885
886        let partitions = &[("a".to_string(), DataType::Date64)];
887        assert_eq!(
888            evaluate_partition_prefix(
889                partitions,
890                &[col("a").eq(Expr::Literal(
891                    ScalarValue::Date64(Some(4 * 24 * 60 * 60 * 1000)),
892                    None
893                )),],
894            ),
895            Some(Path::from("a=1970-01-05")),
896        );
897    }
898}