kv_par_merge_sort/
merge.rs

1use crate::SortedChunkFiles;
2
3use bytemuck::{bytes_of, bytes_of_mut, Pod};
4use core::cmp::Reverse;
5use std::collections::BinaryHeap;
6use std::fs::File;
7use std::io::{self, BufReader, BufWriter, Read, Write};
8use std::path::Path;
9use tempfile::tempfile_in;
10
11const ONE_MIB: usize = 1 << 20;
12
13pub fn merge_chunks_into_tempfiles<K, V>(
14    chunks: Vec<SortedChunkFiles>,
15    tmp_dir_path: &Path,
16) -> Result<SortedChunkFiles, io::Error>
17where
18    K: Ord + Pod,
19    V: Pod,
20{
21    merge_chunks::<K, V>(
22        chunks,
23        tempfile_in(tmp_dir_path)?,
24        tempfile_in(tmp_dir_path)?,
25    )
26}
27
28/// Merges any number of [`SortedChunkFiles`]
29pub fn merge_chunks<K, V>(
30    chunks: Vec<SortedChunkFiles>,
31    merged_key_file: File,
32    merged_value_file: File,
33) -> Result<SortedChunkFiles, io::Error>
34where
35    K: Ord + Pod,
36    V: Pod,
37{
38    log::info!("Running merge of {} persisted chunks", chunks.len());
39
40    let span = tracing::info_span!("merge_persisted_chunks");
41    let _guard = span.enter();
42
43    let sum_entries = chunks.iter().map(|chunk| chunk.num_entries).sum();
44
45    // Merge the files without reading their entire contents into memory.
46    let mut readers: Vec<_> = chunks
47        .into_iter()
48        .map(|chunk| {
49            (
50                BufReader::with_capacity(ONE_MIB, chunk.key_file),
51                BufReader::with_capacity(ONE_MIB, chunk.value_file),
52            )
53        })
54        .collect();
55
56    let mut key_writer = BufWriter::with_capacity(ONE_MIB, merged_key_file);
57    let mut value_writer = BufWriter::with_capacity(ONE_MIB, merged_value_file);
58
59    // Initialize the first key from each file. This asserts that each chunk is not empty.
60    let mut key_heap = BinaryHeap::with_capacity(readers.len());
61    for (i, (key_reader, _)) in readers.iter_mut().enumerate() {
62        let mut key = K::zeroed();
63        assert!(read_element(key_reader, &mut key)?);
64        key_heap.push((Reverse(key), i));
65    }
66
67    while let Some((key, chunk_index)) = key_heap.pop() {
68        let (key_reader, value_reader) = &mut readers[chunk_index];
69        write_element(&mut key_writer, &key.0)?;
70        let mut value = V::zeroed();
71        assert!(read_element(value_reader, &mut value)?);
72        write_element(&mut value_writer, &value)?;
73        let mut next_key = K::zeroed();
74        if read_element(key_reader, &mut next_key)? {
75            key_heap.push((Reverse(next_key), chunk_index));
76        }
77    }
78
79    SortedChunkFiles::new(
80        key_writer.into_inner()?,
81        value_writer.into_inner()?,
82        sum_entries,
83    )
84}
85
86fn write_element<T>(writer: &mut BufWriter<File>, value: &T) -> Result<(), io::Error>
87where
88    T: Pod,
89{
90    writer.write_all(bytes_of(value))
91}
92
93fn read_element<T>(reader: &mut BufReader<File>, value: &mut T) -> Result<bool, io::Error>
94where
95    T: Pod,
96{
97    if let Err(e) = reader.read_exact(bytes_of_mut(value)) {
98        if e.kind() == io::ErrorKind::UnexpectedEof {
99            Ok(false)
100        } else {
101            Err(e)
102        }
103    } else {
104        Ok(true)
105    }
106}