Skip to main content

chkpt_core/store/
pack.rs

1use crate::error::{ChkpttError, Result};
2use crate::store::blob::hash_content;
3use memmap2::Mmap;
4use std::collections::HashMap;
5use std::io::{BufWriter, Cursor, Seek, SeekFrom, Write};
6use std::path::{Path, PathBuf};
7use tempfile::NamedTempFile;
8
9const PACK_MAGIC: &[u8; 4] = b"CHKP";
10const PACK_VERSION: u32 = 1;
11const IDX_ENTRY_SIZE: usize = 32 + 8 + 8; // hash + offset + size
12const HEADER_SIZE: u64 = 12; // MAGIC(4) + VERSION(4) + COUNT(4)
13
14/// In-memory index entry for a pack.
15#[derive(Debug, Clone)]
16struct IndexEntry {
17    hash: [u8; 32],
18    offset: u64,
19    size: u64,
20}
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub struct PackLocation {
24    pub(crate) reader_index: usize,
25    pub(crate) offset: u64,
26    pub(crate) size: u64,
27}
28
29pub struct PackWriter {
30    writer: BufWriter<NamedTempFile>,
31    hasher: blake3::Hasher,
32    idx_entries: Vec<IndexEntry>,
33    offset: u64,
34    packs_dir: PathBuf,
35}
36
37impl PackWriter {
38    pub fn new(packs_dir: &Path) -> Result<Self> {
39        std::fs::create_dir_all(packs_dir)?;
40        let dat_tmp = NamedTempFile::new_in(packs_dir)?;
41        let mut writer = BufWriter::with_capacity(256 * 1024, dat_tmp);
42        // Write 12-byte placeholder header (will be overwritten in finish)
43        let placeholder = [0u8; HEADER_SIZE as usize];
44        writer.write_all(&placeholder)?;
45        // Start incremental hasher — header will be re-hashed in finish()
46        let hasher = blake3::Hasher::new();
47        Ok(Self {
48            writer,
49            hasher,
50            idx_entries: Vec::new(),
51            offset: HEADER_SIZE,
52            packs_dir: packs_dir.to_path_buf(),
53        })
54    }
55
56    pub fn add(&mut self, content: &[u8]) -> Result<String> {
57        let hash_hex = hash_content(content);
58        let hash = hex_to_bytes(&hash_hex)?;
59        let compressed = zstd::encode_all(content, 1)?;
60        self.add_pre_compressed_bytes(hash, compressed)?;
61        Ok(hash_hex)
62    }
63
64    pub fn add_pre_compressed(&mut self, hash_hex: String, compressed: Vec<u8>) -> Result<()> {
65        let hash = hex_to_bytes(&hash_hex)?;
66        self.add_pre_compressed_bytes(hash, compressed)
67    }
68
69    pub fn add_pre_compressed_bytes(&mut self, hash: [u8; 32], compressed: Vec<u8>) -> Result<()> {
70        let compressed_len = compressed.len() as u64;
71
72        // Write entry to BufWriter: hash(32) + compressed_len(8) + data(N)
73        self.writer.write_all(&hash)?;
74        self.writer.write_all(&compressed_len.to_le_bytes())?;
75        self.writer.write_all(&compressed)?;
76
77        // Incremental hash of entry data
78        self.hasher.update(&hash);
79        self.hasher.update(&compressed_len.to_le_bytes());
80        self.hasher.update(&compressed);
81
82        self.idx_entries.push(IndexEntry {
83            hash,
84            offset: self.offset,
85            size: compressed_len,
86        });
87        self.offset += 32 + 8 + compressed_len;
88        Ok(())
89    }
90
91    pub fn is_empty(&self) -> bool {
92        self.idx_entries.is_empty()
93    }
94
95    /// Finalize: write real header, persist .dat, write .idx.
96    /// Returns pack hash.
97    pub fn finish(mut self) -> Result<String> {
98        if self.idx_entries.is_empty() {
99            return Err(ChkpttError::Other("No entries to pack".into()));
100        }
101
102        // Flush BufWriter and get the underlying file
103        self.writer.flush()?;
104        let mut dat_tmp = self.writer.into_inner().map_err(|e| e.into_error())?;
105
106        let count = self.idx_entries.len() as u32;
107
108        // Write real header
109        let mut header = [0u8; HEADER_SIZE as usize];
110        header[0..4].copy_from_slice(PACK_MAGIC);
111        header[4..8].copy_from_slice(&PACK_VERSION.to_le_bytes());
112        header[8..12].copy_from_slice(&count.to_le_bytes());
113
114        dat_tmp.seek(SeekFrom::Start(0))?;
115        dat_tmp.write_all(&header)?;
116        dat_tmp.flush()?;
117
118        // Finalize hash: include header in the hash
119        self.hasher.update(&header);
120        let pack_hash = self.hasher.finalize().to_hex()[..16].to_string();
121
122        // Persist .dat file
123        let dat_path = self.packs_dir.join(format!("pack-{}.dat", pack_hash));
124        if let Err(error) = dat_tmp.persist_noclobber(&dat_path) {
125            if error.error.kind() != std::io::ErrorKind::AlreadyExists {
126                return Err(ChkpttError::Other(error.error.to_string()));
127            }
128        }
129
130        // Sort idx entries by hash for binary search
131        self.idx_entries
132            .sort_unstable_by(|a, b| a.hash.cmp(&b.hash));
133
134        // Write .idx file
135        let idx_path = self.packs_dir.join(format!("pack-{}.idx", pack_hash));
136        let mut idx_buf: Vec<u8> = Vec::with_capacity(self.idx_entries.len() * IDX_ENTRY_SIZE);
137        for entry in &self.idx_entries {
138            idx_buf.extend_from_slice(&entry.hash);
139            idx_buf.extend_from_slice(&entry.offset.to_le_bytes());
140            idx_buf.extend_from_slice(&entry.size.to_le_bytes());
141        }
142        std::fs::write(&idx_path, &idx_buf)?;
143
144        Ok(pack_hash)
145    }
146}
147
148pub struct PackReader {
149    _dat_file: std::fs::File,
150    dat: Mmap,
151    _idx_file: std::fs::File,
152    idx: Mmap,
153    entry_count: usize,
154}
155
156impl PackReader {
157    pub fn open(packs_dir: &Path, pack_hash: &str) -> Result<Self> {
158        let dat_path = packs_dir.join(format!("pack-{}.dat", pack_hash));
159        let idx_path = packs_dir.join(format!("pack-{}.idx", pack_hash));
160
161        let dat_file = std::fs::File::open(&dat_path)?;
162        let idx_file = std::fs::File::open(&idx_path)?;
163
164        // SAFETY: The file handles are kept alive alongside the mmaps.
165        let dat = unsafe { Mmap::map(&dat_file)? };
166        let idx = unsafe { Mmap::map(&idx_file)? };
167
168        let entry_count = idx.len() / IDX_ENTRY_SIZE;
169
170        Ok(Self {
171            _dat_file: dat_file,
172            dat,
173            _idx_file: idx_file,
174            idx,
175            entry_count,
176        })
177    }
178
179    /// Extract an IndexEntry from the mmap'd idx at a given index position.
180    fn idx_entry(&self, index: usize) -> IndexEntry {
181        let pos = index * IDX_ENTRY_SIZE;
182        let mut hash = [0u8; 32];
183        hash.copy_from_slice(&self.idx[pos..pos + 32]);
184        let offset = u64::from_le_bytes(self.idx[pos + 32..pos + 40].try_into().unwrap());
185        let size = u64::from_le_bytes(self.idx[pos + 40..pos + 48].try_into().unwrap());
186        IndexEntry { hash, offset, size }
187    }
188
189    /// Binary search for hash in the mmap'd index.
190    fn find_bytes(&self, hash_bytes: &[u8; 32]) -> Option<IndexEntry> {
191        let mut lo = 0usize;
192        let mut hi = self.entry_count;
193        while lo < hi {
194            let mid = lo + (hi - lo) / 2;
195            let mid_hash = &self.idx[mid * IDX_ENTRY_SIZE..mid * IDX_ENTRY_SIZE + 32];
196            match mid_hash.cmp(&hash_bytes[..]) {
197                std::cmp::Ordering::Equal => return Some(self.idx_entry(mid)),
198                std::cmp::Ordering::Less => lo = mid + 1,
199                std::cmp::Ordering::Greater => hi = mid,
200            }
201        }
202        None
203    }
204
205    fn find(&self, hash_hex: &str) -> Option<IndexEntry> {
206        let hash_bytes = hex_to_bytes(hash_hex).ok()?;
207        self.find_bytes(&hash_bytes)
208    }
209
210    pub fn contains_bytes(&self, hash: &[u8; 32]) -> bool {
211        self.find_bytes(hash).is_some()
212    }
213
214    fn compressed_bytes(&self, offset: u64, size: u64) -> Option<&[u8]> {
215        let data_start = (offset as usize).checked_add(32 + 8)?; // skip hash + compressed_size
216        let data_end = data_start.checked_add(size as usize)?;
217        if data_end > self.dat.len() {
218            return None;
219        }
220        Some(&self.dat[data_start..data_end])
221    }
222
223    fn copy_at<W: Write>(&self, offset: u64, size: u64, mut writer: W) -> Result<()> {
224        let compressed = self.compressed_bytes(offset, size).ok_or_else(|| {
225            ChkpttError::StoreCorrupted("Pack entry points outside pack data".into())
226        })?;
227        zstd::stream::copy_decode(Cursor::new(compressed), &mut writer)?;
228        Ok(())
229    }
230
231    /// Read and decompress an object. Returns None if not found.
232    pub fn try_read(&self, hash_hex: &str) -> Option<Vec<u8>> {
233        let entry = self.find(hash_hex)?;
234        let mut decompressed = Vec::new();
235        self.copy_at(entry.offset, entry.size, &mut decompressed)
236            .ok()?;
237        Some(decompressed)
238    }
239
240    /// Read and decompress an object. Error if not found.
241    pub fn read(&self, hash_hex: &str) -> Result<Vec<u8>> {
242        self.try_read(hash_hex)
243            .ok_or_else(|| ChkpttError::ObjectNotFound(hash_hex.to_string()))
244    }
245}
246
247pub struct PackSet {
248    readers: Vec<PackReader>,
249    reader_indices: HashMap<String, usize>,
250}
251
252impl PackSet {
253    pub fn open_all(packs_dir: &Path) -> Result<Self> {
254        let pack_hashes = list_packs(packs_dir)?;
255        Self::open_selected(packs_dir, &pack_hashes)
256    }
257
258    pub fn open_selected(packs_dir: &Path, pack_hashes: &[String]) -> Result<Self> {
259        let mut readers = Vec::with_capacity(pack_hashes.len());
260        let mut reader_indices = HashMap::with_capacity(pack_hashes.len());
261        for pack_hash in pack_hashes {
262            let reader_index = readers.len();
263            readers.push(PackReader::open(packs_dir, pack_hash)?);
264            reader_indices.insert(pack_hash.clone(), reader_index);
265        }
266        Ok(Self {
267            readers,
268            reader_indices,
269        })
270    }
271
272    pub fn empty() -> Self {
273        Self {
274            readers: Vec::new(),
275            reader_indices: HashMap::new(),
276        }
277    }
278
279    pub fn try_read(&self, hash_hex: &str) -> Option<Vec<u8>> {
280        let location = self.locate(hash_hex)?;
281        let mut decompressed = Vec::new();
282        self.copy_to_writer(&location, &mut decompressed).ok()?;
283        Some(decompressed)
284    }
285
286    pub fn contains_bytes(&self, hash: &[u8; 32]) -> bool {
287        self.readers
288            .iter()
289            .any(|reader| reader.contains_bytes(hash))
290    }
291
292    pub fn read(&self, hash_hex: &str) -> Result<Vec<u8>> {
293        self.try_read(hash_hex)
294            .ok_or_else(|| ChkpttError::ObjectNotFound(hash_hex.to_string()))
295    }
296
297    pub(crate) fn locate(&self, hash_hex: &str) -> Option<PackLocation> {
298        self.readers
299            .iter()
300            .enumerate()
301            .find_map(|(reader_index, reader)| {
302                reader.find(hash_hex).map(|entry| PackLocation {
303                    reader_index,
304                    offset: entry.offset,
305                    size: entry.size,
306                })
307            })
308    }
309
310    pub(crate) fn locate_in_pack_bytes(
311        &self,
312        pack_hash: &str,
313        hash: &[u8; 32],
314    ) -> Option<PackLocation> {
315        let reader_index = *self.reader_indices.get(pack_hash)?;
316        let reader = self.readers.get(reader_index)?;
317        reader.find_bytes(hash).map(|entry| PackLocation {
318            reader_index,
319            offset: entry.offset,
320            size: entry.size,
321        })
322    }
323
324    pub(crate) fn copy_to_writer<W: Write>(
325        &self,
326        location: &PackLocation,
327        writer: W,
328    ) -> Result<()> {
329        let reader = self.readers.get(location.reader_index).ok_or_else(|| {
330            ChkpttError::StoreCorrupted(format!(
331                "Pack reader index {} is out of range",
332                location.reader_index
333            ))
334        })?;
335        reader.copy_at(location.offset, location.size, writer)
336    }
337}
338
339/// List all pack hashes in a directory.
340pub fn list_packs(packs_dir: &Path) -> Result<Vec<String>> {
341    let mut packs = Vec::new();
342    let entries = match std::fs::read_dir(packs_dir) {
343        Ok(entries) => entries,
344        Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(packs),
345        Err(error) => return Err(error.into()),
346    };
347    for entry in entries {
348        let entry = entry?;
349        let name = entry.file_name();
350        let name = name.to_string_lossy();
351        if name.starts_with("pack-") && name.ends_with(".dat") {
352            let hash = name
353                .strip_prefix("pack-")
354                .unwrap()
355                .strip_suffix(".dat")
356                .unwrap();
357            packs.push(hash.to_owned());
358        }
359    }
360    Ok(packs)
361}
362
363fn hex_to_bytes(hex: &str) -> Result<[u8; 32]> {
364    let mut bytes = [0u8; 32];
365    if hex.len() != 64 {
366        return Err(ChkpttError::Other(format!(
367            "Invalid hash length: {}",
368            hex.len()
369        )));
370    }
371    for i in 0..32 {
372        bytes[i] = u8::from_str_radix(&hex[i * 2..i * 2 + 2], 16)
373            .map_err(|_| ChkpttError::Other("Invalid hex".into()))?;
374    }
375    Ok(bytes)
376}