gistools/data_store/external_sort/
mod.rs

1mod merge_sorted_chunks;
2mod sort_chunk;
3
4use super::file::KEY_STORE_LENGTH;
5use merge_sorted_chunks::*;
6use sort_chunk::*;
7use std::{
8    env, format,
9    fs::{self, File},
10    io::{BufReader, BufWriter, Write, copy},
11    path::Path,
12    string::{String, ToString},
13    sync::{Arc, Mutex},
14    thread::{self, available_parallelism},
15    vec,
16    vec::Vec,
17};
18
19/// Sorts an array using external-sorting.
20pub fn external_sort(
21    inputs: &[&str], /* a list of input files without their extensions. e.g. './file1', './file2', './file3' */
22    output: &str,    // output folder to place the sorted keys
23    max_heap: Option<usize>, // max instance of the parsed entity in memory
24    thread_count: Option<usize>, // number of workers
25    tmp_dir: Option<&str>, // temporary directory to store intermediate sorted files
26) {
27    let max_heap = max_heap.unwrap_or(100_000);
28    let thread_count = thread_count.unwrap_or(1);
29    let binding = tmpdir();
30    let tmp_dir = tmp_dir.unwrap_or(&binding);
31    // 1) Get the size of the input
32    let sizes = get_sizes(inputs);
33    // 2) Build chunk list
34    let chunks = build_chunks(&sizes, tmp_dir, max_heap as u64);
35    // 3) Sort chunks - using either workers or single threaded
36    let mut sorted_files: Vec<String> = vec![];
37    if thread_count == 1 || chunks.len() <= 10 {
38        for chunk in chunks {
39            sorted_files.push(sort_chunk(chunk));
40        }
41    } else {
42        sorted_files = sort_chunks_with_threads(chunks, thread_count);
43    }
44    // 4) Merge chunks
45    merge_sorted_chunks(&sorted_files, output, max_heap);
46    merge_values(output, sizes);
47    // 5) Cleanup
48    for file in sorted_files {
49        fs::remove_file(file).unwrap();
50    }
51}
52
53/// A File name and it's size
54#[derive(Debug, Clone)]
55pub struct FileSize {
56    /// Name of the folder
57    name: String,
58    /// Name of the input (there could be multiple input files to sort)
59    input: String,
60    /// Total size of the key store
61    key_size: u64,
62    /// Total size of the item store
63    value_size: u64,
64    /// Offset for values
65    value_offset: u64,
66}
67
68/// Get the sizes of the inputs
69fn get_sizes(inputs: &[&str]) -> Vec<FileSize> {
70    let mut sizes: Vec<FileSize> = vec![];
71    let mut value_offset = 0;
72
73    for input in inputs {
74        let key_size = file_size(&format!("{input}.keys"));
75        let value_size = file_size(&format!("{input}.values"));
76        let name = Path::new(input)
77            .file_name() // Get file name as OsStr
78            .and_then(|name| name.to_str()) // Convert OsStr to &str
79            .unwrap_or("Unknown")
80            .to_string();
81        sizes.push(FileSize { name, input: input.to_string(), key_size, value_size, value_offset });
82        value_offset += value_size;
83    }
84
85    sizes
86}
87
88fn file_size(file: &str) -> u64 {
89    if let Ok(metadata) = fs::metadata(file) {
90        metadata.len() // File size in bytes
91    } else {
92        0
93    }
94}
95
96/// A chunk of a file to be sorted
97#[derive(Debug)]
98pub struct SortChunk {
99    name: String, // name of the input (there could be multiple input files to sort)
100    input: String,
101    out_dir: String,
102    start: u64,
103    end: u64,
104    value_offset: u64,
105}
106
107/// Build chunks from all files. Chunks help describe a portion of work to complete
108fn build_chunks(file_sizes: &[FileSize], out_dir: &str, max_heap: u64) -> Vec<SortChunk> {
109    let mut chunks: Vec<SortChunk> = vec![];
110
111    for file_size in file_sizes {
112        let FileSize { name, input, key_size, value_offset, .. } = file_size;
113        let mut start = 0;
114        while start < *key_size {
115            let end = (start + max_heap * KEY_STORE_LENGTH).min(*key_size);
116            chunks.push(SortChunk {
117                name: name.clone(),
118                input: format!("{input}.keys"),
119                out_dir: out_dir.to_string(),
120                start,
121                end,
122                value_offset: *value_offset,
123            });
124            start += max_heap * KEY_STORE_LENGTH;
125        }
126    }
127
128    chunks
129}
130
131/// Sorts a list of chunks using threads
132fn sort_chunks_with_threads(chunks: Vec<SortChunk>, tc: usize) -> Vec<String> {
133    let parallelism = available_parallelism().map(|n| n.get()).unwrap_or(1);
134    let thread_count = usize::min(tc, parallelism);
135
136    // Create threads that take from the `chunks` vector until its empty and call sort_chunk.
137    // Store the resulant string from sort_chunk in the `sorted_files` vector.
138    let chunks = Arc::new(Mutex::new(chunks));
139    let sorted_files = Arc::new(Mutex::new(Vec::new()));
140
141    let mut handles = Vec::with_capacity(thread_count);
142    for _ in 0..thread_count {
143        let chunks = Arc::clone(&chunks);
144        let sorted_files = Arc::clone(&sorted_files);
145
146        let handle = thread::spawn(move || {
147            while let Some(chunk) = chunks.lock().unwrap().pop() {
148                let result = sort_chunk(chunk);
149                sorted_files.lock().unwrap().push(result);
150            }
151        });
152
153        handles.push(handle);
154    }
155    for handle in handles {
156        let _ = handle.join();
157    }
158
159    Arc::try_unwrap(sorted_files).unwrap().into_inner().unwrap()
160}
161
162/// merge the values files since the sorted key indexes have been merged as well.
163fn merge_values(output: &str, sizes: Vec<FileSize>) {
164    if sizes.len() <= 1 {
165        return;
166    }
167    let mut sorted_sizes = sizes.clone(); // Clone if you need to keep original order
168    sorted_sizes.sort_by(|a, b| a.value_offset.cmp(&b.value_offset));
169
170    let values: Vec<String> = sorted_sizes
171        .into_iter() // Now consume the sorted vector
172        .filter(|c| c.input != output && c.value_size > 0)
173        .map(|c| c.input)
174        .collect();
175
176    if values.is_empty() {
177        return;
178    }
179
180    let output = File::create(format!("{output}.values")).unwrap();
181    let mut writer = BufWriter::new(output);
182
183    for value in values {
184        let input = File::open(format!("{value}.values")).unwrap();
185        let mut reader = BufReader::new(input);
186
187        // write into output
188        copy(&mut reader, &mut writer).unwrap();
189    }
190
191    writer.flush().unwrap();
192}
193
194fn tmpdir() -> String {
195    env::temp_dir().to_str().unwrap().to_string()
196}
197
198// #[cfg(test)]
199// #[coverage(off)]
200// #[cfg_attr(feature = "nightly", coverage(off))]
201// mod tests {
202//     use super::*;
203
204//     #[test]
205//     fn test_sort_single_thread() {
206//         #[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)]
207//         struct TestKey {
208//             a: f64,
209//         }
210
211//         let mut kv_store = KV::<u64, TestKey>::new(None);
212//     }
213// }
214
215// test('sort - single threaded', async () => {
216//     const dir = tmp.dirSync({ prefix: 'externalSort_single' });
217//     const name = `${dir.name}/sort-single-threaded`;
218//     const store = new S2FileStore<{ a: number }>(name);
219
220//     store.set(0, { a: 1 });
221//     store.set(1, { a: 2 });
222//     store.set(5_005, { a: 3 });
223//     store.set(22, { a: 4 });
224//     store.set(22, { a: 5 });
225//     store.set(22, { a: 6 });
226
227//     store.close();
228
229//     await externalSort([name], name);
230
231//     const storeSorted = new S2FileStore<{ a: number }>(name, { isSorted: true });
232//     const data = await Array.fromAsync(storeSorted.entries());
233
234//     expect(data).toStrictEqual([
235//       { key: 0n, value: { a: 1 } },
236//       { key: 1n, value: { a: 2 } },
237//       { key: 22n, value: { a: 4 } },
238//       { key: 22n, value: { a: 5 } },
239//       { key: 22n, value: { a: 6 } },
240//       { key: 5_005n, value: { a: 3 } },
241//     ]);
242//   });