Skip to main content

chkpt_core/store/
pack.rs

1use crate::error::{ChkpttError, Result};
2use crate::store::blob::{hash_content, hex_to_bytes};
3use memmap2::Mmap;
4use serde::{Deserialize, Serialize};
5use std::collections::{BTreeSet, HashMap};
6use std::fs::File;
7use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
8use std::path::{Path, PathBuf};
9use tempfile::NamedTempFile;
10
11const PACK_MAGIC: &[u8; 4] = b"CHKL";
12const IDX_ENTRY_SIZE: usize = 16 + 8 + 8; // hash + offset + size
13const HEADER_SIZE: u64 = 8; // MAGIC(4) + COUNT(4)
14const PART_READ_BUFFER_SIZE: usize = 256 * 1024;
15
16#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
17pub struct PackFinishOptions {
18    pub chunk_bytes: Option<u64>,
19}
20
21#[derive(Debug, Serialize, Deserialize)]
22struct PackPartsManifest {
23    version: u32,
24    pack_hash: String,
25    dat_size: u64,
26    chunk_bytes: u64,
27    parts: Vec<PackPartManifestEntry>,
28}
29
30#[derive(Debug, Serialize, Deserialize)]
31struct PackPartManifestEntry {
32    path: String,
33    offset: u64,
34    size: u64,
35}
36
37/// In-memory index entry for a pack.
38#[derive(Debug, Clone)]
39struct IndexEntry {
40    hash: [u8; 16],
41    offset: u64,
42    size: u64,
43}
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub struct PackLocation {
47    pub(crate) reader_index: usize,
48    pub(crate) offset: u64,
49    pub(crate) size: u64,
50}
51
52pub struct PackWriter {
53    writer: BufWriter<NamedTempFile>,
54    hasher: xxhash_rust::xxh3::Xxh3,
55    idx_entries: Vec<IndexEntry>,
56    offset: u64,
57    packs_dir: PathBuf,
58}
59
60impl PackWriter {
61    pub fn new(packs_dir: &Path) -> Result<Self> {
62        std::fs::create_dir_all(packs_dir)?;
63        let dat_tmp = NamedTempFile::new_in(packs_dir)?;
64        let mut writer = BufWriter::with_capacity(256 * 1024, dat_tmp);
65        // Write 8-byte placeholder header (will be overwritten in finish)
66        let placeholder = [0u8; HEADER_SIZE as usize];
67        writer.write_all(&placeholder)?;
68        // Start incremental hasher — header will be re-hashed in finish()
69        let hasher = xxhash_rust::xxh3::Xxh3::new();
70        Ok(Self {
71            writer,
72            hasher,
73            idx_entries: Vec::new(),
74            offset: HEADER_SIZE,
75            packs_dir: packs_dir.to_path_buf(),
76        })
77    }
78
79    pub fn add(&mut self, content: &[u8]) -> Result<String> {
80        let hash_hex = hash_content(content);
81        let hash = hex_to_bytes(&hash_hex)?;
82        let compressed = {
83            use lz4_flex::frame::FrameEncoder;
84            let mut encoder = FrameEncoder::new(Vec::new());
85            std::io::Write::write_all(&mut encoder, content).unwrap();
86            encoder.finish().unwrap()
87        };
88        self.add_pre_compressed_bytes(hash, compressed)?;
89        Ok(hash_hex)
90    }
91
92    pub fn add_pre_compressed(&mut self, hash_hex: String, compressed: Vec<u8>) -> Result<()> {
93        let hash = hex_to_bytes(&hash_hex)?;
94        self.add_pre_compressed_bytes(hash, compressed)
95    }
96
97    pub fn add_pre_compressed_bytes(&mut self, hash: [u8; 16], compressed: Vec<u8>) -> Result<()> {
98        let compressed_len = compressed.len() as u64;
99
100        // Write entry to BufWriter: hash(16) + compressed_len(8) + data(N)
101        self.writer.write_all(&hash)?;
102        self.writer.write_all(&compressed_len.to_le_bytes())?;
103        self.writer.write_all(&compressed)?;
104
105        // Incremental hash of entry data
106        self.hasher.update(&hash);
107        self.hasher.update(&compressed_len.to_le_bytes());
108        self.hasher.update(&compressed);
109
110        self.idx_entries.push(IndexEntry {
111            hash,
112            offset: self.offset,
113            size: compressed_len,
114        });
115        self.offset += 16 + 8 + compressed_len;
116        Ok(())
117    }
118
119    pub fn is_empty(&self) -> bool {
120        self.idx_entries.is_empty()
121    }
122
123    /// Finalize: write real header, persist .dat, write .idx.
124    /// Returns pack hash.
125    pub fn finish(self) -> Result<String> {
126        self.finish_with_options(PackFinishOptions::default())
127    }
128
129    /// Finalize the pack and optionally split the resulting .dat file into
130    /// contiguous part files. The existing unchunked finish path is the default.
131    pub fn finish_with_options(mut self, options: PackFinishOptions) -> Result<String> {
132        if self.idx_entries.is_empty() {
133            return Err(ChkpttError::Other("No entries to pack".into()));
134        }
135        if options.chunk_bytes == Some(0) {
136            return Err(ChkpttError::Other(
137                "pack chunk size must be greater than zero".into(),
138            ));
139        }
140
141        // Flush BufWriter and get the underlying file
142        self.writer.flush()?;
143        let mut dat_tmp = self.writer.into_inner().map_err(|e| e.into_error())?;
144
145        let count = self.idx_entries.len() as u32;
146
147        // Write real header
148        let mut header = [0u8; HEADER_SIZE as usize];
149        header[0..4].copy_from_slice(PACK_MAGIC);
150        header[4..8].copy_from_slice(&count.to_le_bytes());
151
152        dat_tmp.seek(SeekFrom::Start(0))?;
153        dat_tmp.write_all(&header)?;
154        dat_tmp.flush()?;
155
156        // Finalize hash: include header in the hash
157        self.hasher.update(&header);
158        let pack_hash = format!("{:032x}", self.hasher.digest128())[..16].to_string();
159
160        // Persist .dat file
161        let dat_path = self.packs_dir.join(format!("pack-{}.dat", pack_hash));
162        if let Err(error) = dat_tmp.persist_noclobber(&dat_path) {
163            if error.error.kind() != std::io::ErrorKind::AlreadyExists {
164                return Err(ChkpttError::Other(error.error.to_string()));
165            }
166        }
167
168        // Sort idx entries by hash for binary search
169        self.idx_entries
170            .sort_unstable_by(|a, b| a.hash.cmp(&b.hash));
171
172        // Write .idx file
173        let idx_path = self.packs_dir.join(format!("pack-{}.idx", pack_hash));
174        let mut idx_buf: Vec<u8> = Vec::with_capacity(self.idx_entries.len() * IDX_ENTRY_SIZE);
175        for entry in &self.idx_entries {
176            idx_buf.extend_from_slice(&entry.hash);
177            idx_buf.extend_from_slice(&entry.offset.to_le_bytes());
178            idx_buf.extend_from_slice(&entry.size.to_le_bytes());
179        }
180        std::fs::write(&idx_path, &idx_buf)?;
181
182        if let Some(chunk_bytes) = options.chunk_bytes {
183            split_pack_dat_file(&self.packs_dir, &pack_hash, chunk_bytes)?;
184        }
185
186        Ok(pack_hash)
187    }
188}
189
190fn split_pack_dat_file(packs_dir: &Path, pack_hash: &str, chunk_bytes: u64) -> Result<()> {
191    if chunk_bytes == 0 {
192        return Err(ChkpttError::Other(
193            "pack chunk size must be greater than zero".into(),
194        ));
195    }
196
197    let dat_path = pack_dat_path(packs_dir, pack_hash);
198    let dat_file = File::open(&dat_path)?;
199    let dat_size = dat_file.metadata()?.len();
200    let mut reader = BufReader::with_capacity(PART_READ_BUFFER_SIZE, dat_file);
201    let mut buffer = vec![0u8; (chunk_bytes as usize).min(PART_READ_BUFFER_SIZE).max(1)];
202
203    let mut offset = 0u64;
204    let mut part_index = 0usize;
205    let mut parts = Vec::new();
206
207    while offset < dat_size {
208        let part_file_name = pack_part_file_name(pack_hash, part_index);
209        let part_path = packs_dir.join(&part_file_name);
210        let mut part_tmp = NamedTempFile::new_in(packs_dir)?;
211        let mut remaining = (dat_size - offset).min(chunk_bytes);
212        let part_offset = offset;
213
214        while remaining > 0 {
215            let read_len = (remaining as usize).min(buffer.len());
216            reader.read_exact(&mut buffer[..read_len])?;
217            part_tmp.write_all(&buffer[..read_len])?;
218            remaining -= read_len as u64;
219            offset += read_len as u64;
220        }
221        part_tmp.flush()?;
222        part_tmp
223            .persist(&part_path)
224            .map_err(|error| ChkpttError::Other(error.error.to_string()))?;
225
226        parts.push(PackPartManifestEntry {
227            path: part_file_name,
228            offset: part_offset,
229            size: offset - part_offset,
230        });
231        part_index += 1;
232    }
233
234    let manifest = PackPartsManifest {
235        version: 1,
236        pack_hash: pack_hash.to_string(),
237        dat_size,
238        chunk_bytes,
239        parts,
240    };
241    let manifest_path = pack_parts_manifest_path(packs_dir, pack_hash);
242    let mut manifest_tmp = NamedTempFile::new_in(packs_dir)?;
243    serde_json::to_writer(&mut manifest_tmp, &manifest)
244        .map_err(|error| ChkpttError::Other(error.to_string()))?;
245    manifest_tmp.write_all(b"\n")?;
246    manifest_tmp.flush()?;
247    manifest_tmp
248        .persist(&manifest_path)
249        .map_err(|error| ChkpttError::Other(error.error.to_string()))?;
250
251    std::fs::remove_file(&dat_path)?;
252    Ok(())
253}
254
255enum PackData {
256    SingleFile { _dat_file: File, dat: Mmap },
257    Chunked(ChunkedPackData),
258}
259
260struct ChunkedPackData {
261    dat_size: u64,
262    _part_files: Vec<File>,
263    parts: Vec<PackPartData>,
264}
265
266struct PackPartData {
267    offset: u64,
268    size: u64,
269    dat: Mmap,
270}
271
272struct ChunkedRangeReader<'a> {
273    parts: &'a [PackPartData],
274    part_index: usize,
275    position: u64,
276    end: u64,
277}
278
279impl<'a> ChunkedRangeReader<'a> {
280    fn new(parts: &'a [PackPartData], offset: u64, size: u64) -> Self {
281        let part_index = parts.partition_point(|part| offset >= part.offset + part.size);
282        Self {
283            parts,
284            part_index,
285            position: offset,
286            end: offset + size,
287        }
288    }
289}
290
291impl Read for ChunkedRangeReader<'_> {
292    fn read(&mut self, output: &mut [u8]) -> std::io::Result<usize> {
293        if output.is_empty() || self.position >= self.end {
294            return Ok(0);
295        }
296
297        while self.part_index < self.parts.len() {
298            let part = &self.parts[self.part_index];
299            let part_end = part.offset + part.size;
300            if self.position >= part_end {
301                self.part_index += 1;
302                continue;
303            }
304            if self.position < part.offset {
305                return Err(std::io::Error::new(
306                    std::io::ErrorKind::InvalidData,
307                    "gap in chunked pack data",
308                ));
309            }
310
311            let in_part_offset = (self.position - part.offset) as usize;
312            let available_in_part = (part.size as usize).saturating_sub(in_part_offset);
313            let remaining_in_range = (self.end - self.position) as usize;
314            let to_copy = output.len().min(available_in_part).min(remaining_in_range);
315            if to_copy == 0 {
316                self.part_index += 1;
317                continue;
318            }
319
320            output[..to_copy].copy_from_slice(&part.dat[in_part_offset..in_part_offset + to_copy]);
321            self.position += to_copy as u64;
322            return Ok(to_copy);
323        }
324
325        Err(std::io::Error::new(
326            std::io::ErrorKind::UnexpectedEof,
327            "chunked pack data ended before requested range",
328        ))
329    }
330}
331
332pub struct PackReader {
333    data: PackData,
334    _idx_file: File,
335    idx: Mmap,
336    entry_count: usize,
337}
338
339impl PackReader {
340    pub fn open(packs_dir: &Path, pack_hash: &str) -> Result<Self> {
341        let dat_path = pack_dat_path(packs_dir, pack_hash);
342        let idx_path = pack_idx_path(packs_dir, pack_hash);
343
344        let data = match File::open(&dat_path) {
345            Ok(dat_file) => {
346                // SAFETY: The file handles are kept alive alongside the mmaps.
347                let dat = unsafe { Mmap::map(&dat_file)? };
348                PackData::SingleFile {
349                    _dat_file: dat_file,
350                    dat,
351                }
352            }
353            Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
354                PackData::Chunked(open_chunked_pack_data(packs_dir, pack_hash)?)
355            }
356            Err(error) => return Err(error.into()),
357        };
358        let idx_file = File::open(&idx_path)?;
359
360        // SAFETY: The file handle is kept alive alongside the mmap.
361        let idx = unsafe { Mmap::map(&idx_file)? };
362
363        // Hint kernel about expected access patterns.
364        // .dat uses WillNeed (not Sequential) because parallel restore workers
365        // read different regions of the same mmap concurrently — Sequential
366        // causes aggressive page reclaim that hurts other threads.
367        // .idx is binary-searched so Random is appropriate.
368        #[cfg(unix)]
369        {
370            match &data {
371                PackData::SingleFile { dat, .. } => {
372                    let _ = dat.advise(memmap2::Advice::WillNeed);
373                }
374                PackData::Chunked(chunked) => {
375                    for part in &chunked.parts {
376                        let _ = part.dat.advise(memmap2::Advice::WillNeed);
377                    }
378                }
379            }
380            let _ = idx.advise(memmap2::Advice::Random);
381        }
382
383        let entry_count = idx.len() / IDX_ENTRY_SIZE;
384
385        Ok(Self {
386            data,
387            _idx_file: idx_file,
388            idx,
389            entry_count,
390        })
391    }
392
393    /// Extract an IndexEntry from the mmap'd idx at a given index position.
394    fn idx_entry(&self, index: usize) -> IndexEntry {
395        let pos = index * IDX_ENTRY_SIZE;
396        let mut hash = [0u8; 16];
397        hash.copy_from_slice(&self.idx[pos..pos + 16]);
398        let offset = u64::from_le_bytes(self.idx[pos + 16..pos + 24].try_into().unwrap());
399        let size = u64::from_le_bytes(self.idx[pos + 24..pos + 32].try_into().unwrap());
400        IndexEntry { hash, offset, size }
401    }
402
403    /// Binary search for hash in the mmap'd index.
404    fn find_bytes(&self, hash_bytes: &[u8; 16]) -> Option<IndexEntry> {
405        let mut lo = 0usize;
406        let mut hi = self.entry_count;
407        while lo < hi {
408            let mid = lo + (hi - lo) / 2;
409            let mid_hash = &self.idx[mid * IDX_ENTRY_SIZE..mid * IDX_ENTRY_SIZE + 16];
410            match mid_hash.cmp(&hash_bytes[..]) {
411                std::cmp::Ordering::Equal => return Some(self.idx_entry(mid)),
412                std::cmp::Ordering::Less => lo = mid + 1,
413                std::cmp::Ordering::Greater => hi = mid,
414            }
415        }
416        None
417    }
418
419    fn find(&self, hash_hex: &str) -> Option<IndexEntry> {
420        let hash_bytes = hex_to_bytes(hash_hex).ok()?;
421        self.find_bytes(&hash_bytes)
422    }
423
424    pub fn contains_bytes(&self, hash: &[u8; 16]) -> bool {
425        self.find_bytes(hash).is_some()
426    }
427
428    fn single_file_compressed_bytes(dat: &[u8], offset: u64, size: u64) -> Option<&[u8]> {
429        let data_start = (offset as usize).checked_add(16 + 8)?; // skip hash + compressed_size
430        let data_end = data_start.checked_add(size as usize)?;
431        if data_end > dat.len() {
432            return None;
433        }
434        Some(&dat[data_start..data_end])
435    }
436
437    fn copy_at<W: Write>(&self, offset: u64, size: u64, mut writer: W) -> Result<()> {
438        match &self.data {
439            PackData::SingleFile { dat, .. } => {
440                let compressed =
441                    Self::single_file_compressed_bytes(dat, offset, size).ok_or_else(|| {
442                        ChkpttError::StoreCorrupted("Pack entry points outside pack data".into())
443                    })?;
444                copy_lz4_to_writer(compressed, &mut writer)?;
445            }
446            PackData::Chunked(chunked) => {
447                let data_start = offset.checked_add(16 + 8).ok_or_else(|| {
448                    ChkpttError::StoreCorrupted("Pack entry offset overflows".into())
449                })?;
450                let data_end = data_start.checked_add(size).ok_or_else(|| {
451                    ChkpttError::StoreCorrupted("Pack entry size overflows".into())
452                })?;
453                if data_end > chunked.dat_size {
454                    return Err(ChkpttError::StoreCorrupted(
455                        "Pack entry points outside chunked pack data".into(),
456                    ));
457                }
458                let compressed = ChunkedRangeReader::new(&chunked.parts, data_start, size);
459                copy_lz4_to_writer(compressed, &mut writer)?;
460            }
461        }
462        Ok(())
463    }
464
465    /// Read and decompress an object. Returns None if not found.
466    pub fn try_read(&self, hash_hex: &str) -> Option<Vec<u8>> {
467        let entry = self.find(hash_hex)?;
468        let mut decompressed = Vec::new();
469        self.copy_at(entry.offset, entry.size, &mut decompressed)
470            .ok()?;
471        Some(decompressed)
472    }
473
474    /// Read and decompress an object. Error if not found.
475    pub fn read(&self, hash_hex: &str) -> Result<Vec<u8>> {
476        self.try_read(hash_hex)
477            .ok_or_else(|| ChkpttError::ObjectNotFound(hash_hex.to_string()))
478    }
479}
480
481pub struct PackSet {
482    readers: Vec<PackReader>,
483    reader_indices: HashMap<String, usize>,
484}
485
486impl PackSet {
487    pub fn open_all(packs_dir: &Path) -> Result<Self> {
488        let pack_hashes = list_packs(packs_dir)?;
489        Self::open_selected(packs_dir, &pack_hashes)
490    }
491
492    pub fn open_selected(packs_dir: &Path, pack_hashes: &[String]) -> Result<Self> {
493        let mut readers = Vec::with_capacity(pack_hashes.len());
494        let mut reader_indices = HashMap::with_capacity(pack_hashes.len());
495        for pack_hash in pack_hashes {
496            let reader_index = readers.len();
497            readers.push(PackReader::open(packs_dir, pack_hash)?);
498            reader_indices.insert(pack_hash.clone(), reader_index);
499        }
500        Ok(Self {
501            readers,
502            reader_indices,
503        })
504    }
505
506    pub fn empty() -> Self {
507        Self {
508            readers: Vec::new(),
509            reader_indices: HashMap::new(),
510        }
511    }
512
513    pub fn try_read(&self, hash_hex: &str) -> Option<Vec<u8>> {
514        let location = self.locate(hash_hex)?;
515        let mut decompressed = Vec::new();
516        self.copy_to_writer(&location, &mut decompressed).ok()?;
517        Some(decompressed)
518    }
519
520    pub fn contains_bytes(&self, hash: &[u8; 16]) -> bool {
521        self.readers
522            .iter()
523            .any(|reader| reader.contains_bytes(hash))
524    }
525
526    pub fn read(&self, hash_hex: &str) -> Result<Vec<u8>> {
527        self.try_read(hash_hex)
528            .ok_or_else(|| ChkpttError::ObjectNotFound(hash_hex.to_string()))
529    }
530
531    pub(crate) fn locate(&self, hash_hex: &str) -> Option<PackLocation> {
532        self.readers
533            .iter()
534            .enumerate()
535            .find_map(|(reader_index, reader)| {
536                reader.find(hash_hex).map(|entry| PackLocation {
537                    reader_index,
538                    offset: entry.offset,
539                    size: entry.size,
540                })
541            })
542    }
543
544    pub fn locate_bytes(&self, hash: &[u8; 16]) -> Option<PackLocation> {
545        self.readers
546            .iter()
547            .enumerate()
548            .find_map(|(reader_index, reader)| {
549                reader.find_bytes(hash).map(|entry| PackLocation {
550                    reader_index,
551                    offset: entry.offset,
552                    size: entry.size,
553                })
554            })
555    }
556
557    pub(crate) fn locate_in_pack_bytes(
558        &self,
559        pack_hash: &str,
560        hash: &[u8; 16],
561    ) -> Option<PackLocation> {
562        let reader_index = *self.reader_indices.get(pack_hash)?;
563        let reader = self.readers.get(reader_index)?;
564        reader.find_bytes(hash).map(|entry| PackLocation {
565            reader_index,
566            offset: entry.offset,
567            size: entry.size,
568        })
569    }
570
571    pub(crate) fn copy_to_writer<W: Write>(
572        &self,
573        location: &PackLocation,
574        writer: W,
575    ) -> Result<()> {
576        let reader = self.readers.get(location.reader_index).ok_or_else(|| {
577            ChkpttError::StoreCorrupted(format!(
578                "Pack reader index {} is out of range",
579                location.reader_index
580            ))
581        })?;
582        reader.copy_at(location.offset, location.size, writer)
583    }
584}
585
586fn pack_dat_path(packs_dir: &Path, pack_hash: &str) -> PathBuf {
587    packs_dir.join(format!("pack-{}.dat", pack_hash))
588}
589
590fn pack_idx_path(packs_dir: &Path, pack_hash: &str) -> PathBuf {
591    packs_dir.join(format!("pack-{}.idx", pack_hash))
592}
593
594fn pack_parts_manifest_path(packs_dir: &Path, pack_hash: &str) -> PathBuf {
595    packs_dir.join(format!("pack-{}.dat.parts.json", pack_hash))
596}
597
598fn pack_part_file_name(pack_hash: &str, part_index: usize) -> String {
599    format!("pack-{}.dat.part-{:06}", pack_hash, part_index)
600}
601
602fn copy_lz4_to_writer<R: Read, W: Write>(compressed: R, mut writer: W) -> Result<()> {
603    let mut decoder = lz4_flex::frame::FrameDecoder::new(compressed);
604    std::io::copy(&mut decoder, &mut writer).map_err(|e| {
605        if e.kind() == std::io::ErrorKind::InvalidData {
606            ChkpttError::StoreCorrupted(format!("LZ4 decompression failed: {}", e))
607        } else {
608            ChkpttError::Io(e)
609        }
610    })?;
611    Ok(())
612}
613
614fn open_chunked_pack_data(packs_dir: &Path, pack_hash: &str) -> Result<ChunkedPackData> {
615    let manifest_path = pack_parts_manifest_path(packs_dir, pack_hash);
616    let manifest_file = File::open(&manifest_path)?;
617    let manifest: PackPartsManifest = serde_json::from_reader(BufReader::new(manifest_file))
618        .map_err(|error| {
619            ChkpttError::StoreCorrupted(format!(
620                "Pack parts manifest {} is invalid JSON: {}",
621                manifest_path.display(),
622                error
623            ))
624        })?;
625
626    if manifest.version != 1 {
627        return Err(ChkpttError::StoreCorrupted(format!(
628            "Unsupported pack parts manifest version {}",
629            manifest.version
630        )));
631    }
632    if manifest.pack_hash != pack_hash {
633        return Err(ChkpttError::StoreCorrupted(format!(
634            "Pack parts manifest hash {} does not match requested pack {}",
635            manifest.pack_hash, pack_hash
636        )));
637    }
638    if manifest.dat_size < HEADER_SIZE {
639        return Err(ChkpttError::StoreCorrupted(format!(
640            "Pack parts manifest data size {} is smaller than pack header",
641            manifest.dat_size
642        )));
643    }
644    if manifest.chunk_bytes == 0 {
645        return Err(ChkpttError::StoreCorrupted(
646            "Pack parts manifest has a zero chunk size".into(),
647        ));
648    }
649
650    let mut next_offset = 0u64;
651    let mut part_files = Vec::with_capacity(manifest.parts.len());
652    let mut parts = Vec::with_capacity(manifest.parts.len());
653    for part in manifest.parts {
654        if part.offset != next_offset {
655            return Err(ChkpttError::StoreCorrupted(format!(
656                "Pack parts manifest has a gap before offset {}",
657                next_offset
658            )));
659        }
660        if part.size == 0 {
661            return Err(ChkpttError::StoreCorrupted(
662                "Pack parts manifest contains an empty part".into(),
663            ));
664        }
665
666        let relative_path = Path::new(&part.path);
667        if relative_path.is_absolute() || relative_path.components().count() != 1 {
668            return Err(ChkpttError::StoreCorrupted(format!(
669                "Pack parts manifest contains an invalid part path: {}",
670                part.path
671            )));
672        }
673
674        let part_path = packs_dir.join(relative_path);
675        let part_file = File::open(&part_path)?;
676        let actual_size = part_file.metadata()?.len();
677        if actual_size != part.size {
678            return Err(ChkpttError::StoreCorrupted(format!(
679                "Pack part {} has size {}, expected {}",
680                part_path.display(),
681                actual_size,
682                part.size
683            )));
684        }
685
686        // SAFETY: The part file handles are kept alive in ChunkedPackData.
687        let dat = unsafe { Mmap::map(&part_file)? };
688        parts.push(PackPartData {
689            offset: part.offset,
690            size: part.size,
691            dat,
692        });
693        part_files.push(part_file);
694        next_offset += part.size;
695    }
696
697    if next_offset != manifest.dat_size {
698        return Err(ChkpttError::StoreCorrupted(format!(
699            "Pack parts manifest covers {} bytes, expected {}",
700            next_offset, manifest.dat_size
701        )));
702    }
703
704    Ok(ChunkedPackData {
705        dat_size: manifest.dat_size,
706        _part_files: part_files,
707        parts,
708    })
709}
710
711/// List all pack hashes in a directory.
712pub fn list_packs(packs_dir: &Path) -> Result<Vec<String>> {
713    let mut packs = BTreeSet::new();
714    let entries = match std::fs::read_dir(packs_dir) {
715        Ok(entries) => entries,
716        Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
717        Err(error) => return Err(error.into()),
718    };
719    for entry in entries {
720        let entry = entry?;
721        let name = entry.file_name();
722        let name = name.to_string_lossy();
723        if let Some(hash) = name
724            .strip_prefix("pack-")
725            .and_then(|name| name.strip_suffix(".dat"))
726        {
727            packs.insert(hash.to_owned());
728        } else if let Some(hash) = name
729            .strip_prefix("pack-")
730            .and_then(|name| name.strip_suffix(".dat.parts.json"))
731        {
732            packs.insert(hash.to_owned());
733        }
734    }
735    Ok(packs.into_iter().collect())
736}
737
738pub(crate) fn remove_pack_files(packs_dir: &Path, pack_hash: &str) -> Result<()> {
739    remove_file_if_exists(pack_dat_path(packs_dir, pack_hash))?;
740    remove_file_if_exists(pack_idx_path(packs_dir, pack_hash))?;
741    remove_file_if_exists(pack_parts_manifest_path(packs_dir, pack_hash))?;
742
743    let part_prefix = format!("pack-{}.dat.part-", pack_hash);
744    let entries = match std::fs::read_dir(packs_dir) {
745        Ok(entries) => entries,
746        Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(()),
747        Err(error) => return Err(error.into()),
748    };
749    for entry in entries {
750        let entry = entry?;
751        let name = entry.file_name();
752        if name.to_string_lossy().starts_with(&part_prefix) {
753            remove_file_if_exists(entry.path())?;
754        }
755    }
756
757    Ok(())
758}
759
760fn remove_file_if_exists(path: PathBuf) -> Result<()> {
761    match std::fs::remove_file(path) {
762        Ok(()) => Ok(()),
763        Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(()),
764        Err(error) => Err(error.into()),
765    }
766}