gistools/data_store/external_sort/
mod.rs1mod merge_sorted_chunks;
2mod sort_chunk;
3
4use super::file::KEY_STORE_LENGTH;
5use merge_sorted_chunks::*;
6use sort_chunk::*;
7use std::{
8 env, format,
9 fs::{self, File},
10 io::{BufReader, BufWriter, Write, copy},
11 path::Path,
12 string::{String, ToString},
13 sync::{Arc, Mutex},
14 thread::{self, available_parallelism},
15 vec,
16 vec::Vec,
17};
18
19pub fn external_sort(
21 inputs: &[&str], output: &str, max_heap: Option<usize>, thread_count: Option<usize>, tmp_dir: Option<&str>, ) {
27 let max_heap = max_heap.unwrap_or(100_000);
28 let thread_count = thread_count.unwrap_or(1);
29 let binding = tmpdir();
30 let tmp_dir = tmp_dir.unwrap_or(&binding);
31 let sizes = get_sizes(inputs);
33 let chunks = build_chunks(&sizes, tmp_dir, max_heap as u64);
35 let mut sorted_files: Vec<String> = vec![];
37 if thread_count == 1 || chunks.len() <= 10 {
38 for chunk in chunks {
39 sorted_files.push(sort_chunk(chunk));
40 }
41 } else {
42 sorted_files = sort_chunks_with_threads(chunks, thread_count);
43 }
44 merge_sorted_chunks(&sorted_files, output, max_heap);
46 merge_values(output, sizes);
47 for file in sorted_files {
49 fs::remove_file(file).unwrap();
50 }
51}
52
53#[derive(Debug, Clone)]
55pub struct FileSize {
56 name: String,
58 input: String,
60 key_size: u64,
62 value_size: u64,
64 value_offset: u64,
66}
67
68fn get_sizes(inputs: &[&str]) -> Vec<FileSize> {
70 let mut sizes: Vec<FileSize> = vec![];
71 let mut value_offset = 0;
72
73 for input in inputs {
74 let key_size = file_size(&format!("{input}.keys"));
75 let value_size = file_size(&format!("{input}.values"));
76 let name = Path::new(input)
77 .file_name() .and_then(|name| name.to_str()) .unwrap_or("Unknown")
80 .to_string();
81 sizes.push(FileSize { name, input: input.to_string(), key_size, value_size, value_offset });
82 value_offset += value_size;
83 }
84
85 sizes
86}
87
88fn file_size(file: &str) -> u64 {
89 if let Ok(metadata) = fs::metadata(file) {
90 metadata.len() } else {
92 0
93 }
94}
95
96#[derive(Debug)]
98pub struct SortChunk {
99 name: String, input: String,
101 out_dir: String,
102 start: u64,
103 end: u64,
104 value_offset: u64,
105}
106
107fn build_chunks(file_sizes: &[FileSize], out_dir: &str, max_heap: u64) -> Vec<SortChunk> {
109 let mut chunks: Vec<SortChunk> = vec![];
110
111 for file_size in file_sizes {
112 let FileSize { name, input, key_size, value_offset, .. } = file_size;
113 let mut start = 0;
114 while start < *key_size {
115 let end = (start + max_heap * KEY_STORE_LENGTH).min(*key_size);
116 chunks.push(SortChunk {
117 name: name.clone(),
118 input: format!("{input}.keys"),
119 out_dir: out_dir.to_string(),
120 start,
121 end,
122 value_offset: *value_offset,
123 });
124 start += max_heap * KEY_STORE_LENGTH;
125 }
126 }
127
128 chunks
129}
130
131fn sort_chunks_with_threads(chunks: Vec<SortChunk>, tc: usize) -> Vec<String> {
133 let parallelism = available_parallelism().map(|n| n.get()).unwrap_or(1);
134 let thread_count = usize::min(tc, parallelism);
135
136 let chunks = Arc::new(Mutex::new(chunks));
139 let sorted_files = Arc::new(Mutex::new(Vec::new()));
140
141 let mut handles = Vec::with_capacity(thread_count);
142 for _ in 0..thread_count {
143 let chunks = Arc::clone(&chunks);
144 let sorted_files = Arc::clone(&sorted_files);
145
146 let handle = thread::spawn(move || {
147 while let Some(chunk) = chunks.lock().unwrap().pop() {
148 let result = sort_chunk(chunk);
149 sorted_files.lock().unwrap().push(result);
150 }
151 });
152
153 handles.push(handle);
154 }
155 for handle in handles {
156 let _ = handle.join();
157 }
158
159 Arc::try_unwrap(sorted_files).unwrap().into_inner().unwrap()
160}
161
162fn merge_values(output: &str, sizes: Vec<FileSize>) {
164 if sizes.len() <= 1 {
165 return;
166 }
167 let mut sorted_sizes = sizes.clone(); sorted_sizes.sort_by(|a, b| a.value_offset.cmp(&b.value_offset));
169
170 let values: Vec<String> = sorted_sizes
171 .into_iter() .filter(|c| c.input != output && c.value_size > 0)
173 .map(|c| c.input)
174 .collect();
175
176 if values.is_empty() {
177 return;
178 }
179
180 let output = File::create(format!("{output}.values")).unwrap();
181 let mut writer = BufWriter::new(output);
182
183 for value in values {
184 let input = File::open(format!("{value}.values")).unwrap();
185 let mut reader = BufReader::new(input);
186
187 copy(&mut reader, &mut writer).unwrap();
189 }
190
191 writer.flush().unwrap();
192}
193
194fn tmpdir() -> String {
195 env::temp_dir().to_str().unwrap().to_string()
196}
197
198