1use crate::radix_sort::RadixSort;
2use crate::simd_compare::SIMDCompare;
3use crate::zero_copy::{Line, MappedFile};
4use rayon::prelude::*;
5use std::cmp::Ordering;
6use std::fs::File;
9use std::io::{self, BufRead, BufReader, BufWriter, Write};
10use std::path::{Path, PathBuf};
11use tempfile::TempDir;
12
13pub struct ExternalSort {
15 max_chunk_size: usize,
17 parallel: bool,
19 use_radix: bool,
21 temp_dir: TempDir,
23}
24
25impl ExternalSort {
26 pub fn new(
28 max_memory_mb: usize,
29 parallel: bool,
30 use_radix: bool,
31 temp_dir_path: Option<&str>,
32 ) -> io::Result<Self> {
33 let max_chunk_size = max_memory_mb * 1024 * 1024; let temp_dir = if let Some(path) = temp_dir_path {
37 tempfile::tempdir_in(path)?
38 } else if let Ok(tmpdir) = std::env::var("TMPDIR") {
39 tempfile::tempdir_in(tmpdir)?
40 } else {
41 tempfile::tempdir()?
42 };
43
44 Ok(Self {
45 max_chunk_size,
46 parallel,
47 use_radix,
48 temp_dir,
49 })
50 }
51
52 pub fn sort_file(
54 &self,
55 input_path: &Path,
56 output_path: &Path,
57 numeric: bool,
58 unique: bool,
59 ) -> io::Result<()> {
60 let file_size = std::fs::metadata(input_path)?.len() as usize;
62
63 if file_size <= self.max_chunk_size {
64 return self.sort_in_memory(input_path, output_path, numeric, unique);
66 }
67
68 let chunk_files = self.create_sorted_chunks(input_path, numeric)?;
70
71 self.merge_sorted_chunks(&chunk_files, output_path, numeric, unique)?;
73
74 Ok(())
75 }
76
77 fn sort_in_memory(
79 &self,
80 input_path: &Path,
81 output_path: &Path,
82 numeric: bool,
83 unique: bool,
84 ) -> io::Result<()> {
85 let mapped_file = MappedFile::new(input_path)?;
86 let lines = mapped_file.lines();
87
88 let mut simple_lines: Vec<Line> = lines.to_vec();
89
90 if numeric && self.use_radix {
91 let radix_sorter = RadixSort::new(self.parallel);
92 radix_sorter.sort_numeric_lines(&mut simple_lines);
93 } else if self.parallel && simple_lines.len() > 10000 {
94 if numeric {
95 simple_lines.par_sort_unstable_by(|a, b| a.compare_numeric(b));
96 } else {
97 simple_lines.par_sort_unstable_by(|a, b| a.compare_lexicographic(b));
98 }
99 } else if numeric {
100 simple_lines.sort_unstable_by(|a, b| a.compare_numeric(b));
101 } else {
102 simple_lines.sort_unstable_by(|a, b| a.compare_lexicographic(b));
103 }
104
105 if unique {
107 simple_lines.dedup_by(|a, b| unsafe { a.as_bytes() == b.as_bytes() });
108 }
109
110 let mut output = BufWriter::new(File::create(output_path)?);
112 for line in &simple_lines {
113 unsafe {
114 output.write_all(line.as_bytes())?;
115 output.write_all(b"\n")?;
116 }
117 }
118 output.flush()?;
119
120 Ok(())
121 }
122
123 fn create_sorted_chunks(&self, input_path: &Path, numeric: bool) -> io::Result<Vec<PathBuf>> {
125 let file = File::open(input_path)?;
126 let mut reader = BufReader::new(file);
127 let mut chunk_files = Vec::new();
128 let mut chunk_number = 0;
129
130 loop {
131 let (lines, eof) = self.read_chunk_lines(&mut reader)?;
133 if lines.is_empty() {
134 break;
135 }
136
137 let sorted_lines = self.sort_chunk(lines, numeric)?;
139
140 let chunk_path = self.write_chunk_to_file(&sorted_lines, chunk_number)?;
142 chunk_files.push(chunk_path);
143 chunk_number += 1;
144
145 if eof {
146 break;
147 }
148 }
149
150 Ok(chunk_files)
151 }
152
153 fn read_chunk_lines(&self, reader: &mut BufReader<File>) -> io::Result<(Vec<String>, bool)> {
155 let mut lines = Vec::new();
156 let mut total_size = 0;
157 let mut line = String::new();
158
159 lines.reserve(self.max_chunk_size / 20); while total_size < self.max_chunk_size {
163 line.clear();
164 let bytes_read = reader.read_line(&mut line)?;
165
166 if bytes_read == 0 {
167 return Ok((lines, true));
169 }
170
171 if line.ends_with('\n') {
173 line.pop();
174 if line.ends_with('\r') {
175 line.pop();
176 }
177 }
178
179 total_size += line.len();
180 lines.push(std::mem::take(&mut line));
181 }
182
183 Ok((lines, false))
184 }
185
186 fn sort_chunk(&self, mut lines: Vec<String>, numeric: bool) -> io::Result<Vec<String>> {
188 const LARGE_CHUNK_THRESHOLD: usize = 50_000;
190
191 if numeric && self.use_radix && self.is_all_simple_integers(&lines) {
192 self.radix_sort_strings(&mut lines)?;
194 } else {
195 if self.parallel && lines.len() > LARGE_CHUNK_THRESHOLD {
197 if numeric {
199 lines.par_sort_unstable_by(|a, b| self.compare_numeric_strings(a, b));
200 } else {
201 lines.par_sort_unstable_by(|a, b| {
202 SIMDCompare::compare_bytes_simd(a.as_bytes(), b.as_bytes())
203 });
204 }
205 } else if lines.len() > 10_000 {
206 if numeric {
208 lines.par_sort_unstable_by(|a, b| self.compare_numeric_strings(a, b));
209 } else {
210 lines.par_sort_unstable_by(|a, b| {
211 SIMDCompare::compare_bytes_simd(a.as_bytes(), b.as_bytes())
212 });
213 }
214 } else {
215 if numeric {
217 lines.sort_unstable_by(|a, b| self.compare_numeric_strings(a, b));
218 } else {
219 lines.sort_unstable_by(|a, b| {
220 SIMDCompare::compare_bytes_simd(a.as_bytes(), b.as_bytes())
221 });
222 }
223 }
224 }
225
226 Ok(lines)
227 }
228
229 fn is_all_simple_integers(&self, lines: &[String]) -> bool {
231 let sample_size = lines.len().min(100);
233 lines[..sample_size].iter().all(|line| {
234 SIMDCompare::is_all_digits_simd(line.as_bytes())
235 || (line.starts_with('-') && SIMDCompare::is_all_digits_simd(&line.as_bytes()[1..]))
236 })
237 }
238
239 fn radix_sort_strings(&self, lines: &mut [String]) -> io::Result<()> {
241 let mut values: Vec<(i64, usize)> = lines
243 .iter()
244 .enumerate()
245 .map(|(idx, line)| {
246 let value = line.parse::<i64>().unwrap_or(0);
247 (value, idx)
248 })
249 .collect();
250
251 if self.parallel {
253 values.par_sort_unstable_by_key(|(value, _)| *value);
254 } else {
255 values.sort_unstable_by_key(|(value, _)| *value);
256 }
257
258 let permutation: Vec<usize> = values.into_iter().map(|(_, idx)| idx).collect();
261
262 let mut sorted = Vec::with_capacity(lines.len());
264 for _ in 0..lines.len() {
265 sorted.push(String::new());
266 }
267
268 for (new_idx, &old_idx) in permutation.iter().enumerate() {
269 sorted[new_idx] = std::mem::take(&mut lines[old_idx]);
270 }
271
272 for (i, line) in sorted.into_iter().enumerate() {
274 lines[i] = line;
275 }
276
277 Ok(())
278 }
279
280 fn compare_numeric_strings(&self, a: &str, b: &str) -> Ordering {
282 if let (Ok(a_num), Ok(b_num)) = (a.parse::<i64>(), b.parse::<i64>()) {
284 return a_num.cmp(&b_num);
285 }
286
287 self.compare_numeric_bytes(a.as_bytes(), b.as_bytes())
289 }
290
291 fn compare_numeric_bytes(&self, a: &[u8], b: &[u8]) -> Ordering {
293 let a = self.skip_whitespace(a);
295 let b = self.skip_whitespace(b);
296
297 match (a.is_empty(), b.is_empty()) {
299 (true, true) => return Ordering::Equal,
300 (true, false) => return Ordering::Less,
301 (false, true) => return Ordering::Greater,
302 _ => {}
303 }
304
305 let (a_negative, a_digits) = self.extract_sign(a);
307 let (b_negative, b_digits) = self.extract_sign(b);
308
309 match (a_negative, b_negative) {
311 (false, true) => return Ordering::Greater,
312 (true, false) => return Ordering::Less,
313 _ => {}
314 }
315
316 let magnitude_cmp = self.compare_magnitude(a_digits, b_digits);
318
319 if a_negative {
320 magnitude_cmp.reverse()
321 } else {
322 magnitude_cmp
323 }
324 }
325
326 fn skip_whitespace<'a>(&self, bytes: &'a [u8]) -> &'a [u8] {
327 let start = bytes
328 .iter()
329 .position(|&b| !b.is_ascii_whitespace())
330 .unwrap_or(bytes.len());
331 &bytes[start..]
332 }
333
334 fn extract_sign<'a>(&self, bytes: &'a [u8]) -> (bool, &'a [u8]) {
335 if bytes.starts_with(b"-") {
336 (true, &bytes[1..])
337 } else if bytes.starts_with(b"+") {
338 (false, &bytes[1..])
339 } else {
340 (false, bytes)
341 }
342 }
343
344 fn compare_magnitude(&self, a: &[u8], b: &[u8]) -> Ordering {
345 let a = self.skip_leading_zeros(a);
347 let b = self.skip_leading_zeros(b);
348
349 match a.len().cmp(&b.len()) {
351 Ordering::Equal => a.cmp(b), other => other,
353 }
354 }
355
356 fn skip_leading_zeros<'a>(&self, bytes: &'a [u8]) -> &'a [u8] {
357 let start = bytes.iter().position(|&b| b != b'0').unwrap_or(bytes.len());
358 if start == bytes.len() {
359 b"0" } else {
361 &bytes[start..]
362 }
363 }
364
365 fn write_chunk_to_file(&self, lines: &[String], chunk_number: usize) -> io::Result<PathBuf> {
367 let chunk_path = self
368 .temp_dir
369 .path()
370 .join(format!("chunk_{chunk_number:06}.txt"));
371 let mut writer = BufWriter::new(File::create(&chunk_path)?);
372
373 for line in lines {
374 writeln!(writer, "{line}")?;
375 }
376 writer.flush()?;
377
378 Ok(chunk_path)
379 }
380
381 fn merge_sorted_chunks(
383 &self,
384 chunk_files: &[PathBuf],
385 output_path: &Path,
386 _numeric: bool,
387 unique: bool,
388 ) -> io::Result<()> {
389 use std::cmp::Reverse;
390 use std::collections::BinaryHeap;
391
392 if chunk_files.is_empty() {
393 return Ok(());
394 }
395
396 if chunk_files.len() == 1 {
397 std::fs::copy(&chunk_files[0], output_path)?;
399 return Ok(());
400 }
401
402 let mut readers: Vec<BufReader<File>> = chunk_files
404 .iter()
405 .map(|path| File::open(path).map(BufReader::new))
406 .collect::<Result<Vec<_>, _>>()?;
407
408 let mut output = BufWriter::new(File::create(output_path)?);
409
410 #[derive(Debug)]
412 struct MergeItem {
413 line: String,
414 reader_index: usize,
415 }
416
417 impl PartialEq for MergeItem {
418 fn eq(&self, other: &Self) -> bool {
419 self.line == other.line
420 }
421 }
422
423 impl Eq for MergeItem {}
424
425 impl PartialOrd for MergeItem {
426 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
427 Some(self.cmp(other))
428 }
429 }
430
431 impl Ord for MergeItem {
432 fn cmp(&self, other: &Self) -> Ordering {
433 self.line.cmp(&other.line).reverse()
435 }
436 }
437
438 impl MergeItem {
439 #[allow(dead_code)]
440 fn compare_numeric(&self, other: &str) -> Ordering {
441 if let (Ok(a), Ok(b)) = (self.line.parse::<i64>(), other.parse::<i64>()) {
443 return a.cmp(&b);
444 }
445 self.line.cmp(&other.to_string())
447 }
448 }
449
450 let mut heap: BinaryHeap<Reverse<MergeItem>> = BinaryHeap::new();
451
452 for (idx, reader) in readers.iter_mut().enumerate() {
454 let mut line = String::new();
455 if reader.read_line(&mut line)? > 0 {
456 if line.ends_with('\n') {
457 line.pop();
458 }
459 heap.push(Reverse(MergeItem {
460 line,
461 reader_index: idx,
462 }));
463 }
464 }
465
466 let mut last_line: Option<String> = None;
468 while let Some(Reverse(item)) = heap.pop() {
469 if unique {
471 if let Some(ref prev) = last_line {
472 if prev == &item.line {
473 let reader_idx = item.reader_index;
475 let mut line = String::new();
476 if readers[reader_idx].read_line(&mut line)? > 0 {
477 if line.ends_with('\n') {
478 line.pop();
479 }
480 heap.push(Reverse(MergeItem {
481 line,
482 reader_index: reader_idx,
483 }));
484 }
485 continue;
486 }
487 }
488 last_line = Some(item.line.clone());
489 }
490
491 writeln!(output, "{}", item.line)?;
492
493 let reader_idx = item.reader_index;
495 let mut line = String::new();
496 if readers[reader_idx].read_line(&mut line)? > 0 {
497 if line.ends_with('\n') {
498 line.pop();
499 }
500 heap.push(Reverse(MergeItem {
501 line,
502 reader_index: reader_idx,
503 }));
504 }
505 }
506
507 output.flush()?;
508 Ok(())
509 }
510}
511
512#[cfg(test)]
513mod tests {
514 use super::*;
515 use std::fs;
516 use tempfile::TempDir;
517
518 #[test]
519 fn test_external_sort_small_file() -> io::Result<()> {
520 let temp_dir = TempDir::new()?;
521 let input_file = temp_dir.path().join("input.txt");
522 let output_file = temp_dir.path().join("output.txt");
523
524 fs::write(&input_file, "3\n1\n4\n1\n5\n9\n2\n6\n")?;
526
527 let sorter = ExternalSort::new(1, false, true, None)?; sorter.sort_file(&input_file, &output_file, true, false)?;
530
531 let output_content = fs::read_to_string(&output_file)?;
533 assert_eq!(output_content, "1\n1\n2\n3\n4\n5\n6\n9\n");
534
535 Ok(())
536 }
537}