datafusion_datasource/
file_groups.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic for managing groups of [`PartitionedFile`]s in DataFusion
19
20use crate::{FileRange, PartitionedFile};
21use datafusion_common::Statistics;
22use itertools::Itertools;
23use std::cmp::{min, Ordering};
24use std::collections::BinaryHeap;
25use std::iter::repeat_with;
26use std::mem;
27use std::ops::{Deref, DerefMut, Index, IndexMut};
28use std::sync::Arc;
29
30/// Repartition input files into `target_partitions` partitions, if total file size exceed
31/// `repartition_file_min_size`
32///
33/// This partitions evenly by file byte range, and does not have any knowledge
34/// of how data is laid out in specific files. The specific `FileOpener` are
35/// responsible for the actual partitioning on specific data source type. (e.g.
36/// the `CsvOpener` will read lines overlap with byte range as well as
37/// handle boundaries to ensure all lines will be read exactly once)
38///
39/// # Example
40///
41/// For example, if there are two files `A` and `B` that we wish to read with 4
42/// partitions (with 4 threads) they will be divided as follows:
43///
44/// ```text
45///                                    ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
46///                                      ┌─────────────────┐
47///                                    │ │                 │ │
48///                                      │     File A      │
49///                                    │ │  Range: 0-2MB   │ │
50///                                      │                 │
51///                                    │ └─────────────────┘ │
52///                                     ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
53/// ┌─────────────────┐                ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
54/// │                 │                  ┌─────────────────┐
55/// │                 │                │ │                 │ │
56/// │                 │                  │     File A      │
57/// │                 │                │ │   Range 2-4MB   │ │
58/// │                 │                  │                 │
59/// │                 │                │ └─────────────────┘ │
60/// │  File A (7MB)   │   ────────▶     ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
61/// │                 │                ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
62/// │                 │                  ┌─────────────────┐
63/// │                 │                │ │                 │ │
64/// │                 │                  │     File A      │
65/// │                 │                │ │  Range: 4-6MB   │ │
66/// │                 │                  │                 │
67/// │                 │                │ └─────────────────┘ │
68/// └─────────────────┘                 ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
69/// ┌─────────────────┐                ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
70/// │  File B (1MB)   │                  ┌─────────────────┐
71/// │                 │                │ │     File A      │ │
72/// └─────────────────┘                  │  Range: 6-7MB   │
73///                                    │ └─────────────────┘ │
74///                                      ┌─────────────────┐
75///                                    │ │  File B (1MB)   │ │
76///                                      │                 │
77///                                    │ └─────────────────┘ │
78///                                     ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
79///
80///                                    If target_partitions = 4,
81///                                      divides into 4 groups
82/// ```
83///
84/// # Maintaining Order
85///
86/// Within each group files are read sequentially. Thus, if the overall order of
87/// tuples must be preserved, multiple files can not be mixed in the same group.
88///
89/// In this case, the code will split the largest files evenly into any
90/// available empty groups, but the overall distribution may not be as even
91/// as if the order did not need to be preserved.
92///
93/// ```text
94///                                   ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
95///                                      ┌─────────────────┐
96///                                    │ │                 │ │
97///                                      │     File A      │
98///                                    │ │  Range: 0-2MB   │ │
99///                                      │                 │
100/// ┌─────────────────┐                │ └─────────────────┘ │
101/// │                 │                 ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
102/// │                 │                ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
103/// │                 │                  ┌─────────────────┐
104/// │                 │                │ │                 │ │
105/// │                 │                  │     File A      │
106/// │                 │                │ │   Range 2-4MB   │ │
107/// │  File A (6MB)   │   ────────▶      │                 │
108/// │    (ordered)    │                │ └─────────────────┘ │
109/// │                 │                 ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
110/// │                 │                ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
111/// │                 │                  ┌─────────────────┐
112/// │                 │                │ │                 │ │
113/// │                 │                  │     File A      │
114/// │                 │                │ │  Range: 4-6MB   │ │
115/// └─────────────────┘                  │                 │
116/// ┌─────────────────┐                │ └─────────────────┘ │
117/// │  File B (1MB)   │                 ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
118/// │    (ordered)    │                ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
119/// └─────────────────┘                  ┌─────────────────┐
120///                                    │ │  File B (1MB)   │ │
121///                                      │                 │
122///                                    │ └─────────────────┘ │
123///                                     ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
124///
125///                                    If target_partitions = 4,
126///                                      divides into 4 groups
127/// ```
128#[derive(Debug, Clone, Copy)]
129pub struct FileGroupPartitioner {
130    /// how many partitions should be created
131    target_partitions: usize,
132    /// the minimum size for a file to be repartitioned.
133    repartition_file_min_size: usize,
134    /// if the order when reading the files must be preserved
135    preserve_order_within_groups: bool,
136}
137
138impl Default for FileGroupPartitioner {
139    fn default() -> Self {
140        Self::new()
141    }
142}
143
144impl FileGroupPartitioner {
145    /// Creates a new [`FileGroupPartitioner`] with default values:
146    /// 1. `target_partitions = 1`
147    /// 2. `repartition_file_min_size = 10MB`
148    /// 3. `preserve_order_within_groups = false`
149    pub fn new() -> Self {
150        Self {
151            target_partitions: 1,
152            repartition_file_min_size: 10 * 1024 * 1024,
153            preserve_order_within_groups: false,
154        }
155    }
156
157    /// Set the target partitions
158    pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
159        self.target_partitions = target_partitions;
160        self
161    }
162
163    /// Set the minimum size at which to repartition a file
164    pub fn with_repartition_file_min_size(
165        mut self,
166        repartition_file_min_size: usize,
167    ) -> Self {
168        self.repartition_file_min_size = repartition_file_min_size;
169        self
170    }
171
172    /// Set whether the order of tuples within a file must be preserved
173    pub fn with_preserve_order_within_groups(
174        mut self,
175        preserve_order_within_groups: bool,
176    ) -> Self {
177        self.preserve_order_within_groups = preserve_order_within_groups;
178        self
179    }
180
181    /// Repartition input files according to the settings on this [`FileGroupPartitioner`].
182    ///
183    /// If no repartitioning is needed or possible, return `None`.
184    pub fn repartition_file_groups(
185        &self,
186        file_groups: &[FileGroup],
187    ) -> Option<Vec<FileGroup>> {
188        if file_groups.is_empty() {
189            return None;
190        }
191
192        // Perform redistribution only in case all files should be read from beginning to end
193        let has_ranges = file_groups
194            .iter()
195            .flat_map(FileGroup::iter)
196            .any(|f| f.range.is_some());
197        if has_ranges {
198            return None;
199        }
200
201        //  special case when order must be preserved
202        if self.preserve_order_within_groups {
203            self.repartition_preserving_order(file_groups)
204        } else {
205            self.repartition_evenly_by_size(file_groups)
206        }
207    }
208
209    /// Evenly repartition files across partitions by size, ignoring any
210    /// existing grouping / ordering
211    fn repartition_evenly_by_size(
212        &self,
213        file_groups: &[FileGroup],
214    ) -> Option<Vec<FileGroup>> {
215        let target_partitions = self.target_partitions;
216        let repartition_file_min_size = self.repartition_file_min_size;
217        let flattened_files = file_groups.iter().flat_map(FileGroup::iter).collect_vec();
218
219        let total_size = flattened_files
220            .iter()
221            .map(|f| f.object_meta.size as i64)
222            .sum::<i64>();
223        if total_size < (repartition_file_min_size as i64) || total_size == 0 {
224            return None;
225        }
226
227        let target_partition_size =
228            (total_size as u64).div_ceil(target_partitions as u64);
229
230        let current_partition_index: usize = 0;
231        let current_partition_size: u64 = 0;
232
233        // Partition byte range evenly for all `PartitionedFile`s
234        let repartitioned_files = flattened_files
235            .into_iter()
236            .scan(
237                (current_partition_index, current_partition_size),
238                |state, source_file| {
239                    let mut produced_files = vec![];
240                    let mut range_start = 0;
241                    while range_start < source_file.object_meta.size {
242                        let range_end = min(
243                            range_start + (target_partition_size - state.1),
244                            source_file.object_meta.size,
245                        );
246
247                        let mut produced_file = source_file.clone();
248                        produced_file.range = Some(FileRange {
249                            start: range_start as i64,
250                            end: range_end as i64,
251                        });
252                        produced_files.push((state.0, produced_file));
253
254                        if state.1 + (range_end - range_start) >= target_partition_size {
255                            state.0 += 1;
256                            state.1 = 0;
257                        } else {
258                            state.1 += range_end - range_start;
259                        }
260                        range_start = range_end;
261                    }
262                    Some(produced_files)
263                },
264            )
265            .flatten()
266            .chunk_by(|(partition_idx, _)| *partition_idx)
267            .into_iter()
268            .map(|(_, group)| FileGroup::new(group.map(|(_, vals)| vals).collect_vec()))
269            .collect_vec();
270
271        Some(repartitioned_files)
272    }
273
274    /// Redistribute file groups across size preserving order
275    fn repartition_preserving_order(
276        &self,
277        file_groups: &[FileGroup],
278    ) -> Option<Vec<FileGroup>> {
279        // Can't repartition and preserve order if there are more groups
280        // than partitions
281        if file_groups.len() >= self.target_partitions {
282            return None;
283        }
284        let num_new_groups = self.target_partitions - file_groups.len();
285
286        // If there is only a single file
287        if file_groups.len() == 1 && file_groups[0].len() == 1 {
288            return self.repartition_evenly_by_size(file_groups);
289        }
290
291        // Find which files could be split (single file groups)
292        let mut heap: BinaryHeap<_> = file_groups
293            .iter()
294            .enumerate()
295            .filter_map(|(group_index, group)| {
296                // ignore groups that do not have exactly 1 file
297                if group.len() == 1 {
298                    Some(ToRepartition {
299                        source_index: group_index,
300                        file_size: group[0].object_meta.size,
301                        new_groups: vec![group_index],
302                    })
303                } else {
304                    None
305                }
306            })
307            .map(CompareByRangeSize)
308            .collect();
309
310        // No files can be redistributed
311        if heap.is_empty() {
312            return None;
313        }
314
315        // Add new empty groups to which we will redistribute ranges of existing files
316        // Add new empty groups to which we will redistribute ranges of existing files
317        let mut file_groups: Vec<_> = file_groups
318            .iter()
319            .cloned()
320            .chain(repeat_with(|| FileGroup::new(Vec::new())).take(num_new_groups))
321            .collect();
322
323        // Divide up empty groups
324        for (group_index, group) in file_groups.iter().enumerate() {
325            if !group.is_empty() {
326                continue;
327            }
328            // Pick the file that has the largest ranges to read so far
329            let mut largest_group = heap.pop().unwrap();
330            largest_group.new_groups.push(group_index);
331            heap.push(largest_group);
332        }
333
334        // Distribute files to their newly assigned groups
335        while let Some(to_repartition) = heap.pop() {
336            let range_size = to_repartition.range_size() as i64;
337            let ToRepartition {
338                source_index,
339                file_size,
340                new_groups,
341            } = to_repartition.into_inner();
342            assert_eq!(file_groups[source_index].len(), 1);
343            let original_file = file_groups[source_index].pop().unwrap();
344
345            let last_group = new_groups.len() - 1;
346            let mut range_start: i64 = 0;
347            let mut range_end: i64 = range_size;
348            for (i, group_index) in new_groups.into_iter().enumerate() {
349                let target_group = &mut file_groups[group_index];
350                assert!(target_group.is_empty());
351
352                // adjust last range to include the entire file
353                if i == last_group {
354                    range_end = file_size as i64;
355                }
356                target_group
357                    .push(original_file.clone().with_range(range_start, range_end));
358                range_start = range_end;
359                range_end += range_size;
360            }
361        }
362
363        Some(file_groups)
364    }
365}
366
367/// Represents a group of partitioned files that'll be processed by a single thread.
368/// Maintains optional statistics across all files in the group.
369#[derive(Debug, Clone)]
370pub struct FileGroup {
371    /// The files in this group
372    files: Vec<PartitionedFile>,
373    /// Optional statistics for the data across all files in the group
374    statistics: Option<Arc<Statistics>>,
375}
376
377impl FileGroup {
378    /// Creates a new FileGroup from a vector of PartitionedFile objects
379    pub fn new(files: Vec<PartitionedFile>) -> Self {
380        Self {
381            files,
382            statistics: None,
383        }
384    }
385
386    /// Returns the number of files in this group
387    pub fn len(&self) -> usize {
388        self.files.len()
389    }
390
391    /// Set the statistics for this group
392    pub fn with_statistics(mut self, statistics: Arc<Statistics>) -> Self {
393        self.statistics = Some(statistics);
394        self
395    }
396
397    /// Returns a slice of the files in this group
398    pub fn files(&self) -> &[PartitionedFile] {
399        &self.files
400    }
401
402    pub fn iter(&self) -> impl Iterator<Item = &PartitionedFile> {
403        self.files.iter()
404    }
405
406    pub fn into_inner(self) -> Vec<PartitionedFile> {
407        self.files
408    }
409
410    pub fn is_empty(&self) -> bool {
411        self.files.is_empty()
412    }
413
414    /// Removes the last element from the files vector and returns it, or None if empty
415    pub fn pop(&mut self) -> Option<PartitionedFile> {
416        self.files.pop()
417    }
418
419    /// Adds a file to the group
420    pub fn push(&mut self, file: PartitionedFile) {
421        self.files.push(file);
422    }
423
424    /// Get the specific file statistics for the given index
425    /// If the index is None, return the `FileGroup` statistics
426    pub fn file_statistics(&self, index: Option<usize>) -> Option<&Statistics> {
427        if let Some(index) = index {
428            self.files.get(index).and_then(|f| f.statistics.as_deref())
429        } else {
430            self.statistics.as_deref()
431        }
432    }
433
434    /// Get the mutable reference to the statistics for this group
435    pub fn statistics_mut(&mut self) -> Option<&mut Statistics> {
436        self.statistics.as_mut().map(Arc::make_mut)
437    }
438
439    /// Partition the list of files into `n` groups
440    pub fn split_files(mut self, n: usize) -> Vec<FileGroup> {
441        if self.is_empty() {
442            return vec![];
443        }
444
445        // ObjectStore::list does not guarantee any consistent order and for some
446        // implementations such as LocalFileSystem, it may be inconsistent. Thus
447        // Sort files by path to ensure consistent plans when run more than once.
448        self.files.sort_by(|a, b| a.path().cmp(b.path()));
449
450        // effectively this is div with rounding up instead of truncating
451        let chunk_size = self.len().div_ceil(n);
452        let mut chunks = Vec::with_capacity(n);
453        let mut current_chunk = Vec::with_capacity(chunk_size);
454        for file in self.files.drain(..) {
455            current_chunk.push(file);
456            if current_chunk.len() == chunk_size {
457                let full_chunk = FileGroup::new(mem::replace(
458                    &mut current_chunk,
459                    Vec::with_capacity(chunk_size),
460                ));
461                chunks.push(full_chunk);
462            }
463        }
464
465        if !current_chunk.is_empty() {
466            chunks.push(FileGroup::new(current_chunk))
467        }
468
469        chunks
470    }
471}
472
473impl Index<usize> for FileGroup {
474    type Output = PartitionedFile;
475
476    fn index(&self, index: usize) -> &Self::Output {
477        &self.files[index]
478    }
479}
480
481impl IndexMut<usize> for FileGroup {
482    fn index_mut(&mut self, index: usize) -> &mut Self::Output {
483        &mut self.files[index]
484    }
485}
486
487impl FromIterator<PartitionedFile> for FileGroup {
488    fn from_iter<I: IntoIterator<Item = PartitionedFile>>(iter: I) -> Self {
489        let files = iter.into_iter().collect();
490        FileGroup::new(files)
491    }
492}
493
494impl From<Vec<PartitionedFile>> for FileGroup {
495    fn from(files: Vec<PartitionedFile>) -> Self {
496        FileGroup::new(files)
497    }
498}
499
500impl Default for FileGroup {
501    fn default() -> Self {
502        Self::new(Vec::new())
503    }
504}
505
506/// Tracks how a individual file will be repartitioned
507#[derive(Debug, Clone)]
508struct ToRepartition {
509    /// the index from which the original file will be taken
510    source_index: usize,
511    /// the size of the original file
512    file_size: u64,
513    /// indexes of which group(s) will this be distributed to (including `source_index`)
514    new_groups: Vec<usize>,
515}
516
517impl ToRepartition {
518    /// How big will each file range be when this file is read in its new groups?
519    fn range_size(&self) -> u64 {
520        self.file_size / (self.new_groups.len() as u64)
521    }
522}
523
524struct CompareByRangeSize(ToRepartition);
525impl CompareByRangeSize {
526    fn into_inner(self) -> ToRepartition {
527        self.0
528    }
529}
530impl Ord for CompareByRangeSize {
531    fn cmp(&self, other: &Self) -> Ordering {
532        self.0.range_size().cmp(&other.0.range_size())
533    }
534}
535impl PartialOrd for CompareByRangeSize {
536    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
537        Some(self.cmp(other))
538    }
539}
540impl PartialEq for CompareByRangeSize {
541    fn eq(&self, other: &Self) -> bool {
542        // PartialEq must be consistent with PartialOrd
543        self.cmp(other) == Ordering::Equal
544    }
545}
546impl Eq for CompareByRangeSize {}
547impl Deref for CompareByRangeSize {
548    type Target = ToRepartition;
549    fn deref(&self) -> &Self::Target {
550        &self.0
551    }
552}
553impl DerefMut for CompareByRangeSize {
554    fn deref_mut(&mut self) -> &mut Self::Target {
555        &mut self.0
556    }
557}
558
559#[cfg(test)]
560mod test {
561    use super::*;
562
563    /// Empty file won't get partitioned
564    #[test]
565    fn repartition_empty_file_only() {
566        let partitioned_file_empty = pfile("empty", 0);
567        let file_group = vec![FileGroup::new(vec![partitioned_file_empty])];
568
569        let partitioned_files = FileGroupPartitioner::new()
570            .with_target_partitions(4)
571            .with_repartition_file_min_size(0)
572            .repartition_file_groups(&file_group);
573
574        assert_partitioned_files(None, partitioned_files);
575    }
576
577    /// Repartition when there is a empty file in file groups
578    #[test]
579    fn repartition_empty_files() {
580        let pfile_a = pfile("a", 10);
581        let pfile_b = pfile("b", 10);
582        let pfile_empty = pfile("empty", 0);
583
584        let empty_first = vec![
585            FileGroup::new(vec![pfile_empty.clone()]),
586            FileGroup::new(vec![pfile_a.clone()]),
587            FileGroup::new(vec![pfile_b.clone()]),
588        ];
589        let empty_middle = vec![
590            FileGroup::new(vec![pfile_a.clone()]),
591            FileGroup::new(vec![pfile_empty.clone()]),
592            FileGroup::new(vec![pfile_b.clone()]),
593        ];
594        let empty_last = vec![
595            FileGroup::new(vec![pfile_a]),
596            FileGroup::new(vec![pfile_b]),
597            FileGroup::new(vec![pfile_empty]),
598        ];
599
600        // Repartition file groups into x partitions
601        let expected_2 = vec![
602            FileGroup::new(vec![pfile("a", 10).with_range(0, 10)]),
603            FileGroup::new(vec![pfile("b", 10).with_range(0, 10)]),
604        ];
605        let expected_3 = vec![
606            FileGroup::new(vec![pfile("a", 10).with_range(0, 7)]),
607            FileGroup::new(vec![
608                pfile("a", 10).with_range(7, 10),
609                pfile("b", 10).with_range(0, 4),
610            ]),
611            FileGroup::new(vec![pfile("b", 10).with_range(4, 10)]),
612        ];
613
614        let file_groups_tests = [empty_first, empty_middle, empty_last];
615
616        for fg in file_groups_tests {
617            let all_expected = [(2, expected_2.clone()), (3, expected_3.clone())];
618            for (n_partition, expected) in all_expected {
619                let actual = FileGroupPartitioner::new()
620                    .with_target_partitions(n_partition)
621                    .with_repartition_file_min_size(10)
622                    .repartition_file_groups(&fg);
623
624                assert_partitioned_files(Some(expected), actual);
625            }
626        }
627    }
628
629    #[test]
630    fn repartition_single_file() {
631        // Single file, single partition into multiple partitions
632        let single_partition = vec![FileGroup::new(vec![pfile("a", 123)])];
633
634        let actual = FileGroupPartitioner::new()
635            .with_target_partitions(4)
636            .with_repartition_file_min_size(10)
637            .repartition_file_groups(&single_partition);
638
639        let expected = Some(vec![
640            FileGroup::new(vec![pfile("a", 123).with_range(0, 31)]),
641            FileGroup::new(vec![pfile("a", 123).with_range(31, 62)]),
642            FileGroup::new(vec![pfile("a", 123).with_range(62, 93)]),
643            FileGroup::new(vec![pfile("a", 123).with_range(93, 123)]),
644        ]);
645        assert_partitioned_files(expected, actual);
646    }
647
648    #[test]
649    fn repartition_too_much_partitions() {
650        // Single file, single partition into 96 partitions
651        let partitioned_file = pfile("a", 8);
652        let single_partition = vec![FileGroup::new(vec![partitioned_file])];
653
654        let actual = FileGroupPartitioner::new()
655            .with_target_partitions(96)
656            .with_repartition_file_min_size(5)
657            .repartition_file_groups(&single_partition);
658
659        let expected = Some(vec![
660            FileGroup::new(vec![pfile("a", 8).with_range(0, 1)]),
661            FileGroup::new(vec![pfile("a", 8).with_range(1, 2)]),
662            FileGroup::new(vec![pfile("a", 8).with_range(2, 3)]),
663            FileGroup::new(vec![pfile("a", 8).with_range(3, 4)]),
664            FileGroup::new(vec![pfile("a", 8).with_range(4, 5)]),
665            FileGroup::new(vec![pfile("a", 8).with_range(5, 6)]),
666            FileGroup::new(vec![pfile("a", 8).with_range(6, 7)]),
667            FileGroup::new(vec![pfile("a", 8).with_range(7, 8)]),
668        ]);
669
670        assert_partitioned_files(expected, actual);
671    }
672
673    #[test]
674    fn repartition_multiple_partitions() {
675        // Multiple files in single partition after redistribution
676        let source_partitions = vec![
677            FileGroup::new(vec![pfile("a", 40)]),
678            FileGroup::new(vec![pfile("b", 60)]),
679        ];
680
681        let actual = FileGroupPartitioner::new()
682            .with_target_partitions(3)
683            .with_repartition_file_min_size(10)
684            .repartition_file_groups(&source_partitions);
685
686        let expected = Some(vec![
687            FileGroup::new(vec![pfile("a", 40).with_range(0, 34)]),
688            FileGroup::new(vec![
689                pfile("a", 40).with_range(34, 40),
690                pfile("b", 60).with_range(0, 28),
691            ]),
692            FileGroup::new(vec![pfile("b", 60).with_range(28, 60)]),
693        ]);
694        assert_partitioned_files(expected, actual);
695    }
696
697    #[test]
698    fn repartition_same_num_partitions() {
699        // "Rebalance" files across partitions
700        let source_partitions = vec![
701            FileGroup::new(vec![pfile("a", 40)]),
702            FileGroup::new(vec![pfile("b", 60)]),
703        ];
704
705        let actual = FileGroupPartitioner::new()
706            .with_target_partitions(2)
707            .with_repartition_file_min_size(10)
708            .repartition_file_groups(&source_partitions);
709
710        let expected = Some(vec![
711            FileGroup::new(vec![
712                pfile("a", 40).with_range(0, 40),
713                pfile("b", 60).with_range(0, 10),
714            ]),
715            FileGroup::new(vec![pfile("b", 60).with_range(10, 60)]),
716        ]);
717        assert_partitioned_files(expected, actual);
718    }
719
720    #[test]
721    fn repartition_no_action_ranges() {
722        // No action due to Some(range) in second file
723        let source_partitions = vec![
724            FileGroup::new(vec![pfile("a", 123)]),
725            FileGroup::new(vec![pfile("b", 144).with_range(1, 50)]),
726        ];
727
728        let actual = FileGroupPartitioner::new()
729            .with_target_partitions(65)
730            .with_repartition_file_min_size(10)
731            .repartition_file_groups(&source_partitions);
732
733        assert_partitioned_files(None, actual)
734    }
735
736    #[test]
737    fn repartition_no_action_min_size() {
738        // No action due to target_partition_size
739        let single_partition = vec![FileGroup::new(vec![pfile("a", 123)])];
740
741        let actual = FileGroupPartitioner::new()
742            .with_target_partitions(65)
743            .with_repartition_file_min_size(500)
744            .repartition_file_groups(&single_partition);
745
746        assert_partitioned_files(None, actual)
747    }
748
749    #[test]
750    fn repartition_no_action_zero_files() {
751        // No action due to no files
752        let empty_partition = vec![];
753
754        let partitioner = FileGroupPartitioner::new()
755            .with_target_partitions(65)
756            .with_repartition_file_min_size(500);
757
758        assert_partitioned_files(None, repartition_test(partitioner, empty_partition))
759    }
760
761    #[test]
762    fn repartition_ordered_no_action_too_few_partitions() {
763        // No action as there are no new groups to redistribute to
764        let input_partitions = vec![
765            FileGroup::new(vec![pfile("a", 100)]),
766            FileGroup::new(vec![pfile("b", 200)]),
767        ];
768
769        let actual = FileGroupPartitioner::new()
770            .with_preserve_order_within_groups(true)
771            .with_target_partitions(2)
772            .with_repartition_file_min_size(10)
773            .repartition_file_groups(&input_partitions);
774
775        assert_partitioned_files(None, actual)
776    }
777
778    #[test]
779    fn repartition_ordered_no_action_file_too_small() {
780        // No action as there are no new groups to redistribute to
781        let single_partition = vec![FileGroup::new(vec![pfile("a", 100)])];
782
783        let actual = FileGroupPartitioner::new()
784            .with_preserve_order_within_groups(true)
785            .with_target_partitions(2)
786            // file is too small to repartition
787            .with_repartition_file_min_size(1000)
788            .repartition_file_groups(&single_partition);
789
790        assert_partitioned_files(None, actual)
791    }
792
793    #[test]
794    fn repartition_ordered_one_large_file() {
795        // "Rebalance" the single large file across partitions
796        let source_partitions = vec![FileGroup::new(vec![pfile("a", 100)])];
797
798        let actual = FileGroupPartitioner::new()
799            .with_preserve_order_within_groups(true)
800            .with_target_partitions(3)
801            .with_repartition_file_min_size(10)
802            .repartition_file_groups(&source_partitions);
803
804        let expected = Some(vec![
805            FileGroup::new(vec![pfile("a", 100).with_range(0, 34)]),
806            FileGroup::new(vec![pfile("a", 100).with_range(34, 68)]),
807            FileGroup::new(vec![pfile("a", 100).with_range(68, 100)]),
808        ]);
809        assert_partitioned_files(expected, actual);
810    }
811
812    #[test]
813    fn repartition_ordered_one_large_one_small_file() {
814        // "Rebalance" the single large file across empty partitions, but can't split
815        // small file
816        let source_partitions = vec![
817            FileGroup::new(vec![pfile("a", 100)]),
818            FileGroup::new(vec![pfile("b", 30)]),
819        ];
820
821        let actual = FileGroupPartitioner::new()
822            .with_preserve_order_within_groups(true)
823            .with_target_partitions(4)
824            .with_repartition_file_min_size(10)
825            .repartition_file_groups(&source_partitions);
826
827        let expected = Some(vec![
828            // scan first third of "a"
829            FileGroup::new(vec![pfile("a", 100).with_range(0, 33)]),
830            // only b in this group (can't do this)
831            FileGroup::new(vec![pfile("b", 30).with_range(0, 30)]),
832            // second third of "a"
833            FileGroup::new(vec![pfile("a", 100).with_range(33, 66)]),
834            // final third of "a"
835            FileGroup::new(vec![pfile("a", 100).with_range(66, 100)]),
836        ]);
837        assert_partitioned_files(expected, actual);
838    }
839
840    #[test]
841    fn repartition_ordered_two_large_files() {
842        // "Rebalance" two large files across empty partitions, but can't mix them
843        let source_partitions = vec![
844            FileGroup::new(vec![pfile("a", 100)]),
845            FileGroup::new(vec![pfile("b", 100)]),
846        ];
847
848        let actual = FileGroupPartitioner::new()
849            .with_preserve_order_within_groups(true)
850            .with_target_partitions(4)
851            .with_repartition_file_min_size(10)
852            .repartition_file_groups(&source_partitions);
853
854        let expected = Some(vec![
855            // scan first half of "a"
856            FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
857            // scan first half of "b"
858            FileGroup::new(vec![pfile("b", 100).with_range(0, 50)]),
859            // second half of "a"
860            FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
861            // second half of "b"
862            FileGroup::new(vec![pfile("b", 100).with_range(50, 100)]),
863        ]);
864        assert_partitioned_files(expected, actual);
865    }
866
867    #[test]
868    fn repartition_ordered_two_large_one_small_files() {
869        // "Rebalance" two large files and one small file across empty partitions
870        let source_partitions = vec![
871            FileGroup::new(vec![pfile("a", 100)]),
872            FileGroup::new(vec![pfile("b", 100)]),
873            FileGroup::new(vec![pfile("c", 30)]),
874        ];
875
876        let partitioner = FileGroupPartitioner::new()
877            .with_preserve_order_within_groups(true)
878            .with_repartition_file_min_size(10);
879
880        // with 4 partitions, can only split the first large file "a"
881        let actual = partitioner
882            .with_target_partitions(4)
883            .repartition_file_groups(&source_partitions);
884
885        let expected = Some(vec![
886            // scan first half of "a"
887            FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
888            // All of "b"
889            FileGroup::new(vec![pfile("b", 100).with_range(0, 100)]),
890            // All of "c"
891            FileGroup::new(vec![pfile("c", 30).with_range(0, 30)]),
892            // second half of "a"
893            FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
894        ]);
895        assert_partitioned_files(expected, actual);
896
897        // With 5 partitions, we can split both "a" and "b", but they can't be intermixed
898        let actual = partitioner
899            .with_target_partitions(5)
900            .repartition_file_groups(&source_partitions);
901
902        let expected = Some(vec![
903            // scan first half of "a"
904            FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
905            // scan first half of "b"
906            FileGroup::new(vec![pfile("b", 100).with_range(0, 50)]),
907            // All of "c"
908            FileGroup::new(vec![pfile("c", 30).with_range(0, 30)]),
909            // second half of "a"
910            FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
911            // second half of "b"
912            FileGroup::new(vec![pfile("b", 100).with_range(50, 100)]),
913        ]);
914        assert_partitioned_files(expected, actual);
915    }
916
917    #[test]
918    fn repartition_ordered_one_large_one_small_existing_empty() {
919        // "Rebalance" files using existing empty partition
920        let source_partitions = vec![
921            FileGroup::new(vec![pfile("a", 100)]),
922            FileGroup::default(),
923            FileGroup::new(vec![pfile("b", 40)]),
924            FileGroup::default(),
925        ];
926
927        let actual = FileGroupPartitioner::new()
928            .with_preserve_order_within_groups(true)
929            .with_target_partitions(5)
930            .with_repartition_file_min_size(10)
931            .repartition_file_groups(&source_partitions);
932
933        // Of the three available groups (2 original empty and 1 new from the
934        // target partitions), assign two to "a" and one to "b"
935        let expected = Some(vec![
936            // Scan of "a" across three groups
937            FileGroup::new(vec![pfile("a", 100).with_range(0, 33)]),
938            FileGroup::new(vec![pfile("a", 100).with_range(33, 66)]),
939            // scan first half of "b"
940            FileGroup::new(vec![pfile("b", 40).with_range(0, 20)]),
941            // final third of "a"
942            FileGroup::new(vec![pfile("a", 100).with_range(66, 100)]),
943            // second half of "b"
944            FileGroup::new(vec![pfile("b", 40).with_range(20, 40)]),
945        ]);
946        assert_partitioned_files(expected, actual);
947    }
948    #[test]
949    fn repartition_ordered_existing_group_multiple_files() {
950        // groups with multiple files in a group can not be changed, but can divide others
951        let source_partitions = vec![
952            // two files in an existing partition
953            FileGroup::new(vec![pfile("a", 100), pfile("b", 100)]),
954            FileGroup::new(vec![pfile("c", 40)]),
955        ];
956
957        let actual = FileGroupPartitioner::new()
958            .with_preserve_order_within_groups(true)
959            .with_target_partitions(3)
960            .with_repartition_file_min_size(10)
961            .repartition_file_groups(&source_partitions);
962
963        // Of the three available groups (2 original empty and 1 new from the
964        // target partitions), assign two to "a" and one to "b"
965        let expected = Some(vec![
966            // don't try and rearrange files in the existing partition
967            // assuming that the caller had a good reason to put them that way.
968            // (it is technically possible to split off ranges from the files if desired)
969            FileGroup::new(vec![pfile("a", 100), pfile("b", 100)]),
970            // first half of "c"
971            FileGroup::new(vec![pfile("c", 40).with_range(0, 20)]),
972            // second half of "c"
973            FileGroup::new(vec![pfile("c", 40).with_range(20, 40)]),
974        ]);
975        assert_partitioned_files(expected, actual);
976    }
977
978    /// Asserts that the two groups of [`PartitionedFile`] are the same
979    /// (PartitionedFile doesn't implement PartialEq)
980    fn assert_partitioned_files(
981        expected: Option<Vec<FileGroup>>,
982        actual: Option<Vec<FileGroup>>,
983    ) {
984        match (expected, actual) {
985            (None, None) => {}
986            (Some(_), None) => panic!("Expected Some, got None"),
987            (None, Some(_)) => panic!("Expected None, got Some"),
988            (Some(expected), Some(actual)) => {
989                let expected_string = format!("{expected:#?}");
990                let actual_string = format!("{actual:#?}");
991                assert_eq!(expected_string, actual_string);
992            }
993        }
994    }
995
996    /// returns a partitioned file with the specified path and size
997    fn pfile(path: impl Into<String>, file_size: u64) -> PartitionedFile {
998        PartitionedFile::new(path, file_size)
999    }
1000
1001    /// repartition the file groups both with and without preserving order
1002    /// asserting they return the same value and returns that value
1003    fn repartition_test(
1004        partitioner: FileGroupPartitioner,
1005        file_groups: Vec<FileGroup>,
1006    ) -> Option<Vec<FileGroup>> {
1007        let repartitioned = partitioner.repartition_file_groups(&file_groups);
1008
1009        let repartitioned_preserving_sort = partitioner
1010            .with_preserve_order_within_groups(true)
1011            .repartition_file_groups(&file_groups);
1012
1013        assert_partitioned_files(repartitioned.clone(), repartitioned_preserving_sort);
1014        repartitioned
1015    }
1016}