use std::collections::HashMap;
use std::num::NonZeroU32;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use gix_pack::Find as _;
use tokio::sync::Semaphore;
use tokio::task::JoinSet;
use tracing::{debug, warn};
use crate::git::{self, RefName, Sha};
use crate::keys;
use crate::object_store::{GetOpts, ObjectStore, ObjectStoreError};
use crate::protocol::fetch::{
FetchError, FetchedRefs, MAX_FETCH_CONCURRENCY, ShallowBoundaries, parse_fetch_args,
};
use super::PackchainError;
use super::manifest::load_chain;
use super::retry::{
PACK_MISSING_MAX_RETRIES, PACK_MISSING_RETRY_BACKOFFS, chain_references_pack_key,
};
use super::schema::{ChainManifest, ChainSegment, Sha40};
pub(crate) async fn fetch_batch(
ctx: &super::super::protocol::BatchCtx,
cmds: Vec<String>,
fetched_refs: FetchedRefs,
depth: Option<NonZeroU32>,
) -> Result<(), FetchError> {
if cmds.is_empty() {
return Ok(());
}
debug!(
count = cmds.len(),
depth = ?depth,
"fetching packchain refs"
);
let semaphore = Arc::new(Semaphore::new(MAX_FETCH_CONCURRENCY));
let mut tasks: JoinSet<Result<(), FetchError>> = JoinSet::new();
let prefix = ctx.prefix.clone();
let boundaries = ShallowBoundaries::new();
for cmd in cmds {
let store = Arc::clone(&ctx.store);
let semaphore = Arc::clone(&semaphore);
let prefix = prefix.clone();
let repo_dir = Arc::clone(&ctx.repo_dir);
let fetched_refs = fetched_refs.clone();
let boundaries = boundaries.clone();
tasks.spawn(async move {
let (sha, ref_name) = parse_fetch_args(&cmd)?;
fetch_one(FetchOneCtx {
store,
semaphore,
prefix: prefix.as_deref(),
repo_dir: repo_dir.as_path(),
sha,
ref_name: &ref_name,
fetched_refs: &fetched_refs,
depth,
boundaries: &boundaries,
})
.await
});
}
let mut first_err: Option<FetchError> = None;
while let Some(joined) = tasks.join_next().await {
let res: Result<(), FetchError> = joined.unwrap_or_else(|je| Err(je.into()));
if let Err(err) = res {
if first_err.is_none() {
first_err = Some(err);
} else {
debug!(error = %err, "additional packchain fetch task error (first error already captured)");
}
}
}
if first_err.is_none() && depth.is_some() {
let collected = boundaries.drain();
let repo_dir = ctx.repo_dir.as_path().to_path_buf();
tokio::task::spawn_blocking(move || git::write_shallow_file(&repo_dir, &collected))
.await??;
}
first_err.map_or(Ok(()), Err)
}
struct FetchOneCtx<'a> {
store: Arc<dyn ObjectStore>,
semaphore: Arc<Semaphore>,
prefix: Option<&'a str>,
repo_dir: &'a Path,
sha: Sha,
ref_name: &'a RefName,
fetched_refs: &'a FetchedRefs,
depth: Option<NonZeroU32>,
boundaries: &'a ShallowBoundaries,
}
async fn fetch_one(ctx: FetchOneCtx<'_>) -> Result<(), FetchError> {
let FetchOneCtx {
store,
semaphore,
prefix,
repo_dir,
sha,
ref_name,
fetched_refs,
depth,
boundaries,
} = ctx;
if fetched_refs.contains(&sha) {
debug!(%sha, ref_name = %ref_name, "skipping fetch: already fetched in this session");
} else {
fetch_with_pack_missing_retries(&store, &semaphore, prefix, repo_dir, ref_name, sha, depth)
.await?;
fetched_refs.insert(sha);
}
if let Some(depth) = depth {
let repo_dir = repo_dir.to_path_buf();
let ids = tokio::task::spawn_blocking(move || {
let repo = gix::open(&repo_dir).map_err(crate::git::GitError::from)?;
git::shallow_boundaries(&repo, sha, depth)
})
.await??;
boundaries.extend(ids);
}
Ok(())
}
#[allow(clippy::too_many_arguments)] async fn fetch_with_pack_missing_retries(
store: &Arc<dyn ObjectStore>,
semaphore: &Arc<Semaphore>,
prefix: Option<&str>,
repo_dir: &Path,
ref_name: &RefName,
sha: Sha,
depth: Option<NonZeroU32>,
) -> Result<(), FetchError> {
let mut attempt: u32 = 0;
loop {
let result = fetch_once(store, semaphore, prefix, repo_dir, ref_name, sha, depth).await;
let missing_key = match result {
Ok(()) => return Ok(()),
Err(FetchError::Packchain(PackchainError::PackMissing { key })) => key,
Err(e) => return Err(e),
};
let reloaded = load_chain(store.as_ref(), prefix, ref_name)
.await
.map_err(FetchError::Packchain)?
.ok_or_else(|| {
FetchError::Packchain(PackchainError::ChainAbsent {
ref_name: ref_name.as_str().to_owned(),
})
})?;
if chain_references_pack_key(&reloaded, prefix, &missing_key)
.map_err(FetchError::Packchain)?
{
return Err(FetchError::Packchain(PackchainError::PackMissing {
key: missing_key,
}));
}
if attempt >= PACK_MISSING_MAX_RETRIES {
warn!(
ref_name = %ref_name.as_str(),
last_missing_key = %missing_key,
attempts = attempt,
"fetch: exhausted pack-missing retries against concurrent GC"
);
return Err(FetchError::Packchain(
PackchainError::ConcurrentGcRetriesExhausted {
last_missing_key: missing_key,
attempts: attempt,
},
));
}
debug!(
ref_name = %ref_name.as_str(),
missing_key = %missing_key,
attempt = attempt,
"fetch: PackMissing on chain no longer references the pack — retrying after GC race"
);
tokio::time::sleep(PACK_MISSING_RETRY_BACKOFFS[attempt as usize]).await;
attempt += 1;
}
}
async fn fetch_once(
store: &Arc<dyn ObjectStore>,
semaphore: &Arc<Semaphore>,
prefix: Option<&str>,
repo_dir: &Path,
ref_name: &RefName,
sha: Sha,
depth: Option<NonZeroU32>,
) -> Result<(), FetchError> {
let chain = load_chain(store.as_ref(), prefix, ref_name)
.await
.map_err(FetchError::Packchain)?
.ok_or_else(|| {
FetchError::Packchain(PackchainError::ChainAbsent {
ref_name: ref_name.as_str().to_owned(),
})
})?;
let chain_for_walk = chain.clone();
let repo_dir_owned = repo_dir.to_path_buf();
let (needed, need_baseline) = tokio::task::spawn_blocking(move || {
select_needed_segments(&repo_dir_owned, &chain_for_walk)
})
.await??;
let temp_dir = tempfile::Builder::new()
.prefix("git_remote_object_store_packchain_fetch_")
.tempdir()?;
let baseline_sha = need_baseline.then(|| {
Sha::from_hex(chain.full_at.as_str())
.expect("chain.full_at is a Sha40 — guaranteed 40 lowercase hex bytes")
});
if let Some(depth) = depth {
fetch_shallow(
store.as_ref(),
prefix,
repo_dir,
temp_dir.path(),
ref_name,
sha,
&needed,
baseline_sha,
depth,
)
.await?;
} else {
fetch_full(
store,
semaphore,
prefix,
repo_dir,
temp_dir.path(),
ref_name,
&needed,
baseline_sha,
)
.await?;
}
Ok(())
}
fn select_needed_segments(
repo_dir: &Path,
chain: &ChainManifest,
) -> Result<(Vec<ChainSegment>, bool), FetchError> {
let repo = gix::open(repo_dir).map_err(crate::git::GitError::from)?;
let odb = repo.objects.clone().into_inner();
let mut needed: Vec<ChainSegment> = Vec::new();
for segment in &chain.segments {
let oid = sha40_to_object_id(&segment.sha);
if odb.contains(&oid) {
return Ok((needed, false));
}
needed.push(segment.clone());
}
Ok((needed, true))
}
fn sha40_to_object_id(sha: &Sha40) -> gix_hash::ObjectId {
*Sha::from_hex(sha.as_str())
.expect("Sha40 is 40-lowercase-hex by construction")
.as_object_id()
}
#[allow(clippy::too_many_arguments)] async fn fetch_full(
store: &Arc<dyn ObjectStore>,
semaphore: &Arc<Semaphore>,
prefix: Option<&str>,
repo_dir: &Path,
temp_path: &Path,
ref_name: &RefName,
needed: &[ChainSegment],
baseline_sha: Option<Sha>,
) -> Result<(), FetchError> {
if needed.is_empty() && baseline_sha.is_none() {
debug!(ref_name = %ref_name, "packchain fetch: receiver already up to date");
return Ok(());
}
let downloads = spawn_full_downloads(
store,
semaphore,
prefix,
ref_name,
needed,
baseline_sha,
temp_path,
)?;
let mut drained = drain_full_downloads(downloads, needed.len()).await?;
if let Some((sha, _bundle_path)) = drained.baseline {
git::unbundle_at(repo_dir, temp_path, sha).await?;
}
for segment in needed.iter().rev() {
let pack_path = drained.segments.remove(&segment.sha).ok_or_else(|| {
FetchError::Packchain(PackchainError::PackBuild(
"segment download succeeded but path is missing".to_owned(),
))
})?;
let repo_dir = repo_dir.to_path_buf();
tokio::task::spawn_blocking(move || install_pack(&repo_dir, &pack_path)).await??;
}
Ok(())
}
struct DrainedDownloads {
segments: HashMap<Sha40, PathBuf>,
baseline: Option<(Sha, PathBuf)>,
}
#[allow(clippy::too_many_arguments)]
fn spawn_full_downloads(
store: &Arc<dyn ObjectStore>,
semaphore: &Arc<Semaphore>,
prefix: Option<&str>,
ref_name: &RefName,
needed: &[ChainSegment],
baseline_sha: Option<Sha>,
temp_path: &Path,
) -> Result<JoinSet<Result<DownloadedArtifact, FetchError>>, FetchError> {
let mut downloads: JoinSet<Result<DownloadedArtifact, FetchError>> = JoinSet::new();
for segment in needed {
let content_sha = super::keys::segment_pack_sha(segment).map_err(FetchError::Packchain)?;
let store = Arc::clone(store);
let permit_pool = Arc::clone(semaphore);
let key = super::keys::pack_key_from_relative(prefix, &segment.pack);
let dest = temp_path.join(format!("{}.pack", content_sha.as_str()));
let segment_sha = segment.sha.clone();
downloads.spawn(async move {
let _permit = permit_pool
.acquire_owned()
.await
.expect("fetch semaphore is owned by this batch and never closed");
download_pack(store.as_ref(), &key, &dest).await?;
Ok(DownloadedArtifact::Segment {
sha: segment_sha,
pack_path: dest,
})
});
}
if let Some(baseline_sha) = baseline_sha {
let key = keys::bundle_key(prefix, ref_name, baseline_sha);
let dest = temp_path.join(format!("{baseline_sha}.bundle"));
let store = Arc::clone(store);
let permit_pool = Arc::clone(semaphore);
downloads.spawn(async move {
let _permit = permit_pool
.acquire_owned()
.await
.expect("fetch semaphore is owned by this batch and never closed");
download_baseline(store.as_ref(), &key, &dest).await?;
Ok(DownloadedArtifact::Baseline {
sha: baseline_sha,
bundle_path: dest,
})
});
}
Ok(downloads)
}
async fn drain_full_downloads(
mut downloads: JoinSet<Result<DownloadedArtifact, FetchError>>,
needed_len: usize,
) -> Result<DrainedDownloads, FetchError> {
let mut segments = HashMap::<Sha40, PathBuf>::with_capacity(needed_len);
let mut baseline: Option<(Sha, PathBuf)> = None;
let mut first_err: Option<FetchError> = None;
while let Some(joined) = downloads.join_next().await {
match joined.unwrap_or_else(|je| Err(je.into())) {
Ok(DownloadedArtifact::Segment { sha, pack_path }) => {
segments.insert(sha, pack_path);
}
Ok(DownloadedArtifact::Baseline { sha, bundle_path }) => {
baseline = Some((sha, bundle_path));
}
Err(e) => {
if first_err.is_none() {
first_err = Some(e);
} else {
debug!(error = %e, "additional packchain download error (first error already captured)");
}
}
}
}
if let Some(e) = first_err {
return Err(e);
}
Ok(DrainedDownloads { segments, baseline })
}
#[allow(clippy::too_many_arguments)] async fn fetch_shallow(
store: &dyn ObjectStore,
prefix: Option<&str>,
repo_dir: &Path,
temp_path: &Path,
ref_name: &RefName,
tip_sha: Sha,
needed: &[ChainSegment],
baseline_sha: Option<Sha>,
depth: NonZeroU32,
) -> Result<(), FetchError> {
for segment in needed {
let content_sha = super::keys::segment_pack_sha(segment).map_err(FetchError::Packchain)?;
let key = super::keys::pack_key_from_relative(prefix, &segment.pack);
let dest = temp_path.join(format!("{}.pack", content_sha.as_str()));
download_pack(store, &key, &dest).await?;
let repo_dir_clone = repo_dir.to_path_buf();
let pack_path = dest;
tokio::task::spawn_blocking(move || install_pack(&repo_dir_clone, &pack_path)).await??;
let repo_dir_clone = repo_dir.to_path_buf();
let ids = tokio::task::spawn_blocking(move || {
let repo = gix::open(&repo_dir_clone).map_err(crate::git::GitError::from)?;
git::shallow_boundaries(&repo, tip_sha, depth)
})
.await??;
if !ids.is_empty() {
return Ok(());
}
}
if let Some(baseline_sha) = baseline_sha {
let key = keys::bundle_key(prefix, ref_name, baseline_sha);
let dest = temp_path.join(format!("{baseline_sha}.bundle"));
download_baseline(store, &key, &dest).await?;
git::unbundle_at(repo_dir, temp_path, baseline_sha).await?;
}
Ok(())
}
enum DownloadedArtifact {
Segment { sha: Sha40, pack_path: PathBuf },
Baseline { sha: Sha, bundle_path: PathBuf },
}
async fn download_pack(store: &dyn ObjectStore, key: &str, dest: &Path) -> Result<(), FetchError> {
match store.get_to_file(key, dest, GetOpts::default()).await {
Ok(()) => Ok(()),
Err(ObjectStoreError::NotFound(_)) => {
Err(FetchError::Packchain(PackchainError::PackMissing {
key: key.to_owned(),
}))
}
Err(e) => Err(FetchError::Store(e)),
}
}
async fn download_baseline(
store: &dyn ObjectStore,
key: &str,
dest: &Path,
) -> Result<(), FetchError> {
match store.get_to_file(key, dest, GetOpts::default()).await {
Ok(()) => Ok(()),
Err(ObjectStoreError::NotFound(_)) => {
Err(FetchError::Packchain(PackchainError::BaselineMissing {
key: key.to_owned(),
}))
}
Err(e) => Err(FetchError::Store(e)),
}
}
pub(crate) fn install_pack(repo_dir: &Path, pack_path: &Path) -> Result<(), FetchError> {
use std::fs;
use std::io::BufReader;
let repo = gix::open(repo_dir).map_err(crate::git::GitError::from)?;
let pack_dir = repo.git_dir().join("objects/pack");
fs::create_dir_all(&pack_dir).map_err(FetchError::Io)?;
let pack_file = fs::File::open(pack_path).map_err(FetchError::Io)?;
let mut reader = BufReader::new(pack_file);
let interrupted = AtomicBool::new(false);
let outcome = gix_pack::Bundle::write_to_directory(
&mut reader,
Some(&pack_dir),
&mut gix::progress::Discard,
&interrupted,
Some(repo.objects.clone().into_inner()),
gix_pack::bundle::write::Options {
object_hash: gix_hash::Kind::Sha1,
..Default::default()
},
)
.map_err(|e| FetchError::Packchain(PackchainError::PackIndexWrite(Box::new(e))))?;
if let Some(keep_path) = outcome.keep_path
&& let Err(e) = fs::remove_file(&keep_path)
&& e.kind() != std::io::ErrorKind::NotFound
{
return Err(FetchError::Io(e));
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::object_store::mock::MockStore;
use crate::packchain::keys::chain_key;
use bytes::Bytes;
fn ref_main() -> RefName {
RefName::new("refs/heads/main").expect("ref")
}
#[tokio::test]
async fn select_needed_segments_returns_all_for_empty_repo() {
let repo_dir = tempfile::tempdir().unwrap();
gix::init(repo_dir.path()).unwrap();
let chain = ChainManifest {
v: 1,
tip: Sha40::try_new("0000000000000000000000000000000000000001").unwrap(),
full_at: Sha40::try_new("0000000000000000000000000000000000000001").unwrap(),
segments: vec![ChainSegment {
sha: Sha40::try_new("0000000000000000000000000000000000000001").unwrap(),
parent_sha: None,
pack: "packs/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.pack".to_owned(),
bytes: 1_024,
}],
};
let (needed, need_baseline) =
select_needed_segments(repo_dir.path(), &chain).expect("walk");
assert_eq!(needed.len(), 1);
assert!(need_baseline);
}
#[tokio::test]
async fn select_needed_segments_stops_at_first_known_ancestor() {
use gix::actor::SignatureRef;
use gix::bstr::BStr;
use gix_hash::ObjectId;
let repo_dir = tempfile::tempdir().unwrap();
let repo = gix::init(repo_dir.path()).unwrap();
let signature = SignatureRef {
name: BStr::new("Tester"),
email: BStr::new("t@example.com"),
time: "0 +0000",
};
let blob = repo.write_blob(b"v1").unwrap().detach();
let tree = repo
.write_object(&gix::objs::Tree {
entries: vec![gix::objs::tree::Entry {
mode: gix::objs::tree::EntryKind::Blob.into(),
filename: "a.txt".into(),
oid: blob,
}],
})
.unwrap()
.detach();
let c1_oid = repo
.commit_as(
signature,
signature,
"refs/heads/main",
"first",
tree,
std::iter::empty::<ObjectId>(),
)
.unwrap()
.detach();
let c1_sha40 = Sha40::from_oid(&c1_oid).unwrap();
let newer = Sha40::try_new("ffffffffffffffffffffffffffffffffffffffff").unwrap();
let chain = ChainManifest {
v: 1,
tip: newer.clone(),
full_at: c1_sha40.clone(),
segments: vec![
ChainSegment {
sha: newer.clone(),
parent_sha: Some(c1_sha40.clone()),
pack: "packs/0000000000000000000000000000000000000001.pack".to_owned(),
bytes: 1_024,
},
ChainSegment {
sha: c1_sha40.clone(),
parent_sha: None,
pack: "packs/0000000000000000000000000000000000000002.pack".to_owned(),
bytes: 2_048,
},
],
};
let (needed, need_baseline) =
select_needed_segments(repo_dir.path(), &chain).expect("walk");
assert_eq!(
needed.len(),
1,
"walk must stop at the known ancestor; only segments[0] is needed",
);
assert_eq!(needed[0].sha, newer);
assert!(
!need_baseline,
"baseline is NOT needed when the walk found a known ancestor mid-chain",
);
}
#[tokio::test]
async fn fetch_returns_chain_absent_when_chain_missing() {
let repo_dir = tempfile::tempdir().unwrap();
gix::init(repo_dir.path()).unwrap();
let store: Arc<dyn ObjectStore> = Arc::new(MockStore::new());
let semaphore = Arc::new(Semaphore::new(MAX_FETCH_CONCURRENCY));
let fetched_refs = FetchedRefs::new();
let boundaries = ShallowBoundaries::new();
let ref_name = ref_main();
let bogus_sha = Sha::from_hex("0000000000000000000000000000000000000001").unwrap();
let result = fetch_one(FetchOneCtx {
store,
semaphore,
prefix: Some("repo"),
repo_dir: repo_dir.path(),
sha: bogus_sha,
ref_name: &ref_name,
fetched_refs: &fetched_refs,
depth: None,
boundaries: &boundaries,
})
.await;
match result {
Err(FetchError::Packchain(PackchainError::ChainAbsent { ref_name: r })) => {
assert_eq!(r, "refs/heads/main");
}
other => panic!("expected ChainAbsent, got {other:?}"),
}
}
fn segment_with_pack(pack: &str) -> ChainSegment {
ChainSegment {
sha: Sha40::try_new("2222222222222222222222222222222222222222").unwrap(),
parent_sha: None,
pack: pack.to_owned(),
bytes: 1_024,
}
}
async fn assert_fetch_full_rejects(pack: &str) {
let repo_dir = tempfile::tempdir().unwrap();
gix::init(repo_dir.path()).unwrap();
let temp_dir = tempfile::tempdir().unwrap();
let store: Arc<dyn ObjectStore> = Arc::new(MockStore::new());
let semaphore = Arc::new(Semaphore::new(MAX_FETCH_CONCURRENCY));
let needed = vec![segment_with_pack(pack)];
let result = fetch_full(
&store,
&semaphore,
Some("repo"),
repo_dir.path(),
temp_dir.path(),
&ref_main(),
&needed,
None,
)
.await;
match result {
Err(FetchError::Packchain(PackchainError::MalformedPackEntry {
offset: 0,
reason,
})) => {
assert!(
reason.contains("is not of the form"),
"reason should describe the expected shape, got: {reason}",
);
if !pack.is_empty() {
assert!(
reason.contains(pack),
"reason should also name the offending key, got: {reason}",
);
}
}
other => panic!("expected MalformedPackEntry for pack `{pack}`, got {other:?}"),
}
}
async fn assert_fetch_shallow_rejects(pack: &str) {
let repo_dir = tempfile::tempdir().unwrap();
gix::init(repo_dir.path()).unwrap();
let temp_dir = tempfile::tempdir().unwrap();
let store = MockStore::new();
let needed = vec![segment_with_pack(pack)];
let tip_sha = Sha::from_hex("2222222222222222222222222222222222222222").unwrap();
let depth = NonZeroU32::new(1).unwrap();
let result = fetch_shallow(
&store,
Some("repo"),
repo_dir.path(),
temp_dir.path(),
&ref_main(),
tip_sha,
&needed,
None,
depth,
)
.await;
match result {
Err(FetchError::Packchain(PackchainError::MalformedPackEntry {
offset: 0,
reason,
})) => {
assert!(
reason.contains("is not of the form"),
"reason should describe the expected shape, got: {reason}",
);
if !pack.is_empty() {
assert!(
reason.contains(pack),
"reason should also name the offending key, got: {reason}",
);
}
}
other => panic!("expected MalformedPackEntry for pack `{pack}`, got {other:?}"),
}
}
#[tokio::test]
async fn fetch_full_rejects_empty_pack_key() {
assert_fetch_full_rejects("").await;
}
#[tokio::test]
async fn fetch_full_rejects_unstructured_pack_key() {
assert_fetch_full_rejects("wrong").await;
}
#[tokio::test]
async fn fetch_full_rejects_non_hex_pack_key() {
assert_fetch_full_rejects("packs/notahex.pack").await;
}
#[tokio::test]
async fn fetch_shallow_rejects_empty_pack_key() {
assert_fetch_shallow_rejects("").await;
}
#[tokio::test]
async fn fetch_shallow_rejects_unstructured_pack_key() {
assert_fetch_shallow_rejects("wrong").await;
}
#[tokio::test]
async fn fetch_shallow_rejects_non_hex_pack_key() {
assert_fetch_shallow_rejects("packs/notahex.pack").await;
}
struct EvolvingChainStore {
inner: MockStore,
chain_key: String,
bodies: std::sync::Mutex<Vec<Bytes>>,
calls: std::sync::atomic::AtomicUsize,
}
impl EvolvingChainStore {
fn new(inner: MockStore, chain_key: String, bodies: Vec<Bytes>) -> Self {
assert!(!bodies.is_empty(), "must supply at least one chain body");
Self {
inner,
chain_key,
bodies: std::sync::Mutex::new(bodies),
calls: std::sync::atomic::AtomicUsize::new(0),
}
}
fn chain_calls(&self) -> usize {
self.calls.load(std::sync::atomic::Ordering::SeqCst)
}
}
crate::delegate_to_inner_impl! {
impl ObjectStore for EvolvingChainStore {
forward: list, get_to_file, get_bytes_range,
put_bytes, put_path, put_if_absent,
head, copy, delete;
async fn get_bytes(&self, key: &str) -> Result<Bytes, ObjectStoreError> {
if key == self.chain_key {
let idx = self.calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let guard = self.bodies.lock().unwrap();
let pick = idx.min(guard.len() - 1);
return Ok(guard[pick].clone());
}
self.inner.get_bytes(key).await
}
}
}
struct VanishingChainStore {
inner: MockStore,
chain_key: String,
initial: Bytes,
calls: std::sync::atomic::AtomicUsize,
}
crate::delegate_to_inner_impl! {
impl ObjectStore for VanishingChainStore {
forward: list, get_to_file, get_bytes_range,
put_bytes, put_path, put_if_absent,
head, copy, delete;
async fn get_bytes(&self, key: &str) -> Result<Bytes, ObjectStoreError> {
if key == self.chain_key {
let idx = self.calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
if idx == 0 {
return Ok(self.initial.clone());
}
return Err(ObjectStoreError::NotFound(key.to_owned()));
}
self.inner.get_bytes(key).await
}
}
}
fn one_segment_chain(tip_hex: &str, pack_sha_hex: &str) -> ChainManifest {
ChainManifest {
v: 1,
tip: Sha40::try_new(tip_hex).unwrap(),
full_at: Sha40::try_new(tip_hex).unwrap(),
segments: vec![ChainSegment {
sha: Sha40::try_new(tip_hex).unwrap(),
parent_sha: None,
pack: format!("packs/{pack_sha_hex}.pack"),
bytes: 1_024,
}],
}
}
fn seed_commit(repo_dir: &Path) -> Sha40 {
use gix::actor::SignatureRef;
use gix::bstr::BStr;
use gix_hash::ObjectId;
let repo = gix::open(repo_dir).unwrap();
let signature = SignatureRef {
name: BStr::new("Tester"),
email: BStr::new("t@example.com"),
time: "0 +0000",
};
let blob = repo.write_blob(b"hello").unwrap().detach();
let tree = repo
.write_object(&gix::objs::Tree {
entries: vec![gix::objs::tree::Entry {
mode: gix::objs::tree::EntryKind::Blob.into(),
filename: "f".into(),
oid: blob,
}],
})
.unwrap()
.detach();
let oid = repo
.commit_as(
signature,
signature,
"refs/heads/main",
"seed",
tree,
std::iter::empty::<ObjectId>(),
)
.unwrap()
.detach();
Sha40::from_oid(&oid).unwrap()
}
fn chain_with_known_segment(
tip: &Sha40,
segment_sha: &Sha40,
pack_sha_hex: &str,
) -> ChainManifest {
ChainManifest {
v: 1,
tip: tip.clone(),
full_at: segment_sha.clone(),
segments: vec![ChainSegment {
sha: segment_sha.clone(),
parent_sha: None,
pack: format!("packs/{pack_sha_hex}.pack"),
bytes: 1_024,
}],
}
}
#[tokio::test(start_paused = true)]
async fn fetch_retries_after_pack_missing_when_reload_drops_segment() {
let repo_dir = tempfile::tempdir().unwrap();
gix::init(repo_dir.path()).unwrap();
let known_sha = seed_commit(repo_dir.path());
let initial_pack = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
let reload_pack = "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb";
let tip = "1111111111111111111111111111111111111111";
let inner = MockStore::new();
let reload_chain =
chain_with_known_segment(&Sha40::try_new(tip).unwrap(), &known_sha, reload_pack);
let initial_bytes = Bytes::from(
one_segment_chain(tip, initial_pack)
.to_json_pretty()
.unwrap(),
);
let reload_bytes = Bytes::from(reload_chain.to_json_pretty().unwrap());
let key = chain_key(Some("repo"), ref_main());
inner.insert(&key, initial_bytes.clone());
let evolving = Arc::new(EvolvingChainStore::new(
inner,
key,
vec![initial_bytes, reload_bytes],
));
let store: Arc<dyn ObjectStore> = Arc::clone(&evolving) as _;
let semaphore = Arc::new(Semaphore::new(MAX_FETCH_CONCURRENCY));
let fetched_refs = FetchedRefs::new();
let boundaries = ShallowBoundaries::new();
let ref_name = ref_main();
let tip_sha = Sha::from_hex(tip).unwrap();
let result = fetch_one(FetchOneCtx {
store,
semaphore,
prefix: Some("repo"),
repo_dir: repo_dir.path(),
sha: tip_sha,
ref_name: &ref_name,
fetched_refs: &fetched_refs,
depth: None,
boundaries: &boundaries,
})
.await;
result.expect("retry must succeed once reload drops the missing-pack reference");
assert_eq!(
evolving.chain_calls(),
3,
"retry must perform the full reload-verify-retry sequence",
);
}
#[tokio::test(start_paused = true)]
async fn fetch_surfaces_exhausted_after_max_retries() {
let repo_dir = tempfile::tempdir().unwrap();
gix::init(repo_dir.path()).unwrap();
let pack_shas: [String; 8] = std::array::from_fn(|i| {
let nibble = char::from_digit(u32::try_from(i).unwrap(), 16).unwrap();
std::iter::repeat_n(nibble, 40).collect()
});
let tip = "ffffffffffffffffffffffffffffffffffffffff";
let bodies: Vec<Bytes> = pack_shas
.iter()
.map(|p| Bytes::from(one_segment_chain(tip, p).to_json_pretty().unwrap()))
.collect();
let inner = MockStore::new();
let key = chain_key(Some("repo"), ref_main());
inner.insert(&key, bodies[0].clone());
let store: Arc<dyn ObjectStore> = Arc::new(EvolvingChainStore::new(inner, key, bodies));
let semaphore = Arc::new(Semaphore::new(MAX_FETCH_CONCURRENCY));
let fetched_refs = FetchedRefs::new();
let boundaries = ShallowBoundaries::new();
let ref_name = ref_main();
let tip_sha = Sha::from_hex(tip).unwrap();
let result = fetch_one(FetchOneCtx {
store,
semaphore,
prefix: Some("repo"),
repo_dir: repo_dir.path(),
sha: tip_sha,
ref_name: &ref_name,
fetched_refs: &fetched_refs,
depth: None,
boundaries: &boundaries,
})
.await;
match result {
Err(FetchError::Packchain(PackchainError::ConcurrentGcRetriesExhausted {
last_missing_key,
attempts,
})) => {
assert_eq!(attempts, PACK_MISSING_MAX_RETRIES);
assert!(
last_missing_key.contains(&pack_shas[6]),
"last missing key should name pack_shas[6], got {last_missing_key}"
);
}
other => panic!("expected ConcurrentGcRetriesExhausted, got {other:?}"),
}
}
#[tokio::test(start_paused = true)]
async fn fetch_surfaces_chain_absent_when_chain_vanishes_mid_fetch() {
let repo_dir = tempfile::tempdir().unwrap();
gix::init(repo_dir.path()).unwrap();
let pack_sha = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
let tip = "1111111111111111111111111111111111111111";
let inner = MockStore::new();
let initial = Bytes::from(one_segment_chain(tip, pack_sha).to_json_pretty().unwrap());
let key = chain_key(Some("repo"), ref_main());
inner.insert(&key, initial.clone());
let store: Arc<dyn ObjectStore> = Arc::new(VanishingChainStore {
inner,
chain_key: key,
initial,
calls: std::sync::atomic::AtomicUsize::new(0),
});
let semaphore = Arc::new(Semaphore::new(MAX_FETCH_CONCURRENCY));
let fetched_refs = FetchedRefs::new();
let boundaries = ShallowBoundaries::new();
let ref_name = ref_main();
let tip_sha = Sha::from_hex(tip).unwrap();
let result = fetch_one(FetchOneCtx {
store,
semaphore,
prefix: Some("repo"),
repo_dir: repo_dir.path(),
sha: tip_sha,
ref_name: &ref_name,
fetched_refs: &fetched_refs,
depth: None,
boundaries: &boundaries,
})
.await;
match result {
Err(FetchError::Packchain(PackchainError::ChainAbsent { ref_name: r })) => {
assert_eq!(r, "refs/heads/main");
}
other => panic!("expected ChainAbsent, got {other:?}"),
}
}
#[tokio::test(start_paused = true)]
async fn fetch_shallow_retries_after_pack_missing_when_reload_drops_segment() {
let repo_dir = tempfile::tempdir().unwrap();
gix::init(repo_dir.path()).unwrap();
let known_sha = seed_commit(repo_dir.path());
let initial_pack = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
let reload_pack = "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb";
let tip = "1111111111111111111111111111111111111111";
let inner = MockStore::new();
let initial_bytes = Bytes::from(
one_segment_chain(tip, initial_pack)
.to_json_pretty()
.unwrap(),
);
let reload_bytes = Bytes::from(
chain_with_known_segment(&Sha40::try_new(tip).unwrap(), &known_sha, reload_pack)
.to_json_pretty()
.unwrap(),
);
let key = chain_key(Some("repo"), ref_main());
inner.insert(&key, initial_bytes.clone());
let evolving = Arc::new(EvolvingChainStore::new(
inner,
key,
vec![initial_bytes, reload_bytes],
));
let store: Arc<dyn ObjectStore> = evolving.clone();
let semaphore = Arc::new(Semaphore::new(MAX_FETCH_CONCURRENCY));
let fetched_refs = FetchedRefs::new();
let boundaries = ShallowBoundaries::new();
let ref_name = ref_main();
let tip_sha = Sha::from_hex(known_sha.as_str()).unwrap();
let depth = NonZeroU32::new(1);
let result = fetch_one(FetchOneCtx {
store,
semaphore,
prefix: Some("repo"),
repo_dir: repo_dir.path(),
sha: tip_sha,
ref_name: &ref_name,
fetched_refs: &fetched_refs,
depth,
boundaries: &boundaries,
})
.await;
result.expect("shallow retry must succeed once reload drops the missing-pack reference");
assert_eq!(
evolving.chain_calls(),
3,
"shallow retry must perform the full reload-verify-retry sequence",
);
}
#[tokio::test]
async fn fetch_surfaces_pack_missing_when_chain_references_absent_pack() {
let repo_dir = tempfile::tempdir().unwrap();
gix::init(repo_dir.path()).unwrap();
let store_inner = MockStore::new();
let chain = ChainManifest {
v: 1,
tip: Sha40::try_new("1111111111111111111111111111111111111111").unwrap(),
full_at: Sha40::try_new("1111111111111111111111111111111111111111").unwrap(),
segments: vec![ChainSegment {
sha: Sha40::try_new("1111111111111111111111111111111111111111").unwrap(),
parent_sha: None,
pack: "packs/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.pack".to_owned(),
bytes: 1_024,
}],
};
store_inner.insert(
chain_key(Some("repo"), ref_main()),
Bytes::from(chain.to_json_pretty().unwrap()),
);
let store: Arc<dyn ObjectStore> = Arc::new(store_inner);
let semaphore = Arc::new(Semaphore::new(MAX_FETCH_CONCURRENCY));
let fetched_refs = FetchedRefs::new();
let boundaries = ShallowBoundaries::new();
let ref_name = ref_main();
let tip_sha = Sha::from_hex("1111111111111111111111111111111111111111").unwrap();
let result = fetch_one(FetchOneCtx {
store,
semaphore,
prefix: Some("repo"),
repo_dir: repo_dir.path(),
sha: tip_sha,
ref_name: &ref_name,
fetched_refs: &fetched_refs,
depth: None,
boundaries: &boundaries,
})
.await;
match result {
Err(FetchError::Packchain(PackchainError::PackMissing { key })) => {
assert!(
key.contains("packs/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.pack"),
"key should name the absent pack, got: {key}",
);
}
other => panic!("expected PackMissing, got {other:?}"),
}
}
}