datafusion_datasource/
file_groups.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic for managing groups of [`PartitionedFile`]s in DataFusion
19
20use crate::{FileRange, PartitionedFile};
21use arrow::compute::SortOptions;
22use datafusion_common::Statistics;
23use datafusion_common::utils::compare_rows;
24use itertools::Itertools;
25use std::cmp::{Ordering, min};
26use std::collections::{BinaryHeap, HashMap};
27use std::iter::repeat_with;
28use std::mem;
29use std::ops::{Deref, DerefMut, Index, IndexMut};
30use std::sync::Arc;
31
32/// Repartition input files into `target_partitions` partitions, if total file size exceed
33/// `repartition_file_min_size`
34///
35/// This partitions evenly by file byte range, and does not have any knowledge
36/// of how data is laid out in specific files. The specific `FileOpener` are
37/// responsible for the actual partitioning on specific data source type. (e.g.
38/// the `CsvOpener` will read lines overlap with byte range as well as
39/// handle boundaries to ensure all lines will be read exactly once)
40///
41/// # Example
42///
43/// For example, if there are two files `A` and `B` that we wish to read with 4
44/// partitions (with 4 threads) they will be divided as follows:
45///
46/// ```text
47///                                    ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
48///                                      ┌─────────────────┐
49///                                    │ │                 │ │
50///                                      │     File A      │
51///                                    │ │  Range: 0-2MB   │ │
52///                                      │                 │
53///                                    │ └─────────────────┘ │
54///                                     ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
55/// ┌─────────────────┐                ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
56/// │                 │                  ┌─────────────────┐
57/// │                 │                │ │                 │ │
58/// │                 │                  │     File A      │
59/// │                 │                │ │   Range 2-4MB   │ │
60/// │                 │                  │                 │
61/// │                 │                │ └─────────────────┘ │
62/// │  File A (7MB)   │   ────────▶     ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
63/// │                 │                ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
64/// │                 │                  ┌─────────────────┐
65/// │                 │                │ │                 │ │
66/// │                 │                  │     File A      │
67/// │                 │                │ │  Range: 4-6MB   │ │
68/// │                 │                  │                 │
69/// │                 │                │ └─────────────────┘ │
70/// └─────────────────┘                 ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
71/// ┌─────────────────┐                ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
72/// │  File B (1MB)   │                  ┌─────────────────┐
73/// │                 │                │ │     File A      │ │
74/// └─────────────────┘                  │  Range: 6-7MB   │
75///                                    │ └─────────────────┘ │
76///                                      ┌─────────────────┐
77///                                    │ │  File B (1MB)   │ │
78///                                      │                 │
79///                                    │ └─────────────────┘ │
80///                                     ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
81///
82///                                    If target_partitions = 4,
83///                                      divides into 4 groups
84/// ```
85///
86/// # Maintaining Order
87///
88/// Within each group files are read sequentially. Thus, if the overall order of
89/// tuples must be preserved, multiple files can not be mixed in the same group.
90///
91/// In this case, the code will split the largest files evenly into any
92/// available empty groups, but the overall distribution may not be as even
93/// as if the order did not need to be preserved.
94///
95/// ```text
96///                                   ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
97///                                      ┌─────────────────┐
98///                                    │ │                 │ │
99///                                      │     File A      │
100///                                    │ │  Range: 0-2MB   │ │
101///                                      │                 │
102/// ┌─────────────────┐                │ └─────────────────┘ │
103/// │                 │                 ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
104/// │                 │                ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
105/// │                 │                  ┌─────────────────┐
106/// │                 │                │ │                 │ │
107/// │                 │                  │     File A      │
108/// │                 │                │ │   Range 2-4MB   │ │
109/// │  File A (6MB)   │   ────────▶      │                 │
110/// │    (ordered)    │                │ └─────────────────┘ │
111/// │                 │                 ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
112/// │                 │                ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
113/// │                 │                  ┌─────────────────┐
114/// │                 │                │ │                 │ │
115/// │                 │                  │     File A      │
116/// │                 │                │ │  Range: 4-6MB   │ │
117/// └─────────────────┘                  │                 │
118/// ┌─────────────────┐                │ └─────────────────┘ │
119/// │  File B (1MB)   │                 ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
120/// │    (ordered)    │                ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
121/// └─────────────────┘                  ┌─────────────────┐
122///                                    │ │  File B (1MB)   │ │
123///                                      │                 │
124///                                    │ └─────────────────┘ │
125///                                     ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
126///
127///                                    If target_partitions = 4,
128///                                      divides into 4 groups
129/// ```
130#[derive(Debug, Clone, Copy)]
131pub struct FileGroupPartitioner {
132    /// how many partitions should be created
133    target_partitions: usize,
134    /// the minimum size for a file to be repartitioned.
135    repartition_file_min_size: usize,
136    /// if the order when reading the files must be preserved
137    preserve_order_within_groups: bool,
138}
139
140impl Default for FileGroupPartitioner {
141    fn default() -> Self {
142        Self::new()
143    }
144}
145
146impl FileGroupPartitioner {
147    /// Creates a new [`FileGroupPartitioner`] with default values:
148    /// 1. `target_partitions = 1`
149    /// 2. `repartition_file_min_size = 10MB`
150    /// 3. `preserve_order_within_groups = false`
151    pub fn new() -> Self {
152        Self {
153            target_partitions: 1,
154            repartition_file_min_size: 10 * 1024 * 1024,
155            preserve_order_within_groups: false,
156        }
157    }
158
159    /// Set the target partitions
160    pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
161        self.target_partitions = target_partitions;
162        self
163    }
164
165    /// Set the minimum size at which to repartition a file
166    pub fn with_repartition_file_min_size(
167        mut self,
168        repartition_file_min_size: usize,
169    ) -> Self {
170        self.repartition_file_min_size = repartition_file_min_size;
171        self
172    }
173
174    /// Set whether the order of tuples within a file must be preserved
175    pub fn with_preserve_order_within_groups(
176        mut self,
177        preserve_order_within_groups: bool,
178    ) -> Self {
179        self.preserve_order_within_groups = preserve_order_within_groups;
180        self
181    }
182
183    /// Repartition input files according to the settings on this [`FileGroupPartitioner`].
184    ///
185    /// If no repartitioning is needed or possible, return `None`.
186    pub fn repartition_file_groups(
187        &self,
188        file_groups: &[FileGroup],
189    ) -> Option<Vec<FileGroup>> {
190        if file_groups.is_empty() {
191            return None;
192        }
193
194        //  special case when order must be preserved
195        if self.preserve_order_within_groups {
196            self.repartition_preserving_order(file_groups)
197        } else {
198            self.repartition_evenly_by_size(file_groups)
199        }
200    }
201
202    /// Evenly repartition files across partitions by size, ignoring any
203    /// existing grouping / ordering
204    fn repartition_evenly_by_size(
205        &self,
206        file_groups: &[FileGroup],
207    ) -> Option<Vec<FileGroup>> {
208        let target_partitions = self.target_partitions;
209        let repartition_file_min_size = self.repartition_file_min_size;
210        let flattened_files = file_groups.iter().flat_map(FileGroup::iter).collect_vec();
211
212        let total_size = flattened_files
213            .iter()
214            .map(|f| f.effective_size())
215            .sum::<u64>();
216        if total_size < (repartition_file_min_size as u64) || total_size == 0 {
217            return None;
218        }
219
220        let target_partition_size = total_size.div_ceil(target_partitions as u64);
221
222        let current_partition_index: usize = 0;
223        let current_partition_size: u64 = 0;
224
225        // Partition byte range evenly for all `PartitionedFile`s
226        let repartitioned_files = flattened_files
227            .into_iter()
228            .scan(
229                (current_partition_index, current_partition_size),
230                |(current_partition_index, current_partition_size), source_file| {
231                    let mut produced_files = vec![];
232                    let (mut range_start, file_end) = source_file.range();
233                    while range_start < file_end {
234                        let range_end = min(
235                            range_start
236                                + (target_partition_size - *current_partition_size),
237                            file_end,
238                        );
239
240                        let mut produced_file = source_file.clone();
241                        produced_file.range = Some(FileRange {
242                            start: range_start as i64,
243                            end: range_end as i64,
244                        });
245                        produced_files.push((*current_partition_index, produced_file));
246
247                        if *current_partition_size + (range_end - range_start)
248                            >= target_partition_size
249                        {
250                            *current_partition_index += 1;
251                            *current_partition_size = 0;
252                        } else {
253                            *current_partition_size += range_end - range_start;
254                        }
255                        range_start = range_end;
256                    }
257                    Some(produced_files)
258                },
259            )
260            .flatten()
261            .chunk_by(|(partition_idx, _)| *partition_idx)
262            .into_iter()
263            .map(|(_, group)| FileGroup::new(group.map(|(_, vals)| vals).collect_vec()))
264            .collect_vec();
265
266        Some(repartitioned_files)
267    }
268
269    /// Redistribute file groups across size preserving order
270    fn repartition_preserving_order(
271        &self,
272        file_groups: &[FileGroup],
273    ) -> Option<Vec<FileGroup>> {
274        // Can't repartition and preserve order if there are more groups
275        // than partitions
276        if file_groups.len() >= self.target_partitions {
277            return None;
278        }
279        let num_new_groups = self.target_partitions - file_groups.len();
280
281        // If there is only a single file
282        if file_groups.len() == 1 && file_groups[0].len() == 1 {
283            return self.repartition_evenly_by_size(file_groups);
284        }
285
286        // Find which files could be split (single file groups)
287        let mut heap: BinaryHeap<_> = file_groups
288            .iter()
289            .enumerate()
290            .filter_map(|(group_index, group)| {
291                // ignore groups that do not have exactly 1 file
292                if group.len() == 1 {
293                    Some(ToRepartition {
294                        source_index: group_index,
295                        file_size: group[0].effective_size(),
296                        new_groups: vec![group_index],
297                    })
298                } else {
299                    None
300                }
301            })
302            .map(CompareByRangeSize)
303            .collect();
304
305        // No files can be redistributed
306        if heap.is_empty() {
307            return None;
308        }
309
310        // Add new empty groups to which we will redistribute ranges of existing files
311        // Add new empty groups to which we will redistribute ranges of existing files
312        let mut file_groups: Vec<_> = file_groups
313            .iter()
314            .cloned()
315            .chain(repeat_with(|| FileGroup::new(Vec::new())).take(num_new_groups))
316            .collect();
317
318        // Divide up empty groups
319        for (group_index, group) in file_groups.iter().enumerate() {
320            if !group.is_empty() {
321                continue;
322            }
323            // Pick the file that has the largest ranges to read so far
324            let mut largest_group = heap.pop().unwrap();
325            largest_group.new_groups.push(group_index);
326            heap.push(largest_group);
327        }
328
329        // Distribute files to their newly assigned groups
330        while let Some(to_repartition) = heap.pop() {
331            let range_size = to_repartition.range_size();
332            let ToRepartition {
333                source_index,
334                file_size: _,
335                new_groups,
336            } = to_repartition.into_inner();
337            assert_eq!(file_groups[source_index].len(), 1);
338            let original_file = file_groups[source_index].pop().unwrap();
339
340            let last_group = new_groups.len() - 1;
341            let (mut range_start, file_end) = original_file.range();
342            let mut range_end = range_start + range_size;
343            for (i, group_index) in new_groups.into_iter().enumerate() {
344                let target_group = &mut file_groups[group_index];
345                assert!(target_group.is_empty());
346
347                // adjust last range to include the entire file
348                if i == last_group {
349                    range_end = file_end;
350                }
351                target_group.push(
352                    original_file
353                        .clone()
354                        .with_range(range_start as i64, range_end as i64),
355                );
356                range_start = range_end;
357                range_end += range_size;
358            }
359        }
360
361        Some(file_groups)
362    }
363}
364
365/// Represents a group of partitioned files that'll be processed by a single thread.
366/// Maintains optional statistics across all files in the group.
367///
368/// # Statistics
369///
370/// The group-level [`FileGroup::file_statistics`] field contains merged statistics from all files
371/// in the group for the **full table schema** (file columns + partition columns).
372///
373/// Partition column statistics are derived from the individual file partition values:
374/// - `min` = minimum partition value across all files in the group
375/// - `max` = maximum partition value across all files in the group
376/// - `null_count` = 0 (partition values are never null)
377///
378/// This allows query optimizers to prune entire file groups based on partition bounds.
379#[derive(Debug, Clone)]
380pub struct FileGroup {
381    /// The files in this group
382    files: Vec<PartitionedFile>,
383    /// Optional statistics for the data across all files in the group.
384    ///
385    /// These statistics cover the full table schema: file columns plus partition columns.
386    /// Partition column statistics are merged from individual [`PartitionedFile::statistics`],
387    /// which compute exact values from [`PartitionedFile::partition_values`].
388    statistics: Option<Arc<Statistics>>,
389}
390
391impl FileGroup {
392    /// Creates a new FileGroup from a vector of PartitionedFile objects
393    pub fn new(files: Vec<PartitionedFile>) -> Self {
394        Self {
395            files,
396            statistics: None,
397        }
398    }
399
400    /// Returns the number of files in this group
401    pub fn len(&self) -> usize {
402        self.files.len()
403    }
404
405    /// Set the statistics for this group
406    pub fn with_statistics(mut self, statistics: Arc<Statistics>) -> Self {
407        self.statistics = Some(statistics);
408        self
409    }
410
411    /// Returns a slice of the files in this group
412    pub fn files(&self) -> &[PartitionedFile] {
413        &self.files
414    }
415
416    pub fn iter(&self) -> impl Iterator<Item = &PartitionedFile> {
417        self.files.iter()
418    }
419
420    pub fn into_inner(self) -> Vec<PartitionedFile> {
421        self.files
422    }
423
424    pub fn is_empty(&self) -> bool {
425        self.files.is_empty()
426    }
427
428    /// Removes the last element from the files vector and returns it, or None if empty
429    pub fn pop(&mut self) -> Option<PartitionedFile> {
430        self.files.pop()
431    }
432
433    /// Adds a file to the group
434    pub fn push(&mut self, partitioned_file: PartitionedFile) {
435        self.files.push(partitioned_file);
436    }
437
438    /// Get the specific file statistics for the given index
439    /// If the index is None, return the `FileGroup` statistics
440    pub fn file_statistics(&self, index: Option<usize>) -> Option<&Statistics> {
441        if let Some(index) = index {
442            self.files.get(index).and_then(|f| f.statistics.as_deref())
443        } else {
444            self.statistics.as_deref()
445        }
446    }
447
448    /// Get the mutable reference to the statistics for this group
449    pub fn statistics_mut(&mut self) -> Option<&mut Statistics> {
450        self.statistics.as_mut().map(Arc::make_mut)
451    }
452
453    /// Partition the list of files into `n` groups
454    pub fn split_files(mut self, n: usize) -> Vec<FileGroup> {
455        if self.is_empty() {
456            return vec![];
457        }
458
459        // ObjectStore::list does not guarantee any consistent order and for some
460        // implementations such as LocalFileSystem, it may be inconsistent. Thus
461        // Sort files by path to ensure consistent plans when run more than once.
462        self.files.sort_by(|a, b| a.path().cmp(b.path()));
463
464        // effectively this is div with rounding up instead of truncating
465        let chunk_size = self.len().div_ceil(n);
466        let mut chunks = Vec::with_capacity(n);
467        let mut current_chunk = Vec::with_capacity(chunk_size);
468        for file in self.files.drain(..) {
469            current_chunk.push(file);
470            if current_chunk.len() == chunk_size {
471                let full_chunk = FileGroup::new(mem::replace(
472                    &mut current_chunk,
473                    Vec::with_capacity(chunk_size),
474                ));
475                chunks.push(full_chunk);
476            }
477        }
478
479        if !current_chunk.is_empty() {
480            chunks.push(FileGroup::new(current_chunk))
481        }
482
483        chunks
484    }
485
486    /// Groups files by their partition values, ensuring all files with same
487    /// partition values are in the same group.
488    ///
489    /// Note: May return fewer groups than `max_target_partitions` when the
490    /// number of unique partition values is less than the target.
491    pub fn group_by_partition_values(
492        self,
493        max_target_partitions: usize,
494    ) -> Vec<FileGroup> {
495        if self.is_empty() || max_target_partitions == 0 {
496            return vec![];
497        }
498
499        let mut partition_groups: HashMap<
500            Vec<datafusion_common::ScalarValue>,
501            Vec<PartitionedFile>,
502        > = HashMap::new();
503
504        for file in self.files {
505            partition_groups
506                .entry(file.partition_values.clone())
507                .or_default()
508                .push(file);
509        }
510
511        let num_unique_partitions = partition_groups.len();
512
513        // Sort for deterministic bucket assignment across query executions.
514        let mut sorted_partitions: Vec<_> = partition_groups.into_iter().collect();
515        let sort_options =
516            vec![
517                SortOptions::default();
518                sorted_partitions.first().map(|(k, _)| k.len()).unwrap_or(0)
519            ];
520        sorted_partitions.sort_by(|a, b| {
521            compare_rows(&a.0, &b.0, &sort_options).unwrap_or(Ordering::Equal)
522        });
523
524        if num_unique_partitions <= max_target_partitions {
525            sorted_partitions
526                .into_iter()
527                .map(|(_, files)| FileGroup::new(files))
528                .collect()
529        } else {
530            // Merge into max_target_partitions buckets using round-robin.
531            // This maintains grouping by partition value as we are merging groups which already
532            // contain all values for a partition key.
533            let mut target_groups = vec![vec![]; max_target_partitions];
534
535            for (idx, (_, files)) in sorted_partitions.into_iter().enumerate() {
536                let bucket = idx % max_target_partitions;
537                target_groups[bucket].extend(files);
538            }
539
540            target_groups.into_iter().map(FileGroup::new).collect()
541        }
542    }
543}
544
545impl Index<usize> for FileGroup {
546    type Output = PartitionedFile;
547
548    fn index(&self, index: usize) -> &Self::Output {
549        &self.files[index]
550    }
551}
552
553impl IndexMut<usize> for FileGroup {
554    fn index_mut(&mut self, index: usize) -> &mut Self::Output {
555        &mut self.files[index]
556    }
557}
558
559impl FromIterator<PartitionedFile> for FileGroup {
560    fn from_iter<I: IntoIterator<Item = PartitionedFile>>(iter: I) -> Self {
561        let files = iter.into_iter().collect();
562        FileGroup::new(files)
563    }
564}
565
566impl From<Vec<PartitionedFile>> for FileGroup {
567    fn from(files: Vec<PartitionedFile>) -> Self {
568        FileGroup::new(files)
569    }
570}
571
572impl Default for FileGroup {
573    fn default() -> Self {
574        Self::new(Vec::new())
575    }
576}
577
578/// Tracks how a individual file will be repartitioned
579#[derive(Debug, Clone)]
580struct ToRepartition {
581    /// the index from which the original file will be taken
582    source_index: usize,
583    /// the size of the original file
584    file_size: u64,
585    /// indexes of which group(s) will this be distributed to (including `source_index`)
586    new_groups: Vec<usize>,
587}
588
589impl ToRepartition {
590    /// How big will each file range be when this file is read in its new groups?
591    fn range_size(&self) -> u64 {
592        self.file_size / (self.new_groups.len() as u64)
593    }
594}
595
596struct CompareByRangeSize(ToRepartition);
597impl CompareByRangeSize {
598    fn into_inner(self) -> ToRepartition {
599        self.0
600    }
601}
602impl Ord for CompareByRangeSize {
603    fn cmp(&self, other: &Self) -> Ordering {
604        self.0.range_size().cmp(&other.0.range_size())
605    }
606}
607impl PartialOrd for CompareByRangeSize {
608    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
609        Some(self.cmp(other))
610    }
611}
612impl PartialEq for CompareByRangeSize {
613    fn eq(&self, other: &Self) -> bool {
614        // PartialEq must be consistent with PartialOrd
615        self.cmp(other) == Ordering::Equal
616    }
617}
618impl Eq for CompareByRangeSize {}
619impl Deref for CompareByRangeSize {
620    type Target = ToRepartition;
621    fn deref(&self) -> &Self::Target {
622        &self.0
623    }
624}
625impl DerefMut for CompareByRangeSize {
626    fn deref_mut(&mut self) -> &mut Self::Target {
627        &mut self.0
628    }
629}
630
631#[cfg(test)]
632mod test {
633    use super::*;
634    use datafusion_common::ScalarValue;
635
636    /// Empty file won't get partitioned
637    #[test]
638    fn repartition_empty_file_only() {
639        let partitioned_file_empty = pfile("empty", 0);
640        let file_group = vec![FileGroup::new(vec![partitioned_file_empty])];
641
642        let partitioned_files = FileGroupPartitioner::new()
643            .with_target_partitions(4)
644            .with_repartition_file_min_size(0)
645            .repartition_file_groups(&file_group);
646
647        assert_partitioned_files(None, partitioned_files);
648    }
649
650    /// Repartition when there is a empty file in file groups
651    #[test]
652    fn repartition_empty_files() {
653        let pfile_a = pfile("a", 10);
654        let pfile_b = pfile("b", 10);
655        let pfile_empty = pfile("empty", 0);
656
657        let empty_first = vec![
658            FileGroup::new(vec![pfile_empty.clone()]),
659            FileGroup::new(vec![pfile_a.clone()]),
660            FileGroup::new(vec![pfile_b.clone()]),
661        ];
662        let empty_middle = vec![
663            FileGroup::new(vec![pfile_a.clone()]),
664            FileGroup::new(vec![pfile_empty.clone()]),
665            FileGroup::new(vec![pfile_b.clone()]),
666        ];
667        let empty_last = vec![
668            FileGroup::new(vec![pfile_a]),
669            FileGroup::new(vec![pfile_b]),
670            FileGroup::new(vec![pfile_empty]),
671        ];
672
673        // Repartition file groups into x partitions
674        let expected_2 = vec![
675            FileGroup::new(vec![pfile("a", 10).with_range(0, 10)]),
676            FileGroup::new(vec![pfile("b", 10).with_range(0, 10)]),
677        ];
678        let expected_3 = vec![
679            FileGroup::new(vec![pfile("a", 10).with_range(0, 7)]),
680            FileGroup::new(vec![
681                pfile("a", 10).with_range(7, 10),
682                pfile("b", 10).with_range(0, 4),
683            ]),
684            FileGroup::new(vec![pfile("b", 10).with_range(4, 10)]),
685        ];
686
687        let file_groups_tests = [empty_first, empty_middle, empty_last];
688
689        for fg in file_groups_tests {
690            let all_expected = [(2, expected_2.clone()), (3, expected_3.clone())];
691            for (n_partition, expected) in all_expected {
692                let actual = FileGroupPartitioner::new()
693                    .with_target_partitions(n_partition)
694                    .with_repartition_file_min_size(10)
695                    .repartition_file_groups(&fg);
696
697                assert_partitioned_files(Some(expected), actual);
698            }
699        }
700    }
701
702    #[test]
703    fn repartition_single_file() {
704        // Single file, single partition into multiple partitions
705        let single_partition = vec![FileGroup::new(vec![pfile("a", 123)])];
706
707        let actual = FileGroupPartitioner::new()
708            .with_target_partitions(4)
709            .with_repartition_file_min_size(10)
710            .repartition_file_groups(&single_partition);
711
712        let expected = Some(vec![
713            FileGroup::new(vec![pfile("a", 123).with_range(0, 31)]),
714            FileGroup::new(vec![pfile("a", 123).with_range(31, 62)]),
715            FileGroup::new(vec![pfile("a", 123).with_range(62, 93)]),
716            FileGroup::new(vec![pfile("a", 123).with_range(93, 123)]),
717        ]);
718        assert_partitioned_files(expected, actual);
719    }
720
721    #[test]
722    fn repartition_single_file_with_range() {
723        // Single file, single partition into multiple partitions
724        let single_partition =
725            vec![FileGroup::new(vec![pfile("a", 123).with_range(0, 123)])];
726
727        let actual = FileGroupPartitioner::new()
728            .with_target_partitions(4)
729            .with_repartition_file_min_size(10)
730            .repartition_file_groups(&single_partition);
731
732        let expected = Some(vec![
733            FileGroup::new(vec![pfile("a", 123).with_range(0, 31)]),
734            FileGroup::new(vec![pfile("a", 123).with_range(31, 62)]),
735            FileGroup::new(vec![pfile("a", 123).with_range(62, 93)]),
736            FileGroup::new(vec![pfile("a", 123).with_range(93, 123)]),
737        ]);
738        assert_partitioned_files(expected, actual);
739    }
740
741    #[test]
742    fn repartition_single_file_with_incomplete_range() {
743        // Single file, single partition into multiple partitions
744        let single_partition =
745            vec![FileGroup::new(vec![pfile("a", 123).with_range(10, 100)])];
746
747        let actual = FileGroupPartitioner::new()
748            .with_target_partitions(4)
749            .with_repartition_file_min_size(10)
750            .repartition_file_groups(&single_partition);
751
752        let expected = Some(vec![
753            FileGroup::new(vec![pfile("a", 123).with_range(10, 33)]),
754            FileGroup::new(vec![pfile("a", 123).with_range(33, 56)]),
755            FileGroup::new(vec![pfile("a", 123).with_range(56, 79)]),
756            FileGroup::new(vec![pfile("a", 123).with_range(79, 100)]),
757        ]);
758        assert_partitioned_files(expected, actual);
759    }
760
761    #[test]
762    fn repartition_single_file_duplicated_with_range() {
763        // Single file, two partitions into multiple partitions
764        let single_partition = vec![FileGroup::new(vec![
765            pfile("a", 100).with_range(0, 50),
766            pfile("a", 100).with_range(50, 100),
767        ])];
768
769        let actual = FileGroupPartitioner::new()
770            .with_target_partitions(4)
771            .with_repartition_file_min_size(10)
772            .repartition_file_groups(&single_partition);
773
774        let expected = Some(vec![
775            FileGroup::new(vec![pfile("a", 100).with_range(0, 25)]),
776            FileGroup::new(vec![pfile("a", 100).with_range(25, 50)]),
777            FileGroup::new(vec![pfile("a", 100).with_range(50, 75)]),
778            FileGroup::new(vec![pfile("a", 100).with_range(75, 100)]),
779        ]);
780        assert_partitioned_files(expected, actual);
781    }
782
783    #[test]
784    fn repartition_too_much_partitions() {
785        // Single file, single partition into 96 partitions
786        let partitioned_file = pfile("a", 8);
787        let single_partition = vec![FileGroup::new(vec![partitioned_file])];
788
789        let actual = FileGroupPartitioner::new()
790            .with_target_partitions(96)
791            .with_repartition_file_min_size(5)
792            .repartition_file_groups(&single_partition);
793
794        let expected = Some(vec![
795            FileGroup::new(vec![pfile("a", 8).with_range(0, 1)]),
796            FileGroup::new(vec![pfile("a", 8).with_range(1, 2)]),
797            FileGroup::new(vec![pfile("a", 8).with_range(2, 3)]),
798            FileGroup::new(vec![pfile("a", 8).with_range(3, 4)]),
799            FileGroup::new(vec![pfile("a", 8).with_range(4, 5)]),
800            FileGroup::new(vec![pfile("a", 8).with_range(5, 6)]),
801            FileGroup::new(vec![pfile("a", 8).with_range(6, 7)]),
802            FileGroup::new(vec![pfile("a", 8).with_range(7, 8)]),
803        ]);
804
805        assert_partitioned_files(expected, actual);
806    }
807
808    #[test]
809    fn repartition_multiple_partitions() {
810        // Multiple files in single partition after redistribution
811        let source_partitions = vec![
812            FileGroup::new(vec![pfile("a", 40)]),
813            FileGroup::new(vec![pfile("b", 60)]),
814        ];
815
816        let actual = FileGroupPartitioner::new()
817            .with_target_partitions(3)
818            .with_repartition_file_min_size(10)
819            .repartition_file_groups(&source_partitions);
820
821        let expected = Some(vec![
822            FileGroup::new(vec![pfile("a", 40).with_range(0, 34)]),
823            FileGroup::new(vec![
824                pfile("a", 40).with_range(34, 40),
825                pfile("b", 60).with_range(0, 28),
826            ]),
827            FileGroup::new(vec![pfile("b", 60).with_range(28, 60)]),
828        ]);
829        assert_partitioned_files(expected, actual);
830    }
831
832    #[test]
833    fn repartition_same_num_partitions() {
834        // "Rebalance" files across partitions
835        let source_partitions = vec![
836            FileGroup::new(vec![pfile("a", 40)]),
837            FileGroup::new(vec![pfile("b", 60)]),
838        ];
839
840        let actual = FileGroupPartitioner::new()
841            .with_target_partitions(2)
842            .with_repartition_file_min_size(10)
843            .repartition_file_groups(&source_partitions);
844
845        let expected = Some(vec![
846            FileGroup::new(vec![
847                pfile("a", 40).with_range(0, 40),
848                pfile("b", 60).with_range(0, 10),
849            ]),
850            FileGroup::new(vec![pfile("b", 60).with_range(10, 60)]),
851        ]);
852        assert_partitioned_files(expected, actual);
853    }
854
855    #[test]
856    fn repartition_no_action_min_size() {
857        // No action due to target_partition_size
858        let single_partition = vec![FileGroup::new(vec![pfile("a", 123)])];
859
860        let actual = FileGroupPartitioner::new()
861            .with_target_partitions(65)
862            .with_repartition_file_min_size(500)
863            .repartition_file_groups(&single_partition);
864
865        assert_partitioned_files(None, actual)
866    }
867
868    #[test]
869    fn repartition_no_action_zero_files() {
870        // No action due to no files
871        let empty_partition = vec![];
872
873        let partitioner = FileGroupPartitioner::new()
874            .with_target_partitions(65)
875            .with_repartition_file_min_size(500);
876
877        assert_partitioned_files(None, repartition_test(partitioner, empty_partition))
878    }
879
880    #[test]
881    fn repartition_ordered_no_action_too_few_partitions() {
882        // No action as there are no new groups to redistribute to
883        let input_partitions = vec![
884            FileGroup::new(vec![pfile("a", 100)]),
885            FileGroup::new(vec![pfile("b", 200)]),
886        ];
887
888        let actual = FileGroupPartitioner::new()
889            .with_preserve_order_within_groups(true)
890            .with_target_partitions(2)
891            .with_repartition_file_min_size(10)
892            .repartition_file_groups(&input_partitions);
893
894        assert_partitioned_files(None, actual)
895    }
896
897    #[test]
898    fn repartition_ordered_no_action_file_too_small() {
899        // No action as there are no new groups to redistribute to
900        let single_partition = vec![FileGroup::new(vec![pfile("a", 100)])];
901
902        let actual = FileGroupPartitioner::new()
903            .with_preserve_order_within_groups(true)
904            .with_target_partitions(2)
905            // file is too small to repartition
906            .with_repartition_file_min_size(1000)
907            .repartition_file_groups(&single_partition);
908
909        assert_partitioned_files(None, actual)
910    }
911
912    #[test]
913    fn repartition_ordered_one_large_file() {
914        // "Rebalance" the single large file across partitions
915        let source_partitions = vec![FileGroup::new(vec![pfile("a", 100)])];
916
917        let actual = FileGroupPartitioner::new()
918            .with_preserve_order_within_groups(true)
919            .with_target_partitions(3)
920            .with_repartition_file_min_size(10)
921            .repartition_file_groups(&source_partitions);
922
923        let expected = Some(vec![
924            FileGroup::new(vec![pfile("a", 100).with_range(0, 34)]),
925            FileGroup::new(vec![pfile("a", 100).with_range(34, 68)]),
926            FileGroup::new(vec![pfile("a", 100).with_range(68, 100)]),
927        ]);
928        assert_partitioned_files(expected, actual);
929    }
930
931    #[test]
932    fn repartition_ordered_one_large_file_with_range() {
933        // "Rebalance" the single large file across partitions
934        let source_partitions =
935            vec![FileGroup::new(vec![pfile("a", 100).with_range(0, 100)])];
936
937        let actual = FileGroupPartitioner::new()
938            .with_preserve_order_within_groups(true)
939            .with_target_partitions(3)
940            .with_repartition_file_min_size(10)
941            .repartition_file_groups(&source_partitions);
942
943        let expected = Some(vec![
944            FileGroup::new(vec![pfile("a", 100).with_range(0, 34)]),
945            FileGroup::new(vec![pfile("a", 100).with_range(34, 68)]),
946            FileGroup::new(vec![pfile("a", 100).with_range(68, 100)]),
947        ]);
948        assert_partitioned_files(expected, actual);
949    }
950
951    #[test]
952    fn repartition_ordered_one_large_one_small_file() {
953        // "Rebalance" the single large file across empty partitions, but can't split
954        // small file
955        let source_partitions = vec![
956            FileGroup::new(vec![pfile("a", 100)]),
957            FileGroup::new(vec![pfile("b", 30)]),
958        ];
959
960        let actual = FileGroupPartitioner::new()
961            .with_preserve_order_within_groups(true)
962            .with_target_partitions(4)
963            .with_repartition_file_min_size(10)
964            .repartition_file_groups(&source_partitions);
965
966        let expected = Some(vec![
967            // scan first third of "a"
968            FileGroup::new(vec![pfile("a", 100).with_range(0, 33)]),
969            // only b in this group (can't do this)
970            FileGroup::new(vec![pfile("b", 30).with_range(0, 30)]),
971            // second third of "a"
972            FileGroup::new(vec![pfile("a", 100).with_range(33, 66)]),
973            // final third of "a"
974            FileGroup::new(vec![pfile("a", 100).with_range(66, 100)]),
975        ]);
976        assert_partitioned_files(expected, actual);
977    }
978
979    #[test]
980    fn repartition_ordered_one_large_one_small_file_with_full_range() {
981        // "Rebalance" the single large file across empty partitions, but can't split
982        // small file
983        let source_partitions = vec![
984            FileGroup::new(vec![pfile("a", 100).with_range(0, 100)]),
985            FileGroup::new(vec![pfile("b", 30)]),
986        ];
987
988        let actual = FileGroupPartitioner::new()
989            .with_preserve_order_within_groups(true)
990            .with_target_partitions(4)
991            .with_repartition_file_min_size(10)
992            .repartition_file_groups(&source_partitions);
993
994        let expected = Some(vec![
995            // scan first third of "a"
996            FileGroup::new(vec![pfile("a", 100).with_range(0, 33)]),
997            // only b in this group (can't do this)
998            FileGroup::new(vec![pfile("b", 30).with_range(0, 30)]),
999            // second third of "a"
1000            FileGroup::new(vec![pfile("a", 100).with_range(33, 66)]),
1001            // final third of "a"
1002            FileGroup::new(vec![pfile("a", 100).with_range(66, 100)]),
1003        ]);
1004        assert_partitioned_files(expected, actual);
1005    }
1006
1007    #[test]
1008    fn repartition_ordered_one_large_one_small_file_with_split_range() {
1009        // "Rebalance" the single large file across empty partitions, but can't split
1010        // small file
1011        let source_partitions = vec![
1012            FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
1013            FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
1014            FileGroup::new(vec![pfile("b", 30)]),
1015        ];
1016
1017        let actual = FileGroupPartitioner::new()
1018            .with_preserve_order_within_groups(true)
1019            .with_target_partitions(4)
1020            .with_repartition_file_min_size(10)
1021            .repartition_file_groups(&source_partitions);
1022
1023        let expected = Some(vec![
1024            // scan first half of first "a"
1025            FileGroup::new(vec![pfile("a", 100).with_range(0, 25)]),
1026            // second "a" fully (not split)
1027            FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
1028            // only b in this group (can't do this)
1029            FileGroup::new(vec![pfile("b", 30).with_range(0, 30)]),
1030            // second half of first "a"
1031            FileGroup::new(vec![pfile("a", 100).with_range(25, 50)]),
1032        ]);
1033        assert_partitioned_files(expected, actual);
1034    }
1035
1036    #[test]
1037    fn repartition_ordered_one_large_one_small_file_with_non_full_range() {
1038        // "Rebalance" the single large file across empty partitions, but can't split
1039        // small file
1040        let source_partitions = vec![
1041            FileGroup::new(vec![pfile("a", 100).with_range(20, 80)]),
1042            FileGroup::new(vec![pfile("b", 30).with_range(5, 25)]),
1043        ];
1044
1045        let actual = FileGroupPartitioner::new()
1046            .with_preserve_order_within_groups(true)
1047            .with_target_partitions(4)
1048            .with_repartition_file_min_size(10)
1049            .repartition_file_groups(&source_partitions);
1050
1051        let expected = Some(vec![
1052            // scan first third of "a"
1053            FileGroup::new(vec![pfile("a", 100).with_range(20, 40)]),
1054            // only b in this group (can't split this)
1055            FileGroup::new(vec![pfile("b", 30).with_range(5, 25)]),
1056            // second third of "a"
1057            FileGroup::new(vec![pfile("a", 100).with_range(40, 60)]),
1058            // final third of "a"
1059            FileGroup::new(vec![pfile("a", 100).with_range(60, 80)]),
1060        ]);
1061        assert_partitioned_files(expected, actual);
1062    }
1063
1064    #[test]
1065    fn repartition_ordered_two_large_files() {
1066        // "Rebalance" two large files across empty partitions, but can't mix them
1067        let source_partitions = vec![
1068            FileGroup::new(vec![pfile("a", 100)]),
1069            FileGroup::new(vec![pfile("b", 100)]),
1070        ];
1071
1072        let actual = FileGroupPartitioner::new()
1073            .with_preserve_order_within_groups(true)
1074            .with_target_partitions(4)
1075            .with_repartition_file_min_size(10)
1076            .repartition_file_groups(&source_partitions);
1077
1078        let expected = Some(vec![
1079            // scan first half of "a"
1080            FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
1081            // scan first half of "b"
1082            FileGroup::new(vec![pfile("b", 100).with_range(0, 50)]),
1083            // second half of "a"
1084            FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
1085            // second half of "b"
1086            FileGroup::new(vec![pfile("b", 100).with_range(50, 100)]),
1087        ]);
1088        assert_partitioned_files(expected, actual);
1089    }
1090
1091    #[test]
1092    fn repartition_ordered_two_large_one_small_files() {
1093        // "Rebalance" two large files and one small file across empty partitions
1094        let source_partitions = vec![
1095            FileGroup::new(vec![pfile("a", 100)]),
1096            FileGroup::new(vec![pfile("b", 100)]),
1097            FileGroup::new(vec![pfile("c", 30)]),
1098        ];
1099
1100        let partitioner = FileGroupPartitioner::new()
1101            .with_preserve_order_within_groups(true)
1102            .with_repartition_file_min_size(10);
1103
1104        // with 4 partitions, can only split the first large file "a"
1105        let actual = partitioner
1106            .with_target_partitions(4)
1107            .repartition_file_groups(&source_partitions);
1108
1109        let expected = Some(vec![
1110            // scan first half of "a"
1111            FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
1112            // All of "b"
1113            FileGroup::new(vec![pfile("b", 100).with_range(0, 100)]),
1114            // All of "c"
1115            FileGroup::new(vec![pfile("c", 30).with_range(0, 30)]),
1116            // second half of "a"
1117            FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
1118        ]);
1119        assert_partitioned_files(expected, actual);
1120
1121        // With 5 partitions, we can split both "a" and "b", but they can't be intermixed
1122        let actual = partitioner
1123            .with_target_partitions(5)
1124            .repartition_file_groups(&source_partitions);
1125
1126        let expected = Some(vec![
1127            // scan first half of "a"
1128            FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
1129            // scan first half of "b"
1130            FileGroup::new(vec![pfile("b", 100).with_range(0, 50)]),
1131            // All of "c"
1132            FileGroup::new(vec![pfile("c", 30).with_range(0, 30)]),
1133            // second half of "a"
1134            FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
1135            // second half of "b"
1136            FileGroup::new(vec![pfile("b", 100).with_range(50, 100)]),
1137        ]);
1138        assert_partitioned_files(expected, actual);
1139    }
1140
1141    #[test]
1142    fn repartition_ordered_one_large_one_small_existing_empty() {
1143        // "Rebalance" files using existing empty partition
1144        let source_partitions = vec![
1145            FileGroup::new(vec![pfile("a", 100)]),
1146            FileGroup::default(),
1147            FileGroup::new(vec![pfile("b", 40)]),
1148            FileGroup::default(),
1149        ];
1150
1151        let actual = FileGroupPartitioner::new()
1152            .with_preserve_order_within_groups(true)
1153            .with_target_partitions(5)
1154            .with_repartition_file_min_size(10)
1155            .repartition_file_groups(&source_partitions);
1156
1157        // Of the three available groups (2 original empty and 1 new from the
1158        // target partitions), assign two to "a" and one to "b"
1159        let expected = Some(vec![
1160            // Scan of "a" across three groups
1161            FileGroup::new(vec![pfile("a", 100).with_range(0, 33)]),
1162            FileGroup::new(vec![pfile("a", 100).with_range(33, 66)]),
1163            // scan first half of "b"
1164            FileGroup::new(vec![pfile("b", 40).with_range(0, 20)]),
1165            // final third of "a"
1166            FileGroup::new(vec![pfile("a", 100).with_range(66, 100)]),
1167            // second half of "b"
1168            FileGroup::new(vec![pfile("b", 40).with_range(20, 40)]),
1169        ]);
1170        assert_partitioned_files(expected, actual);
1171    }
1172    #[test]
1173    fn repartition_ordered_existing_group_multiple_files() {
1174        // groups with multiple files in a group can not be changed, but can divide others
1175        let source_partitions = vec![
1176            // two files in an existing partition
1177            FileGroup::new(vec![pfile("a", 100), pfile("b", 100)]),
1178            FileGroup::new(vec![pfile("c", 40)]),
1179        ];
1180
1181        let actual = FileGroupPartitioner::new()
1182            .with_preserve_order_within_groups(true)
1183            .with_target_partitions(3)
1184            .with_repartition_file_min_size(10)
1185            .repartition_file_groups(&source_partitions);
1186
1187        // Of the three available groups (2 original empty and 1 new from the
1188        // target partitions), assign two to "a" and one to "b"
1189        let expected = Some(vec![
1190            // don't try and rearrange files in the existing partition
1191            // assuming that the caller had a good reason to put them that way.
1192            // (it is technically possible to split off ranges from the files if desired)
1193            FileGroup::new(vec![pfile("a", 100), pfile("b", 100)]),
1194            // first half of "c"
1195            FileGroup::new(vec![pfile("c", 40).with_range(0, 20)]),
1196            // second half of "c"
1197            FileGroup::new(vec![pfile("c", 40).with_range(20, 40)]),
1198        ]);
1199        assert_partitioned_files(expected, actual);
1200    }
1201
1202    /// Asserts that the two groups of [`PartitionedFile`] are the same
1203    /// (PartitionedFile doesn't implement PartialEq)
1204    fn assert_partitioned_files(
1205        expected: Option<Vec<FileGroup>>,
1206        actual: Option<Vec<FileGroup>>,
1207    ) {
1208        match (expected, actual) {
1209            (None, None) => {}
1210            (Some(_), None) => panic!("Expected Some, got None"),
1211            (None, Some(_)) => panic!("Expected None, got Some"),
1212            (Some(expected), Some(actual)) => {
1213                let expected_string = format!("{expected:#?}");
1214                let actual_string = format!("{actual:#?}");
1215                assert_eq!(expected_string, actual_string);
1216            }
1217        }
1218    }
1219
1220    /// returns a partitioned file with the specified path and size
1221    fn pfile(path: impl Into<String>, file_size: u64) -> PartitionedFile {
1222        PartitionedFile::new(path, file_size)
1223    }
1224
1225    /// Creates a file with partition value with a static size of 10.
1226    fn pfile_with_pv(path: &str, pv: &str) -> PartitionedFile {
1227        let mut file = pfile(path, 10);
1228        file.partition_values = vec![ScalarValue::from(pv)];
1229        file
1230    }
1231
1232    /// repartition the file groups both with and without preserving order
1233    /// asserting they return the same value and returns that value
1234    fn repartition_test(
1235        partitioner: FileGroupPartitioner,
1236        file_groups: Vec<FileGroup>,
1237    ) -> Option<Vec<FileGroup>> {
1238        let repartitioned = partitioner.repartition_file_groups(&file_groups);
1239
1240        let repartitioned_preserving_sort = partitioner
1241            .with_preserve_order_within_groups(true)
1242            .repartition_file_groups(&file_groups);
1243
1244        assert_partitioned_files(repartitioned.clone(), repartitioned_preserving_sort);
1245        repartitioned
1246    }
1247
1248    #[test]
1249    fn test_group_by_partition_values_edge_cases() {
1250        // Edge cases: empty and zero target
1251        assert!(FileGroup::default().group_by_partition_values(4).is_empty());
1252        assert!(
1253            FileGroup::new(vec![pfile("a", 100)])
1254                .group_by_partition_values(0)
1255                .is_empty()
1256        );
1257    }
1258
1259    #[test]
1260    fn test_group_by_partition_values_less_groups_than_target() {
1261        // File a and b have partition value p1.
1262        // File c has partition value p2.
1263        // Grouping by partition value should not redistribute any files since the number of partition
1264        // values <= max_target_partitions.
1265        let fg = FileGroup::new(vec![
1266            pfile_with_pv("a", "p1"),
1267            pfile_with_pv("b", "p1"),
1268            pfile_with_pv("c", "p2"),
1269        ]);
1270        let groups = fg.group_by_partition_values(4);
1271        assert_eq!(groups.len(), 2);
1272        assert_eq!(groups[0].len(), 2);
1273        assert_eq!(groups[1].len(), 1);
1274    }
1275
1276    #[test]
1277    fn test_group_by_partition_values_more_groups_than_target() {
1278        // Each file has a single partition value. The number of partition values > max_target_partitions, so
1279        // they should be round-robin distributed into groups.
1280        let fg = FileGroup::new(vec![
1281            pfile_with_pv("a", "p1"),
1282            pfile_with_pv("b", "p2"),
1283            pfile_with_pv("c", "p3"),
1284            pfile_with_pv("d", "p4"),
1285            pfile_with_pv("e", "p5"),
1286        ]);
1287        let groups = fg.group_by_partition_values(3);
1288        assert_eq!(groups.len(), 3);
1289        assert_eq!(groups[0].len(), 2);
1290        assert_eq!(groups[1].len(), 2);
1291        assert_eq!(groups[2].len(), 1);
1292    }
1293}