Skip to main content

datafusion_datasource/
file_groups.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic for managing groups of [`PartitionedFile`]s in DataFusion
19
20use crate::{FileRange, PartitionedFile};
21use arrow::compute::SortOptions;
22use datafusion_common::Statistics;
23use datafusion_common::utils::compare_rows;
24use itertools::Itertools;
25use std::cmp::{Ordering, min};
26use std::collections::{BinaryHeap, HashMap};
27use std::iter::repeat_with;
28use std::mem;
29use std::ops::{Deref, DerefMut, Index, IndexMut};
30use std::sync::Arc;
31
32/// Repartition input files into `target_partitions` partitions, if total file size exceed
33/// `repartition_file_min_size`
34///
35/// This partitions evenly by file byte range, and does not have any knowledge
36/// of how data is laid out in specific files. The specific `FileOpener` are
37/// responsible for the actual partitioning on specific data source type. (e.g.
38/// the `CsvOpener` will read lines overlap with byte range as well as
39/// handle boundaries to ensure all lines will be read exactly once)
40///
41/// # Example
42///
43/// For example, if there are two files `A` and `B` that we wish to read with 4
44/// partitions (with 4 threads) they will be divided as follows:
45///
46/// ```text
47///                                    ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
48///                                      ┌─────────────────┐
49///                                    │ │                 │ │
50///                                      │     File A      │
51///                                    │ │  Range: 0-2MB   │ │
52///                                      │                 │
53///                                    │ └─────────────────┘ │
54///                                     ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
55/// ┌─────────────────┐                ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
56/// │                 │                  ┌─────────────────┐
57/// │                 │                │ │                 │ │
58/// │                 │                  │     File A      │
59/// │                 │                │ │   Range 2-4MB   │ │
60/// │                 │                  │                 │
61/// │                 │                │ └─────────────────┘ │
62/// │  File A (7MB)   │   ────────▶     ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
63/// │                 │                ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
64/// │                 │                  ┌─────────────────┐
65/// │                 │                │ │                 │ │
66/// │                 │                  │     File A      │
67/// │                 │                │ │  Range: 4-6MB   │ │
68/// │                 │                  │                 │
69/// │                 │                │ └─────────────────┘ │
70/// └─────────────────┘                 ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
71/// ┌─────────────────┐                ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
72/// │  File B (1MB)   │                  ┌─────────────────┐
73/// │                 │                │ │     File A      │ │
74/// └─────────────────┘                  │  Range: 6-7MB   │
75///                                    │ └─────────────────┘ │
76///                                      ┌─────────────────┐
77///                                    │ │  File B (1MB)   │ │
78///                                      │                 │
79///                                    │ └─────────────────┘ │
80///                                     ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
81///
82///                                    If target_partitions = 4,
83///                                      divides into 4 groups
84/// ```
85///
86/// # Maintaining Order
87///
88/// Within each group files are read sequentially. Thus, if the overall order of
89/// tuples must be preserved, multiple files can not be mixed in the same group.
90///
91/// In this case, the code will split the largest files evenly into any
92/// available empty groups, but the overall distribution may not be as even
93/// as if the order did not need to be preserved.
94///
95/// ```text
96///                                   ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
97///                                      ┌─────────────────┐
98///                                    │ │                 │ │
99///                                      │     File A      │
100///                                    │ │  Range: 0-2MB   │ │
101///                                      │                 │
102/// ┌─────────────────┐                │ └─────────────────┘ │
103/// │                 │                 ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
104/// │                 │                ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
105/// │                 │                  ┌─────────────────┐
106/// │                 │                │ │                 │ │
107/// │                 │                  │     File A      │
108/// │                 │                │ │   Range 2-4MB   │ │
109/// │  File A (6MB)   │   ────────▶      │                 │
110/// │    (ordered)    │                │ └─────────────────┘ │
111/// │                 │                 ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
112/// │                 │                ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
113/// │                 │                  ┌─────────────────┐
114/// │                 │                │ │                 │ │
115/// │                 │                  │     File A      │
116/// │                 │                │ │  Range: 4-6MB   │ │
117/// └─────────────────┘                  │                 │
118/// ┌─────────────────┐                │ └─────────────────┘ │
119/// │  File B (1MB)   │                 ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
120/// │    (ordered)    │                ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
121/// └─────────────────┘                  ┌─────────────────┐
122///                                    │ │  File B (1MB)   │ │
123///                                      │                 │
124///                                    │ └─────────────────┘ │
125///                                     ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
126///
127///                                    If target_partitions = 4,
128///                                      divides into 4 groups
129/// ```
130#[derive(Debug, Clone, Copy)]
131pub struct FileGroupPartitioner {
132    /// how many partitions should be created
133    target_partitions: usize,
134    /// the minimum size for a file to be repartitioned.
135    repartition_file_min_size: usize,
136    /// if the order when reading the files must be preserved
137    preserve_order_within_groups: bool,
138}
139
140impl Default for FileGroupPartitioner {
141    fn default() -> Self {
142        Self::new()
143    }
144}
145
146impl FileGroupPartitioner {
147    /// Creates a new [`FileGroupPartitioner`] with default values:
148    /// 1. `target_partitions = 1`
149    /// 2. `repartition_file_min_size = 10MB`
150    /// 3. `preserve_order_within_groups = false`
151    pub fn new() -> Self {
152        Self {
153            target_partitions: 1,
154            repartition_file_min_size: 10 * 1024 * 1024,
155            preserve_order_within_groups: false,
156        }
157    }
158
159    /// Set the target partitions
160    pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
161        self.target_partitions = target_partitions;
162        self
163    }
164
165    /// Set the minimum size at which to repartition a file
166    pub fn with_repartition_file_min_size(
167        mut self,
168        repartition_file_min_size: usize,
169    ) -> Self {
170        self.repartition_file_min_size = repartition_file_min_size;
171        self
172    }
173
174    /// Set whether the order of tuples within a file must be preserved
175    pub fn with_preserve_order_within_groups(
176        mut self,
177        preserve_order_within_groups: bool,
178    ) -> Self {
179        self.preserve_order_within_groups = preserve_order_within_groups;
180        self
181    }
182
183    /// Repartition input files according to the settings on this [`FileGroupPartitioner`].
184    ///
185    /// If no repartitioning is needed or possible, return `None`.
186    pub fn repartition_file_groups(
187        &self,
188        file_groups: &[FileGroup],
189    ) -> Option<Vec<FileGroup>> {
190        if file_groups.is_empty() {
191            return None;
192        }
193
194        //  special case when order must be preserved
195        if self.preserve_order_within_groups {
196            self.repartition_preserving_order(file_groups)
197        } else {
198            self.repartition_evenly_by_size(file_groups)
199        }
200    }
201
202    /// Evenly repartition files across partitions by size, ignoring any
203    /// existing grouping / ordering
204    fn repartition_evenly_by_size(
205        &self,
206        file_groups: &[FileGroup],
207    ) -> Option<Vec<FileGroup>> {
208        let target_partitions = self.target_partitions;
209        let repartition_file_min_size = self.repartition_file_min_size;
210        let flattened_files = file_groups.iter().flat_map(FileGroup::iter).collect_vec();
211
212        let total_size = flattened_files
213            .iter()
214            .map(|f| f.effective_size())
215            .sum::<u64>();
216        if total_size < (repartition_file_min_size as u64) || total_size == 0 {
217            return None;
218        }
219
220        let target_partition_size = total_size.div_ceil(target_partitions as u64);
221
222        let current_partition_index: usize = 0;
223        let current_partition_size: u64 = 0;
224
225        // Partition byte range evenly for all `PartitionedFile`s
226        let repartitioned_files = flattened_files
227            .into_iter()
228            .scan(
229                (current_partition_index, current_partition_size),
230                |(current_partition_index, current_partition_size), source_file| {
231                    let mut produced_files = vec![];
232                    let (mut range_start, file_end) = source_file.range();
233                    while range_start < file_end {
234                        let range_end = min(
235                            range_start
236                                + (target_partition_size - *current_partition_size),
237                            file_end,
238                        );
239
240                        let mut produced_file = source_file.clone();
241                        produced_file.range = Some(FileRange {
242                            start: range_start as i64,
243                            end: range_end as i64,
244                        });
245                        produced_files.push((*current_partition_index, produced_file));
246
247                        if *current_partition_size + (range_end - range_start)
248                            >= target_partition_size
249                        {
250                            *current_partition_index += 1;
251                            *current_partition_size = 0;
252                        } else {
253                            *current_partition_size += range_end - range_start;
254                        }
255                        range_start = range_end;
256                    }
257                    Some(produced_files)
258                },
259            )
260            .flatten()
261            .chunk_by(|(partition_idx, _)| *partition_idx)
262            .into_iter()
263            .map(|(_, group)| FileGroup::new(group.map(|(_, vals)| vals).collect_vec()))
264            .collect_vec();
265
266        Some(repartitioned_files)
267    }
268
269    /// Redistribute file groups across size preserving order
270    fn repartition_preserving_order(
271        &self,
272        file_groups: &[FileGroup],
273    ) -> Option<Vec<FileGroup>> {
274        // Can't repartition and preserve order if there are more groups
275        // than partitions
276        if file_groups.len() >= self.target_partitions {
277            return None;
278        }
279        let num_new_groups = self.target_partitions - file_groups.len();
280
281        // If there is only a single file
282        if file_groups.len() == 1 && file_groups[0].len() == 1 {
283            return self.repartition_evenly_by_size(file_groups);
284        }
285
286        // Find which files could be split (single file groups)
287        let mut heap: BinaryHeap<_> = file_groups
288            .iter()
289            .enumerate()
290            .filter_map(|(group_index, group)| {
291                // ignore groups that do not have exactly 1 file
292                if group.len() == 1 {
293                    Some(ToRepartition {
294                        source_index: group_index,
295                        file_size: group[0].effective_size(),
296                        new_groups: vec![group_index],
297                    })
298                } else {
299                    None
300                }
301            })
302            .map(CompareByRangeSize)
303            .collect();
304
305        // No files can be redistributed
306        if heap.is_empty() {
307            return None;
308        }
309
310        // Add new empty groups to which we will redistribute ranges of existing files
311        // Add new empty groups to which we will redistribute ranges of existing files
312        let mut file_groups: Vec<_> = file_groups
313            .iter()
314            .cloned()
315            .chain(repeat_with(|| FileGroup::new(Vec::new())).take(num_new_groups))
316            .collect();
317
318        // Divide up empty groups
319        for (group_index, group) in file_groups.iter().enumerate() {
320            if !group.is_empty() {
321                continue;
322            }
323            // Pick the file that has the largest ranges to read so far
324            let mut largest_group = heap.pop().unwrap();
325            largest_group.new_groups.push(group_index);
326            heap.push(largest_group);
327        }
328
329        // Distribute files to their newly assigned groups
330        while let Some(to_repartition) = heap.pop() {
331            let range_size = to_repartition.range_size();
332            let ToRepartition {
333                source_index,
334                file_size: _,
335                new_groups,
336            } = to_repartition.into_inner();
337            assert_eq!(file_groups[source_index].len(), 1);
338            let original_file = file_groups[source_index].pop().unwrap();
339
340            let last_group = new_groups.len() - 1;
341            let (mut range_start, file_end) = original_file.range();
342            let mut range_end = range_start + range_size;
343            for (i, group_index) in new_groups.into_iter().enumerate() {
344                let target_group = &mut file_groups[group_index];
345                assert!(target_group.is_empty());
346
347                // adjust last range to include the entire file
348                if i == last_group {
349                    range_end = file_end;
350                }
351                target_group.push(
352                    original_file
353                        .clone()
354                        .with_range(range_start as i64, range_end as i64),
355                );
356                range_start = range_end;
357                range_end += range_size;
358            }
359        }
360
361        Some(file_groups)
362    }
363}
364
365/// Represents a group of partitioned files that'll be processed by a single thread.
366/// Maintains optional statistics across all files in the group.
367///
368/// # Statistics
369///
370/// The group-level [`FileGroup::file_statistics`] field contains merged statistics from all files
371/// in the group for the **full table schema** (file columns + partition columns).
372///
373/// Partition column statistics are derived from the individual file partition values:
374/// - `min` = minimum partition value across all files in the group
375/// - `max` = maximum partition value across all files in the group
376/// - `null_count` = 0 (partition values are never null)
377///
378/// This allows query optimizers to prune entire file groups based on partition bounds.
379#[derive(Debug, Clone)]
380pub struct FileGroup {
381    /// The files in this group
382    files: Vec<PartitionedFile>,
383    /// Optional statistics for the data across all files in the group.
384    ///
385    /// These statistics cover the full table schema: file columns plus partition columns.
386    /// Partition column statistics are merged from individual [`PartitionedFile::statistics`],
387    /// which compute exact values from [`PartitionedFile::partition_values`].
388    statistics: Option<Arc<Statistics>>,
389}
390
391impl FileGroup {
392    /// Creates a new FileGroup from a vector of PartitionedFile objects
393    pub fn new(files: Vec<PartitionedFile>) -> Self {
394        Self {
395            files,
396            statistics: None,
397        }
398    }
399
400    /// Returns the number of files in this group
401    pub fn len(&self) -> usize {
402        self.files.len()
403    }
404
405    /// Set the statistics for this group
406    pub fn with_statistics(mut self, statistics: Arc<Statistics>) -> Self {
407        self.statistics = Some(statistics);
408        self
409    }
410
411    /// Returns a slice of the files in this group
412    pub fn files(&self) -> &[PartitionedFile] {
413        &self.files
414    }
415
416    pub fn iter(&self) -> impl Iterator<Item = &PartitionedFile> {
417        self.files.iter()
418    }
419
420    pub fn into_inner(self) -> Vec<PartitionedFile> {
421        self.files
422    }
423
424    pub fn is_empty(&self) -> bool {
425        self.files.is_empty()
426    }
427
428    /// Removes the last element from the files vector and returns it, or None if empty
429    pub fn pop(&mut self) -> Option<PartitionedFile> {
430        self.files.pop()
431    }
432
433    /// Adds a file to the group
434    pub fn push(&mut self, partitioned_file: PartitionedFile) {
435        self.files.push(partitioned_file);
436    }
437
438    /// Get the specific file statistics for the given index
439    /// If the index is None, return the `FileGroup` statistics
440    pub fn file_statistics(&self, index: Option<usize>) -> Option<&Statistics> {
441        if let Some(index) = index {
442            self.files.get(index).and_then(|f| f.statistics.as_deref())
443        } else {
444            self.statistics.as_deref()
445        }
446    }
447
448    /// Get the mutable reference to the statistics for this group
449    pub fn statistics_mut(&mut self) -> Option<&mut Statistics> {
450        self.statistics.as_mut().map(Arc::make_mut)
451    }
452
453    /// Partition the list of files into `n` groups
454    pub fn split_files(mut self, n: usize) -> Vec<FileGroup> {
455        if self.is_empty() {
456            return vec![];
457        }
458
459        // ObjectStore::list does not guarantee any consistent order and for some
460        // implementations such as LocalFileSystem, it may be inconsistent. Thus
461        // Sort files by path to ensure consistent plans when run more than once.
462        self.files.sort_by(|a, b| a.path().cmp(b.path()));
463
464        // effectively this is div with rounding up instead of truncating
465        let chunk_size = self.len().div_ceil(n);
466        let mut chunks = Vec::with_capacity(n);
467        let mut current_chunk = Vec::with_capacity(chunk_size);
468        for file in self.files.drain(..) {
469            current_chunk.push(file);
470            if current_chunk.len() == chunk_size {
471                let full_chunk = FileGroup::new(mem::replace(
472                    &mut current_chunk,
473                    Vec::with_capacity(chunk_size),
474                ));
475                chunks.push(full_chunk);
476            }
477        }
478
479        if !current_chunk.is_empty() {
480            chunks.push(FileGroup::new(current_chunk))
481        }
482
483        chunks
484    }
485
486    /// Groups files by their partition values, ensuring all files with same
487    /// partition values are in the same group.
488    ///
489    /// Note: May return fewer groups than `max_target_partitions` when the
490    /// number of unique partition values is less than the target.
491    #[allow(clippy::allow_attributes, clippy::mutable_key_type)] // ScalarValue has interior mutability but is intentionally used as hash key
492    pub fn group_by_partition_values(
493        self,
494        max_target_partitions: usize,
495    ) -> Vec<FileGroup> {
496        if self.is_empty() || max_target_partitions == 0 {
497            return vec![];
498        }
499
500        let mut partition_groups: HashMap<
501            Vec<datafusion_common::ScalarValue>,
502            Vec<PartitionedFile>,
503        > = HashMap::new();
504
505        for file in self.files {
506            partition_groups
507                .entry(file.partition_values.clone())
508                .or_default()
509                .push(file);
510        }
511
512        let num_unique_partitions = partition_groups.len();
513
514        // Sort for deterministic bucket assignment across query executions.
515        let mut sorted_partitions: Vec<_> = partition_groups.into_iter().collect();
516        let sort_options =
517            vec![
518                SortOptions::default();
519                sorted_partitions.first().map(|(k, _)| k.len()).unwrap_or(0)
520            ];
521        sorted_partitions.sort_by(|a, b| {
522            compare_rows(&a.0, &b.0, &sort_options).unwrap_or(Ordering::Equal)
523        });
524
525        if num_unique_partitions <= max_target_partitions {
526            sorted_partitions
527                .into_iter()
528                .map(|(_, files)| FileGroup::new(files))
529                .collect()
530        } else {
531            // Merge into max_target_partitions buckets using round-robin.
532            // This maintains grouping by partition value as we are merging groups which already
533            // contain all values for a partition key.
534            let mut target_groups = vec![vec![]; max_target_partitions];
535
536            for (idx, (_, files)) in sorted_partitions.into_iter().enumerate() {
537                let bucket = idx % max_target_partitions;
538                target_groups[bucket].extend(files);
539            }
540
541            target_groups.into_iter().map(FileGroup::new).collect()
542        }
543    }
544}
545
546impl Index<usize> for FileGroup {
547    type Output = PartitionedFile;
548
549    fn index(&self, index: usize) -> &Self::Output {
550        &self.files[index]
551    }
552}
553
554impl IndexMut<usize> for FileGroup {
555    fn index_mut(&mut self, index: usize) -> &mut Self::Output {
556        &mut self.files[index]
557    }
558}
559
560impl FromIterator<PartitionedFile> for FileGroup {
561    fn from_iter<I: IntoIterator<Item = PartitionedFile>>(iter: I) -> Self {
562        let files = iter.into_iter().collect();
563        FileGroup::new(files)
564    }
565}
566
567impl From<Vec<PartitionedFile>> for FileGroup {
568    fn from(files: Vec<PartitionedFile>) -> Self {
569        FileGroup::new(files)
570    }
571}
572
573impl Default for FileGroup {
574    fn default() -> Self {
575        Self::new(Vec::new())
576    }
577}
578
579/// Tracks how a individual file will be repartitioned
580#[derive(Debug, Clone)]
581struct ToRepartition {
582    /// the index from which the original file will be taken
583    source_index: usize,
584    /// the size of the original file
585    file_size: u64,
586    /// indexes of which group(s) will this be distributed to (including `source_index`)
587    new_groups: Vec<usize>,
588}
589
590impl ToRepartition {
591    /// How big will each file range be when this file is read in its new groups?
592    fn range_size(&self) -> u64 {
593        self.file_size / (self.new_groups.len() as u64)
594    }
595}
596
597struct CompareByRangeSize(ToRepartition);
598impl CompareByRangeSize {
599    fn into_inner(self) -> ToRepartition {
600        self.0
601    }
602}
603impl Ord for CompareByRangeSize {
604    fn cmp(&self, other: &Self) -> Ordering {
605        self.0.range_size().cmp(&other.0.range_size())
606    }
607}
608impl PartialOrd for CompareByRangeSize {
609    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
610        Some(self.cmp(other))
611    }
612}
613impl PartialEq for CompareByRangeSize {
614    fn eq(&self, other: &Self) -> bool {
615        // PartialEq must be consistent with PartialOrd
616        self.cmp(other) == Ordering::Equal
617    }
618}
619impl Eq for CompareByRangeSize {}
620impl Deref for CompareByRangeSize {
621    type Target = ToRepartition;
622    fn deref(&self) -> &Self::Target {
623        &self.0
624    }
625}
626impl DerefMut for CompareByRangeSize {
627    fn deref_mut(&mut self) -> &mut Self::Target {
628        &mut self.0
629    }
630}
631
632#[cfg(test)]
633mod test {
634    use super::*;
635    use datafusion_common::ScalarValue;
636
637    /// Empty file won't get partitioned
638    #[test]
639    fn repartition_empty_file_only() {
640        let partitioned_file_empty = pfile("empty", 0);
641        let file_group = vec![FileGroup::new(vec![partitioned_file_empty])];
642
643        let partitioned_files = FileGroupPartitioner::new()
644            .with_target_partitions(4)
645            .with_repartition_file_min_size(0)
646            .repartition_file_groups(&file_group);
647
648        assert_partitioned_files(None, partitioned_files);
649    }
650
651    /// Repartition when there is a empty file in file groups
652    #[test]
653    fn repartition_empty_files() {
654        let pfile_a = pfile("a", 10);
655        let pfile_b = pfile("b", 10);
656        let pfile_empty = pfile("empty", 0);
657
658        let empty_first = vec![
659            FileGroup::new(vec![pfile_empty.clone()]),
660            FileGroup::new(vec![pfile_a.clone()]),
661            FileGroup::new(vec![pfile_b.clone()]),
662        ];
663        let empty_middle = vec![
664            FileGroup::new(vec![pfile_a.clone()]),
665            FileGroup::new(vec![pfile_empty.clone()]),
666            FileGroup::new(vec![pfile_b.clone()]),
667        ];
668        let empty_last = vec![
669            FileGroup::new(vec![pfile_a]),
670            FileGroup::new(vec![pfile_b]),
671            FileGroup::new(vec![pfile_empty]),
672        ];
673
674        // Repartition file groups into x partitions
675        let expected_2 = vec![
676            FileGroup::new(vec![pfile("a", 10).with_range(0, 10)]),
677            FileGroup::new(vec![pfile("b", 10).with_range(0, 10)]),
678        ];
679        let expected_3 = vec![
680            FileGroup::new(vec![pfile("a", 10).with_range(0, 7)]),
681            FileGroup::new(vec![
682                pfile("a", 10).with_range(7, 10),
683                pfile("b", 10).with_range(0, 4),
684            ]),
685            FileGroup::new(vec![pfile("b", 10).with_range(4, 10)]),
686        ];
687
688        let file_groups_tests = [empty_first, empty_middle, empty_last];
689
690        for fg in file_groups_tests {
691            let all_expected = [(2, expected_2.clone()), (3, expected_3.clone())];
692            for (n_partition, expected) in all_expected {
693                let actual = FileGroupPartitioner::new()
694                    .with_target_partitions(n_partition)
695                    .with_repartition_file_min_size(10)
696                    .repartition_file_groups(&fg);
697
698                assert_partitioned_files(Some(expected), actual);
699            }
700        }
701    }
702
703    #[test]
704    fn repartition_single_file() {
705        // Single file, single partition into multiple partitions
706        let single_partition = vec![FileGroup::new(vec![pfile("a", 123)])];
707
708        let actual = FileGroupPartitioner::new()
709            .with_target_partitions(4)
710            .with_repartition_file_min_size(10)
711            .repartition_file_groups(&single_partition);
712
713        let expected = Some(vec![
714            FileGroup::new(vec![pfile("a", 123).with_range(0, 31)]),
715            FileGroup::new(vec![pfile("a", 123).with_range(31, 62)]),
716            FileGroup::new(vec![pfile("a", 123).with_range(62, 93)]),
717            FileGroup::new(vec![pfile("a", 123).with_range(93, 123)]),
718        ]);
719        assert_partitioned_files(expected, actual);
720    }
721
722    #[test]
723    fn repartition_single_file_with_range() {
724        // Single file, single partition into multiple partitions
725        let single_partition =
726            vec![FileGroup::new(vec![pfile("a", 123).with_range(0, 123)])];
727
728        let actual = FileGroupPartitioner::new()
729            .with_target_partitions(4)
730            .with_repartition_file_min_size(10)
731            .repartition_file_groups(&single_partition);
732
733        let expected = Some(vec![
734            FileGroup::new(vec![pfile("a", 123).with_range(0, 31)]),
735            FileGroup::new(vec![pfile("a", 123).with_range(31, 62)]),
736            FileGroup::new(vec![pfile("a", 123).with_range(62, 93)]),
737            FileGroup::new(vec![pfile("a", 123).with_range(93, 123)]),
738        ]);
739        assert_partitioned_files(expected, actual);
740    }
741
742    #[test]
743    fn repartition_single_file_with_incomplete_range() {
744        // Single file, single partition into multiple partitions
745        let single_partition =
746            vec![FileGroup::new(vec![pfile("a", 123).with_range(10, 100)])];
747
748        let actual = FileGroupPartitioner::new()
749            .with_target_partitions(4)
750            .with_repartition_file_min_size(10)
751            .repartition_file_groups(&single_partition);
752
753        let expected = Some(vec![
754            FileGroup::new(vec![pfile("a", 123).with_range(10, 33)]),
755            FileGroup::new(vec![pfile("a", 123).with_range(33, 56)]),
756            FileGroup::new(vec![pfile("a", 123).with_range(56, 79)]),
757            FileGroup::new(vec![pfile("a", 123).with_range(79, 100)]),
758        ]);
759        assert_partitioned_files(expected, actual);
760    }
761
762    #[test]
763    fn repartition_single_file_duplicated_with_range() {
764        // Single file, two partitions into multiple partitions
765        let single_partition = vec![FileGroup::new(vec![
766            pfile("a", 100).with_range(0, 50),
767            pfile("a", 100).with_range(50, 100),
768        ])];
769
770        let actual = FileGroupPartitioner::new()
771            .with_target_partitions(4)
772            .with_repartition_file_min_size(10)
773            .repartition_file_groups(&single_partition);
774
775        let expected = Some(vec![
776            FileGroup::new(vec![pfile("a", 100).with_range(0, 25)]),
777            FileGroup::new(vec![pfile("a", 100).with_range(25, 50)]),
778            FileGroup::new(vec![pfile("a", 100).with_range(50, 75)]),
779            FileGroup::new(vec![pfile("a", 100).with_range(75, 100)]),
780        ]);
781        assert_partitioned_files(expected, actual);
782    }
783
784    #[test]
785    fn repartition_too_much_partitions() {
786        // Single file, single partition into 96 partitions
787        let partitioned_file = pfile("a", 8);
788        let single_partition = vec![FileGroup::new(vec![partitioned_file])];
789
790        let actual = FileGroupPartitioner::new()
791            .with_target_partitions(96)
792            .with_repartition_file_min_size(5)
793            .repartition_file_groups(&single_partition);
794
795        let expected = Some(vec![
796            FileGroup::new(vec![pfile("a", 8).with_range(0, 1)]),
797            FileGroup::new(vec![pfile("a", 8).with_range(1, 2)]),
798            FileGroup::new(vec![pfile("a", 8).with_range(2, 3)]),
799            FileGroup::new(vec![pfile("a", 8).with_range(3, 4)]),
800            FileGroup::new(vec![pfile("a", 8).with_range(4, 5)]),
801            FileGroup::new(vec![pfile("a", 8).with_range(5, 6)]),
802            FileGroup::new(vec![pfile("a", 8).with_range(6, 7)]),
803            FileGroup::new(vec![pfile("a", 8).with_range(7, 8)]),
804        ]);
805
806        assert_partitioned_files(expected, actual);
807    }
808
809    #[test]
810    fn repartition_multiple_partitions() {
811        // Multiple files in single partition after redistribution
812        let source_partitions = vec![
813            FileGroup::new(vec![pfile("a", 40)]),
814            FileGroup::new(vec![pfile("b", 60)]),
815        ];
816
817        let actual = FileGroupPartitioner::new()
818            .with_target_partitions(3)
819            .with_repartition_file_min_size(10)
820            .repartition_file_groups(&source_partitions);
821
822        let expected = Some(vec![
823            FileGroup::new(vec![pfile("a", 40).with_range(0, 34)]),
824            FileGroup::new(vec![
825                pfile("a", 40).with_range(34, 40),
826                pfile("b", 60).with_range(0, 28),
827            ]),
828            FileGroup::new(vec![pfile("b", 60).with_range(28, 60)]),
829        ]);
830        assert_partitioned_files(expected, actual);
831    }
832
833    #[test]
834    fn repartition_same_num_partitions() {
835        // "Rebalance" files across partitions
836        let source_partitions = vec![
837            FileGroup::new(vec![pfile("a", 40)]),
838            FileGroup::new(vec![pfile("b", 60)]),
839        ];
840
841        let actual = FileGroupPartitioner::new()
842            .with_target_partitions(2)
843            .with_repartition_file_min_size(10)
844            .repartition_file_groups(&source_partitions);
845
846        let expected = Some(vec![
847            FileGroup::new(vec![
848                pfile("a", 40).with_range(0, 40),
849                pfile("b", 60).with_range(0, 10),
850            ]),
851            FileGroup::new(vec![pfile("b", 60).with_range(10, 60)]),
852        ]);
853        assert_partitioned_files(expected, actual);
854    }
855
856    #[test]
857    fn repartition_no_action_min_size() {
858        // No action due to target_partition_size
859        let single_partition = vec![FileGroup::new(vec![pfile("a", 123)])];
860
861        let actual = FileGroupPartitioner::new()
862            .with_target_partitions(65)
863            .with_repartition_file_min_size(500)
864            .repartition_file_groups(&single_partition);
865
866        assert_partitioned_files(None, actual)
867    }
868
869    #[test]
870    fn repartition_no_action_zero_files() {
871        // No action due to no files
872        let empty_partition = vec![];
873
874        let partitioner = FileGroupPartitioner::new()
875            .with_target_partitions(65)
876            .with_repartition_file_min_size(500);
877
878        assert_partitioned_files(None, repartition_test(partitioner, empty_partition))
879    }
880
881    #[test]
882    fn repartition_ordered_no_action_too_few_partitions() {
883        // No action as there are no new groups to redistribute to
884        let input_partitions = vec![
885            FileGroup::new(vec![pfile("a", 100)]),
886            FileGroup::new(vec![pfile("b", 200)]),
887        ];
888
889        let actual = FileGroupPartitioner::new()
890            .with_preserve_order_within_groups(true)
891            .with_target_partitions(2)
892            .with_repartition_file_min_size(10)
893            .repartition_file_groups(&input_partitions);
894
895        assert_partitioned_files(None, actual)
896    }
897
898    #[test]
899    fn repartition_ordered_no_action_file_too_small() {
900        // No action as there are no new groups to redistribute to
901        let single_partition = vec![FileGroup::new(vec![pfile("a", 100)])];
902
903        let actual = FileGroupPartitioner::new()
904            .with_preserve_order_within_groups(true)
905            .with_target_partitions(2)
906            // file is too small to repartition
907            .with_repartition_file_min_size(1000)
908            .repartition_file_groups(&single_partition);
909
910        assert_partitioned_files(None, actual)
911    }
912
913    #[test]
914    fn repartition_ordered_one_large_file() {
915        // "Rebalance" the single large file across partitions
916        let source_partitions = vec![FileGroup::new(vec![pfile("a", 100)])];
917
918        let actual = FileGroupPartitioner::new()
919            .with_preserve_order_within_groups(true)
920            .with_target_partitions(3)
921            .with_repartition_file_min_size(10)
922            .repartition_file_groups(&source_partitions);
923
924        let expected = Some(vec![
925            FileGroup::new(vec![pfile("a", 100).with_range(0, 34)]),
926            FileGroup::new(vec![pfile("a", 100).with_range(34, 68)]),
927            FileGroup::new(vec![pfile("a", 100).with_range(68, 100)]),
928        ]);
929        assert_partitioned_files(expected, actual);
930    }
931
932    #[test]
933    fn repartition_ordered_one_large_file_with_range() {
934        // "Rebalance" the single large file across partitions
935        let source_partitions =
936            vec![FileGroup::new(vec![pfile("a", 100).with_range(0, 100)])];
937
938        let actual = FileGroupPartitioner::new()
939            .with_preserve_order_within_groups(true)
940            .with_target_partitions(3)
941            .with_repartition_file_min_size(10)
942            .repartition_file_groups(&source_partitions);
943
944        let expected = Some(vec![
945            FileGroup::new(vec![pfile("a", 100).with_range(0, 34)]),
946            FileGroup::new(vec![pfile("a", 100).with_range(34, 68)]),
947            FileGroup::new(vec![pfile("a", 100).with_range(68, 100)]),
948        ]);
949        assert_partitioned_files(expected, actual);
950    }
951
952    #[test]
953    fn repartition_ordered_one_large_one_small_file() {
954        // "Rebalance" the single large file across empty partitions, but can't split
955        // small file
956        let source_partitions = vec![
957            FileGroup::new(vec![pfile("a", 100)]),
958            FileGroup::new(vec![pfile("b", 30)]),
959        ];
960
961        let actual = FileGroupPartitioner::new()
962            .with_preserve_order_within_groups(true)
963            .with_target_partitions(4)
964            .with_repartition_file_min_size(10)
965            .repartition_file_groups(&source_partitions);
966
967        let expected = Some(vec![
968            // scan first third of "a"
969            FileGroup::new(vec![pfile("a", 100).with_range(0, 33)]),
970            // only b in this group (can't do this)
971            FileGroup::new(vec![pfile("b", 30).with_range(0, 30)]),
972            // second third of "a"
973            FileGroup::new(vec![pfile("a", 100).with_range(33, 66)]),
974            // final third of "a"
975            FileGroup::new(vec![pfile("a", 100).with_range(66, 100)]),
976        ]);
977        assert_partitioned_files(expected, actual);
978    }
979
980    #[test]
981    fn repartition_ordered_one_large_one_small_file_with_full_range() {
982        // "Rebalance" the single large file across empty partitions, but can't split
983        // small file
984        let source_partitions = vec![
985            FileGroup::new(vec![pfile("a", 100).with_range(0, 100)]),
986            FileGroup::new(vec![pfile("b", 30)]),
987        ];
988
989        let actual = FileGroupPartitioner::new()
990            .with_preserve_order_within_groups(true)
991            .with_target_partitions(4)
992            .with_repartition_file_min_size(10)
993            .repartition_file_groups(&source_partitions);
994
995        let expected = Some(vec![
996            // scan first third of "a"
997            FileGroup::new(vec![pfile("a", 100).with_range(0, 33)]),
998            // only b in this group (can't do this)
999            FileGroup::new(vec![pfile("b", 30).with_range(0, 30)]),
1000            // second third of "a"
1001            FileGroup::new(vec![pfile("a", 100).with_range(33, 66)]),
1002            // final third of "a"
1003            FileGroup::new(vec![pfile("a", 100).with_range(66, 100)]),
1004        ]);
1005        assert_partitioned_files(expected, actual);
1006    }
1007
1008    #[test]
1009    fn repartition_ordered_one_large_one_small_file_with_split_range() {
1010        // "Rebalance" the single large file across empty partitions, but can't split
1011        // small file
1012        let source_partitions = vec![
1013            FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
1014            FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
1015            FileGroup::new(vec![pfile("b", 30)]),
1016        ];
1017
1018        let actual = FileGroupPartitioner::new()
1019            .with_preserve_order_within_groups(true)
1020            .with_target_partitions(4)
1021            .with_repartition_file_min_size(10)
1022            .repartition_file_groups(&source_partitions);
1023
1024        let expected = Some(vec![
1025            // scan first half of first "a"
1026            FileGroup::new(vec![pfile("a", 100).with_range(0, 25)]),
1027            // second "a" fully (not split)
1028            FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
1029            // only b in this group (can't do this)
1030            FileGroup::new(vec![pfile("b", 30).with_range(0, 30)]),
1031            // second half of first "a"
1032            FileGroup::new(vec![pfile("a", 100).with_range(25, 50)]),
1033        ]);
1034        assert_partitioned_files(expected, actual);
1035    }
1036
1037    #[test]
1038    fn repartition_ordered_one_large_one_small_file_with_non_full_range() {
1039        // "Rebalance" the single large file across empty partitions, but can't split
1040        // small file
1041        let source_partitions = vec![
1042            FileGroup::new(vec![pfile("a", 100).with_range(20, 80)]),
1043            FileGroup::new(vec![pfile("b", 30).with_range(5, 25)]),
1044        ];
1045
1046        let actual = FileGroupPartitioner::new()
1047            .with_preserve_order_within_groups(true)
1048            .with_target_partitions(4)
1049            .with_repartition_file_min_size(10)
1050            .repartition_file_groups(&source_partitions);
1051
1052        let expected = Some(vec![
1053            // scan first third of "a"
1054            FileGroup::new(vec![pfile("a", 100).with_range(20, 40)]),
1055            // only b in this group (can't split this)
1056            FileGroup::new(vec![pfile("b", 30).with_range(5, 25)]),
1057            // second third of "a"
1058            FileGroup::new(vec![pfile("a", 100).with_range(40, 60)]),
1059            // final third of "a"
1060            FileGroup::new(vec![pfile("a", 100).with_range(60, 80)]),
1061        ]);
1062        assert_partitioned_files(expected, actual);
1063    }
1064
1065    #[test]
1066    fn repartition_ordered_two_large_files() {
1067        // "Rebalance" two large files across empty partitions, but can't mix them
1068        let source_partitions = vec![
1069            FileGroup::new(vec![pfile("a", 100)]),
1070            FileGroup::new(vec![pfile("b", 100)]),
1071        ];
1072
1073        let actual = FileGroupPartitioner::new()
1074            .with_preserve_order_within_groups(true)
1075            .with_target_partitions(4)
1076            .with_repartition_file_min_size(10)
1077            .repartition_file_groups(&source_partitions);
1078
1079        let expected = Some(vec![
1080            // scan first half of "a"
1081            FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
1082            // scan first half of "b"
1083            FileGroup::new(vec![pfile("b", 100).with_range(0, 50)]),
1084            // second half of "a"
1085            FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
1086            // second half of "b"
1087            FileGroup::new(vec![pfile("b", 100).with_range(50, 100)]),
1088        ]);
1089        assert_partitioned_files(expected, actual);
1090    }
1091
1092    #[test]
1093    fn repartition_ordered_two_large_one_small_files() {
1094        // "Rebalance" two large files and one small file across empty partitions
1095        let source_partitions = vec![
1096            FileGroup::new(vec![pfile("a", 100)]),
1097            FileGroup::new(vec![pfile("b", 100)]),
1098            FileGroup::new(vec![pfile("c", 30)]),
1099        ];
1100
1101        let partitioner = FileGroupPartitioner::new()
1102            .with_preserve_order_within_groups(true)
1103            .with_repartition_file_min_size(10);
1104
1105        // with 4 partitions, can only split the first large file "a"
1106        let actual = partitioner
1107            .with_target_partitions(4)
1108            .repartition_file_groups(&source_partitions);
1109
1110        let expected = Some(vec![
1111            // scan first half of "a"
1112            FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
1113            // All of "b"
1114            FileGroup::new(vec![pfile("b", 100).with_range(0, 100)]),
1115            // All of "c"
1116            FileGroup::new(vec![pfile("c", 30).with_range(0, 30)]),
1117            // second half of "a"
1118            FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
1119        ]);
1120        assert_partitioned_files(expected, actual);
1121
1122        // With 5 partitions, we can split both "a" and "b", but they can't be intermixed
1123        let actual = partitioner
1124            .with_target_partitions(5)
1125            .repartition_file_groups(&source_partitions);
1126
1127        let expected = Some(vec![
1128            // scan first half of "a"
1129            FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
1130            // scan first half of "b"
1131            FileGroup::new(vec![pfile("b", 100).with_range(0, 50)]),
1132            // All of "c"
1133            FileGroup::new(vec![pfile("c", 30).with_range(0, 30)]),
1134            // second half of "a"
1135            FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
1136            // second half of "b"
1137            FileGroup::new(vec![pfile("b", 100).with_range(50, 100)]),
1138        ]);
1139        assert_partitioned_files(expected, actual);
1140    }
1141
1142    #[test]
1143    fn repartition_ordered_one_large_one_small_existing_empty() {
1144        // "Rebalance" files using existing empty partition
1145        let source_partitions = vec![
1146            FileGroup::new(vec![pfile("a", 100)]),
1147            FileGroup::default(),
1148            FileGroup::new(vec![pfile("b", 40)]),
1149            FileGroup::default(),
1150        ];
1151
1152        let actual = FileGroupPartitioner::new()
1153            .with_preserve_order_within_groups(true)
1154            .with_target_partitions(5)
1155            .with_repartition_file_min_size(10)
1156            .repartition_file_groups(&source_partitions);
1157
1158        // Of the three available groups (2 original empty and 1 new from the
1159        // target partitions), assign two to "a" and one to "b"
1160        let expected = Some(vec![
1161            // Scan of "a" across three groups
1162            FileGroup::new(vec![pfile("a", 100).with_range(0, 33)]),
1163            FileGroup::new(vec![pfile("a", 100).with_range(33, 66)]),
1164            // scan first half of "b"
1165            FileGroup::new(vec![pfile("b", 40).with_range(0, 20)]),
1166            // final third of "a"
1167            FileGroup::new(vec![pfile("a", 100).with_range(66, 100)]),
1168            // second half of "b"
1169            FileGroup::new(vec![pfile("b", 40).with_range(20, 40)]),
1170        ]);
1171        assert_partitioned_files(expected, actual);
1172    }
1173    #[test]
1174    fn repartition_ordered_existing_group_multiple_files() {
1175        // groups with multiple files in a group can not be changed, but can divide others
1176        let source_partitions = vec![
1177            // two files in an existing partition
1178            FileGroup::new(vec![pfile("a", 100), pfile("b", 100)]),
1179            FileGroup::new(vec![pfile("c", 40)]),
1180        ];
1181
1182        let actual = FileGroupPartitioner::new()
1183            .with_preserve_order_within_groups(true)
1184            .with_target_partitions(3)
1185            .with_repartition_file_min_size(10)
1186            .repartition_file_groups(&source_partitions);
1187
1188        // Of the three available groups (2 original empty and 1 new from the
1189        // target partitions), assign two to "a" and one to "b"
1190        let expected = Some(vec![
1191            // don't try and rearrange files in the existing partition
1192            // assuming that the caller had a good reason to put them that way.
1193            // (it is technically possible to split off ranges from the files if desired)
1194            FileGroup::new(vec![pfile("a", 100), pfile("b", 100)]),
1195            // first half of "c"
1196            FileGroup::new(vec![pfile("c", 40).with_range(0, 20)]),
1197            // second half of "c"
1198            FileGroup::new(vec![pfile("c", 40).with_range(20, 40)]),
1199        ]);
1200        assert_partitioned_files(expected, actual);
1201    }
1202
1203    /// Asserts that the two groups of [`PartitionedFile`] are the same
1204    /// (PartitionedFile doesn't implement PartialEq)
1205    fn assert_partitioned_files(
1206        expected: Option<Vec<FileGroup>>,
1207        actual: Option<Vec<FileGroup>>,
1208    ) {
1209        match (expected, actual) {
1210            (None, None) => {}
1211            (Some(_), None) => panic!("Expected Some, got None"),
1212            (None, Some(_)) => panic!("Expected None, got Some"),
1213            (Some(expected), Some(actual)) => {
1214                let expected_string = format!("{expected:#?}");
1215                let actual_string = format!("{actual:#?}");
1216                assert_eq!(expected_string, actual_string);
1217            }
1218        }
1219    }
1220
1221    /// returns a partitioned file with the specified path and size
1222    fn pfile(path: impl Into<String>, file_size: u64) -> PartitionedFile {
1223        PartitionedFile::new(path, file_size)
1224    }
1225
1226    /// Creates a file with partition value with a static size of 10.
1227    fn pfile_with_pv(path: &str, pv: &str) -> PartitionedFile {
1228        let mut file = pfile(path, 10);
1229        file.partition_values = vec![ScalarValue::from(pv)];
1230        file
1231    }
1232
1233    /// repartition the file groups both with and without preserving order
1234    /// asserting they return the same value and returns that value
1235    fn repartition_test(
1236        partitioner: FileGroupPartitioner,
1237        file_groups: Vec<FileGroup>,
1238    ) -> Option<Vec<FileGroup>> {
1239        let repartitioned = partitioner.repartition_file_groups(&file_groups);
1240
1241        let repartitioned_preserving_sort = partitioner
1242            .with_preserve_order_within_groups(true)
1243            .repartition_file_groups(&file_groups);
1244
1245        assert_partitioned_files(repartitioned.clone(), repartitioned_preserving_sort);
1246        repartitioned
1247    }
1248
1249    #[test]
1250    fn test_group_by_partition_values_edge_cases() {
1251        // Edge cases: empty and zero target
1252        assert!(FileGroup::default().group_by_partition_values(4).is_empty());
1253        assert!(
1254            FileGroup::new(vec![pfile("a", 100)])
1255                .group_by_partition_values(0)
1256                .is_empty()
1257        );
1258    }
1259
1260    #[test]
1261    fn test_group_by_partition_values_less_groups_than_target() {
1262        // File a and b have partition value p1.
1263        // File c has partition value p2.
1264        // Grouping by partition value should not redistribute any files since the number of partition
1265        // values <= max_target_partitions.
1266        let fg = FileGroup::new(vec![
1267            pfile_with_pv("a", "p1"),
1268            pfile_with_pv("b", "p1"),
1269            pfile_with_pv("c", "p2"),
1270        ]);
1271        let groups = fg.group_by_partition_values(4);
1272        assert_eq!(groups.len(), 2);
1273        assert_eq!(groups[0].len(), 2);
1274        assert_eq!(groups[1].len(), 1);
1275    }
1276
1277    #[test]
1278    fn test_group_by_partition_values_more_groups_than_target() {
1279        // Each file has a single partition value. The number of partition values > max_target_partitions, so
1280        // they should be round-robin distributed into groups.
1281        let fg = FileGroup::new(vec![
1282            pfile_with_pv("a", "p1"),
1283            pfile_with_pv("b", "p2"),
1284            pfile_with_pv("c", "p3"),
1285            pfile_with_pv("d", "p4"),
1286            pfile_with_pv("e", "p5"),
1287        ]);
1288        let groups = fg.group_by_partition_values(3);
1289        assert_eq!(groups.len(), 3);
1290        assert_eq!(groups[0].len(), 2);
1291        assert_eq!(groups[1].len(), 2);
1292        assert_eq!(groups[2].len(), 1);
1293    }
1294}