kv_par_merge_sort/
merge.rs1use crate::SortedChunkFiles;
2
3use bytemuck::{bytes_of, bytes_of_mut, Pod};
4use core::cmp::Reverse;
5use std::collections::BinaryHeap;
6use std::fs::File;
7use std::io::{self, BufReader, BufWriter, Read, Write};
8use std::path::Path;
9use tempfile::tempfile_in;
10
11const ONE_MIB: usize = 1 << 20;
12
13pub fn merge_chunks_into_tempfiles<K, V>(
14 chunks: Vec<SortedChunkFiles>,
15 tmp_dir_path: &Path,
16) -> Result<SortedChunkFiles, io::Error>
17where
18 K: Ord + Pod,
19 V: Pod,
20{
21 merge_chunks::<K, V>(
22 chunks,
23 tempfile_in(tmp_dir_path)?,
24 tempfile_in(tmp_dir_path)?,
25 )
26}
27
28pub fn merge_chunks<K, V>(
30 chunks: Vec<SortedChunkFiles>,
31 merged_key_file: File,
32 merged_value_file: File,
33) -> Result<SortedChunkFiles, io::Error>
34where
35 K: Ord + Pod,
36 V: Pod,
37{
38 log::info!("Running merge of {} persisted chunks", chunks.len());
39
40 let span = tracing::info_span!("merge_persisted_chunks");
41 let _guard = span.enter();
42
43 let sum_entries = chunks.iter().map(|chunk| chunk.num_entries).sum();
44
45 let mut readers: Vec<_> = chunks
47 .into_iter()
48 .map(|chunk| {
49 (
50 BufReader::with_capacity(ONE_MIB, chunk.key_file),
51 BufReader::with_capacity(ONE_MIB, chunk.value_file),
52 )
53 })
54 .collect();
55
56 let mut key_writer = BufWriter::with_capacity(ONE_MIB, merged_key_file);
57 let mut value_writer = BufWriter::with_capacity(ONE_MIB, merged_value_file);
58
59 let mut key_heap = BinaryHeap::with_capacity(readers.len());
61 for (i, (key_reader, _)) in readers.iter_mut().enumerate() {
62 let mut key = K::zeroed();
63 assert!(read_element(key_reader, &mut key)?);
64 key_heap.push((Reverse(key), i));
65 }
66
67 while let Some((key, chunk_index)) = key_heap.pop() {
68 let (key_reader, value_reader) = &mut readers[chunk_index];
69 write_element(&mut key_writer, &key.0)?;
70 let mut value = V::zeroed();
71 assert!(read_element(value_reader, &mut value)?);
72 write_element(&mut value_writer, &value)?;
73 let mut next_key = K::zeroed();
74 if read_element(key_reader, &mut next_key)? {
75 key_heap.push((Reverse(next_key), chunk_index));
76 }
77 }
78
79 SortedChunkFiles::new(
80 key_writer.into_inner()?,
81 value_writer.into_inner()?,
82 sum_entries,
83 )
84}
85
86fn write_element<T>(writer: &mut BufWriter<File>, value: &T) -> Result<(), io::Error>
87where
88 T: Pod,
89{
90 writer.write_all(bytes_of(value))
91}
92
93fn read_element<T>(reader: &mut BufReader<File>, value: &mut T) -> Result<bool, io::Error>
94where
95 T: Pod,
96{
97 if let Err(e) = reader.read_exact(bytes_of_mut(value)) {
98 if e.kind() == io::ErrorKind::UnexpectedEof {
99 Ok(false)
100 } else {
101 Err(e)
102 }
103 } else {
104 Ok(true)
105 }
106}