Skip to main content

objects/store/pack/
pack_reader.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Pack reader for extracting objects from packfiles.
3
4use std::path::Path;
5
6use bytes::Bytes;
7use heddle_format::delta::{DeltaDecoder, MAX_DELTA_OUTPUT_SIZE};
8
9use super::{
10    ObjectType, PackObjectId, PackObjectRecord, decompress_pack_payload, has_zstd_magic,
11    pack_container_spec, pack_index::PackIndex, varint, verify_container,
12};
13use crate::{
14    object::ContentHash,
15    store::{Result, StoreError},
16};
17
18const MAX_PACK_DELTA_OUTPUT_SIZE: usize = MAX_DELTA_OUTPUT_SIZE;
19const MAX_DELTA_CHAIN_DEPTH: usize = 50;
20
21/// Pack reader for extracting objects.
22///
23/// `data` is a refcounted [`Bytes`] view of the pack file. For
24/// uncompressed entries we hand back a zero-copy `Bytes::slice` into
25/// this buffer — no per-blob memcpy, no per-blob allocation. Mmap-
26/// backed `Bytes` (via [`Bytes::from_owner`] on the
27/// `memmap2::Mmap`) survives across reads without copying the
28/// whole pack into the heap.
29enum PackData<'a> {
30    Borrowed(&'a [u8]),
31    Owned(Bytes),
32}
33
34impl<'a> PackData<'a> {
35    fn as_slice(&self) -> &[u8] {
36        match self {
37            Self::Borrowed(data) => data,
38            Self::Owned(data) => data,
39        }
40    }
41
42    fn slice(&self, range: std::ops::Range<usize>) -> Bytes {
43        match self {
44            Self::Borrowed(data) => Bytes::copy_from_slice(&data[range]),
45            Self::Owned(data) => data.slice(range),
46        }
47    }
48}
49
50pub struct PackReader<'a> {
51    data: PackData<'a>,
52    index: PackIndex,
53    content_end: usize,
54}
55
56impl PackReader<'static> {
57    /// Open a pack file. mmap-backed when the pack is large enough
58    /// to benefit (the same threshold the loose-blob path uses for
59    /// its own mmap decision); read-into-heap otherwise.
60    pub fn open(pack_path: &Path, index_path: &Path) -> Result<Self> {
61        let pack_bytes = crate::store::fs::read_file_bytes_for_pack(pack_path)?;
62        let index_data = std::fs::read(index_path)?;
63        let (_, _, content_end) = verify_container(&pack_bytes, pack_container_spec())?;
64        let index = PackIndex::from_bytes(&index_data)?;
65        Ok(Self {
66            data: PackData::Owned(pack_bytes),
67            index,
68            content_end,
69        })
70    }
71
72    pub fn from_bytes(pack_data: impl Into<Bytes>, index_data: impl AsRef<[u8]>) -> Result<Self> {
73        let pack_data = pack_data.into();
74        let (_, _, content_end) = verify_container(&pack_data, pack_container_spec())?;
75        let index = PackIndex::from_bytes(index_data.as_ref())?;
76        Ok(Self {
77            data: PackData::Owned(pack_data),
78            index,
79            content_end,
80        })
81    }
82}
83
84impl<'a> PackReader<'a> {
85    pub fn from_slice(pack_data: &'a [u8], index_data: impl AsRef<[u8]>) -> Result<Self> {
86        let (_, _, content_end) = verify_container(pack_data, pack_container_spec())?;
87        let index = PackIndex::from_bytes(index_data.as_ref())?;
88        Ok(Self {
89            data: PackData::Borrowed(pack_data),
90            index,
91            content_end,
92        })
93    }
94
95    /// List all object ids in this pack.
96    pub fn list_ids(&self) -> Vec<PackObjectId> {
97        self.index.ids()
98    }
99
100    pub fn list_hashes(&self) -> Vec<ContentHash> {
101        self.list_ids()
102            .into_iter()
103            .filter_map(|id| match id {
104                PackObjectId::Hash(hash) => Some(hash),
105                PackObjectId::ChangeId(_) => None,
106            })
107            .collect()
108    }
109
110    pub fn has_object(&self, id: &PackObjectId) -> bool {
111        self.index.find(id).is_some()
112    }
113
114    /// Get an object from the pack.
115    ///
116    /// Verifies that the tagged id at the indexed offset matches
117    /// `id` before returning. A stale `.idx` file (e.g., overwritten
118    /// in place after a pack rebuild) can otherwise route a request
119    /// for hash `A` to a record physically located at hash `B`'s
120    /// offset — same shape, different content, no error signal.
121    /// This cheap 32-byte id comparison catches that without paying
122    /// a full content-hash recompute on every read; corruption
123    /// strictly *inside* the record body is a separate failure mode
124    /// surfaced via the consumer-side hash verify (see
125    /// `FsStore::loose_blob_path` for the blob equivalent).
126    pub fn get_object(&self, id: &PackObjectId) -> Result<Option<(ObjectType, Vec<u8>)>> {
127        let offset = match self.index.find(id) {
128            Some(offset) => checked_index_offset(offset)?,
129            None => return Ok(None),
130        };
131
132        let record = self.read_record_at_depth(offset, 0)?;
133        verify_record_id_matches(id, &record.id)?;
134        Ok(Some((record.obj_type, record.data)))
135    }
136
137    pub fn get_hashed_object(&self, hash: &ContentHash) -> Result<Option<(ObjectType, Vec<u8>)>> {
138        self.get_object(&PackObjectId::Hash(*hash))
139    }
140
141    /// Zero-copy fast path: when the entry is non-delta and stored
142    /// uncompressed, returns `Bytes::slice` into the pack's
143    /// (mmap-backed) buffer — no allocation, no memcpy. Compressed
144    /// or delta entries fall back to `get_object` and wrap the
145    /// resulting `Vec<u8>` in a `Bytes` (one Arc, no body copy).
146    ///
147    /// Use this from the hot read path. The 10 MB benchmark gap
148    /// between the mount and vanilla FS at the 1 MB+ tier is the
149    /// per-blob memcpy this method eliminates.
150    pub fn get_object_bytes(&self, id: &PackObjectId) -> Result<Option<(ObjectType, Bytes)>> {
151        let Some(offset) = self.index.find(id) else {
152            return Ok(None);
153        };
154        let offset = checked_index_offset(offset)?;
155        if offset >= self.content_end {
156            return Err(StoreError::InvalidObject(
157                "Entry offset out of bounds".to_string(),
158            ));
159        }
160
161        // Verify the tagged id at the indexed offset matches the
162        // requested id — guards against stale-index misrouting (see
163        // `get_object` for the long-form rationale). 32-byte
164        // compare; cheaper than the size+varint decode that follows.
165        let (record_id, id_len) = PackObjectId::decode_tagged(self.content_from(offset)?)?;
166        verify_record_id_matches(id, &record_id)?;
167        let header_start = checked_index_add(offset, id_len, "record header start")?;
168        let (obj_type, uncompressed_size, type_len) =
169            varint::decode_type_and_size(self.content_from(header_start)?).ok_or_else(|| {
170                StoreError::InvalidObject("Truncated type+size varint".to_string())
171            })?;
172        let uncompressed_size = checked_decoded_size("uncompressed_size", uncompressed_size)?;
173        let varint_start = checked_index_add(header_start, type_len, "compressed_size start")?;
174        let (compressed_size, comp_len) = varint::decode_varint(self.content_from(varint_start)?)
175            .ok_or_else(truncated_compressed_size_varint)?;
176        let compressed_size = checked_decoded_size("compressed_size", compressed_size)?;
177
178        // Fast path: non-delta entry stored uncompressed. The most
179        // common shape for snapshot-time packs (the builder skips
180        // the delta search for unrelated blobs).
181        if obj_type != ObjectType::Delta && compressed_size == uncompressed_size {
182            let data_start = checked_index_add(varint_start, comp_len, "entry data start")?;
183            let data_end = checked_data_end(data_start, compressed_size, self.content_end)?;
184            return Ok(Some((obj_type, self.data.slice(data_start..data_end))));
185        }
186
187        // Slow path: defer to the full record reader (it handles
188        // decompression + delta chains) and Bytes-wrap the Vec.
189        // Bytes::from(Vec) is a single Arc allocation, no body copy.
190        let record = self.read_record_at_depth(offset, 0)?;
191        Ok(Some((record.obj_type, Bytes::from(record.data))))
192    }
193
194    pub fn get_hashed_object_bytes(
195        &self,
196        hash: &ContentHash,
197    ) -> Result<Option<(ObjectType, Bytes)>> {
198        self.get_object_bytes(&PackObjectId::Hash(*hash))
199    }
200
201    /// Read just the type+size header for an object without
202    /// decompressing its payload. Returns `Ok(None)` when the object
203    /// isn't in this pack.
204    ///
205    /// For non-delta entries this is one varint decode at the indexed
206    /// offset — much cheaper than `get_object`. Delta entries fall
207    /// back to a full read because their *resolved* size requires
208    /// chasing the base; in practice deltas are rare in the directory
209    /// listing hot path so the fallback is acceptable.
210    pub fn get_hashed_object_size(&self, hash: &ContentHash) -> Result<Option<u64>> {
211        let id = PackObjectId::Hash(*hash);
212        let Some(offset) = self.index.find(&id) else {
213            return Ok(None);
214        };
215        let offset = checked_index_offset(offset)?;
216        if offset >= self.content_end {
217            return Err(StoreError::InvalidObject(
218                "Entry offset out of bounds".to_string(),
219            ));
220        }
221        let (record_id, id_len) = PackObjectId::decode_tagged(self.content_from(offset)?)?;
222        verify_record_id_matches(&id, &record_id)?;
223        let header_start = checked_index_add(offset, id_len, "record header start")?;
224        let (obj_type, uncompressed_size, _type_len) = super::varint::decode_type_and_size(
225            self.content_from(header_start)?,
226        )
227        .ok_or_else(|| StoreError::InvalidObject("Truncated type+size varint".to_string()))?;
228        if obj_type == ObjectType::Delta {
229            // Delta entries record the *resolved* output size in the
230            // type+size varint already (see `read_record_at_depth`'s
231            // size-mismatch check), so we can still return without
232            // decompressing the payload.
233            return Ok(Some(uncompressed_size));
234        }
235        Ok(Some(uncompressed_size))
236    }
237
238    fn read_record_at_depth(&self, offset: usize, depth: usize) -> Result<PackObjectRecord> {
239        if offset >= self.content_end {
240            return Err(StoreError::InvalidObject(
241                "Entry offset out of bounds".to_string(),
242            ));
243        }
244
245        let (id, id_len) = PackObjectId::decode_tagged(self.content_from(offset)?)?;
246        let header_start = checked_index_add(offset, id_len, "record header start")?;
247
248        let (obj_type, uncompressed_size, type_len) =
249            varint::decode_type_and_size(self.content_from(header_start)?).ok_or_else(|| {
250                StoreError::InvalidObject("Truncated type+size varint".to_string())
251            })?;
252        let uncompressed_size = checked_decoded_size("uncompressed_size", uncompressed_size)?;
253
254        let varint_start = checked_index_add(header_start, type_len, "compressed_size start")?;
255        let (compressed_size, comp_len) = varint::decode_varint(self.content_from(varint_start)?)
256            .ok_or_else(truncated_compressed_size_varint)?;
257        let compressed_size = checked_decoded_size("compressed_size", compressed_size)?;
258
259        let mut data_start = checked_index_add(varint_start, comp_len, "entry data start")?;
260
261        // Delta entries carry a tagged base id in pack v2.
262        let base_id = if obj_type == ObjectType::Delta {
263            let (base_id, base_len) = PackObjectId::decode_tagged(self.content_from(data_start)?)?;
264            data_start = checked_index_add(data_start, base_len, "delta data start")?;
265            Some(base_id)
266        } else {
267            None
268        };
269
270        let data_end = checked_data_end(data_start, compressed_size, self.content_end)?;
271
272        let stored_data = &self.data.as_slice()[data_start..data_end];
273
274        // Raw zstd (no wrapper). For non-delta entries, decompress
275        // if sizes differ. For delta entries, the stored data IS the delta
276        // payload (possibly zstd-compressed); check for zstd magic.
277        let decompressed = if obj_type == ObjectType::Delta {
278            if has_zstd_magic(stored_data) {
279                decompress_pack_payload(stored_data, 0)?
280            } else {
281                stored_data.to_vec()
282            }
283        } else if compressed_size != uncompressed_size {
284            decompress_pack_payload(stored_data, uncompressed_size)?
285        } else {
286            stored_data.to_vec()
287        };
288
289        let (resolved_type, final_data) = if obj_type == ObjectType::Delta {
290            self.read_delta_record(base_id, &decompressed, uncompressed_size, depth)?
291        } else {
292            (obj_type, decompressed)
293        };
294
295        if final_data.len() != uncompressed_size {
296            return Err(StoreError::InvalidObject(format!(
297                "Size mismatch: expected {}, got {}",
298                uncompressed_size,
299                final_data.len()
300            )));
301        }
302
303        Ok(PackObjectRecord {
304            id,
305            obj_type: resolved_type,
306            data: final_data,
307            delta_base: None,
308            path_hint: None,
309        })
310    }
311
312    fn read_delta_record(
313        &self,
314        base_id: Option<PackObjectId>,
315        delta: &[u8],
316        uncompressed_size: usize,
317        depth: usize,
318    ) -> Result<(ObjectType, Vec<u8>)> {
319        if depth > MAX_DELTA_CHAIN_DEPTH {
320            return Err(StoreError::InvalidObject(format!(
321                "Delta chain depth {} exceeds max {}",
322                depth, MAX_DELTA_CHAIN_DEPTH
323            )));
324        }
325
326        if uncompressed_size > MAX_PACK_DELTA_OUTPUT_SIZE {
327            return Err(StoreError::InvalidObject(format!(
328                "Delta output size {} exceeds max {}",
329                uncompressed_size, MAX_PACK_DELTA_OUTPUT_SIZE
330            )));
331        }
332
333        let base_hash = Self::require_delta_base_hash(base_id)?;
334        let base_offset = self
335            .index
336            .find(&PackObjectId::Hash(base_hash))
337            .ok_or_else(|| StoreError::NotFound(base_hash.to_string()))?;
338        let base_offset = checked_index_offset(base_offset)?;
339        let base_record = self.read_record_at_depth(base_offset, depth + 1)?;
340        let base_type = base_record.obj_type;
341        let base_data = base_record.data;
342
343        let decoded = DeltaDecoder::decode(&base_data, delta, uncompressed_size)
344            .map_err(|error| StoreError::InvalidObject(format!("Delta decode failed: {error}")))?;
345
346        Ok((base_type, decoded))
347    }
348
349    fn require_delta_base_hash(base_id: Option<PackObjectId>) -> Result<ContentHash> {
350        match base_id {
351            Some(PackObjectId::Hash(hash)) => Ok(hash),
352            Some(PackObjectId::ChangeId(_)) => Err(StoreError::InvalidObject(
353                "pack delta base must be hash-backed content".into(),
354            )),
355            None => Err(StoreError::InvalidObject(
356                "pack object type is Delta but base hash is missing".into(),
357            )),
358        }
359    }
360
361    fn content_from(&self, offset: usize) -> Result<&[u8]> {
362        if offset > self.content_end {
363            return Err(StoreError::InvalidObject(
364                "Entry header out of bounds".to_string(),
365            ));
366        }
367        Ok(&self.data.as_slice()[offset..self.content_end])
368    }
369}
370
371fn checked_index_offset(offset: u64) -> Result<usize> {
372    usize::try_from(offset)
373        .map_err(|_| StoreError::InvalidObject("Entry offset exceeds platform limits".to_string()))
374}
375
376fn checked_decoded_size(field: &str, size: u64) -> Result<usize> {
377    let size = usize::try_from(size).map_err(|_| {
378        StoreError::InvalidObject(format!("Decoded {field} exceeds platform limits"))
379    })?;
380    if field == "uncompressed_size" && size > super::shared::MAX_PACK_OBJECT_OUTPUT_SIZE {
381        return Err(StoreError::InvalidObject(format!(
382            "Pack object output size {size} exceeds max {}",
383            super::shared::MAX_PACK_OBJECT_OUTPUT_SIZE
384        )));
385    }
386    Ok(size)
387}
388
389fn checked_index_add(start: usize, len: usize, field: &str) -> Result<usize> {
390    start.checked_add(len).ok_or_else(|| {
391        StoreError::InvalidObject(format!("{field} offset overflows platform limits"))
392    })
393}
394
395fn checked_data_end(
396    data_start: usize,
397    compressed_size: usize,
398    content_end: usize,
399) -> Result<usize> {
400    let data_end = data_start.checked_add(compressed_size).ok_or_else(|| {
401        StoreError::InvalidObject("Entry data range overflows platform limits".to_string())
402    })?;
403    if data_end > content_end {
404        return Err(StoreError::InvalidObject(
405            "Entry data out of bounds".to_string(),
406        ));
407    }
408    Ok(data_end)
409}
410
411fn truncated_compressed_size_varint() -> StoreError {
412    StoreError::InvalidObject("Truncated compressed_size varint".to_string())
413}
414
415/// Reject a record whose tagged id at the indexed offset doesn't
416/// match the id the caller asked for. The pack format stores its
417/// records `[tagged_id, type+size, compressed_size, payload]` so the
418/// tagged id is the cheapest available authenticator of "we landed
419/// on the right record"; a stale or hand-edited `.idx` that points
420/// at the *wrong* record produces a mismatch here and we surface it
421/// as a real error instead of silently routing the caller to whatever
422/// bytes happened to be at the bad offset.
423fn verify_record_id_matches(requested: &PackObjectId, found: &PackObjectId) -> Result<()> {
424    if requested == found {
425        return Ok(());
426    }
427    Err(StoreError::InvalidObject(format!(
428        "pack index routed lookup for {requested:?} to record tagged {found:?} \
429         — index is stale or corrupt; the loose-store path will re-promote on \
430         the next read"
431    )))
432}
433
434#[cfg(test)]
435mod tests {
436    use super::{PackObjectId, PackReader, verify_record_id_matches};
437    use crate::{object::ContentHash, store::StoreError};
438
439    #[test]
440    fn test_require_delta_base_hash_rejects_missing_hash() {
441        let error =
442            PackReader::require_delta_base_hash(None).expect_err("missing hash should fail");
443
444        assert!(
445            matches!(error, StoreError::InvalidObject(message) if message == "pack object type is Delta but base hash is missing")
446        );
447    }
448
449    #[test]
450    fn verify_record_id_matches_accepts_identical_ids() {
451        let id = PackObjectId::Hash(ContentHash::from_bytes([7u8; 32]));
452        verify_record_id_matches(&id, &id).expect("matching ids must verify");
453    }
454
455    #[test]
456    fn verify_record_id_matches_rejects_mismatched_ids() {
457        let asked = PackObjectId::Hash(ContentHash::from_bytes([7u8; 32]));
458        let found = PackObjectId::Hash(ContentHash::from_bytes([8u8; 32]));
459        let error = verify_record_id_matches(&asked, &found)
460            .expect_err("mismatched record id must error rather than silently route");
461        assert!(
462            matches!(&error, StoreError::InvalidObject(message) if message.contains("stale or corrupt")),
463            "stale-index mismatch must surface as InvalidObject with the diagnostic phrase, got: {error:?}",
464        );
465    }
466}