Skip to main content

objects/store/pack/
pack_reader.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Pack reader for extracting objects from packfiles.
3
4use std::path::Path;
5
6use bytes::Bytes;
7
8use super::{
9    ObjectType, PackObjectId, PackObjectRecord, decompress_pack_payload, has_zstd_magic,
10    pack_container_spec, pack_index::PackIndex, varint, verify_container,
11};
12use crate::{
13    object::ContentHash,
14    store::{Result, StoreError},
15};
16
17const MAX_PACK_DELTA_OUTPUT_SIZE: usize = crate::delta::MAX_DELTA_OUTPUT_SIZE;
18const MAX_DELTA_CHAIN_DEPTH: usize = 50;
19
20/// Pack reader for extracting objects.
21///
22/// `data` is a refcounted [`Bytes`] view of the pack file. For
23/// uncompressed entries we hand back a zero-copy `Bytes::slice` into
24/// this buffer — no per-blob memcpy, no per-blob allocation. Mmap-
25/// backed `Bytes` (via [`Bytes::from_owner`] on the
26/// `memmap2::Mmap`) survives across reads without copying the
27/// whole pack into the heap.
28enum PackData<'a> {
29    Borrowed(&'a [u8]),
30    Owned(Bytes),
31}
32
33impl<'a> PackData<'a> {
34    fn as_slice(&self) -> &[u8] {
35        match self {
36            Self::Borrowed(data) => data,
37            Self::Owned(data) => data,
38        }
39    }
40
41    fn slice(&self, range: std::ops::Range<usize>) -> Bytes {
42        match self {
43            Self::Borrowed(data) => Bytes::copy_from_slice(&data[range]),
44            Self::Owned(data) => data.slice(range),
45        }
46    }
47}
48
49pub struct PackReader<'a> {
50    data: PackData<'a>,
51    index: PackIndex,
52    content_end: usize,
53}
54
55impl PackReader<'static> {
56    /// Open a pack file. mmap-backed when the pack is large enough
57    /// to benefit (the same threshold the loose-blob path uses for
58    /// its own mmap decision); read-into-heap otherwise.
59    pub fn open(pack_path: &Path, index_path: &Path) -> Result<Self> {
60        let pack_bytes = crate::store::fs::read_file_bytes_for_pack(pack_path)?;
61        let index_data = std::fs::read(index_path)?;
62        let (_, _, content_end) = verify_container(&pack_bytes, pack_container_spec())?;
63        let index = PackIndex::from_bytes(&index_data)?;
64        Ok(Self {
65            data: PackData::Owned(pack_bytes),
66            index,
67            content_end,
68        })
69    }
70
71    pub fn from_bytes(pack_data: impl Into<Bytes>, index_data: impl AsRef<[u8]>) -> Result<Self> {
72        let pack_data = pack_data.into();
73        let (_, _, content_end) = verify_container(&pack_data, pack_container_spec())?;
74        let index = PackIndex::from_bytes(index_data.as_ref())?;
75        Ok(Self {
76            data: PackData::Owned(pack_data),
77            index,
78            content_end,
79        })
80    }
81}
82
83impl<'a> PackReader<'a> {
84    pub fn from_slice(pack_data: &'a [u8], index_data: impl AsRef<[u8]>) -> Result<Self> {
85        let (_, _, content_end) = verify_container(pack_data, pack_container_spec())?;
86        let index = PackIndex::from_bytes(index_data.as_ref())?;
87        Ok(Self {
88            data: PackData::Borrowed(pack_data),
89            index,
90            content_end,
91        })
92    }
93
94    /// List all object ids in this pack.
95    pub fn list_ids(&self) -> Vec<PackObjectId> {
96        self.index.ids()
97    }
98
99    pub fn list_hashes(&self) -> Vec<ContentHash> {
100        self.list_ids()
101            .into_iter()
102            .filter_map(|id| match id {
103                PackObjectId::Hash(hash) => Some(hash),
104                PackObjectId::ChangeId(_) => None,
105            })
106            .collect()
107    }
108
109    pub fn has_object(&self, id: &PackObjectId) -> bool {
110        self.index.find(id).is_some()
111    }
112
113    /// Get an object from the pack.
114    ///
115    /// Verifies that the tagged id at the indexed offset matches
116    /// `id` before returning. A stale `.idx` file (e.g., overwritten
117    /// in place after a pack rebuild) can otherwise route a request
118    /// for hash `A` to a record physically located at hash `B`'s
119    /// offset — same shape, different content, no error signal.
120    /// This cheap 32-byte id comparison catches that without paying
121    /// a full content-hash recompute on every read; corruption
122    /// strictly *inside* the record body is a separate failure mode
123    /// surfaced via the consumer-side hash verify (see
124    /// `FsStore::loose_blob_path` for the blob equivalent).
125    pub fn get_object(&self, id: &PackObjectId) -> Result<Option<(ObjectType, Vec<u8>)>> {
126        let offset = match self.index.find(id) {
127            Some(offset) => checked_index_offset(offset)?,
128            None => return Ok(None),
129        };
130
131        let record = self.read_record_at_depth(offset, 0)?;
132        verify_record_id_matches(id, &record.id)?;
133        Ok(Some((record.obj_type, record.data)))
134    }
135
136    pub fn get_hashed_object(&self, hash: &ContentHash) -> Result<Option<(ObjectType, Vec<u8>)>> {
137        self.get_object(&PackObjectId::Hash(*hash))
138    }
139
140    /// Zero-copy fast path: when the entry is non-delta and stored
141    /// uncompressed, returns `Bytes::slice` into the pack's
142    /// (mmap-backed) buffer — no allocation, no memcpy. Compressed
143    /// or delta entries fall back to `get_object` and wrap the
144    /// resulting `Vec<u8>` in a `Bytes` (one Arc, no body copy).
145    ///
146    /// Use this from the hot read path. The 10 MB benchmark gap
147    /// between the mount and vanilla FS at the 1 MB+ tier is the
148    /// per-blob memcpy this method eliminates.
149    pub fn get_object_bytes(&self, id: &PackObjectId) -> Result<Option<(ObjectType, Bytes)>> {
150        let Some(offset) = self.index.find(id) else {
151            return Ok(None);
152        };
153        let offset = checked_index_offset(offset)?;
154        if offset >= self.content_end {
155            return Err(StoreError::InvalidObject(
156                "Entry offset out of bounds".to_string(),
157            ));
158        }
159
160        // Verify the tagged id at the indexed offset matches the
161        // requested id — guards against stale-index misrouting (see
162        // `get_object` for the long-form rationale). 32-byte
163        // compare; cheaper than the size+varint decode that follows.
164        let (record_id, id_len) = PackObjectId::decode_tagged(self.content_from(offset)?)?;
165        verify_record_id_matches(id, &record_id)?;
166        let header_start = checked_index_add(offset, id_len, "record header start")?;
167        let (obj_type, uncompressed_size, type_len) =
168            varint::decode_type_and_size(self.content_from(header_start)?).ok_or_else(|| {
169                StoreError::InvalidObject("Truncated type+size varint".to_string())
170            })?;
171        let uncompressed_size = checked_decoded_size("uncompressed_size", uncompressed_size)?;
172        let varint_start = checked_index_add(header_start, type_len, "compressed_size start")?;
173        let (compressed_size, comp_len) = varint::decode_varint(self.content_from(varint_start)?)
174            .ok_or_else(truncated_compressed_size_varint)?;
175        let compressed_size = checked_decoded_size("compressed_size", compressed_size)?;
176
177        // Fast path: non-delta entry stored uncompressed. The most
178        // common shape for snapshot-time packs (the builder skips
179        // the delta search for unrelated blobs).
180        if obj_type != ObjectType::Delta && compressed_size == uncompressed_size {
181            let data_start = checked_index_add(varint_start, comp_len, "entry data start")?;
182            let data_end = checked_data_end(data_start, compressed_size, self.content_end)?;
183            return Ok(Some((obj_type, self.data.slice(data_start..data_end))));
184        }
185
186        // Slow path: defer to the full record reader (it handles
187        // decompression + delta chains) and Bytes-wrap the Vec.
188        // Bytes::from(Vec) is a single Arc allocation, no body copy.
189        let record = self.read_record_at_depth(offset, 0)?;
190        Ok(Some((record.obj_type, Bytes::from(record.data))))
191    }
192
193    pub fn get_hashed_object_bytes(
194        &self,
195        hash: &ContentHash,
196    ) -> Result<Option<(ObjectType, Bytes)>> {
197        self.get_object_bytes(&PackObjectId::Hash(*hash))
198    }
199
200    /// Read just the type+size header for an object without
201    /// decompressing its payload. Returns `Ok(None)` when the object
202    /// isn't in this pack.
203    ///
204    /// For non-delta entries this is one varint decode at the indexed
205    /// offset — much cheaper than `get_object`. Delta entries fall
206    /// back to a full read because their *resolved* size requires
207    /// chasing the base; in practice deltas are rare in the directory
208    /// listing hot path so the fallback is acceptable.
209    pub fn get_hashed_object_size(&self, hash: &ContentHash) -> Result<Option<u64>> {
210        let id = PackObjectId::Hash(*hash);
211        let Some(offset) = self.index.find(&id) else {
212            return Ok(None);
213        };
214        let offset = checked_index_offset(offset)?;
215        if offset >= self.content_end {
216            return Err(StoreError::InvalidObject(
217                "Entry offset out of bounds".to_string(),
218            ));
219        }
220        let (record_id, id_len) = PackObjectId::decode_tagged(self.content_from(offset)?)?;
221        verify_record_id_matches(&id, &record_id)?;
222        let header_start = checked_index_add(offset, id_len, "record header start")?;
223        let (obj_type, uncompressed_size, _type_len) = super::varint::decode_type_and_size(
224            self.content_from(header_start)?,
225        )
226        .ok_or_else(|| StoreError::InvalidObject("Truncated type+size varint".to_string()))?;
227        if obj_type == ObjectType::Delta {
228            // Delta entries record the *resolved* output size in the
229            // type+size varint already (see `read_record_at_depth`'s
230            // size-mismatch check), so we can still return without
231            // decompressing the payload.
232            return Ok(Some(uncompressed_size));
233        }
234        Ok(Some(uncompressed_size))
235    }
236
237    fn read_record_at_depth(&self, offset: usize, depth: usize) -> Result<PackObjectRecord> {
238        if offset >= self.content_end {
239            return Err(StoreError::InvalidObject(
240                "Entry offset out of bounds".to_string(),
241            ));
242        }
243
244        let (id, id_len) = PackObjectId::decode_tagged(self.content_from(offset)?)?;
245        let header_start = checked_index_add(offset, id_len, "record header start")?;
246
247        let (obj_type, uncompressed_size, type_len) =
248            varint::decode_type_and_size(self.content_from(header_start)?).ok_or_else(|| {
249                StoreError::InvalidObject("Truncated type+size varint".to_string())
250            })?;
251        let uncompressed_size = checked_decoded_size("uncompressed_size", uncompressed_size)?;
252
253        let varint_start = checked_index_add(header_start, type_len, "compressed_size start")?;
254        let (compressed_size, comp_len) = varint::decode_varint(self.content_from(varint_start)?)
255            .ok_or_else(truncated_compressed_size_varint)?;
256        let compressed_size = checked_decoded_size("compressed_size", compressed_size)?;
257
258        let mut data_start = checked_index_add(varint_start, comp_len, "entry data start")?;
259
260        // Delta entries carry a tagged base id in pack v2.
261        let base_id = if obj_type == ObjectType::Delta {
262            let (base_id, base_len) = PackObjectId::decode_tagged(self.content_from(data_start)?)?;
263            data_start = checked_index_add(data_start, base_len, "delta data start")?;
264            Some(base_id)
265        } else {
266            None
267        };
268
269        let data_end = checked_data_end(data_start, compressed_size, self.content_end)?;
270
271        let stored_data = &self.data.as_slice()[data_start..data_end];
272
273        // Raw zstd (no wrapper). For non-delta entries, decompress
274        // if sizes differ. For delta entries, the stored data IS the delta
275        // payload (possibly zstd-compressed); check for zstd magic.
276        let decompressed = if obj_type == ObjectType::Delta {
277            if has_zstd_magic(stored_data) {
278                decompress_pack_payload(stored_data, 0)?
279            } else {
280                stored_data.to_vec()
281            }
282        } else if compressed_size != uncompressed_size {
283            decompress_pack_payload(stored_data, uncompressed_size)?
284        } else {
285            stored_data.to_vec()
286        };
287
288        let (resolved_type, final_data) = if obj_type == ObjectType::Delta {
289            self.read_delta_record(base_id, &decompressed, uncompressed_size, depth)?
290        } else {
291            (obj_type, decompressed)
292        };
293
294        if final_data.len() != uncompressed_size {
295            return Err(StoreError::InvalidObject(format!(
296                "Size mismatch: expected {}, got {}",
297                uncompressed_size,
298                final_data.len()
299            )));
300        }
301
302        Ok(PackObjectRecord {
303            id,
304            obj_type: resolved_type,
305            data: final_data,
306            delta_base: None,
307            path_hint: None,
308        })
309    }
310
311    fn read_delta_record(
312        &self,
313        base_id: Option<PackObjectId>,
314        delta: &[u8],
315        uncompressed_size: usize,
316        depth: usize,
317    ) -> Result<(ObjectType, Vec<u8>)> {
318        if depth > MAX_DELTA_CHAIN_DEPTH {
319            return Err(StoreError::InvalidObject(format!(
320                "Delta chain depth {} exceeds max {}",
321                depth, MAX_DELTA_CHAIN_DEPTH
322            )));
323        }
324
325        if uncompressed_size > MAX_PACK_DELTA_OUTPUT_SIZE {
326            return Err(StoreError::InvalidObject(format!(
327                "Delta output size {} exceeds max {}",
328                uncompressed_size, MAX_PACK_DELTA_OUTPUT_SIZE
329            )));
330        }
331
332        let base_hash = Self::require_delta_base_hash(base_id)?;
333        let base_offset = self
334            .index
335            .find(&PackObjectId::Hash(base_hash))
336            .ok_or_else(|| StoreError::NotFound(base_hash.to_string()))?;
337        let base_offset = checked_index_offset(base_offset)?;
338        let base_record = self.read_record_at_depth(base_offset, depth + 1)?;
339        let base_type = base_record.obj_type;
340        let base_data = base_record.data;
341
342        let decoded = crate::delta::DeltaDecoder::decode(&base_data, delta, uncompressed_size)
343            .map_err(|error| StoreError::InvalidObject(format!("Delta decode failed: {error}")))?;
344
345        Ok((base_type, decoded))
346    }
347
348    fn require_delta_base_hash(base_id: Option<PackObjectId>) -> Result<ContentHash> {
349        match base_id {
350            Some(PackObjectId::Hash(hash)) => Ok(hash),
351            Some(PackObjectId::ChangeId(_)) => Err(StoreError::InvalidObject(
352                "pack delta base must be hash-backed content".into(),
353            )),
354            None => Err(StoreError::InvalidObject(
355                "pack object type is Delta but base hash is missing".into(),
356            )),
357        }
358    }
359
360    fn content_from(&self, offset: usize) -> Result<&[u8]> {
361        if offset > self.content_end {
362            return Err(StoreError::InvalidObject(
363                "Entry header out of bounds".to_string(),
364            ));
365        }
366        Ok(&self.data.as_slice()[offset..self.content_end])
367    }
368}
369
370fn checked_index_offset(offset: u64) -> Result<usize> {
371    usize::try_from(offset)
372        .map_err(|_| StoreError::InvalidObject("Entry offset exceeds platform limits".to_string()))
373}
374
375fn checked_decoded_size(field: &str, size: u64) -> Result<usize> {
376    let size = usize::try_from(size).map_err(|_| {
377        StoreError::InvalidObject(format!("Decoded {field} exceeds platform limits"))
378    })?;
379    if field == "uncompressed_size" && size > super::shared::MAX_PACK_OBJECT_OUTPUT_SIZE {
380        return Err(StoreError::InvalidObject(format!(
381            "Pack object output size {size} exceeds max {}",
382            super::shared::MAX_PACK_OBJECT_OUTPUT_SIZE
383        )));
384    }
385    Ok(size)
386}
387
388fn checked_index_add(start: usize, len: usize, field: &str) -> Result<usize> {
389    start.checked_add(len).ok_or_else(|| {
390        StoreError::InvalidObject(format!("{field} offset overflows platform limits"))
391    })
392}
393
394fn checked_data_end(
395    data_start: usize,
396    compressed_size: usize,
397    content_end: usize,
398) -> Result<usize> {
399    let data_end = data_start.checked_add(compressed_size).ok_or_else(|| {
400        StoreError::InvalidObject("Entry data range overflows platform limits".to_string())
401    })?;
402    if data_end > content_end {
403        return Err(StoreError::InvalidObject(
404            "Entry data out of bounds".to_string(),
405        ));
406    }
407    Ok(data_end)
408}
409
410fn truncated_compressed_size_varint() -> StoreError {
411    StoreError::InvalidObject("Truncated compressed_size varint".to_string())
412}
413
414/// Reject a record whose tagged id at the indexed offset doesn't
415/// match the id the caller asked for. The pack format stores its
416/// records `[tagged_id, type+size, compressed_size, payload]` so the
417/// tagged id is the cheapest available authenticator of "we landed
418/// on the right record"; a stale or hand-edited `.idx` that points
419/// at the *wrong* record produces a mismatch here and we surface it
420/// as a real error instead of silently routing the caller to whatever
421/// bytes happened to be at the bad offset.
422fn verify_record_id_matches(requested: &PackObjectId, found: &PackObjectId) -> Result<()> {
423    if requested == found {
424        return Ok(());
425    }
426    Err(StoreError::InvalidObject(format!(
427        "pack index routed lookup for {requested:?} to record tagged {found:?} \
428         — index is stale or corrupt; the loose-store path will re-promote on \
429         the next read"
430    )))
431}
432
433#[cfg(test)]
434mod tests {
435    use super::{PackObjectId, PackReader, verify_record_id_matches};
436    use crate::{object::ContentHash, store::StoreError};
437
438    #[test]
439    fn test_require_delta_base_hash_rejects_missing_hash() {
440        let error =
441            PackReader::require_delta_base_hash(None).expect_err("missing hash should fail");
442
443        assert!(
444            matches!(error, StoreError::InvalidObject(message) if message == "pack object type is Delta but base hash is missing")
445        );
446    }
447
448    #[test]
449    fn verify_record_id_matches_accepts_identical_ids() {
450        let id = PackObjectId::Hash(ContentHash::from_bytes([7u8; 32]));
451        verify_record_id_matches(&id, &id).expect("matching ids must verify");
452    }
453
454    #[test]
455    fn verify_record_id_matches_rejects_mismatched_ids() {
456        let asked = PackObjectId::Hash(ContentHash::from_bytes([7u8; 32]));
457        let found = PackObjectId::Hash(ContentHash::from_bytes([8u8; 32]));
458        let error = verify_record_id_matches(&asked, &found)
459            .expect_err("mismatched record id must error rather than silently route");
460        assert!(
461            matches!(&error, StoreError::InvalidObject(message) if message.contains("stale or corrupt")),
462            "stale-index mismatch must surface as InvalidObject with the diagnostic phrase, got: {error:?}",
463        );
464    }
465}