datafusion_datasource/
file_groups.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic for managing groups of [`PartitionedFile`]s in DataFusion
19
20use crate::{FileRange, PartitionedFile};
21use itertools::Itertools;
22use std::cmp::min;
23use std::collections::BinaryHeap;
24use std::iter::repeat_with;
25
26/// Repartition input files into `target_partitions` partitions, if total file size exceed
27/// `repartition_file_min_size`
28///
29/// This partitions evenly by file byte range, and does not have any knowledge
30/// of how data is laid out in specific files. The specific `FileOpener` are
31/// responsible for the actual partitioning on specific data source type. (e.g.
32/// the `CsvOpener` will read lines overlap with byte range as well as
33/// handle boundaries to ensure all lines will be read exactly once)
34///
35/// # Example
36///
37/// For example, if there are two files `A` and `B` that we wish to read with 4
38/// partitions (with 4 threads) they will be divided as follows:
39///
40/// ```text
41///                                    ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
42///                                      ┌─────────────────┐
43///                                    │ │                 │ │
44///                                      │     File A      │
45///                                    │ │  Range: 0-2MB   │ │
46///                                      │                 │
47///                                    │ └─────────────────┘ │
48///                                     ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
49/// ┌─────────────────┐                ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
50/// │                 │                  ┌─────────────────┐
51/// │                 │                │ │                 │ │
52/// │                 │                  │     File A      │
53/// │                 │                │ │   Range 2-4MB   │ │
54/// │                 │                  │                 │
55/// │                 │                │ └─────────────────┘ │
56/// │  File A (7MB)   │   ────────▶     ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
57/// │                 │                ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
58/// │                 │                  ┌─────────────────┐
59/// │                 │                │ │                 │ │
60/// │                 │                  │     File A      │
61/// │                 │                │ │  Range: 4-6MB   │ │
62/// │                 │                  │                 │
63/// │                 │                │ └─────────────────┘ │
64/// └─────────────────┘                 ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
65/// ┌─────────────────┐                ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
66/// │  File B (1MB)   │                  ┌─────────────────┐
67/// │                 │                │ │     File A      │ │
68/// └─────────────────┘                  │  Range: 6-7MB   │
69///                                    │ └─────────────────┘ │
70///                                      ┌─────────────────┐
71///                                    │ │  File B (1MB)   │ │
72///                                      │                 │
73///                                    │ └─────────────────┘ │
74///                                     ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
75///
76///                                    If target_partitions = 4,
77///                                      divides into 4 groups
78/// ```
79///
80/// # Maintaining Order
81///
82/// Within each group files are read sequentially. Thus, if the overall order of
83/// tuples must be preserved, multiple files can not be mixed in the same group.
84///
85/// In this case, the code will split the largest files evenly into any
86/// available empty groups, but the overall distribution may not not be as even
87/// as as even as if the order did not need to be preserved.
88///
89/// ```text
90///                                   ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
91///                                      ┌─────────────────┐
92///                                    │ │                 │ │
93///                                      │     File A      │
94///                                    │ │  Range: 0-2MB   │ │
95///                                      │                 │
96/// ┌─────────────────┐                │ └─────────────────┘ │
97/// │                 │                 ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
98/// │                 │                ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
99/// │                 │                  ┌─────────────────┐
100/// │                 │                │ │                 │ │
101/// │                 │                  │     File A      │
102/// │                 │                │ │   Range 2-4MB   │ │
103/// │  File A (6MB)   │   ────────▶      │                 │
104/// │    (ordered)    │                │ └─────────────────┘ │
105/// │                 │                 ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
106/// │                 │                ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
107/// │                 │                  ┌─────────────────┐
108/// │                 │                │ │                 │ │
109/// │                 │                  │     File A      │
110/// │                 │                │ │  Range: 4-6MB   │ │
111/// └─────────────────┘                  │                 │
112/// ┌─────────────────┐                │ └─────────────────┘ │
113/// │  File B (1MB)   │                 ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
114/// │    (ordered)    │                ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
115/// └─────────────────┘                  ┌─────────────────┐
116///                                    │ │  File B (1MB)   │ │
117///                                      │                 │
118///                                    │ └─────────────────┘ │
119///                                     ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
120///
121///                                    If target_partitions = 4,
122///                                      divides into 4 groups
123/// ```
124#[derive(Debug, Clone, Copy)]
125pub struct FileGroupPartitioner {
126    /// how many partitions should be created
127    target_partitions: usize,
128    /// the minimum size for a file to be repartitioned.
129    repartition_file_min_size: usize,
130    /// if the order when reading the files must be preserved
131    preserve_order_within_groups: bool,
132}
133
134impl Default for FileGroupPartitioner {
135    fn default() -> Self {
136        Self::new()
137    }
138}
139
140impl FileGroupPartitioner {
141    /// Creates a new [`FileGroupPartitioner`] with default values:
142    /// 1. `target_partitions = 1`
143    /// 2. `repartition_file_min_size = 10MB`
144    /// 3. `preserve_order_within_groups = false`
145    pub fn new() -> Self {
146        Self {
147            target_partitions: 1,
148            repartition_file_min_size: 10 * 1024 * 1024,
149            preserve_order_within_groups: false,
150        }
151    }
152
153    /// Set the target partitions
154    pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
155        self.target_partitions = target_partitions;
156        self
157    }
158
159    /// Set the minimum size at which to repartition a file
160    pub fn with_repartition_file_min_size(
161        mut self,
162        repartition_file_min_size: usize,
163    ) -> Self {
164        self.repartition_file_min_size = repartition_file_min_size;
165        self
166    }
167
168    /// Set whether the order of tuples within a file must be preserved
169    pub fn with_preserve_order_within_groups(
170        mut self,
171        preserve_order_within_groups: bool,
172    ) -> Self {
173        self.preserve_order_within_groups = preserve_order_within_groups;
174        self
175    }
176
177    /// Repartition input files according to the settings on this [`FileGroupPartitioner`].
178    ///
179    /// If no repartitioning is needed or possible, return `None`.
180    pub fn repartition_file_groups(
181        &self,
182        file_groups: &[Vec<PartitionedFile>],
183    ) -> Option<Vec<Vec<PartitionedFile>>> {
184        if file_groups.is_empty() {
185            return None;
186        }
187
188        // Perform redistribution only in case all files should be read from beginning to end
189        let has_ranges = file_groups.iter().flatten().any(|f| f.range.is_some());
190        if has_ranges {
191            return None;
192        }
193
194        //  special case when order must be preserved
195        if self.preserve_order_within_groups {
196            self.repartition_preserving_order(file_groups)
197        } else {
198            self.repartition_evenly_by_size(file_groups)
199        }
200    }
201
202    /// Evenly repartition files across partitions by size, ignoring any
203    /// existing grouping / ordering
204    fn repartition_evenly_by_size(
205        &self,
206        file_groups: &[Vec<PartitionedFile>],
207    ) -> Option<Vec<Vec<PartitionedFile>>> {
208        let target_partitions = self.target_partitions;
209        let repartition_file_min_size = self.repartition_file_min_size;
210        let flattened_files = file_groups.iter().flatten().collect::<Vec<_>>();
211
212        let total_size = flattened_files
213            .iter()
214            .map(|f| f.object_meta.size as i64)
215            .sum::<i64>();
216        if total_size < (repartition_file_min_size as i64) || total_size == 0 {
217            return None;
218        }
219
220        let target_partition_size = (total_size as usize).div_ceil(target_partitions);
221
222        let current_partition_index: usize = 0;
223        let current_partition_size: usize = 0;
224
225        // Partition byte range evenly for all `PartitionedFile`s
226        let repartitioned_files = flattened_files
227            .into_iter()
228            .scan(
229                (current_partition_index, current_partition_size),
230                |state, source_file| {
231                    let mut produced_files = vec![];
232                    let mut range_start = 0;
233                    while range_start < source_file.object_meta.size {
234                        let range_end = min(
235                            range_start + (target_partition_size - state.1),
236                            source_file.object_meta.size,
237                        );
238
239                        let mut produced_file = source_file.clone();
240                        produced_file.range = Some(FileRange {
241                            start: range_start as i64,
242                            end: range_end as i64,
243                        });
244                        produced_files.push((state.0, produced_file));
245
246                        if state.1 + (range_end - range_start) >= target_partition_size {
247                            state.0 += 1;
248                            state.1 = 0;
249                        } else {
250                            state.1 += range_end - range_start;
251                        }
252                        range_start = range_end;
253                    }
254                    Some(produced_files)
255                },
256            )
257            .flatten()
258            .chunk_by(|(partition_idx, _)| *partition_idx)
259            .into_iter()
260            .map(|(_, group)| group.map(|(_, vals)| vals).collect_vec())
261            .collect_vec();
262
263        Some(repartitioned_files)
264    }
265
266    /// Redistribute file groups across size preserving order
267    fn repartition_preserving_order(
268        &self,
269        file_groups: &[Vec<PartitionedFile>],
270    ) -> Option<Vec<Vec<PartitionedFile>>> {
271        // Can't repartition and preserve order if there are more groups
272        // than partitions
273        if file_groups.len() >= self.target_partitions {
274            return None;
275        }
276        let num_new_groups = self.target_partitions - file_groups.len();
277
278        // If there is only a single file
279        if file_groups.len() == 1 && file_groups[0].len() == 1 {
280            return self.repartition_evenly_by_size(file_groups);
281        }
282
283        // Find which files could be split (single file groups)
284        let mut heap: BinaryHeap<_> = file_groups
285            .iter()
286            .enumerate()
287            .filter_map(|(group_index, group)| {
288                // ignore groups that do not have exactly 1 file
289                if group.len() == 1 {
290                    Some(ToRepartition {
291                        source_index: group_index,
292                        file_size: group[0].object_meta.size,
293                        new_groups: vec![group_index],
294                    })
295                } else {
296                    None
297                }
298            })
299            .collect();
300
301        // No files can be redistributed
302        if heap.is_empty() {
303            return None;
304        }
305
306        // Add new empty groups to which we will redistribute ranges of existing files
307        let mut file_groups: Vec<_> = file_groups
308            .iter()
309            .cloned()
310            .chain(repeat_with(Vec::new).take(num_new_groups))
311            .collect();
312
313        // Divide up empty groups
314        for (group_index, group) in file_groups.iter().enumerate() {
315            if !group.is_empty() {
316                continue;
317            }
318            // Pick the file that has the largest ranges to read so far
319            let mut largest_group = heap.pop().unwrap();
320            largest_group.new_groups.push(group_index);
321            heap.push(largest_group);
322        }
323
324        // Distribute files to their newly assigned groups
325        while let Some(to_repartition) = heap.pop() {
326            let range_size = to_repartition.range_size() as i64;
327            let ToRepartition {
328                source_index,
329                file_size,
330                new_groups,
331            } = to_repartition;
332            assert_eq!(file_groups[source_index].len(), 1);
333            let original_file = file_groups[source_index].pop().unwrap();
334
335            let last_group = new_groups.len() - 1;
336            let mut range_start: i64 = 0;
337            let mut range_end: i64 = range_size;
338            for (i, group_index) in new_groups.into_iter().enumerate() {
339                let target_group = &mut file_groups[group_index];
340                assert!(target_group.is_empty());
341
342                // adjust last range to include the entire file
343                if i == last_group {
344                    range_end = file_size as i64;
345                }
346                target_group
347                    .push(original_file.clone().with_range(range_start, range_end));
348                range_start = range_end;
349                range_end += range_size;
350            }
351        }
352
353        Some(file_groups)
354    }
355}
356
357/// Tracks how a individual file will be repartitioned
358#[derive(Debug, Clone, PartialEq, Eq)]
359struct ToRepartition {
360    /// the index from which the original file will be taken
361    source_index: usize,
362    /// the size of the original file
363    file_size: usize,
364    /// indexes of which group(s) will this be distributed to (including `source_index`)
365    new_groups: Vec<usize>,
366}
367
368impl ToRepartition {
369    // how big will each file range be when this file is read in its new groups?
370    fn range_size(&self) -> usize {
371        self.file_size / self.new_groups.len()
372    }
373}
374
375impl PartialOrd for ToRepartition {
376    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
377        Some(self.cmp(other))
378    }
379}
380
381/// Order based on individual range
382impl Ord for ToRepartition {
383    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
384        self.range_size().cmp(&other.range_size())
385    }
386}
387
388#[cfg(test)]
389mod test {
390    use super::*;
391
392    /// Empty file won't get partitioned
393    #[test]
394    fn repartition_empty_file_only() {
395        let partitioned_file_empty = pfile("empty", 0);
396        let file_group = vec![vec![partitioned_file_empty]];
397
398        let partitioned_files = FileGroupPartitioner::new()
399            .with_target_partitions(4)
400            .with_repartition_file_min_size(0)
401            .repartition_file_groups(&file_group);
402
403        assert_partitioned_files(None, partitioned_files);
404    }
405
406    /// Repartition when there is a empty file in file groups
407    #[test]
408    fn repartition_empty_files() {
409        let pfile_a = pfile("a", 10);
410        let pfile_b = pfile("b", 10);
411        let pfile_empty = pfile("empty", 0);
412
413        let empty_first = vec![
414            vec![pfile_empty.clone()],
415            vec![pfile_a.clone()],
416            vec![pfile_b.clone()],
417        ];
418        let empty_middle = vec![
419            vec![pfile_a.clone()],
420            vec![pfile_empty.clone()],
421            vec![pfile_b.clone()],
422        ];
423        let empty_last = vec![vec![pfile_a], vec![pfile_b], vec![pfile_empty]];
424
425        // Repartition file groups into x partitions
426        let expected_2 = vec![
427            vec![pfile("a", 10).with_range(0, 10)],
428            vec![pfile("b", 10).with_range(0, 10)],
429        ];
430        let expected_3 = vec![
431            vec![pfile("a", 10).with_range(0, 7)],
432            vec![
433                pfile("a", 10).with_range(7, 10),
434                pfile("b", 10).with_range(0, 4),
435            ],
436            vec![pfile("b", 10).with_range(4, 10)],
437        ];
438
439        let file_groups_tests = [empty_first, empty_middle, empty_last];
440
441        for fg in file_groups_tests {
442            let all_expected = [(2, expected_2.clone()), (3, expected_3.clone())];
443            for (n_partition, expected) in all_expected {
444                let actual = FileGroupPartitioner::new()
445                    .with_target_partitions(n_partition)
446                    .with_repartition_file_min_size(10)
447                    .repartition_file_groups(&fg);
448
449                assert_partitioned_files(Some(expected), actual);
450            }
451        }
452    }
453
454    #[test]
455    fn repartition_single_file() {
456        // Single file, single partition into multiple partitions
457        let single_partition = vec![vec![pfile("a", 123)]];
458
459        let actual = FileGroupPartitioner::new()
460            .with_target_partitions(4)
461            .with_repartition_file_min_size(10)
462            .repartition_file_groups(&single_partition);
463
464        let expected = Some(vec![
465            vec![pfile("a", 123).with_range(0, 31)],
466            vec![pfile("a", 123).with_range(31, 62)],
467            vec![pfile("a", 123).with_range(62, 93)],
468            vec![pfile("a", 123).with_range(93, 123)],
469        ]);
470        assert_partitioned_files(expected, actual);
471    }
472
473    #[test]
474    fn repartition_too_much_partitions() {
475        // Single file, single partition into 96 partitions
476        let partitioned_file = pfile("a", 8);
477        let single_partition = vec![vec![partitioned_file]];
478
479        let actual = FileGroupPartitioner::new()
480            .with_target_partitions(96)
481            .with_repartition_file_min_size(5)
482            .repartition_file_groups(&single_partition);
483
484        let expected = Some(vec![
485            vec![pfile("a", 8).with_range(0, 1)],
486            vec![pfile("a", 8).with_range(1, 2)],
487            vec![pfile("a", 8).with_range(2, 3)],
488            vec![pfile("a", 8).with_range(3, 4)],
489            vec![pfile("a", 8).with_range(4, 5)],
490            vec![pfile("a", 8).with_range(5, 6)],
491            vec![pfile("a", 8).with_range(6, 7)],
492            vec![pfile("a", 8).with_range(7, 8)],
493        ]);
494
495        assert_partitioned_files(expected, actual);
496    }
497
498    #[test]
499    fn repartition_multiple_partitions() {
500        // Multiple files in single partition after redistribution
501        let source_partitions = vec![vec![pfile("a", 40)], vec![pfile("b", 60)]];
502
503        let actual = FileGroupPartitioner::new()
504            .with_target_partitions(3)
505            .with_repartition_file_min_size(10)
506            .repartition_file_groups(&source_partitions);
507
508        let expected = Some(vec![
509            vec![pfile("a", 40).with_range(0, 34)],
510            vec![
511                pfile("a", 40).with_range(34, 40),
512                pfile("b", 60).with_range(0, 28),
513            ],
514            vec![pfile("b", 60).with_range(28, 60)],
515        ]);
516        assert_partitioned_files(expected, actual);
517    }
518
519    #[test]
520    fn repartition_same_num_partitions() {
521        // "Rebalance" files across partitions
522        let source_partitions = vec![vec![pfile("a", 40)], vec![pfile("b", 60)]];
523
524        let actual = FileGroupPartitioner::new()
525            .with_target_partitions(2)
526            .with_repartition_file_min_size(10)
527            .repartition_file_groups(&source_partitions);
528
529        let expected = Some(vec![
530            vec![
531                pfile("a", 40).with_range(0, 40),
532                pfile("b", 60).with_range(0, 10),
533            ],
534            vec![pfile("b", 60).with_range(10, 60)],
535        ]);
536        assert_partitioned_files(expected, actual);
537    }
538
539    #[test]
540    fn repartition_no_action_ranges() {
541        // No action due to Some(range) in second file
542        let source_partitions = vec![
543            vec![pfile("a", 123)],
544            vec![pfile("b", 144).with_range(1, 50)],
545        ];
546
547        let actual = FileGroupPartitioner::new()
548            .with_target_partitions(65)
549            .with_repartition_file_min_size(10)
550            .repartition_file_groups(&source_partitions);
551
552        assert_partitioned_files(None, actual)
553    }
554
555    #[test]
556    fn repartition_no_action_min_size() {
557        // No action due to target_partition_size
558        let single_partition = vec![vec![pfile("a", 123)]];
559
560        let actual = FileGroupPartitioner::new()
561            .with_target_partitions(65)
562            .with_repartition_file_min_size(500)
563            .repartition_file_groups(&single_partition);
564
565        assert_partitioned_files(None, actual)
566    }
567
568    #[test]
569    fn repartition_no_action_zero_files() {
570        // No action due to no files
571        let empty_partition = vec![];
572
573        let partitioner = FileGroupPartitioner::new()
574            .with_target_partitions(65)
575            .with_repartition_file_min_size(500);
576
577        assert_partitioned_files(None, repartition_test(partitioner, empty_partition))
578    }
579
580    #[test]
581    fn repartition_ordered_no_action_too_few_partitions() {
582        // No action as there are no new groups to redistribute to
583        let input_partitions = vec![vec![pfile("a", 100)], vec![pfile("b", 200)]];
584
585        let actual = FileGroupPartitioner::new()
586            .with_preserve_order_within_groups(true)
587            .with_target_partitions(2)
588            .with_repartition_file_min_size(10)
589            .repartition_file_groups(&input_partitions);
590
591        assert_partitioned_files(None, actual)
592    }
593
594    #[test]
595    fn repartition_ordered_no_action_file_too_small() {
596        // No action as there are no new groups to redistribute to
597        let single_partition = vec![vec![pfile("a", 100)]];
598
599        let actual = FileGroupPartitioner::new()
600            .with_preserve_order_within_groups(true)
601            .with_target_partitions(2)
602            // file is too small to repartition
603            .with_repartition_file_min_size(1000)
604            .repartition_file_groups(&single_partition);
605
606        assert_partitioned_files(None, actual)
607    }
608
609    #[test]
610    fn repartition_ordered_one_large_file() {
611        // "Rebalance" the single large file across partitions
612        let source_partitions = vec![vec![pfile("a", 100)]];
613
614        let actual = FileGroupPartitioner::new()
615            .with_preserve_order_within_groups(true)
616            .with_target_partitions(3)
617            .with_repartition_file_min_size(10)
618            .repartition_file_groups(&source_partitions);
619
620        let expected = Some(vec![
621            vec![pfile("a", 100).with_range(0, 34)],
622            vec![pfile("a", 100).with_range(34, 68)],
623            vec![pfile("a", 100).with_range(68, 100)],
624        ]);
625        assert_partitioned_files(expected, actual);
626    }
627
628    #[test]
629    fn repartition_ordered_one_large_one_small_file() {
630        // "Rebalance" the single large file across empty partitions, but can't split
631        // small file
632        let source_partitions = vec![vec![pfile("a", 100)], vec![pfile("b", 30)]];
633
634        let actual = FileGroupPartitioner::new()
635            .with_preserve_order_within_groups(true)
636            .with_target_partitions(4)
637            .with_repartition_file_min_size(10)
638            .repartition_file_groups(&source_partitions);
639
640        let expected = Some(vec![
641            // scan first third of "a"
642            vec![pfile("a", 100).with_range(0, 33)],
643            // only b in this group (can't do this)
644            vec![pfile("b", 30).with_range(0, 30)],
645            // second third of "a"
646            vec![pfile("a", 100).with_range(33, 66)],
647            // final third of "a"
648            vec![pfile("a", 100).with_range(66, 100)],
649        ]);
650        assert_partitioned_files(expected, actual);
651    }
652
653    #[test]
654    fn repartition_ordered_two_large_files() {
655        // "Rebalance" two large files across empty partitions, but can't mix them
656        let source_partitions = vec![vec![pfile("a", 100)], vec![pfile("b", 100)]];
657
658        let actual = FileGroupPartitioner::new()
659            .with_preserve_order_within_groups(true)
660            .with_target_partitions(4)
661            .with_repartition_file_min_size(10)
662            .repartition_file_groups(&source_partitions);
663
664        let expected = Some(vec![
665            // scan first half of "a"
666            vec![pfile("a", 100).with_range(0, 50)],
667            // scan first half of "b"
668            vec![pfile("b", 100).with_range(0, 50)],
669            // second half of "a"
670            vec![pfile("a", 100).with_range(50, 100)],
671            // second half of "b"
672            vec![pfile("b", 100).with_range(50, 100)],
673        ]);
674        assert_partitioned_files(expected, actual);
675    }
676
677    #[test]
678    fn repartition_ordered_two_large_one_small_files() {
679        // "Rebalance" two large files and one small file across empty partitions
680        let source_partitions = vec![
681            vec![pfile("a", 100)],
682            vec![pfile("b", 100)],
683            vec![pfile("c", 30)],
684        ];
685
686        let partitioner = FileGroupPartitioner::new()
687            .with_preserve_order_within_groups(true)
688            .with_repartition_file_min_size(10);
689
690        // with 4 partitions, can only split the first large file "a"
691        let actual = partitioner
692            .with_target_partitions(4)
693            .repartition_file_groups(&source_partitions);
694
695        let expected = Some(vec![
696            // scan first half of "a"
697            vec![pfile("a", 100).with_range(0, 50)],
698            // All of "b"
699            vec![pfile("b", 100).with_range(0, 100)],
700            // All of "c"
701            vec![pfile("c", 30).with_range(0, 30)],
702            // second half of "a"
703            vec![pfile("a", 100).with_range(50, 100)],
704        ]);
705        assert_partitioned_files(expected, actual);
706
707        // With 5 partitions, we can split both "a" and "b", but they can't be intermixed
708        let actual = partitioner
709            .with_target_partitions(5)
710            .repartition_file_groups(&source_partitions);
711
712        let expected = Some(vec![
713            // scan first half of "a"
714            vec![pfile("a", 100).with_range(0, 50)],
715            // scan first half of "b"
716            vec![pfile("b", 100).with_range(0, 50)],
717            // All of "c"
718            vec![pfile("c", 30).with_range(0, 30)],
719            // second half of "a"
720            vec![pfile("a", 100).with_range(50, 100)],
721            // second half of "b"
722            vec![pfile("b", 100).with_range(50, 100)],
723        ]);
724        assert_partitioned_files(expected, actual);
725    }
726
727    #[test]
728    fn repartition_ordered_one_large_one_small_existing_empty() {
729        // "Rebalance" files using existing empty partition
730        let source_partitions =
731            vec![vec![pfile("a", 100)], vec![], vec![pfile("b", 40)], vec![]];
732
733        let actual = FileGroupPartitioner::new()
734            .with_preserve_order_within_groups(true)
735            .with_target_partitions(5)
736            .with_repartition_file_min_size(10)
737            .repartition_file_groups(&source_partitions);
738
739        // Of the three available groups (2 original empty and 1 new from the
740        // target partitions), assign two to "a" and one to "b"
741        let expected = Some(vec![
742            // Scan of "a" across three groups
743            vec![pfile("a", 100).with_range(0, 33)],
744            vec![pfile("a", 100).with_range(33, 66)],
745            // scan first half of "b"
746            vec![pfile("b", 40).with_range(0, 20)],
747            // final third of "a"
748            vec![pfile("a", 100).with_range(66, 100)],
749            // second half of "b"
750            vec![pfile("b", 40).with_range(20, 40)],
751        ]);
752        assert_partitioned_files(expected, actual);
753    }
754    #[test]
755    fn repartition_ordered_existing_group_multiple_files() {
756        // groups with multiple files in a group can not be changed, but can divide others
757        let source_partitions = vec![
758            // two files in an existing partition
759            vec![pfile("a", 100), pfile("b", 100)],
760            vec![pfile("c", 40)],
761        ];
762
763        let actual = FileGroupPartitioner::new()
764            .with_preserve_order_within_groups(true)
765            .with_target_partitions(3)
766            .with_repartition_file_min_size(10)
767            .repartition_file_groups(&source_partitions);
768
769        // Of the three available groups (2 original empty and 1 new from the
770        // target partitions), assign two to "a" and one to "b"
771        let expected = Some(vec![
772            // don't try and rearrange files in the existing partition
773            // assuming that the caller had a good reason to put them that way.
774            // (it is technically possible to split off ranges from the files if desired)
775            vec![pfile("a", 100), pfile("b", 100)],
776            // first half of "c"
777            vec![pfile("c", 40).with_range(0, 20)],
778            // second half of "c"
779            vec![pfile("c", 40).with_range(20, 40)],
780        ]);
781        assert_partitioned_files(expected, actual);
782    }
783
784    /// Asserts that the two groups of [`PartitionedFile`] are the same
785    /// (PartitionedFile doesn't implement PartialEq)
786    fn assert_partitioned_files(
787        expected: Option<Vec<Vec<PartitionedFile>>>,
788        actual: Option<Vec<Vec<PartitionedFile>>>,
789    ) {
790        match (expected, actual) {
791            (None, None) => {}
792            (Some(_), None) => panic!("Expected Some, got None"),
793            (None, Some(_)) => panic!("Expected None, got Some"),
794            (Some(expected), Some(actual)) => {
795                let expected_string = format!("{:#?}", expected);
796                let actual_string = format!("{:#?}", actual);
797                assert_eq!(expected_string, actual_string);
798            }
799        }
800    }
801
802    /// returns a partitioned file with the specified path and size
803    fn pfile(path: impl Into<String>, file_size: u64) -> PartitionedFile {
804        PartitionedFile::new(path, file_size)
805    }
806
807    /// repartition the file groups both with and without preserving order
808    /// asserting they return the same value and returns that value
809    fn repartition_test(
810        partitioner: FileGroupPartitioner,
811        file_groups: Vec<Vec<PartitionedFile>>,
812    ) -> Option<Vec<Vec<PartitionedFile>>> {
813        let repartitioned = partitioner.repartition_file_groups(&file_groups);
814
815        let repartitioned_preserving_sort = partitioner
816            .with_preserve_order_within_groups(true)
817            .repartition_file_groups(&file_groups);
818
819        assert_partitioned_files(repartitioned.clone(), repartitioned_preserving_sort);
820        repartitioned
821    }
822}