gnu_sort/
external_sort.rs

1use crate::radix_sort::RadixSort;
2use crate::simd_compare::SIMDCompare;
3use crate::zero_copy::{Line, MappedFile};
4use rayon::prelude::*;
5use std::cmp::Ordering;
6/// External sorting implementation for very large datasets
7/// Uses divide-and-conquer with disk-based temporary files to handle datasets larger than RAM
8use std::fs::File;
9use std::io::{self, BufRead, BufReader, BufWriter, Write};
10use std::path::{Path, PathBuf};
11use tempfile::TempDir;
12
13/// External sorter for handling very large datasets efficiently
14pub struct ExternalSort {
15    /// Maximum chunk size in memory (bytes)
16    max_chunk_size: usize,
17    /// Whether to use parallel processing
18    parallel: bool,
19    /// Whether to use radix sort for numeric data
20    use_radix: bool,
21    /// Temporary directory for chunk files
22    temp_dir: TempDir,
23}
24
25impl ExternalSort {
26    /// Create new external sorter with memory limit
27    pub fn new(
28        max_memory_mb: usize,
29        parallel: bool,
30        use_radix: bool,
31        temp_dir_path: Option<&str>,
32    ) -> io::Result<Self> {
33        let max_chunk_size = max_memory_mb * 1024 * 1024; // Convert MB to bytes
34
35        // Create temp directory in specified location or use default
36        let temp_dir = if let Some(path) = temp_dir_path {
37            tempfile::tempdir_in(path)?
38        } else if let Ok(tmpdir) = std::env::var("TMPDIR") {
39            tempfile::tempdir_in(tmpdir)?
40        } else {
41            tempfile::tempdir()?
42        };
43
44        Ok(Self {
45            max_chunk_size,
46            parallel,
47            use_radix,
48            temp_dir,
49        })
50    }
51
52    /// Main external sort entry point
53    pub fn sort_file(
54        &self,
55        input_path: &Path,
56        output_path: &Path,
57        numeric: bool,
58        unique: bool,
59    ) -> io::Result<()> {
60        // Step 1: Estimate file size and determine strategy
61        let file_size = std::fs::metadata(input_path)?.len() as usize;
62
63        if file_size <= self.max_chunk_size {
64            // File fits in memory - use in-memory sorting
65            return self.sort_in_memory(input_path, output_path, numeric, unique);
66        }
67
68        // Step 2: Split file into sorted chunks
69        let chunk_files = self.create_sorted_chunks(input_path, numeric)?;
70
71        // Step 3: Merge sorted chunks
72        self.merge_sorted_chunks(&chunk_files, output_path, numeric, unique)?;
73
74        Ok(())
75    }
76
77    /// Sort file that fits entirely in memory
78    fn sort_in_memory(
79        &self,
80        input_path: &Path,
81        output_path: &Path,
82        numeric: bool,
83        unique: bool,
84    ) -> io::Result<()> {
85        let mapped_file = MappedFile::new(input_path)?;
86        let lines = mapped_file.lines();
87
88        let mut simple_lines: Vec<Line> = lines.to_vec();
89
90        if numeric && self.use_radix {
91            let radix_sorter = RadixSort::new(self.parallel);
92            radix_sorter.sort_numeric_lines(&mut simple_lines);
93        } else if self.parallel && simple_lines.len() > 10000 {
94            if numeric {
95                simple_lines.par_sort_unstable_by(|a, b| a.compare_numeric(b));
96            } else {
97                simple_lines.par_sort_unstable_by(|a, b| a.compare_lexicographic(b));
98            }
99        } else if numeric {
100            simple_lines.sort_unstable_by(|a, b| a.compare_numeric(b));
101        } else {
102            simple_lines.sort_unstable_by(|a, b| a.compare_lexicographic(b));
103        }
104
105        // Remove duplicates if unique mode
106        if unique {
107            simple_lines.dedup_by(|a, b| unsafe { a.as_bytes() == b.as_bytes() });
108        }
109
110        // Write sorted output
111        let mut output = BufWriter::new(File::create(output_path)?);
112        for line in &simple_lines {
113            unsafe {
114                output.write_all(line.as_bytes())?;
115                output.write_all(b"\n")?;
116            }
117        }
118        output.flush()?;
119
120        Ok(())
121    }
122
123    /// Create sorted chunks from large input file
124    fn create_sorted_chunks(&self, input_path: &Path, numeric: bool) -> io::Result<Vec<PathBuf>> {
125        let file = File::open(input_path)?;
126        let mut reader = BufReader::new(file);
127        let mut chunk_files = Vec::new();
128        let mut chunk_number = 0;
129
130        loop {
131            // Read chunk of lines that fits in memory
132            let (lines, eof) = self.read_chunk_lines(&mut reader)?;
133            if lines.is_empty() {
134                break;
135            }
136
137            // Sort the chunk
138            let sorted_lines = self.sort_chunk(lines, numeric)?;
139
140            // Write sorted chunk to temporary file
141            let chunk_path = self.write_chunk_to_file(&sorted_lines, chunk_number)?;
142            chunk_files.push(chunk_path);
143            chunk_number += 1;
144
145            if eof {
146                break;
147            }
148        }
149
150        Ok(chunk_files)
151    }
152
153    /// Read a chunk of lines that fits in memory (optimized for large files)
154    fn read_chunk_lines(&self, reader: &mut BufReader<File>) -> io::Result<(Vec<String>, bool)> {
155        let mut lines = Vec::new();
156        let mut total_size = 0;
157        let mut line = String::new();
158
159        // Pre-allocate capacity for better performance
160        lines.reserve(self.max_chunk_size / 20); // Estimate ~20 chars per line
161
162        while total_size < self.max_chunk_size {
163            line.clear();
164            let bytes_read = reader.read_line(&mut line)?;
165
166            if bytes_read == 0 {
167                // EOF reached
168                return Ok((lines, true));
169            }
170
171            // Remove trailing newline
172            if line.ends_with('\n') {
173                line.pop();
174                if line.ends_with('\r') {
175                    line.pop();
176                }
177            }
178
179            total_size += line.len();
180            lines.push(std::mem::take(&mut line));
181        }
182
183        Ok((lines, false))
184    }
185
186    /// Sort a chunk using optimized algorithms for large data  
187    fn sort_chunk(&self, mut lines: Vec<String>, numeric: bool) -> io::Result<Vec<String>> {
188        // For large chunks, always prefer parallel sorting
189        const LARGE_CHUNK_THRESHOLD: usize = 50_000;
190
191        if numeric && self.use_radix && self.is_all_simple_integers(&lines) {
192            // Use radix sort for simple integers
193            self.radix_sort_strings(&mut lines)?;
194        } else {
195            // Use optimized comparison-based sort
196            if self.parallel && lines.len() > LARGE_CHUNK_THRESHOLD {
197                // For very large chunks, use parallel sort
198                if numeric {
199                    lines.par_sort_unstable_by(|a, b| self.compare_numeric_strings(a, b));
200                } else {
201                    lines.par_sort_unstable_by(|a, b| {
202                        SIMDCompare::compare_bytes_simd(a.as_bytes(), b.as_bytes())
203                    });
204                }
205            } else if lines.len() > 10_000 {
206                // Medium chunks - parallel but less aggressive
207                if numeric {
208                    lines.par_sort_unstable_by(|a, b| self.compare_numeric_strings(a, b));
209                } else {
210                    lines.par_sort_unstable_by(|a, b| {
211                        SIMDCompare::compare_bytes_simd(a.as_bytes(), b.as_bytes())
212                    });
213                }
214            } else {
215                // Small chunks - sequential
216                if numeric {
217                    lines.sort_unstable_by(|a, b| self.compare_numeric_strings(a, b));
218                } else {
219                    lines.sort_unstable_by(|a, b| {
220                        SIMDCompare::compare_bytes_simd(a.as_bytes(), b.as_bytes())
221                    });
222                }
223            }
224        }
225
226        Ok(lines)
227    }
228
229    /// Check if all strings are simple integers
230    fn is_all_simple_integers(&self, lines: &[String]) -> bool {
231        // Sample first 100 lines to determine if all are simple integers
232        let sample_size = lines.len().min(100);
233        lines[..sample_size].iter().all(|line| {
234            SIMDCompare::is_all_digits_simd(line.as_bytes())
235                || (line.starts_with('-') && SIMDCompare::is_all_digits_simd(&line.as_bytes()[1..]))
236        })
237    }
238
239    /// Radix sort for string integers
240    fn radix_sort_strings(&self, lines: &mut [String]) -> io::Result<()> {
241        // Convert to (value, index) pairs
242        let mut values: Vec<(i64, usize)> = lines
243            .iter()
244            .enumerate()
245            .map(|(idx, line)| {
246                let value = line.parse::<i64>().unwrap_or(0);
247                (value, idx)
248            })
249            .collect();
250
251        // Sort by value
252        if self.parallel {
253            values.par_sort_unstable_by_key(|(value, _)| *value);
254        } else {
255            values.sort_unstable_by_key(|(value, _)| *value);
256        }
257
258        // Reconstruct lines in sorted order
259        // Create a permutation vector
260        let permutation: Vec<usize> = values.into_iter().map(|(_, idx)| idx).collect();
261
262        // Apply permutation efficiently without unnecessary cloning
263        let mut sorted = Vec::with_capacity(lines.len());
264        for _ in 0..lines.len() {
265            sorted.push(String::new());
266        }
267
268        for (new_idx, &old_idx) in permutation.iter().enumerate() {
269            sorted[new_idx] = std::mem::take(&mut lines[old_idx]);
270        }
271
272        // Replace original with sorted
273        for (i, line) in sorted.into_iter().enumerate() {
274            lines[i] = line;
275        }
276
277        Ok(())
278    }
279
280    /// Compare numeric strings efficiently
281    fn compare_numeric_strings(&self, a: &str, b: &str) -> Ordering {
282        // Fast path for simple integers
283        if let (Ok(a_num), Ok(b_num)) = (a.parse::<i64>(), b.parse::<i64>()) {
284            return a_num.cmp(&b_num);
285        }
286
287        // Fall back to byte-level numeric comparison
288        self.compare_numeric_bytes(a.as_bytes(), b.as_bytes())
289    }
290
291    /// Byte-level numeric comparison
292    fn compare_numeric_bytes(&self, a: &[u8], b: &[u8]) -> Ordering {
293        // Skip leading whitespace
294        let a = self.skip_whitespace(a);
295        let b = self.skip_whitespace(b);
296
297        // Handle empty strings
298        match (a.is_empty(), b.is_empty()) {
299            (true, true) => return Ordering::Equal,
300            (true, false) => return Ordering::Less,
301            (false, true) => return Ordering::Greater,
302            _ => {}
303        }
304
305        // Extract signs
306        let (a_negative, a_digits) = self.extract_sign(a);
307        let (b_negative, b_digits) = self.extract_sign(b);
308
309        // Compare signs
310        match (a_negative, b_negative) {
311            (false, true) => return Ordering::Greater,
312            (true, false) => return Ordering::Less,
313            _ => {}
314        }
315
316        // Compare magnitudes
317        let magnitude_cmp = self.compare_magnitude(a_digits, b_digits);
318
319        if a_negative {
320            magnitude_cmp.reverse()
321        } else {
322            magnitude_cmp
323        }
324    }
325
326    fn skip_whitespace<'a>(&self, bytes: &'a [u8]) -> &'a [u8] {
327        let start = bytes
328            .iter()
329            .position(|&b| !b.is_ascii_whitespace())
330            .unwrap_or(bytes.len());
331        &bytes[start..]
332    }
333
334    fn extract_sign<'a>(&self, bytes: &'a [u8]) -> (bool, &'a [u8]) {
335        if bytes.starts_with(b"-") {
336            (true, &bytes[1..])
337        } else if bytes.starts_with(b"+") {
338            (false, &bytes[1..])
339        } else {
340            (false, bytes)
341        }
342    }
343
344    fn compare_magnitude(&self, a: &[u8], b: &[u8]) -> Ordering {
345        // Remove leading zeros
346        let a = self.skip_leading_zeros(a);
347        let b = self.skip_leading_zeros(b);
348
349        // Compare lengths first (longer number is bigger)
350        match a.len().cmp(&b.len()) {
351            Ordering::Equal => a.cmp(b), // Same length, compare lexicographically
352            other => other,
353        }
354    }
355
356    fn skip_leading_zeros<'a>(&self, bytes: &'a [u8]) -> &'a [u8] {
357        let start = bytes.iter().position(|&b| b != b'0').unwrap_or(bytes.len());
358        if start == bytes.len() {
359            b"0" // All zeros, return single zero
360        } else {
361            &bytes[start..]
362        }
363    }
364
365    /// Write sorted chunk to temporary file
366    fn write_chunk_to_file(&self, lines: &[String], chunk_number: usize) -> io::Result<PathBuf> {
367        let chunk_path = self
368            .temp_dir
369            .path()
370            .join(format!("chunk_{chunk_number:06}.txt"));
371        let mut writer = BufWriter::new(File::create(&chunk_path)?);
372
373        for line in lines {
374            writeln!(writer, "{line}")?;
375        }
376        writer.flush()?;
377
378        Ok(chunk_path)
379    }
380
381    /// Merge sorted chunks using k-way merge
382    fn merge_sorted_chunks(
383        &self,
384        chunk_files: &[PathBuf],
385        output_path: &Path,
386        _numeric: bool,
387        unique: bool,
388    ) -> io::Result<()> {
389        use std::cmp::Reverse;
390        use std::collections::BinaryHeap;
391
392        if chunk_files.is_empty() {
393            return Ok(());
394        }
395
396        if chunk_files.len() == 1 {
397            // Single chunk, just copy it
398            std::fs::copy(&chunk_files[0], output_path)?;
399            return Ok(());
400        }
401
402        // Open all chunk files
403        let mut readers: Vec<BufReader<File>> = chunk_files
404            .iter()
405            .map(|path| File::open(path).map(BufReader::new))
406            .collect::<Result<Vec<_>, _>>()?;
407
408        let mut output = BufWriter::new(File::create(output_path)?);
409
410        // Priority queue for k-way merge
411        #[derive(Debug)]
412        struct MergeItem {
413            line: String,
414            reader_index: usize,
415        }
416
417        impl PartialEq for MergeItem {
418            fn eq(&self, other: &Self) -> bool {
419                self.line == other.line
420            }
421        }
422
423        impl Eq for MergeItem {}
424
425        impl PartialOrd for MergeItem {
426            fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
427                Some(self.cmp(other))
428            }
429        }
430
431        impl Ord for MergeItem {
432            fn cmp(&self, other: &Self) -> Ordering {
433                // Simple lexicographic comparison (reversed for min-heap)
434                self.line.cmp(&other.line).reverse()
435            }
436        }
437
438        impl MergeItem {
439            #[allow(dead_code)]
440            fn compare_numeric(&self, other: &str) -> Ordering {
441                // Fast path for simple integers
442                if let (Ok(a), Ok(b)) = (self.line.parse::<i64>(), other.parse::<i64>()) {
443                    return a.cmp(&b);
444                }
445                // Fall back to string comparison
446                self.line.cmp(&other.to_string())
447            }
448        }
449
450        let mut heap: BinaryHeap<Reverse<MergeItem>> = BinaryHeap::new();
451
452        // Initialize heap with first line from each reader
453        for (idx, reader) in readers.iter_mut().enumerate() {
454            let mut line = String::new();
455            if reader.read_line(&mut line)? > 0 {
456                if line.ends_with('\n') {
457                    line.pop();
458                }
459                heap.push(Reverse(MergeItem {
460                    line,
461                    reader_index: idx,
462                }));
463            }
464        }
465
466        // Merge process
467        let mut last_line: Option<String> = None;
468        while let Some(Reverse(item)) = heap.pop() {
469            // If unique mode, skip duplicates
470            if unique {
471                if let Some(ref prev) = last_line {
472                    if prev == &item.line {
473                        // Skip duplicate, but still read next line from same reader
474                        let reader_idx = item.reader_index;
475                        let mut line = String::new();
476                        if readers[reader_idx].read_line(&mut line)? > 0 {
477                            if line.ends_with('\n') {
478                                line.pop();
479                            }
480                            heap.push(Reverse(MergeItem {
481                                line,
482                                reader_index: reader_idx,
483                            }));
484                        }
485                        continue;
486                    }
487                }
488                last_line = Some(item.line.clone());
489            }
490
491            writeln!(output, "{}", item.line)?;
492
493            // Read next line from the same reader
494            let reader_idx = item.reader_index;
495            let mut line = String::new();
496            if readers[reader_idx].read_line(&mut line)? > 0 {
497                if line.ends_with('\n') {
498                    line.pop();
499                }
500                heap.push(Reverse(MergeItem {
501                    line,
502                    reader_index: reader_idx,
503                }));
504            }
505        }
506
507        output.flush()?;
508        Ok(())
509    }
510}
511
512#[cfg(test)]
513mod tests {
514    use super::*;
515    use std::fs;
516    use tempfile::TempDir;
517
518    #[test]
519    fn test_external_sort_small_file() -> io::Result<()> {
520        let temp_dir = TempDir::new()?;
521        let input_file = temp_dir.path().join("input.txt");
522        let output_file = temp_dir.path().join("output.txt");
523
524        // Create test input
525        fs::write(&input_file, "3\n1\n4\n1\n5\n9\n2\n6\n")?;
526
527        // Sort with external sorter
528        let sorter = ExternalSort::new(1, false, true, None)?; // 1MB limit
529        sorter.sort_file(&input_file, &output_file, true, false)?;
530
531        // Verify output
532        let output_content = fs::read_to_string(&output_file)?;
533        assert_eq!(output_content, "1\n1\n2\n3\n4\n5\n6\n9\n");
534
535        Ok(())
536    }
537}