mod merge_sorted_chunks;
mod sort_chunk;
use super::file::KEY_STORE_LENGTH;
use merge_sorted_chunks::*;
use sort_chunk::*;
use std::{
env, format,
fs::{self, File},
io::{BufReader, BufWriter, Write, copy},
path::Path,
string::{String, ToString},
sync::{Arc, Mutex},
thread::{self, available_parallelism},
vec,
vec::Vec,
};
pub fn external_sort(
inputs: &[&str],
output: &str, max_heap: Option<usize>, thread_count: Option<usize>, tmp_dir: Option<&str>, ) {
let max_heap = max_heap.unwrap_or(100_000);
let thread_count = thread_count.unwrap_or(1);
let binding = tmpdir();
let tmp_dir = tmp_dir.unwrap_or(&binding);
let sizes = get_sizes(inputs);
let chunks = build_chunks(&sizes, tmp_dir, max_heap as u64);
let mut sorted_files: Vec<String> = vec![];
if thread_count == 1 || chunks.len() <= 10 {
for chunk in chunks {
sorted_files.push(sort_chunk(chunk));
}
} else {
sorted_files = sort_chunks_with_threads(chunks, thread_count);
}
merge_sorted_chunks(&sorted_files, output, max_heap);
merge_values(output, sizes);
for file in sorted_files {
fs::remove_file(file).unwrap();
}
}
#[derive(Debug, Clone)]
pub struct FileSize {
name: String,
input: String,
key_size: u64,
value_size: u64,
value_offset: u64,
}
fn get_sizes(inputs: &[&str]) -> Vec<FileSize> {
let mut sizes: Vec<FileSize> = vec![];
let mut value_offset = 0;
for input in inputs {
let key_size = file_size(&format!("{input}.keys"));
let value_size = file_size(&format!("{input}.values"));
let name = Path::new(input)
.file_name() .and_then(|name| name.to_str()) .unwrap_or("Unknown")
.to_string();
sizes.push(FileSize { name, input: input.to_string(), key_size, value_size, value_offset });
value_offset += value_size;
}
sizes
}
fn file_size(file: &str) -> u64 {
if let Ok(metadata) = fs::metadata(file) {
metadata.len() } else {
0
}
}
#[derive(Debug)]
pub struct SortChunk {
name: String, input: String,
out_dir: String,
start: u64,
end: u64,
value_offset: u64,
}
fn build_chunks(file_sizes: &[FileSize], out_dir: &str, max_heap: u64) -> Vec<SortChunk> {
let mut chunks: Vec<SortChunk> = vec![];
for file_size in file_sizes {
let FileSize { name, input, key_size, value_offset, .. } = file_size;
let mut start = 0;
while start < *key_size {
let end = (start + max_heap * KEY_STORE_LENGTH).min(*key_size);
chunks.push(SortChunk {
name: name.clone(),
input: format!("{input}.keys"),
out_dir: out_dir.to_string(),
start,
end,
value_offset: *value_offset,
});
start += max_heap * KEY_STORE_LENGTH;
}
}
chunks
}
fn sort_chunks_with_threads(chunks: Vec<SortChunk>, tc: usize) -> Vec<String> {
let parallelism = available_parallelism().map(|n| n.get()).unwrap_or(1);
let thread_count = usize::min(tc, parallelism);
let chunks = Arc::new(Mutex::new(chunks));
let sorted_files = Arc::new(Mutex::new(Vec::new()));
let mut handles = Vec::with_capacity(thread_count);
for _ in 0..thread_count {
let chunks = Arc::clone(&chunks);
let sorted_files = Arc::clone(&sorted_files);
let handle = thread::spawn(move || {
while let Some(chunk) = chunks.lock().unwrap().pop() {
let result = sort_chunk(chunk);
sorted_files.lock().unwrap().push(result);
}
});
handles.push(handle);
}
for handle in handles {
let _ = handle.join();
}
Arc::try_unwrap(sorted_files).unwrap().into_inner().unwrap()
}
fn merge_values(output: &str, sizes: Vec<FileSize>) {
if sizes.len() <= 1 {
return;
}
let mut sorted_sizes = sizes.clone(); sorted_sizes.sort_by(|a, b| a.value_offset.cmp(&b.value_offset));
let values: Vec<String> = sorted_sizes
.into_iter() .filter(|c| c.input != output && c.value_size > 0)
.map(|c| c.input)
.collect();
if values.is_empty() {
return;
}
let output = File::create(format!("{output}.values")).unwrap();
let mut writer = BufWriter::new(output);
for value in values {
let input = File::open(format!("{value}.values")).unwrap();
let mut reader = BufReader::new(input);
copy(&mut reader, &mut writer).unwrap();
}
writer.flush().unwrap();
}
fn tmpdir() -> String {
env::temp_dir().to_str().unwrap().to_string()
}