Skip to main content

chkpt_core/store/
pack.rs

1use crate::error::{ChkpttError, Result};
2use crate::store::blob::{hash_content, hex_to_bytes};
3use memmap2::Mmap;
4use std::collections::HashMap;
5use std::io::{BufWriter, Seek, SeekFrom, Write};
6use std::path::{Path, PathBuf};
7use tempfile::NamedTempFile;
8
9const PACK_MAGIC: &[u8; 4] = b"CHKL";
10const IDX_ENTRY_SIZE: usize = 16 + 8 + 8; // hash + offset + size
11const HEADER_SIZE: u64 = 8; // MAGIC(4) + COUNT(4)
12
13/// In-memory index entry for a pack.
14#[derive(Debug, Clone)]
15struct IndexEntry {
16    hash: [u8; 16],
17    offset: u64,
18    size: u64,
19}
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub struct PackLocation {
23    pub(crate) reader_index: usize,
24    pub(crate) offset: u64,
25    pub(crate) size: u64,
26}
27
28pub struct PackWriter {
29    writer: BufWriter<NamedTempFile>,
30    hasher: xxhash_rust::xxh3::Xxh3,
31    idx_entries: Vec<IndexEntry>,
32    offset: u64,
33    packs_dir: PathBuf,
34}
35
36impl PackWriter {
37    pub fn new(packs_dir: &Path) -> Result<Self> {
38        std::fs::create_dir_all(packs_dir)?;
39        let dat_tmp = NamedTempFile::new_in(packs_dir)?;
40        let mut writer = BufWriter::with_capacity(256 * 1024, dat_tmp);
41        // Write 8-byte placeholder header (will be overwritten in finish)
42        let placeholder = [0u8; HEADER_SIZE as usize];
43        writer.write_all(&placeholder)?;
44        // Start incremental hasher — header will be re-hashed in finish()
45        let hasher = xxhash_rust::xxh3::Xxh3::new();
46        Ok(Self {
47            writer,
48            hasher,
49            idx_entries: Vec::new(),
50            offset: HEADER_SIZE,
51            packs_dir: packs_dir.to_path_buf(),
52        })
53    }
54
55    pub fn add(&mut self, content: &[u8]) -> Result<String> {
56        let hash_hex = hash_content(content);
57        let hash = hex_to_bytes(&hash_hex)?;
58        let compressed = {
59            use lz4_flex::frame::FrameEncoder;
60            let mut encoder = FrameEncoder::new(Vec::new());
61            std::io::Write::write_all(&mut encoder, content).unwrap();
62            encoder.finish().unwrap()
63        };
64        self.add_pre_compressed_bytes(hash, compressed)?;
65        Ok(hash_hex)
66    }
67
68    pub fn add_pre_compressed(&mut self, hash_hex: String, compressed: Vec<u8>) -> Result<()> {
69        let hash = hex_to_bytes(&hash_hex)?;
70        self.add_pre_compressed_bytes(hash, compressed)
71    }
72
73    pub fn add_pre_compressed_bytes(&mut self, hash: [u8; 16], compressed: Vec<u8>) -> Result<()> {
74        let compressed_len = compressed.len() as u64;
75
76        // Write entry to BufWriter: hash(16) + compressed_len(8) + data(N)
77        self.writer.write_all(&hash)?;
78        self.writer.write_all(&compressed_len.to_le_bytes())?;
79        self.writer.write_all(&compressed)?;
80
81        // Incremental hash of entry data
82        self.hasher.update(&hash);
83        self.hasher.update(&compressed_len.to_le_bytes());
84        self.hasher.update(&compressed);
85
86        self.idx_entries.push(IndexEntry {
87            hash,
88            offset: self.offset,
89            size: compressed_len,
90        });
91        self.offset += 16 + 8 + compressed_len;
92        Ok(())
93    }
94
95    pub fn is_empty(&self) -> bool {
96        self.idx_entries.is_empty()
97    }
98
99    /// Finalize: write real header, persist .dat, write .idx.
100    /// Returns pack hash.
101    pub fn finish(mut self) -> Result<String> {
102        if self.idx_entries.is_empty() {
103            return Err(ChkpttError::Other("No entries to pack".into()));
104        }
105
106        // Flush BufWriter and get the underlying file
107        self.writer.flush()?;
108        let mut dat_tmp = self.writer.into_inner().map_err(|e| e.into_error())?;
109
110        let count = self.idx_entries.len() as u32;
111
112        // Write real header
113        let mut header = [0u8; HEADER_SIZE as usize];
114        header[0..4].copy_from_slice(PACK_MAGIC);
115        header[4..8].copy_from_slice(&count.to_le_bytes());
116
117        dat_tmp.seek(SeekFrom::Start(0))?;
118        dat_tmp.write_all(&header)?;
119        dat_tmp.flush()?;
120
121        // Finalize hash: include header in the hash
122        self.hasher.update(&header);
123        let pack_hash = format!("{:032x}", self.hasher.digest128())[..16].to_string();
124
125        // Persist .dat file
126        let dat_path = self.packs_dir.join(format!("pack-{}.dat", pack_hash));
127        if let Err(error) = dat_tmp.persist_noclobber(&dat_path) {
128            if error.error.kind() != std::io::ErrorKind::AlreadyExists {
129                return Err(ChkpttError::Other(error.error.to_string()));
130            }
131        }
132
133        // Sort idx entries by hash for binary search
134        self.idx_entries
135            .sort_unstable_by(|a, b| a.hash.cmp(&b.hash));
136
137        // Write .idx file
138        let idx_path = self.packs_dir.join(format!("pack-{}.idx", pack_hash));
139        let mut idx_buf: Vec<u8> = Vec::with_capacity(self.idx_entries.len() * IDX_ENTRY_SIZE);
140        for entry in &self.idx_entries {
141            idx_buf.extend_from_slice(&entry.hash);
142            idx_buf.extend_from_slice(&entry.offset.to_le_bytes());
143            idx_buf.extend_from_slice(&entry.size.to_le_bytes());
144        }
145        std::fs::write(&idx_path, &idx_buf)?;
146
147        Ok(pack_hash)
148    }
149}
150
151pub struct PackReader {
152    _dat_file: std::fs::File,
153    dat: Mmap,
154    _idx_file: std::fs::File,
155    idx: Mmap,
156    entry_count: usize,
157}
158
159impl PackReader {
160    pub fn open(packs_dir: &Path, pack_hash: &str) -> Result<Self> {
161        let dat_path = packs_dir.join(format!("pack-{}.dat", pack_hash));
162        let idx_path = packs_dir.join(format!("pack-{}.idx", pack_hash));
163
164        let dat_file = std::fs::File::open(&dat_path)?;
165        let idx_file = std::fs::File::open(&idx_path)?;
166
167        // SAFETY: The file handles are kept alive alongside the mmaps.
168        let dat = unsafe { Mmap::map(&dat_file)? };
169        let idx = unsafe { Mmap::map(&idx_file)? };
170
171        // Hint kernel about expected access patterns.
172        // .dat uses WillNeed (not Sequential) because parallel restore workers
173        // read different regions of the same mmap concurrently — Sequential
174        // causes aggressive page reclaim that hurts other threads.
175        // .idx is binary-searched so Random is appropriate.
176        #[cfg(unix)]
177        {
178            let _ = dat.advise(memmap2::Advice::WillNeed);
179            let _ = idx.advise(memmap2::Advice::Random);
180        }
181
182        let entry_count = idx.len() / IDX_ENTRY_SIZE;
183
184        Ok(Self {
185            _dat_file: dat_file,
186            dat,
187            _idx_file: idx_file,
188            idx,
189            entry_count,
190        })
191    }
192
193    /// Extract an IndexEntry from the mmap'd idx at a given index position.
194    fn idx_entry(&self, index: usize) -> IndexEntry {
195        let pos = index * IDX_ENTRY_SIZE;
196        let mut hash = [0u8; 16];
197        hash.copy_from_slice(&self.idx[pos..pos + 16]);
198        let offset = u64::from_le_bytes(self.idx[pos + 16..pos + 24].try_into().unwrap());
199        let size = u64::from_le_bytes(self.idx[pos + 24..pos + 32].try_into().unwrap());
200        IndexEntry { hash, offset, size }
201    }
202
203    /// Binary search for hash in the mmap'd index.
204    fn find_bytes(&self, hash_bytes: &[u8; 16]) -> Option<IndexEntry> {
205        let mut lo = 0usize;
206        let mut hi = self.entry_count;
207        while lo < hi {
208            let mid = lo + (hi - lo) / 2;
209            let mid_hash = &self.idx[mid * IDX_ENTRY_SIZE..mid * IDX_ENTRY_SIZE + 16];
210            match mid_hash.cmp(&hash_bytes[..]) {
211                std::cmp::Ordering::Equal => return Some(self.idx_entry(mid)),
212                std::cmp::Ordering::Less => lo = mid + 1,
213                std::cmp::Ordering::Greater => hi = mid,
214            }
215        }
216        None
217    }
218
219    fn find(&self, hash_hex: &str) -> Option<IndexEntry> {
220        let hash_bytes = hex_to_bytes(hash_hex).ok()?;
221        self.find_bytes(&hash_bytes)
222    }
223
224    pub fn contains_bytes(&self, hash: &[u8; 16]) -> bool {
225        self.find_bytes(hash).is_some()
226    }
227
228    fn compressed_bytes(&self, offset: u64, size: u64) -> Option<&[u8]> {
229        let data_start = (offset as usize).checked_add(16 + 8)?; // skip hash + compressed_size
230        let data_end = data_start.checked_add(size as usize)?;
231        if data_end > self.dat.len() {
232            return None;
233        }
234        Some(&self.dat[data_start..data_end])
235    }
236
237    fn copy_at<W: Write>(&self, offset: u64, size: u64, mut writer: W) -> Result<()> {
238        let compressed = self.compressed_bytes(offset, size).ok_or_else(|| {
239            ChkpttError::StoreCorrupted("Pack entry points outside pack data".into())
240        })?;
241        let mut decoder = lz4_flex::frame::FrameDecoder::new(compressed);
242        std::io::copy(&mut decoder, &mut writer).map_err(|e| {
243            if e.kind() == std::io::ErrorKind::InvalidData {
244                ChkpttError::StoreCorrupted(format!("LZ4 decompression failed: {}", e))
245            } else {
246                ChkpttError::Io(e)
247            }
248        })?;
249        Ok(())
250    }
251
252    /// Read and decompress an object. Returns None if not found.
253    pub fn try_read(&self, hash_hex: &str) -> Option<Vec<u8>> {
254        let entry = self.find(hash_hex)?;
255        let mut decompressed = Vec::new();
256        self.copy_at(entry.offset, entry.size, &mut decompressed)
257            .ok()?;
258        Some(decompressed)
259    }
260
261    /// Read and decompress an object. Error if not found.
262    pub fn read(&self, hash_hex: &str) -> Result<Vec<u8>> {
263        self.try_read(hash_hex)
264            .ok_or_else(|| ChkpttError::ObjectNotFound(hash_hex.to_string()))
265    }
266}
267
268pub struct PackSet {
269    readers: Vec<PackReader>,
270    reader_indices: HashMap<String, usize>,
271}
272
273impl PackSet {
274    pub fn open_all(packs_dir: &Path) -> Result<Self> {
275        let pack_hashes = list_packs(packs_dir)?;
276        Self::open_selected(packs_dir, &pack_hashes)
277    }
278
279    pub fn open_selected(packs_dir: &Path, pack_hashes: &[String]) -> Result<Self> {
280        let mut readers = Vec::with_capacity(pack_hashes.len());
281        let mut reader_indices = HashMap::with_capacity(pack_hashes.len());
282        for pack_hash in pack_hashes {
283            let reader_index = readers.len();
284            readers.push(PackReader::open(packs_dir, pack_hash)?);
285            reader_indices.insert(pack_hash.clone(), reader_index);
286        }
287        Ok(Self {
288            readers,
289            reader_indices,
290        })
291    }
292
293    pub fn empty() -> Self {
294        Self {
295            readers: Vec::new(),
296            reader_indices: HashMap::new(),
297        }
298    }
299
300    pub fn try_read(&self, hash_hex: &str) -> Option<Vec<u8>> {
301        let location = self.locate(hash_hex)?;
302        let mut decompressed = Vec::new();
303        self.copy_to_writer(&location, &mut decompressed).ok()?;
304        Some(decompressed)
305    }
306
307    pub fn contains_bytes(&self, hash: &[u8; 16]) -> bool {
308        self.readers
309            .iter()
310            .any(|reader| reader.contains_bytes(hash))
311    }
312
313    pub fn read(&self, hash_hex: &str) -> Result<Vec<u8>> {
314        self.try_read(hash_hex)
315            .ok_or_else(|| ChkpttError::ObjectNotFound(hash_hex.to_string()))
316    }
317
318    pub(crate) fn locate(&self, hash_hex: &str) -> Option<PackLocation> {
319        self.readers
320            .iter()
321            .enumerate()
322            .find_map(|(reader_index, reader)| {
323                reader.find(hash_hex).map(|entry| PackLocation {
324                    reader_index,
325                    offset: entry.offset,
326                    size: entry.size,
327                })
328            })
329    }
330
331    pub fn locate_bytes(&self, hash: &[u8; 16]) -> Option<PackLocation> {
332        self.readers
333            .iter()
334            .enumerate()
335            .find_map(|(reader_index, reader)| {
336                reader.find_bytes(hash).map(|entry| PackLocation {
337                    reader_index,
338                    offset: entry.offset,
339                    size: entry.size,
340                })
341            })
342    }
343
344    pub(crate) fn locate_in_pack_bytes(
345        &self,
346        pack_hash: &str,
347        hash: &[u8; 16],
348    ) -> Option<PackLocation> {
349        let reader_index = *self.reader_indices.get(pack_hash)?;
350        let reader = self.readers.get(reader_index)?;
351        reader.find_bytes(hash).map(|entry| PackLocation {
352            reader_index,
353            offset: entry.offset,
354            size: entry.size,
355        })
356    }
357
358    pub(crate) fn copy_to_writer<W: Write>(
359        &self,
360        location: &PackLocation,
361        writer: W,
362    ) -> Result<()> {
363        let reader = self.readers.get(location.reader_index).ok_or_else(|| {
364            ChkpttError::StoreCorrupted(format!(
365                "Pack reader index {} is out of range",
366                location.reader_index
367            ))
368        })?;
369        reader.copy_at(location.offset, location.size, writer)
370    }
371}
372
373/// List all pack hashes in a directory.
374pub fn list_packs(packs_dir: &Path) -> Result<Vec<String>> {
375    let mut packs = Vec::new();
376    let entries = match std::fs::read_dir(packs_dir) {
377        Ok(entries) => entries,
378        Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(packs),
379        Err(error) => return Err(error.into()),
380    };
381    for entry in entries {
382        let entry = entry?;
383        let name = entry.file_name();
384        let name = name.to_string_lossy();
385        if name.starts_with("pack-") && name.ends_with(".dat") {
386            let hash = name
387                .strip_prefix("pack-")
388                .unwrap()
389                .strip_suffix(".dat")
390                .unwrap();
391            packs.push(hash.to_owned());
392        }
393    }
394    Ok(packs)
395}