use std::collections::{BTreeMap, HashMap, VecDeque};
use std::sync::{Arc, Mutex};
use bytes::Bytes;
use gix_pack::data::entry::Header as EntryHeader;
use tracing::{debug, warn};
use crate::git::RefName;
use crate::object_store::{ObjectStore, ObjectStoreError};
use crate::remote::Remote;
use crate::url::StorageEngine;
use super::PackchainError;
use super::keys::{pack_idx_key, pack_key};
use super::manifest::{load_chain, load_path_index};
use super::retry::{
PACK_MISSING_MAX_RETRIES, PACK_MISSING_RETRY_BACKOFFS, chain_references_pack_key,
};
use super::schema::{ChainManifest, ChainSegment, PathNode, Sha40};
pub const MAX_DELTA_DEPTH: u32 = 50;
pub const DEFAULT_CACHE_CAPACITY_BYTES: u64 = 64 * 1024 * 1024;
const MAX_RANGE_BYTES: u64 = 1024 * 1024 * 1024;
const MAX_DECOMPRESSED_BYTES: u64 = 1024 * 1024 * 1024;
const MAX_RANGE_EXPANSIONS: u32 = 6;
pub struct PackIndexCache {
inner: Mutex<CacheInner>,
capacity_bytes: u64,
}
struct CacheInner {
map: HashMap<CacheKey, Arc<CachedIndex>>,
order: VecDeque<CacheKey>,
total_bytes: u64,
}
type CacheKey = (String, Sha40);
struct CachedIndex {
file: gix_pack::index::File<Vec<u8>>,
sorted_offsets: Vec<u64>,
bytes: u64,
}
impl PackIndexCache {
#[must_use]
pub fn new(capacity_bytes: u64) -> Self {
Self {
inner: Mutex::new(CacheInner {
map: HashMap::new(),
order: VecDeque::new(),
total_bytes: 0,
}),
capacity_bytes,
}
}
#[must_use]
pub fn resident_bytes(&self) -> u64 {
self.lock().total_bytes
}
#[must_use]
pub fn len(&self) -> usize {
self.lock().map.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
fn lock(&self) -> std::sync::MutexGuard<'_, CacheInner> {
self.inner.lock().expect("cache mutex poisoned")
}
fn get(&self, key: &CacheKey) -> Option<Arc<CachedIndex>> {
let mut inner = self.lock();
let entry = inner.map.get(key).cloned()?;
remove_from_order(&mut inner.order, key);
inner.order.push_back(key.clone());
Some(entry)
}
fn insert(&self, key: CacheKey, value: Arc<CachedIndex>) {
let mut inner = self.lock();
let bytes = value.bytes;
if let Some(prev) = inner.map.remove(&key) {
inner.total_bytes = inner.total_bytes.saturating_sub(prev.bytes);
remove_from_order(&mut inner.order, &key);
}
if bytes > self.capacity_bytes {
return;
}
while inner.total_bytes + bytes > self.capacity_bytes {
let Some(oldest) = inner.order.pop_front() else {
break;
};
if let Some(removed) = inner.map.remove(&oldest) {
inner.total_bytes = inner.total_bytes.saturating_sub(removed.bytes);
}
}
inner.total_bytes += bytes;
inner.order.push_back(key.clone());
inner.map.insert(key, value);
}
}
fn remove_from_order(order: &mut VecDeque<CacheKey>, key: &CacheKey) {
if let Some(pos) = order.iter().position(|k| k == key) {
order.remove(pos);
}
}
impl Default for PackIndexCache {
fn default() -> Self {
Self::new(DEFAULT_CACHE_CAPACITY_BYTES)
}
}
impl std::fmt::Debug for PackIndexCache {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PackIndexCache")
.field("capacity_bytes", &self.capacity_bytes)
.field("resident_bytes", &self.resident_bytes())
.field("entries", &self.len())
.finish_non_exhaustive()
}
}
pub async fn read_blob(
remote: &Remote,
ref_name: &str,
path: &str,
cache: &PackIndexCache,
) -> Result<Bytes, PackchainError> {
if remote.engine() != StorageEngine::Packchain {
return Err(PackchainError::WrongEngine {
found: remote.engine(),
});
}
let segments = parse_path(path)?;
let remote_ref = RefName::new(ref_name).map_err(|_| PackchainError::InvalidRefName {
name: ref_name.to_owned(),
})?;
let prefix_opt = (!remote.prefix().is_empty()).then(|| remote.prefix());
let chain = load_chain(remote.store(), prefix_opt, &remote_ref)
.await?
.ok_or_else(|| PackchainError::ChainAbsent {
ref_name: ref_name.to_owned(),
})?;
let path_index = load_path_index(remote.store(), prefix_opt, &remote_ref)
.await?
.ok_or_else(|| PackchainError::PathIndexAbsent {
ref_name: ref_name.to_owned(),
})?;
if path_index.tip != chain.tip {
return Err(PackchainError::TransientChainPathIndexMismatch {
ref_name: ref_name.to_owned(),
chain_tip: chain.tip.as_str().to_owned(),
path_index_tip: path_index.tip.as_str().to_owned(),
});
}
let blob_sha = walk_path(&path_index.tree, &segments, ref_name, path)?;
debug!(
ref_name = %ref_name,
path = %path,
blob = %blob_sha.as_str(),
segments = chain.segments.len(),
"read_blob: resolved path to blob, scanning chain"
);
let blob_oid = sha40_to_object_id(&blob_sha);
let result = read_with_pack_missing_retries(
remote.store(),
prefix_opt,
&remote_ref,
ref_name,
chain,
&blob_oid,
cache,
)
.await;
let blob_not_in_chain = || PackchainError::BlobNotInChain {
sha: blob_sha.as_str().to_owned(),
path: path.to_owned(),
};
match result {
Ok(ResolvedObject {
payload,
kind: ObjectKind::Blob,
}) => Ok(Bytes::from(payload)),
Ok(_) => Err(blob_not_in_chain()),
Err(PackchainError::BlobNotInChain { sha, .. }) if sha == blob_sha.as_str() => {
Err(blob_not_in_chain())
}
Err(e) => Err(e),
}
}
async fn read_with_pack_missing_retries(
store: &dyn ObjectStore,
prefix: Option<&str>,
remote_ref: &RefName,
ref_name: &str,
initial_chain: ChainManifest,
blob_oid: &gix_hash::ObjectId,
cache: &PackIndexCache,
) -> Result<ResolvedObject, PackchainError> {
let mut current_chain = initial_chain;
let mut attempt: u32 = 0;
loop {
let mut depth = 0u32;
let result = read_object_from_chain(
store,
prefix,
¤t_chain.segments,
blob_oid,
cache,
&mut depth,
)
.await;
let missing_key = match result {
Ok(resolved) => return Ok(resolved),
Err(PackchainError::PackMissing { key }) => key,
Err(e) => return Err(e),
};
let reloaded = load_chain(store, prefix, remote_ref)
.await?
.ok_or_else(|| PackchainError::ChainAbsent {
ref_name: ref_name.to_owned(),
})?;
if chain_references_pack_key(&reloaded, prefix, &missing_key)? {
return Err(PackchainError::PackMissing { key: missing_key });
}
if attempt >= PACK_MISSING_MAX_RETRIES {
warn!(
ref_name = %ref_name,
last_missing_key = %missing_key,
attempts = attempt,
"read_blob: exhausted pack-missing retries against concurrent GC"
);
return Err(PackchainError::ConcurrentGcRetriesExhausted {
last_missing_key: missing_key,
attempts: attempt,
});
}
debug!(
ref_name = %ref_name,
missing_key = %missing_key,
attempt = attempt,
"read_blob: PackMissing on chain no longer references the pack — retrying after GC race"
);
tokio::time::sleep(PACK_MISSING_RETRY_BACKOFFS[attempt as usize]).await;
attempt += 1;
current_chain = reloaded;
}
}
#[derive(Debug)]
struct ResolvedObject {
payload: Vec<u8>,
kind: ObjectKind,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ObjectKind {
Blob,
Commit,
Tree,
Tag,
}
fn parse_path(path: &str) -> Result<Vec<&str>, PackchainError> {
if path.is_empty() {
return Err(PackchainError::MalformedPath {
path: path.to_owned(),
reason: "empty path",
});
}
if path.starts_with('/') {
return Err(PackchainError::MalformedPath {
path: path.to_owned(),
reason: "absolute paths are not allowed",
});
}
let segments: Vec<&str> = path.split('/').collect();
for seg in &segments {
if seg.is_empty() {
return Err(PackchainError::MalformedPath {
path: path.to_owned(),
reason: "empty segment (consecutive or trailing slash)",
});
}
if *seg == ".." {
return Err(PackchainError::MalformedPath {
path: path.to_owned(),
reason: "`..` segments are not allowed",
});
}
if *seg == "." {
return Err(PackchainError::MalformedPath {
path: path.to_owned(),
reason: "`.` segments are not allowed",
});
}
}
Ok(segments)
}
fn walk_path(
root: &BTreeMap<String, PathNode>,
segments: &[&str],
ref_name: &str,
path: &str,
) -> Result<Sha40, PackchainError> {
let path_not_found = || PackchainError::PathNotFound {
ref_name: ref_name.to_owned(),
path: path.to_owned(),
};
let (last_seg, prefix_segs) = segments
.split_last()
.expect("parse_path guarantees at least one segment");
let mut current = root;
for seg in prefix_segs {
let Some(PathNode::Tree(children)) = current.get(*seg) else {
return Err(path_not_found());
};
current = children;
}
match current.get(*last_seg) {
Some(PathNode::Blob(sha)) => Ok(sha.clone()),
Some(PathNode::Tree(_)) => Err(PackchainError::PathNotABlob {
path: path.to_owned(),
}),
None => Err(path_not_found()),
}
}
fn sha40_to_object_id(sha: &Sha40) -> gix_hash::ObjectId {
gix_hash::ObjectId::from_hex(sha.as_str().as_bytes())
.expect("Sha40 is always 40 lowercase hex by construction")
}
async fn read_object_from_chain(
store: &dyn ObjectStore,
prefix: Option<&str>,
segments: &[ChainSegment],
target_oid: &gix_hash::ObjectId,
cache: &PackIndexCache,
depth: &mut u32,
) -> Result<ResolvedObject, PackchainError> {
for segment in segments {
let content_sha = super::keys::segment_pack_sha(segment)?;
let idx = load_index(store, prefix, &content_sha, cache).await?;
let Some(entry_index) = idx.file.lookup(target_oid) else {
continue;
};
let pack_offset = idx.file.pack_offset_at_index(entry_index);
let bytes = fetch_entry_bytes(store, prefix, &content_sha, pack_offset, &idx).await?;
let resolved = Box::pin(decode_entry(
store,
prefix,
segments,
&content_sha,
pack_offset,
&bytes,
cache,
depth,
))
.await?;
return Ok(resolved);
}
Err(PackchainError::BlobNotInChain {
sha: target_oid.to_string(),
path: String::new(),
})
}
async fn load_index(
store: &dyn ObjectStore,
prefix: Option<&str>,
content_sha: &Sha40,
cache: &PackIndexCache,
) -> Result<Arc<CachedIndex>, PackchainError> {
let key = (prefix.unwrap_or("").to_owned(), content_sha.clone());
if let Some(hit) = cache.get(&key) {
return Ok(hit);
}
let idx_key = pack_idx_key(prefix, content_sha);
let idx_bytes = match store.get_bytes(&idx_key).await {
Ok(b) => b,
Err(ObjectStoreError::NotFound(_)) => {
return Err(PackchainError::PackMissing { key: idx_key });
}
Err(e) => return Err(PackchainError::Store(e)),
};
let owned: Vec<u8> = idx_bytes.to_vec();
let owned_len = owned.len() as u64;
let path = std::path::PathBuf::from(idx_key);
let file =
gix_pack::index::File::from_data(owned, path, gix_hash::Kind::Sha1).map_err(|e| {
PackchainError::MalformedPackEntry {
offset: 0,
reason: format!("idx parse: {e}"),
}
})?;
let sorted_offsets = file.sorted_offsets();
let offsets_bytes = (sorted_offsets.len() as u64).saturating_mul(8);
let cached = Arc::new(CachedIndex {
file,
sorted_offsets,
bytes: owned_len.saturating_add(offsets_bytes),
});
cache.insert(key, Arc::clone(&cached));
Ok(cached)
}
async fn fetch_entry_bytes(
store: &dyn ObjectStore,
prefix: Option<&str>,
content_sha: &Sha40,
pack_offset: u64,
idx: &CachedIndex,
) -> Result<Bytes, PackchainError> {
let pack = pack_key(prefix, content_sha);
let next_offset = idx
.sorted_offsets
.iter()
.copied()
.find(|&o| o > pack_offset);
let end = if let Some(end) = next_offset {
end
} else {
let meta = match store.head(&pack).await {
Ok(m) => m,
Err(ObjectStoreError::NotFound(_)) => {
return Err(PackchainError::PackMissing { key: pack });
}
Err(e) => return Err(PackchainError::Store(e)),
};
if pack_offset >= meta.size {
return Err(PackchainError::MalformedPackEntry {
offset: pack_offset,
reason: "entry offset beyond pack EOF".to_owned(),
});
}
meta.size
};
let span = end.saturating_sub(pack_offset);
if span > MAX_RANGE_BYTES {
return Err(PackchainError::MalformedPackEntry {
offset: pack_offset,
reason: format!("entry range {span} bytes exceeds {MAX_RANGE_BYTES}-byte cap"),
});
}
match store.get_bytes_range(&pack, pack_offset..end).await {
Ok(b) => Ok(b),
Err(ObjectStoreError::NotFound(_)) => Err(PackchainError::PackMissing { key: pack }),
Err(e) => Err(PackchainError::Store(e)),
}
}
#[allow(clippy::too_many_arguments)]
async fn decode_entry(
store: &dyn ObjectStore,
prefix: Option<&str>,
chain: &[ChainSegment],
content_sha: &Sha40,
pack_offset: u64,
raw: &[u8],
cache: &PackIndexCache,
depth: &mut u32,
) -> Result<ResolvedObject, PackchainError> {
if *depth > MAX_DELTA_DEPTH {
return Err(PackchainError::DeltaTooDeep {
max: MAX_DELTA_DEPTH,
});
}
*depth += 1;
let entry =
gix_pack::data::Entry::from_bytes(raw, pack_offset, gix_hash::Kind::Sha1.len_in_bytes())
.map_err(|e| PackchainError::MalformedPackEntry {
offset: pack_offset,
reason: e.to_string(),
})?;
let header_size: usize = usize::try_from(entry.data_offset - pack_offset).map_err(|_| {
PackchainError::MalformedPackEntry {
offset: pack_offset,
reason: "entry header size exceeds usize".to_owned(),
}
})?;
if entry.decompressed_size > MAX_DECOMPRESSED_BYTES {
return Err(PackchainError::MalformedPackEntry {
offset: pack_offset,
reason: format!(
"decompressed object size {} exceeds {}-byte cap",
entry.decompressed_size, MAX_DECOMPRESSED_BYTES
),
});
}
let decompressed_size: usize = usize::try_from(entry.decompressed_size).map_err(|_| {
PackchainError::MalformedPackEntry {
offset: pack_offset,
reason: "decompressed object size exceeds usize".to_owned(),
}
})?;
let inflated = inflate_with_retry(
store,
prefix,
content_sha,
pack_offset,
raw,
header_size,
decompressed_size,
)
.await?;
match entry.header {
EntryHeader::Blob => Ok(ResolvedObject {
payload: inflated,
kind: ObjectKind::Blob,
}),
EntryHeader::Commit => Ok(ResolvedObject {
payload: inflated,
kind: ObjectKind::Commit,
}),
EntryHeader::Tree => Ok(ResolvedObject {
payload: inflated,
kind: ObjectKind::Tree,
}),
EntryHeader::Tag => Ok(ResolvedObject {
payload: inflated,
kind: ObjectKind::Tag,
}),
EntryHeader::OfsDelta { base_distance } => {
let base_offset = pack_offset.checked_sub(base_distance).ok_or(
PackchainError::MalformedPackEntry {
offset: pack_offset,
reason: "ofs-delta base distance underflows pack offset".to_owned(),
},
)?;
let idx = load_index(store, prefix, content_sha, cache).await?;
let base_bytes =
fetch_entry_bytes(store, prefix, content_sha, base_offset, &idx).await?;
let base = Box::pin(decode_entry(
store,
prefix,
chain,
content_sha,
base_offset,
&base_bytes,
cache,
depth,
))
.await?;
apply_delta(&base, &inflated)
}
EntryHeader::RefDelta { base_id } => {
let base = Box::pin(read_object_from_chain(
store, prefix, chain, &base_id, cache, depth,
))
.await?;
apply_delta(&base, &inflated)
}
}
}
async fn inflate_with_retry(
store: &dyn ObjectStore,
prefix: Option<&str>,
content_sha: &Sha40,
pack_offset: u64,
raw: &[u8],
header_size: usize,
decompressed_size: usize,
) -> Result<Vec<u8>, PackchainError> {
let mut current_buffer: Option<Bytes> = None;
let mut current_end = pack_offset.saturating_add(raw.len() as u64);
let mut expansions = 0u32;
loop {
let compressed: &[u8] = match ¤t_buffer {
Some(buf) => &buf[header_size..],
None => &raw[header_size..],
};
match inflate_to(compressed, decompressed_size) {
Ok(v) => return Ok(v),
Err(InflateOutcome::NeedMoreInput) => {
if expansions >= MAX_RANGE_EXPANSIONS {
return Err(PackchainError::MalformedPackEntry {
offset: pack_offset,
reason: "ran out of compressed bytes after maximum range expansion"
.to_owned(),
});
}
let next_size = ((current_end - pack_offset) * 2).min(MAX_RANGE_BYTES);
if next_size <= current_end - pack_offset {
return Err(PackchainError::MalformedPackEntry {
offset: pack_offset,
reason: "range expansion hit safety cap".to_owned(),
});
}
let new_end = pack_offset + next_size;
let pack = pack_key(prefix, content_sha);
let bytes = match store.get_bytes_range(&pack, pack_offset..new_end).await {
Ok(b) => b,
Err(ObjectStoreError::NotFound(_)) => {
return Err(PackchainError::PackMissing { key: pack });
}
Err(ObjectStoreError::RangeNotSatisfiable { .. }) => {
return Err(PackchainError::MalformedPackEntry {
offset: pack_offset,
reason: "zlib stream truncated at pack EOF".to_owned(),
});
}
Err(e) => return Err(PackchainError::Store(e)),
};
current_buffer = Some(bytes);
current_end = new_end;
expansions += 1;
}
Err(InflateOutcome::Failed) => {
return Err(PackchainError::Decompress {
offset: pack_offset,
});
}
}
}
}
fn inflate_to(input: &[u8], announced_size: usize) -> Result<Vec<u8>, InflateOutcome> {
use gix::features::zlib::{FlushDecompress, Status};
let mut state = gix::features::zlib::Decompress::new();
let mut out = vec![0u8; announced_size];
match state.decompress(input, &mut out, FlushDecompress::Finish) {
Ok(Status::StreamEnd) => {
let produced =
usize::try_from(state.total_out()).map_err(|_| InflateOutcome::Failed)?;
if produced != announced_size {
return Err(InflateOutcome::Failed);
}
Ok(out)
}
Ok(Status::Ok | Status::BufError) => Err(InflateOutcome::NeedMoreInput),
Err(_) => Err(InflateOutcome::Failed),
}
}
enum InflateOutcome {
NeedMoreInput,
Failed,
}
fn apply_delta(base: &ResolvedObject, delta: &[u8]) -> Result<ResolvedObject, PackchainError> {
let mut cursor = 0usize;
let (src_size, n) = read_size_varint(delta, cursor).ok_or(PackchainError::MalformedDelta {
reason: "truncated source size header",
})?;
cursor += n;
let (dst_size, n) = read_size_varint(delta, cursor).ok_or(PackchainError::MalformedDelta {
reason: "truncated destination size header",
})?;
cursor += n;
if src_size != base.payload.len() as u64 {
return Err(PackchainError::MalformedDelta {
reason: "delta source size does not match base object size",
});
}
if dst_size > MAX_DECOMPRESSED_BYTES {
return Err(PackchainError::MalformedDelta {
reason: "delta destination size exceeds 1 GiB cap",
});
}
let dst_size_usize = usize::try_from(dst_size).map_err(|_| PackchainError::MalformedDelta {
reason: "delta destination size exceeds usize",
})?;
let mut out = Vec::with_capacity(dst_size_usize);
while cursor < delta.len() {
let op = delta[cursor];
cursor += 1;
if op & 0x80 != 0 {
apply_delta_copy_op(op, delta, &mut cursor, &base.payload, &mut out)?;
} else if op == 0 {
return Err(PackchainError::MalformedDelta {
reason: "reserved zero opcode",
});
} else {
apply_delta_insert_op(op, delta, &mut cursor, &mut out)?;
}
if out.len() > dst_size_usize {
return Err(PackchainError::MalformedDelta {
reason: "produced object exceeds announced destination size",
});
}
}
if out.len() as u64 != dst_size {
return Err(PackchainError::MalformedDelta {
reason: "produced object does not match announced destination size",
});
}
Ok(ResolvedObject {
payload: out,
kind: base.kind,
})
}
fn read_packed_operand(
delta: &[u8],
cursor: &mut usize,
bitmask: u8,
bits: u8,
truncated_reason: &'static str,
) -> Result<u32, PackchainError> {
let mut value = 0u32;
for shift in 0..bits {
if bitmask & (1 << shift) != 0 {
let byte = *delta.get(*cursor).ok_or(PackchainError::MalformedDelta {
reason: truncated_reason,
})?;
value |= u32::from(byte) << (u32::from(shift) * 8);
*cursor += 1;
}
}
Ok(value)
}
const GIT_DELTA_DEFAULT_COPY_SIZE: u32 = 0x1_0000;
fn apply_delta_copy_op(
op: u8,
delta: &[u8],
cursor: &mut usize,
base: &[u8],
out: &mut Vec<u8>,
) -> Result<(), PackchainError> {
let copy_offset = read_packed_operand(delta, cursor, op, 4, "truncated delta copy offset")?;
let mut copy_size =
read_packed_operand(delta, cursor, op >> 4, 3, "truncated delta copy size")?;
if copy_size == 0 {
copy_size = GIT_DELTA_DEFAULT_COPY_SIZE;
}
let start = copy_offset as usize;
let end = start
.checked_add(copy_size as usize)
.ok_or(PackchainError::MalformedDelta {
reason: "copy span overflow",
})?;
if end > base.len() {
return Err(PackchainError::MalformedDelta {
reason: "copy span exceeds base object",
});
}
out.extend_from_slice(&base[start..end]);
Ok(())
}
fn apply_delta_insert_op(
op: u8,
delta: &[u8],
cursor: &mut usize,
out: &mut Vec<u8>,
) -> Result<(), PackchainError> {
let len = op as usize;
let end = cursor
.checked_add(len)
.ok_or(PackchainError::MalformedDelta {
reason: "insert span overflow",
})?;
if end > delta.len() {
return Err(PackchainError::MalformedDelta {
reason: "insert span exceeds delta payload",
});
}
out.extend_from_slice(&delta[*cursor..end]);
*cursor = end;
Ok(())
}
fn read_size_varint(data: &[u8], mut cursor: usize) -> Option<(u64, usize)> {
let start = cursor;
let mut value: u64 = 0;
let mut shift = 0u32;
loop {
let byte = *data.get(cursor)?;
cursor += 1;
value |= u64::from(byte & 0x7f).checked_shl(shift)?;
if byte & 0x80 == 0 {
return Some((value, cursor - start));
}
shift += 7;
if shift >= 64 {
return None;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn sha40(s: &str) -> Sha40 {
Sha40::try_new(s).expect("test fixture sha is valid")
}
#[test]
fn parse_path_rejects_empty() {
let err = parse_path("").unwrap_err();
assert!(matches!(err, PackchainError::MalformedPath { .. }));
}
#[test]
fn parse_path_rejects_absolute() {
let err = parse_path("/etc/passwd").unwrap_err();
let PackchainError::MalformedPath { reason, .. } = err else {
panic!("expected MalformedPath");
};
assert!(reason.contains("absolute"));
}
#[test]
fn parse_path_rejects_dotdot() {
let err = parse_path("src/../etc").unwrap_err();
assert!(matches!(err, PackchainError::MalformedPath { .. }));
}
#[test]
fn parse_path_rejects_dot() {
let err = parse_path("./src").unwrap_err();
assert!(matches!(err, PackchainError::MalformedPath { .. }));
}
#[test]
fn parse_path_rejects_double_slash() {
let err = parse_path("src//main.rs").unwrap_err();
assert!(matches!(err, PackchainError::MalformedPath { .. }));
}
#[test]
fn parse_path_rejects_trailing_slash() {
let err = parse_path("src/main.rs/").unwrap_err();
assert!(matches!(err, PackchainError::MalformedPath { .. }));
}
#[test]
fn parse_path_accepts_nested() {
let segs = parse_path("src/lib/mod.rs").unwrap();
assert_eq!(segs, vec!["src", "lib", "mod.rs"]);
}
#[test]
fn parse_path_accepts_single_segment() {
let segs = parse_path("Cargo.toml").unwrap();
assert_eq!(segs, vec!["Cargo.toml"]);
}
const SHA_A: &str = "0123456789abcdef0123456789abcdef01234567";
const SHA_B: &str = "fedcba9876543210fedcba9876543210fedcba98";
const SHA_C: &str = "1111111111111111111111111111111111111111";
#[test]
fn walk_path_finds_top_level_blob() {
let mut tree = BTreeMap::new();
tree.insert("Cargo.toml".to_owned(), PathNode::Blob(sha40(SHA_A)));
let segs = parse_path("Cargo.toml").unwrap();
let result = walk_path(&tree, &segs, "refs/heads/main", "Cargo.toml").unwrap();
assert_eq!(result.as_str(), SHA_A);
}
#[test]
fn walk_path_descends_subtree() {
let mut subtree = BTreeMap::new();
subtree.insert("main.rs".to_owned(), PathNode::Blob(sha40(SHA_A)));
let mut tree = BTreeMap::new();
tree.insert("src".to_owned(), PathNode::Tree(subtree));
let segs = parse_path("src/main.rs").unwrap();
let result = walk_path(&tree, &segs, "refs/heads/main", "src/main.rs").unwrap();
assert_eq!(result.as_str(), SHA_A);
}
#[test]
fn walk_path_missing_returns_path_not_found() {
let mut tree = BTreeMap::new();
tree.insert("Cargo.toml".to_owned(), PathNode::Blob(sha40(SHA_A)));
let segs = parse_path("missing.txt").unwrap();
let err = walk_path(&tree, &segs, "refs/heads/main", "missing.txt").unwrap_err();
assert!(matches!(err, PackchainError::PathNotFound { .. }));
}
#[test]
fn walk_path_directory_returns_path_not_a_blob() {
let mut subtree = BTreeMap::new();
subtree.insert("main.rs".to_owned(), PathNode::Blob(sha40(SHA_A)));
let mut tree = BTreeMap::new();
tree.insert("src".to_owned(), PathNode::Tree(subtree));
let segs = parse_path("src").unwrap();
let err = walk_path(&tree, &segs, "refs/heads/main", "src").unwrap_err();
assert!(matches!(err, PackchainError::PathNotABlob { .. }));
}
#[test]
fn walk_path_through_blob_returns_not_found() {
let mut tree = BTreeMap::new();
tree.insert("Cargo.toml".to_owned(), PathNode::Blob(sha40(SHA_A)));
let segs = parse_path("Cargo.toml/extra").unwrap();
let err = walk_path(&tree, &segs, "refs/heads/main", "Cargo.toml/extra").unwrap_err();
assert!(matches!(err, PackchainError::PathNotFound { .. }));
}
#[test]
fn read_size_varint_single_byte() {
let (v, n) = read_size_varint(&[0x05], 0).unwrap();
assert_eq!(v, 5);
assert_eq!(n, 1);
}
#[test]
fn read_size_varint_multi_byte() {
let (v, n) = read_size_varint(&[0x83, 0x02], 0).unwrap();
assert_eq!(v, 259);
assert_eq!(n, 2);
}
#[test]
fn read_size_varint_truncated() {
assert!(read_size_varint(&[0x80], 0).is_none());
}
#[test]
fn cache_default_starts_empty() {
let cache = PackIndexCache::default();
assert_eq!(cache.len(), 0);
assert!(cache.is_empty());
assert_eq!(cache.resident_bytes(), 0);
}
#[test]
fn cache_default_enforces_64mib_capacity() {
let cache = PackIndexCache::default();
cache.insert(
("p".into(), sha40(SHA_A)),
Arc::new(make_dummy_index(DEFAULT_CACHE_CAPACITY_BYTES + 1)),
);
assert_eq!(cache.len(), 0, "entry over 64 MiB must be rejected");
cache.insert(
("p".into(), sha40(SHA_B)),
Arc::new(make_dummy_index(DEFAULT_CACHE_CAPACITY_BYTES)),
);
assert_eq!(cache.len(), 1, "entry at 64 MiB must be accepted");
}
#[test]
fn cache_explicit_capacity_zero_disables_caching() {
let cache = PackIndexCache::new(0);
let dummy = make_dummy_index(1_024);
cache.insert(("p".into(), sha40(SHA_A)), Arc::new(dummy));
assert_eq!(cache.len(), 0);
}
#[test]
fn cache_evicts_lru_when_over_capacity() {
let cache = PackIndexCache::new(3_000);
cache.insert(
("p".into(), sha40(SHA_A)),
Arc::new(make_dummy_index(1_000)),
);
cache.insert(
("p".into(), sha40(SHA_B)),
Arc::new(make_dummy_index(1_000)),
);
cache.insert(
("p".into(), sha40(SHA_C)),
Arc::new(make_dummy_index(1_000)),
);
assert_eq!(cache.len(), 3);
assert_eq!(cache.resident_bytes(), 3_000);
let _ = cache.get(&("p".into(), sha40(SHA_A)));
cache.insert(
(
"p".into(),
sha40("dddddddddddddddddddddddddddddddddddddddd"),
),
Arc::new(make_dummy_index(1_000)),
);
assert_eq!(cache.len(), 3);
assert!(cache.get(&("p".into(), sha40(SHA_A))).is_some());
assert!(cache.get(&("p".into(), sha40(SHA_B))).is_none());
}
#[test]
fn cache_repeated_inserts_replace_accounting() {
let cache = PackIndexCache::new(10_000);
let key: CacheKey = ("p".into(), sha40(SHA_A));
cache.insert(key.clone(), Arc::new(make_dummy_index(1_000)));
cache.insert(key.clone(), Arc::new(make_dummy_index(2_500)));
assert_eq!(cache.len(), 1);
assert_eq!(cache.resident_bytes(), 2_500);
}
fn make_dummy_index(bytes: u64) -> CachedIndex {
let mut data = Vec::with_capacity(8 + 256 * 4 + 40);
data.extend_from_slice(b"\xfftOc"); data.extend_from_slice(&2u32.to_be_bytes()); for _ in 0..256 {
data.extend_from_slice(&0u32.to_be_bytes()); }
data.extend_from_slice(&[0u8; 20]); data.extend_from_slice(&[0u8; 20]); let file = gix_pack::index::File::from_data(
data,
std::path::PathBuf::from("dummy.idx"),
gix_hash::Kind::Sha1,
)
.expect("hand-crafted minimal v2 idx parses");
CachedIndex {
file,
sorted_offsets: Vec::new(),
bytes,
}
}
#[test]
fn sha40_to_object_id_roundtrips() {
let sha = sha40(SHA_A);
let oid = sha40_to_object_id(&sha);
assert_eq!(oid.to_string(), SHA_A);
}
fn base_blob(payload: &[u8]) -> ResolvedObject {
ResolvedObject {
payload: payload.to_vec(),
kind: ObjectKind::Blob,
}
}
fn varint(mut value: u64) -> Vec<u8> {
let mut out = Vec::new();
loop {
let byte = (value & 0x7f) as u8;
value >>= 7;
if value == 0 {
out.push(byte);
return out;
}
out.push(byte | 0x80);
}
}
#[test]
fn apply_delta_insert_only_round_trips() {
let base = base_blob(b"");
let literal = b"Hello, packchain!";
let mut delta = Vec::new();
delta.extend_from_slice(&varint(0)); delta.extend_from_slice(&varint(literal.len() as u64)); delta.push(u8::try_from(literal.len()).expect("test literal fits in 7 bits"));
delta.extend_from_slice(literal);
let out = apply_delta(&base, &delta).expect("insert-only delta applies");
assert_eq!(out.payload, literal);
assert_eq!(out.kind, ObjectKind::Blob);
}
#[test]
fn apply_delta_copy_only_round_trips() {
let base = base_blob(b"abcdefghij");
let mut delta = Vec::new();
delta.extend_from_slice(&varint(10)); delta.extend_from_slice(&varint(5)); delta.push(0b1001_0001);
delta.push(0); delta.push(5); let out = apply_delta(&base, &delta).expect("copy-only delta applies");
assert_eq!(out.payload, b"abcde");
}
#[test]
fn apply_delta_mixed_copy_and_insert_round_trips() {
let base = base_blob(b"HELLO!?");
let mut delta = Vec::new();
delta.extend_from_slice(&varint(7)); delta.extend_from_slice(&varint(11)); delta.push(0b1001_0001);
delta.push(0);
delta.push(5);
let literal = b" world";
delta.push(u8::try_from(literal.len()).expect("test literal fits in 7 bits"));
delta.extend_from_slice(literal);
let out = apply_delta(&base, &delta).expect("mixed delta applies");
assert_eq!(out.payload, b"HELLO world");
}
#[test]
fn apply_delta_preserves_base_kind() {
let base = ResolvedObject {
payload: b"x".to_vec(),
kind: ObjectKind::Tree,
};
let mut delta = Vec::new();
delta.extend_from_slice(&varint(1));
delta.extend_from_slice(&varint(1));
delta.push(0b1001_0001);
delta.push(0);
delta.push(1);
let out = apply_delta(&base, &delta).expect("kind-preserving delta applies");
assert_eq!(out.kind, ObjectKind::Tree);
}
#[test]
fn apply_delta_rejects_source_size_mismatch() {
let base = base_blob(b"x");
let mut delta = Vec::new();
delta.extend_from_slice(&varint(99));
delta.extend_from_slice(&varint(1));
delta.push(1);
delta.push(b'y');
let err = apply_delta(&base, &delta).expect_err("size mismatch must fail");
assert!(
matches!(err, PackchainError::MalformedDelta { reason } if reason.contains("source size")),
"expected MalformedDelta source-size mismatch, got {err:?}",
);
}
#[test]
fn apply_delta_rejects_copy_past_base_end() {
let base = base_blob(b"abcd");
let mut delta = Vec::new();
delta.extend_from_slice(&varint(4));
delta.extend_from_slice(&varint(5));
delta.push(0b1001_0001);
delta.push(3); delta.push(5); let err = apply_delta(&base, &delta).expect_err("out-of-range copy must fail");
assert!(
matches!(err, PackchainError::MalformedDelta { reason } if reason.contains("copy span")),
"expected MalformedDelta copy-span error, got {err:?}",
);
}
#[test]
fn apply_delta_rejects_dst_size_over_cap() {
let base = base_blob(b"");
let mut delta = Vec::new();
delta.extend_from_slice(&varint(0));
delta.extend_from_slice(&varint(MAX_DECOMPRESSED_BYTES + 1));
let err = apply_delta(&base, &delta).expect_err("oversize dst must fail");
assert!(
matches!(err, PackchainError::MalformedDelta { reason } if reason.contains("1 GiB cap")),
"expected MalformedDelta cap error, got {err:?}",
);
}
#[test]
fn apply_delta_rejects_reserved_zero_opcode() {
let base = base_blob(b"");
let mut delta = Vec::new();
delta.extend_from_slice(&varint(0));
delta.extend_from_slice(&varint(0));
delta.push(0); let err = apply_delta(&base, &delta).expect_err("reserved opcode must fail");
assert!(
matches!(err, PackchainError::MalformedDelta { reason } if reason.contains("zero opcode")),
"expected MalformedDelta reserved-opcode error, got {err:?}",
);
}
#[test]
fn apply_delta_copy_size_zero_substitutes_default() {
let base = base_blob(b"x");
let mut delta = Vec::new();
delta.extend_from_slice(&varint(1));
delta.extend_from_slice(&varint(2)); delta.push(0b1000_0001);
delta.push(0); let err = apply_delta(&base, &delta)
.expect_err("default-size substitution must fail bounds check");
assert!(
matches!(&err, PackchainError::MalformedDelta { reason } if reason.contains("copy span exceeds base")),
"expected copy-span-exceeds-base (proves default size was substituted), got {err:?}",
);
}
#[test]
fn apply_delta_rejects_dst_size_undershoot() {
let base = base_blob(b"abcdef");
let mut delta = Vec::new();
delta.extend_from_slice(&varint(6));
delta.extend_from_slice(&varint(10)); delta.push(0b1001_0001);
delta.push(0);
delta.push(3);
let err = apply_delta(&base, &delta).expect_err("undershoot must fail");
assert!(
matches!(err, PackchainError::MalformedDelta { reason } if reason.contains("destination size")),
"expected MalformedDelta undershoot error, got {err:?}",
);
}
#[test]
fn apply_delta_rejects_overshoot() {
let base = base_blob(b"abcdefgh");
let mut delta = Vec::new();
delta.extend_from_slice(&varint(8)); delta.extend_from_slice(&varint(4)); delta.push(0b1001_0001);
delta.push(0);
delta.push(8);
let err = apply_delta(&base, &delta).expect_err("overshoot must fail");
assert!(
matches!(
err,
PackchainError::MalformedDelta {
reason: "produced object exceeds announced destination size"
}
),
"expected MalformedDelta overshoot error, got {err:?}",
);
}
#[test]
fn apply_delta_overshoot_check_fires_after_single_default_size_copy() {
let base_payload = vec![b'x'; 0x1_0000];
let base = base_blob(&base_payload);
let mut delta = Vec::new();
delta.extend_from_slice(&varint(0x1_0000)); delta.extend_from_slice(&varint(4)); delta.push(0b1000_0001);
delta.push(0); let err = apply_delta(&base, &delta).expect_err("default-size overshoot must fail");
assert!(
matches!(
err,
PackchainError::MalformedDelta {
reason: "produced object exceeds announced destination size"
}
),
"expected MalformedDelta overshoot error after first op, got {err:?}",
);
}
#[test]
fn apply_delta_exact_match_does_not_trip_overshoot_check() {
let base = base_blob(b"abcd");
let mut delta = Vec::new();
delta.extend_from_slice(&varint(4));
delta.extend_from_slice(&varint(4));
delta.push(0b1001_0001);
delta.push(0);
delta.push(4);
let out = apply_delta(&base, &delta).expect("exact-match delta applies");
assert_eq!(out.payload, b"abcd");
}
use crate::object_store::mock::MockStore;
use flate2::Compression;
use flate2::write::ZlibEncoder;
use std::io::Write;
#[allow(clippy::cast_possible_truncation)]
fn encode_pack_entry_header(type_id: u8, mut size: u64) -> Vec<u8> {
let mut out = Vec::new();
let low4 = (size & 0x0f) as u8;
size >>= 4;
let mut byte = (type_id << 4) | low4;
if size != 0 {
byte |= 0x80;
}
out.push(byte);
while size != 0 {
let mut next = (size & 0x7f) as u8;
size >>= 7;
if size != 0 {
next |= 0x80;
}
out.push(next);
}
out
}
#[allow(clippy::cast_possible_truncation)]
fn encode_ofs_delta_distance(distance: u64) -> Vec<u8> {
let mut bytes = Vec::new();
let mut v = distance;
bytes.push((v & 0x7f) as u8);
v >>= 7;
while v != 0 {
v -= 1;
bytes.push(((v & 0x7f) as u8) | 0x80);
v >>= 7;
}
bytes.reverse();
bytes
}
fn zlib_compress(data: &[u8]) -> Vec<u8> {
let mut e = ZlibEncoder::new(Vec::new(), Compression::default());
e.write_all(data).expect("zlib encode");
e.finish().expect("zlib finish")
}
#[allow(clippy::cast_possible_truncation)]
fn make_insert_delta(base_size: u64, payload: &[u8]) -> Vec<u8> {
let mut d = Vec::new();
let put_varint = |mut v: u64, buf: &mut Vec<u8>| loop {
let byte = (v & 0x7f) as u8;
v >>= 7;
if v == 0 {
buf.push(byte);
return;
}
buf.push(byte | 0x80);
};
put_varint(base_size, &mut d);
put_varint(payload.len() as u64, &mut d);
assert!(payload.len() < 0x80, "test literal too long for one insert");
d.push(payload.len() as u8);
d.extend_from_slice(payload);
d
}
#[allow(clippy::cast_possible_truncation)]
fn push_pack_entry(
pack: &mut Vec<u8>,
offsets: &mut Vec<u64>,
type_id: u8,
ofs_delta_distance: Option<u64>,
decompressed_payload: &[u8],
) {
let start = pack.len() as u64;
offsets.push(start);
pack.extend(encode_pack_entry_header(
type_id,
decompressed_payload.len() as u64,
));
if let Some(d) = ofs_delta_distance {
pack.extend(encode_ofs_delta_distance(d));
}
pack.extend(zlib_compress(decompressed_payload));
}
fn install_cached_index(
cache: &PackIndexCache,
prefix: &str,
content_sha: &Sha40,
offsets: Vec<u64>,
) {
let cached = CachedIndex {
file: minimal_v2_idx(),
sorted_offsets: offsets,
bytes: 1_024,
};
cache.insert((prefix.to_owned(), content_sha.clone()), Arc::new(cached));
}
fn minimal_v2_idx() -> gix_pack::index::File<Vec<u8>> {
let mut data = Vec::with_capacity(8 + 256 * 4 + 40);
data.extend_from_slice(b"\xfftOc");
data.extend_from_slice(&2u32.to_be_bytes());
for _ in 0..256 {
data.extend_from_slice(&0u32.to_be_bytes());
}
data.extend_from_slice(&[0u8; 20]);
data.extend_from_slice(&[0u8; 20]);
gix_pack::index::File::from_data(
data,
std::path::PathBuf::from("dummy.idx"),
gix_hash::Kind::Sha1,
)
.expect("hand-crafted minimal v2 idx parses")
}
#[tokio::test]
async fn decode_entry_rejects_when_depth_already_over_cap() {
let store = MockStore::new();
let cache = PackIndexCache::default();
let chain: Vec<ChainSegment> = Vec::new();
let content_sha = sha40(SHA_A);
let mut pack = Vec::new();
let mut offsets = Vec::new();
push_pack_entry(&mut pack, &mut offsets, 3 , None, b"x");
let mut depth = MAX_DELTA_DEPTH + 1;
let err = decode_entry(
&store,
None,
&chain,
&content_sha,
offsets[0],
&pack[usize::try_from(offsets[0]).unwrap()..],
&cache,
&mut depth,
)
.await
.expect_err("over-cap depth must fail");
assert!(
matches!(err, PackchainError::DeltaTooDeep { max } if max == MAX_DELTA_DEPTH),
"expected DeltaTooDeep, got {err:?}",
);
}
#[tokio::test]
async fn decode_entry_at_cap_with_non_delta_base_succeeds() {
let store = MockStore::new();
let cache = PackIndexCache::default();
let chain: Vec<ChainSegment> = Vec::new();
let content_sha = sha40(SHA_A);
let mut pack = Vec::new();
let mut offsets = Vec::new();
push_pack_entry(
&mut pack,
&mut offsets,
3,
None,
b"deepest-base",
);
let mut depth = MAX_DELTA_DEPTH;
let resolved = decode_entry(
&store,
None,
&chain,
&content_sha,
offsets[0],
&pack[usize::try_from(offsets[0]).unwrap()..],
&cache,
&mut depth,
)
.await
.expect("blob at MAX boundary must decode");
assert_eq!(resolved.payload, b"deepest-base");
assert_eq!(resolved.kind, ObjectKind::Blob);
}
#[tokio::test]
async fn ofs_delta_recursion_consumes_depth_budget() {
let store = MockStore::new();
let cache = PackIndexCache::default();
let chain: Vec<ChainSegment> = Vec::new();
let content_sha = sha40(SHA_A);
let base_payload = b"base-blob";
let mut pack = Vec::new();
let mut offsets = Vec::new();
push_pack_entry(&mut pack, &mut offsets, 3, None, base_payload);
let delta = make_insert_delta(base_payload.len() as u64, b"reconstructed");
let entry1_start = pack.len() as u64;
let distance = entry1_start - offsets[0];
push_pack_entry(&mut pack, &mut offsets, 6, Some(distance), &delta);
store.insert(pack_key(None, &content_sha), Bytes::from(pack.clone()));
install_cached_index(&cache, "", &content_sha, offsets.clone());
let mut depth = MAX_DELTA_DEPTH;
let err = decode_entry(
&store,
None,
&chain,
&content_sha,
offsets[1],
&pack[usize::try_from(offsets[1]).unwrap()..],
&cache,
&mut depth,
)
.await
.expect_err("OFS_DELTA recursion must trip the depth guard");
assert!(
matches!(err, PackchainError::DeltaTooDeep { max } if max == MAX_DELTA_DEPTH),
"expected DeltaTooDeep from OFS_DELTA recursion, got {err:?}",
);
}
struct FakeSizeStore {
inner: MockStore,
fake_size: u64,
}
#[async_trait::async_trait]
impl ObjectStore for FakeSizeStore {
async fn list(
&self,
prefix: &str,
) -> Result<Vec<crate::object_store::ObjectMeta>, ObjectStoreError> {
self.inner.list(prefix).await
}
async fn get_to_file(
&self,
key: &str,
dest: &std::path::Path,
opts: crate::object_store::GetOpts,
) -> Result<(), ObjectStoreError> {
self.inner.get_to_file(key, dest, opts).await
}
async fn get_bytes(&self, key: &str) -> Result<Bytes, ObjectStoreError> {
self.inner.get_bytes(key).await
}
async fn get_bytes_range(
&self,
key: &str,
range: std::ops::Range<u64>,
) -> Result<Bytes, ObjectStoreError> {
self.inner.get_bytes_range(key, range).await
}
async fn put_bytes(
&self,
key: &str,
body: Bytes,
opts: crate::object_store::PutOpts,
) -> Result<(), ObjectStoreError> {
self.inner.put_bytes(key, body, opts).await
}
async fn put_if_absent(&self, key: &str, body: Bytes) -> Result<bool, ObjectStoreError> {
self.inner.put_if_absent(key, body).await
}
async fn head(
&self,
key: &str,
) -> Result<crate::object_store::ObjectMeta, ObjectStoreError> {
let meta = self.inner.head(key).await?;
Ok(crate::object_store::ObjectMeta {
size: self.fake_size,
..meta
})
}
async fn copy(&self, src: &str, dst: &str) -> Result<(), ObjectStoreError> {
self.inner.copy(src, dst).await
}
async fn delete(&self, key: &str) -> Result<(), ObjectStoreError> {
self.inner.delete(key).await
}
}
#[tokio::test]
async fn fetch_entry_bytes_terminal_entry_under_cap_succeeds() {
let store = MockStore::new();
let cache = PackIndexCache::default();
let content_sha = sha40(SHA_A);
let body: &[u8] = b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09";
store.insert(pack_key(None, &content_sha), Bytes::from(body.to_vec()));
install_cached_index(&cache, "", &content_sha, vec![2]);
let idx = cache
.get(&(String::new(), content_sha.clone()))
.expect("cache hit");
let got = fetch_entry_bytes(&store, None, &content_sha, 2, &idx)
.await
.expect("terminal entry under cap must succeed");
assert_eq!(got.as_ref(), &body[2..]);
}
#[tokio::test]
async fn fetch_entry_bytes_terminal_entry_over_cap_rejected() {
let inner = MockStore::new();
let cache = PackIndexCache::default();
let content_sha = sha40(SHA_A);
inner.insert(pack_key(None, &content_sha), Bytes::from_static(b"stub"));
install_cached_index(&cache, "", &content_sha, vec![0]);
let idx = cache
.get(&(String::new(), content_sha.clone()))
.expect("cache hit");
let store = FakeSizeStore {
inner,
fake_size: MAX_RANGE_BYTES + 1,
};
let err = fetch_entry_bytes(&store, None, &content_sha, 0, &idx)
.await
.expect_err("terminal entry above cap must be rejected");
assert!(
matches!(
err,
PackchainError::MalformedPackEntry { offset: 0, ref reason }
if reason.contains("exceeds") && reason.contains("cap")
),
"expected MalformedPackEntry size-cap error, got {err:?}",
);
}
#[tokio::test]
async fn fetch_entry_bytes_terminal_entry_offset_past_eof_rejected() {
let store = MockStore::new();
let cache = PackIndexCache::default();
let content_sha = sha40(SHA_A);
store.insert(pack_key(None, &content_sha), Bytes::from_static(b"abc"));
install_cached_index(&cache, "", &content_sha, vec![100]);
let idx = cache
.get(&(String::new(), content_sha.clone()))
.expect("cache hit");
let err = fetch_entry_bytes(&store, None, &content_sha, 100, &idx)
.await
.expect_err("offset beyond EOF must be rejected");
assert!(
matches!(
err,
PackchainError::MalformedPackEntry { offset: 100, ref reason }
if reason.contains("beyond pack EOF")
),
"expected MalformedPackEntry EOF error, got {err:?}",
);
}
#[tokio::test]
async fn ofs_delta_below_cap_decodes() {
let store = MockStore::new();
let cache = PackIndexCache::default();
let chain: Vec<ChainSegment> = Vec::new();
let content_sha = sha40(SHA_A);
let base_payload = b"base";
let mut pack = Vec::new();
let mut offsets = Vec::new();
push_pack_entry(&mut pack, &mut offsets, 3, None, base_payload);
let delta = make_insert_delta(base_payload.len() as u64, b"hi");
let entry1_start = pack.len() as u64;
let distance = entry1_start - offsets[0];
push_pack_entry(&mut pack, &mut offsets, 6, Some(distance), &delta);
store.insert(pack_key(None, &content_sha), Bytes::from(pack.clone()));
install_cached_index(&cache, "", &content_sha, offsets.clone());
let mut depth = 0u32;
let resolved = decode_entry(
&store,
None,
&chain,
&content_sha,
offsets[1],
&pack[usize::try_from(offsets[1]).unwrap()..],
&cache,
&mut depth,
)
.await
.expect("OFS_DELTA decodes below cap");
assert_eq!(resolved.payload, b"hi");
assert_eq!(resolved.kind, ObjectKind::Blob);
}
use crate::packchain::keys::chain_key;
use crate::packchain::schema::ChainManifest;
use std::sync::atomic::{AtomicUsize, Ordering};
fn make_chain_with(tip_hex: &str, pack_sha_hex: &str) -> ChainManifest {
ChainManifest {
v: ChainManifest::SCHEMA_VERSION,
tip: sha40(tip_hex),
full_at: sha40(tip_hex),
segments: vec![ChainSegment {
sha: sha40(tip_hex),
parent_sha: None,
pack: format!("packs/{pack_sha_hex}.pack"),
bytes: 1_024,
}],
}
}
#[test]
fn chain_references_pack_key_matches_pack_and_idx_keys() {
let chain = make_chain_with(SHA_A, SHA_B);
assert!(chain_references_pack_key(&chain, None, &format!("packs/{SHA_B}.pack")).unwrap());
assert!(chain_references_pack_key(&chain, None, &format!("packs/{SHA_B}.idx")).unwrap());
}
#[test]
fn chain_references_pack_key_returns_false_for_unreferenced_pack() {
let chain = make_chain_with(SHA_A, SHA_B);
assert!(!chain_references_pack_key(&chain, None, &format!("packs/{SHA_C}.pack")).unwrap());
assert!(!chain_references_pack_key(&chain, None, &format!("packs/{SHA_C}.idx")).unwrap());
}
#[test]
fn chain_references_pack_key_respects_prefix() {
let chain = make_chain_with(SHA_A, SHA_B);
assert!(
chain_references_pack_key(&chain, Some("repo"), &format!("repo/packs/{SHA_B}.pack"))
.unwrap()
);
assert!(
!chain_references_pack_key(&chain, Some("repo"), &format!("packs/{SHA_B}.pack"))
.unwrap()
);
}
#[test]
fn chain_references_pack_key_returns_false_for_malformed_missing_key() {
let chain = make_chain_with(SHA_A, SHA_B);
assert!(!chain_references_pack_key(&chain, None, "weird/key").unwrap());
assert!(!chain_references_pack_key(&chain, None, "packs/not-a-sha.pack").unwrap());
assert!(!chain_references_pack_key(&chain, None, &format!("packs/{SHA_B}.bin")).unwrap());
assert!(!chain_references_pack_key(&chain, None, "").unwrap());
}
struct EvolvingChainStore {
inner: MockStore,
chain_key: String,
bodies: Vec<Bytes>,
calls: AtomicUsize,
path_index_calls: AtomicUsize,
}
impl EvolvingChainStore {
fn new(inner: MockStore, chain_key: String, bodies: Vec<Bytes>) -> Self {
assert!(!bodies.is_empty(), "must supply at least one chain body");
Self {
inner,
chain_key,
bodies,
calls: AtomicUsize::new(0),
path_index_calls: AtomicUsize::new(0),
}
}
fn chain_calls(&self) -> usize {
self.calls.load(Ordering::SeqCst)
}
fn path_index_calls(&self) -> usize {
self.path_index_calls.load(Ordering::SeqCst)
}
}
crate::delegate_to_inner_impl! {
impl ObjectStore for EvolvingChainStore {
forward: list, get_to_file, get_bytes_range,
put_bytes, put_if_absent,
head, copy, delete;
async fn get_bytes(&self, key: &str) -> Result<Bytes, ObjectStoreError> {
if key == self.chain_key {
let idx = self.calls.fetch_add(1, Ordering::SeqCst);
let pick = idx.min(self.bodies.len() - 1);
return Ok(self.bodies[pick].clone());
}
if key.ends_with("/path-index.json") {
self.path_index_calls.fetch_add(1, Ordering::SeqCst);
}
self.inner.get_bytes(key).await
}
}
}
fn build_one_object_v2_idx(target_sha: &Sha40, pack_offset: u32) -> Vec<u8> {
let oid = sha40_to_object_id(target_sha);
let sha_bytes = oid.as_bytes();
let first_byte = sha_bytes[0];
let mut data = Vec::with_capacity(8 + 256 * 4 + 20 + 4 + 4 + 20 + 20);
data.extend_from_slice(b"\xfftOc");
data.extend_from_slice(&2u32.to_be_bytes());
for i in 0u16..256 {
let count = u32::from(u8::try_from(i).expect("0..256 fits in u8") >= first_byte);
data.extend_from_slice(&count.to_be_bytes());
}
data.extend_from_slice(sha_bytes);
data.extend_from_slice(&0u32.to_be_bytes());
data.extend_from_slice(&pack_offset.to_be_bytes());
data.extend_from_slice(&[0u8; 20]);
data.extend_from_slice(&[0u8; 20]);
data
}
fn chain_json_bytes(tip_hex: &str, pack_sha_hex: &str) -> Bytes {
let json = make_chain_with(tip_hex, pack_sha_hex)
.to_json_pretty()
.expect("chain serialise");
Bytes::from(json)
}
#[tokio::test]
async fn read_with_pack_missing_retries_succeeds_after_chain_reload() {
let inner = MockStore::new();
let cache = PackIndexCache::default();
let p1_sha = sha40(SHA_A);
let p2_sha = sha40(SHA_B);
let blob_payload = b"recovered blob";
let blob_oid_sha = sha40(SHA_C);
let blob_oid = sha40_to_object_id(&blob_oid_sha);
let mut pack = Vec::new();
let mut offsets = Vec::new();
push_pack_entry(
&mut pack,
&mut offsets,
3,
None,
blob_payload,
);
inner.insert(pack_key(None, &p2_sha), Bytes::from(pack.clone()));
let idx_bytes = build_one_object_v2_idx(&blob_oid_sha, 0);
inner.insert(pack_idx_key(None, &p2_sha), Bytes::from(idx_bytes));
let chain_key = chain_key(None, "refs/heads/main");
let v1 = chain_json_bytes(SHA_A, p1_sha.as_str());
let v2 = chain_json_bytes(SHA_A, p2_sha.as_str());
let store = EvolvingChainStore::new(inner, chain_key, vec![v2]);
let initial = ChainManifest::from_json_bytes(&v1).expect("chain v1 parses");
let remote_ref = RefName::new("refs/heads/main").expect("ref name valid");
let resolved = read_with_pack_missing_retries(
&store,
None,
&remote_ref,
"refs/heads/main",
initial,
&blob_oid,
&cache,
)
.await
.expect("retry must succeed after chain reload");
assert_eq!(resolved.payload, blob_payload);
assert_eq!(resolved.kind, ObjectKind::Blob);
assert_eq!(
store.chain_calls(),
1,
"exactly one chain reload should have fired"
);
assert_eq!(
store.path_index_calls(),
0,
"retry path must not reload path-index.json",
);
}
#[tokio::test]
async fn read_with_pack_missing_retries_does_not_reload_path_index() {
let inner = MockStore::new();
let cache = PackIndexCache::default();
let p1_sha = sha40(SHA_A);
let p2_sha = sha40(SHA_B);
let blob_payload = b"recovered blob";
let blob_oid_sha = sha40(SHA_C);
let blob_oid = sha40_to_object_id(&blob_oid_sha);
let mut pack = Vec::new();
let mut offsets = Vec::new();
push_pack_entry(
&mut pack,
&mut offsets,
3,
None,
blob_payload,
);
inner.insert(pack_key(None, &p2_sha), Bytes::from(pack));
let idx_bytes = build_one_object_v2_idx(&blob_oid_sha, 0);
inner.insert(pack_idx_key(None, &p2_sha), Bytes::from(idx_bytes));
inner.insert("refs/heads/main/path-index.json", Bytes::from_static(b"{}"));
let chain_key = chain_key(None, "refs/heads/main");
let v1 = chain_json_bytes(SHA_A, p1_sha.as_str());
let v2 = chain_json_bytes(SHA_A, p2_sha.as_str());
let store = EvolvingChainStore::new(inner, chain_key, vec![v2]);
let initial = ChainManifest::from_json_bytes(&v1).expect("chain v1 parses");
let remote_ref = RefName::new("refs/heads/main").expect("ref name valid");
let resolved = read_with_pack_missing_retries(
&store,
None,
&remote_ref,
"refs/heads/main",
initial,
&blob_oid,
&cache,
)
.await
.expect("retry must succeed");
assert_eq!(resolved.payload, blob_payload);
assert_eq!(store.chain_calls(), 1);
assert_eq!(
store.path_index_calls(),
0,
"retry path read path-index.json {} times; must be zero",
store.path_index_calls(),
);
}
#[tokio::test]
async fn read_with_pack_missing_retries_fails_fast_when_chain_still_references_missing_pack() {
let inner = MockStore::new();
let cache = PackIndexCache::default();
let p1_sha = sha40(SHA_A);
let blob_oid = sha40_to_object_id(&sha40(SHA_C));
let chain_key = chain_key(None, "refs/heads/main");
let body = chain_json_bytes(SHA_A, p1_sha.as_str());
let store = EvolvingChainStore::new(inner, chain_key, vec![body.clone()]);
let initial = ChainManifest::from_json_bytes(&body).expect("chain parses");
let remote_ref = RefName::new("refs/heads/main").expect("ref name valid");
let err = read_with_pack_missing_retries(
&store,
None,
&remote_ref,
"refs/heads/main",
initial,
&blob_oid,
&cache,
)
.await
.expect_err("missing pack still in chain must fail fast");
match err {
PackchainError::PackMissing { key } => {
assert!(
key.contains(&format!("packs/{SHA_A}")),
"PackMissing key should name the missing pack, got {key}",
);
}
other => panic!("expected fail-fast PackMissing, got {other:?}"),
}
assert_eq!(store.chain_calls(), 1);
}
#[tokio::test(start_paused = true)]
async fn read_with_pack_missing_retries_surfaces_exhausted_after_max_retries() {
let inner = MockStore::new();
let cache = PackIndexCache::default();
let blob_oid = sha40_to_object_id(&sha40(SHA_C));
let pack_shas = [
"0000000000000000000000000000000000000000",
"1111111111111111111111111111111111111111",
"2222222222222222222222222222222222222222",
"3333333333333333333333333333333333333333",
"4444444444444444444444444444444444444444",
];
let chain_key = chain_key(None, "refs/heads/main");
let v1 = chain_json_bytes(SHA_A, pack_shas[0]);
let reload_bodies: Vec<Bytes> = pack_shas[1..]
.iter()
.map(|sha| chain_json_bytes(SHA_A, sha))
.collect();
let initial = ChainManifest::from_json_bytes(&v1).expect("chain v1 parses");
let store = EvolvingChainStore::new(inner, chain_key, reload_bodies);
let remote_ref = RefName::new("refs/heads/main").expect("ref name valid");
let err = read_with_pack_missing_retries(
&store,
None,
&remote_ref,
"refs/heads/main",
initial,
&blob_oid,
&cache,
)
.await
.expect_err("exhausted retries must error");
match err {
PackchainError::ConcurrentGcRetriesExhausted {
last_missing_key,
attempts,
} => {
assert_eq!(attempts, PACK_MISSING_MAX_RETRIES);
assert!(
last_missing_key.contains(pack_shas[3]),
"last missing key should name pack[3], got {last_missing_key}"
);
}
other => panic!("expected ConcurrentGcRetriesExhausted, got {other:?}"),
}
assert_eq!(
store.chain_calls(),
usize::try_from(PACK_MISSING_MAX_RETRIES + 1).unwrap()
);
}
#[tokio::test]
async fn read_with_pack_missing_retries_does_not_retry_on_non_pack_missing_errors() {
let inner = MockStore::new();
let cache = PackIndexCache::default();
let p1_sha = sha40(SHA_A);
inner.insert(
pack_idx_key(None, &p1_sha),
Bytes::from_static(b"not a real idx"),
);
let chain_key = chain_key(None, "refs/heads/main");
let body = chain_json_bytes(SHA_A, p1_sha.as_str());
let store = EvolvingChainStore::new(inner, chain_key, vec![body.clone()]);
let initial = ChainManifest::from_json_bytes(&body).expect("chain parses");
let remote_ref = RefName::new("refs/heads/main").expect("ref name valid");
let blob_oid = sha40_to_object_id(&sha40(SHA_C));
let err = read_with_pack_missing_retries(
&store,
None,
&remote_ref,
"refs/heads/main",
initial,
&blob_oid,
&cache,
)
.await
.expect_err("malformed idx must surface immediately");
assert!(
matches!(err, PackchainError::MalformedPackEntry { .. }),
"expected MalformedPackEntry passthrough, got {err:?}"
);
assert_eq!(store.chain_calls(), 0);
}
#[tokio::test]
async fn read_with_pack_missing_retries_surfaces_chain_reload_error() {
use crate::object_store::mock::Fault;
let store = MockStore::new();
let cache = PackIndexCache::default();
let p1_sha = sha40(SHA_A);
let blob_oid = sha40_to_object_id(&sha40(SHA_C));
let chain_key_str = chain_key(None, "refs/heads/main");
let body = chain_json_bytes(SHA_A, p1_sha.as_str());
let initial = ChainManifest::from_json_bytes(&body).expect("chain v1 parses");
let remote_ref = RefName::new("refs/heads/main").expect("ref name valid");
store.arm(Fault::NetworkOnGetBytes { key: chain_key_str });
let err = read_with_pack_missing_retries(
&store,
None,
&remote_ref,
"refs/heads/main",
initial,
&blob_oid,
&cache,
)
.await
.expect_err("chain reload failure must surface as an error");
assert!(
matches!(err, PackchainError::Store(_)),
"expected PackchainError::Store wrapping the chain-reload transport error; \
a regression that swallowed the reload error would yield \
ConcurrentGcRetriesExhausted or the original PackMissing instead. got {err:?}"
);
assert_eq!(
store.pending_faults(),
0,
"armed chain-reload fault must have fired exactly once"
);
}
}