use std::path::Path;
use std::sync::Arc;
use futures::stream::{StreamExt, TryStreamExt};
use tempfile::TempDir;
use time::{Duration, OffsetDateTime};
use tracing::{debug, info, warn};
use crate::git::{self, RefName, Sha};
use crate::keys;
use crate::object_store::{GetOpts, ObjectStore, PutOpts};
use crate::protocol::fetch::MAX_FETCH_CONCURRENCY;
use crate::protocol::push::{acquire_lock, lock_key, release_lock};
use super::PackchainError;
use super::audit::{COMPACT_BYTES_THRESHOLD, COMPACT_SEGMENTS_THRESHOLD};
use super::fetch::install_pack;
use super::gc::write_baseline_tombstone_best_effort;
use super::keys::{pack_idx_key, pack_key, pack_key_from_relative};
use super::manifest::{load_chain, write_chain, write_path_index};
use super::pack::build_baseline_pack;
use super::schema::{ChainManifest, ChainSegment, Sha40};
#[derive(Debug, Clone, Copy)]
pub(crate) struct CompactOpts {
pub(crate) force: bool,
pub(crate) lock_ttl: Duration,
}
#[derive(Debug, Clone)]
pub(crate) struct CompactOutcome {
pub(crate) action: CompactAction,
pub(crate) ref_path: String,
pub(crate) prior_segments: usize,
pub(crate) prior_bytes: u64,
pub(crate) new_pack_sha: Option<String>,
pub(crate) new_pack_bytes: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum CompactAction {
Compacted,
SkippedUnderThreshold,
AlreadyMinimal,
LockContended,
}
pub(crate) async fn compact(
store: Arc<dyn ObjectStore>,
prefix: Option<&str>,
ref_name: &RefName,
opts: CompactOpts,
) -> Result<CompactOutcome, PackchainError> {
let store_ref = store.as_ref();
let prior = load_chain(store_ref, prefix, ref_name)
.await?
.ok_or_else(|| PackchainError::ChainAbsent {
ref_name: ref_name.as_str().to_owned(),
})?;
let prior_segments = prior.segments.len();
let prior_bytes = prior
.segments
.iter()
.map(|s| s.bytes)
.fold(0u64, u64::saturating_add);
if prior_segments == 1 && prior.tip == prior.full_at {
return Ok(CompactOutcome {
action: CompactAction::AlreadyMinimal,
ref_path: ref_name.as_str().to_owned(),
prior_segments,
prior_bytes,
new_pack_sha: None,
new_pack_bytes: 0,
});
}
if !opts.force
&& prior_segments <= COMPACT_SEGMENTS_THRESHOLD
&& prior_bytes <= COMPACT_BYTES_THRESHOLD
{
debug!(
ref_path = %ref_name.as_str(),
segments = prior_segments,
bytes = prior_bytes,
"compact: heuristic did not trigger; skipping",
);
return Ok(CompactOutcome {
action: CompactAction::SkippedUnderThreshold,
ref_path: ref_name.as_str().to_owned(),
prior_segments,
prior_bytes,
new_pack_sha: None,
new_pack_bytes: 0,
});
}
let lock = lock_key(prefix, ref_name);
let now = OffsetDateTime::now_utc();
let Some(guard) = acquire_lock(Arc::clone(&store), &lock, opts.lock_ttl, now)
.await
.map_err(PackchainError::Store)?
else {
return Ok(CompactOutcome {
action: CompactAction::LockContended,
ref_path: ref_name.as_str().to_owned(),
prior_segments,
prior_bytes,
new_pack_sha: None,
new_pack_bytes: 0,
});
};
let result = compact_under_lock(store.as_ref(), prefix, ref_name).await;
let release_result = release_lock(guard).await;
match (&result, release_result) {
(Ok(_), Err(e)) => {
warn!(key = %lock, error = %e, "compact: failed to release per-ref lock");
}
(Err(_), Err(e)) => {
warn!(
key = %lock,
error = %e,
"compact: lock release failed (compact already errored)",
);
}
_ => {}
}
let lock_outcome = result?;
Ok(CompactOutcome {
action: CompactAction::Compacted,
ref_path: ref_name.as_str().to_owned(),
prior_segments,
prior_bytes,
new_pack_sha: Some(lock_outcome.new_pack_sha),
new_pack_bytes: lock_outcome.new_pack_bytes,
})
}
struct CompactUnderLockOutcome {
new_pack_sha: String,
new_pack_bytes: u64,
}
async fn compact_under_lock(
store: &dyn ObjectStore,
prefix: Option<&str>,
ref_name: &RefName,
) -> Result<CompactUnderLockOutcome, PackchainError> {
let chain =
load_chain(store, prefix, ref_name)
.await?
.ok_or_else(|| PackchainError::ChainAbsent {
ref_name: ref_name.as_str().to_owned(),
})?;
let tip_sha = Sha::from_hex(chain.tip.as_str()).expect("Sha40 always parses as a gix Sha");
let scratch = TempDir::new().map_err(PackchainError::Io)?;
let repo_dir = scratch.path().join("repo");
let download_dir = scratch.path().join("downloads");
let output_dir = scratch.path().join("output");
std::fs::create_dir(&repo_dir).map_err(PackchainError::Io)?;
std::fs::create_dir(&download_dir).map_err(PackchainError::Io)?;
std::fs::create_dir(&output_dir).map_err(PackchainError::Io)?;
{
let repo_dir = repo_dir.clone();
tokio::task::spawn_blocking(move || {
gix::init_bare(&repo_dir)
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
})
.await?
.map_err(|e| PackchainError::Io(std::io::Error::other(e.to_string())))?;
}
download_chain_artefacts(store, prefix, ref_name, &chain, &download_dir).await?;
install_chain_into_repo(&repo_dir, &download_dir, &chain).await?;
let new_pack = {
let repo_dir = repo_dir.clone();
let output_dir = output_dir.clone();
tokio::task::spawn_blocking(move || -> Result<_, PackchainError> {
let repo = gix::open(&repo_dir).map_err(crate::git::GitError::from)?;
let peeled = crate::git::peel_tag_chain(&repo, tip_sha).map_err(PackchainError::Git)?;
drop(repo);
build_baseline_pack(&repo_dir, peeled, &output_dir)
})
.await??
};
let path_index = {
let repo_dir = repo_dir.clone();
tokio::task::spawn_blocking(move || -> Result<_, PackchainError> {
let repo = gix::open(&repo_dir).map_err(crate::git::GitError::from)?;
let peeled = crate::git::peel_tag_chain(&repo, tip_sha).map_err(PackchainError::Git)?;
super::git::extract_path_index(&repo, &peeled, tip_sha)
})
.await??
};
let bundle_path = git::bundle_at(&repo_dir, &output_dir, tip_sha, chain.tip.as_str())
.await
.map_err(PackchainError::Git)?;
let new_pack_key = pack_key(prefix, &new_pack.content_sha);
let new_idx_key = pack_idx_key(prefix, &new_pack.content_sha);
let new_bundle_key = keys::bundle_key(prefix, ref_name, chain.tip.as_str());
let prior_full_sha = chain.full_at.clone();
upload_pack_and_idx(
store,
&new_pack_key,
&new_idx_key,
&new_pack.pack_path,
&new_pack.idx_path,
)
.await?;
upload_bundle(store, &new_bundle_key, &bundle_path).await?;
let new_segment = ChainSegment {
sha: chain.tip.clone(),
parent_sha: None,
pack: pack_key(None, &new_pack.content_sha),
bytes: new_pack.pack_bytes,
};
let new_chain = ChainManifest {
v: ChainManifest::SCHEMA_VERSION,
tip: chain.tip.clone(),
full_at: chain.tip.clone(),
segments: vec![new_segment],
};
write_chain(store, prefix, ref_name, &new_chain).await?;
if let Some(ref index) = path_index {
write_path_index(store, prefix, ref_name, index).await?;
}
tombstone_prior_baseline_bundle(store, prefix, ref_name, &prior_full_sha, &chain.tip).await;
info!(
ref_path = %ref_name.as_str(),
prior_segments = chain.segments.len(),
new_pack_sha = %new_pack.content_sha.as_str(),
new_pack_bytes = new_pack.pack_bytes,
"compact: chain rewritten to single segment",
);
Ok(CompactUnderLockOutcome {
new_pack_sha: new_pack.content_sha.as_str().to_owned(),
new_pack_bytes: new_pack.pack_bytes,
})
}
async fn download_chain_artefacts(
store: &dyn ObjectStore,
prefix: Option<&str>,
ref_name: &RefName,
chain: &ChainManifest,
download_dir: &Path,
) -> Result<(), PackchainError> {
let mut tasks: Vec<DownloadTask> = chain
.segments
.iter()
.map(|seg| {
super::keys::segment_pack_sha(seg)?;
Ok::<_, PackchainError>(DownloadTask {
key: pack_key_from_relative(prefix, &seg.pack),
dest: download_dir.join(format!("{}.pack", seg.sha.as_str())),
kind: "pack",
})
})
.collect::<Result<_, _>>()?;
tasks.push(DownloadTask {
key: keys::bundle_key(prefix, ref_name, chain.full_at.as_str()),
dest: download_dir.join(format!("{}.bundle", chain.full_at.as_str())),
kind: "bundle",
});
futures::stream::iter(tasks)
.map(|task| async move { task.run(store).await })
.buffer_unordered(MAX_FETCH_CONCURRENCY)
.try_collect::<Vec<_>>()
.await?;
Ok(())
}
#[derive(Debug, Clone)]
struct DownloadTask {
key: String,
dest: std::path::PathBuf,
kind: &'static str,
}
impl DownloadTask {
async fn run(self, store: &dyn ObjectStore) -> Result<(), PackchainError> {
store
.get_to_file(&self.key, &self.dest, GetOpts::default())
.await
.map_err(PackchainError::Store)?;
debug!(key = %self.key, kind = self.kind, "compact: downloaded");
Ok(())
}
}
async fn install_chain_into_repo(
repo_dir: &Path,
download_dir: &Path,
chain: &ChainManifest,
) -> Result<(), PackchainError> {
let baseline_sha =
Sha::from_hex(chain.full_at.as_str()).expect("Sha40 always parses as a gix Sha");
git::unbundle_at(repo_dir, download_dir, baseline_sha)
.await
.map_err(PackchainError::Git)?;
for segment in chain.segments.iter().rev() {
let pack_path = download_dir.join(format!("{}.pack", segment.sha.as_str()));
let repo_dir = repo_dir.to_path_buf();
tokio::task::spawn_blocking(move || install_pack(&repo_dir, &pack_path))
.await?
.map_err(|e| match e {
crate::protocol::fetch::FetchError::Packchain(p) => p,
crate::protocol::fetch::FetchError::Git(g) => PackchainError::Git(g),
crate::protocol::fetch::FetchError::Io(io) => PackchainError::Io(io),
other => PackchainError::Io(std::io::Error::other(other.to_string())),
})?;
}
Ok(())
}
async fn upload_pack_and_idx(
store: &dyn ObjectStore,
pack_key: &str,
idx_key: &str,
pack_path: &Path,
idx_path: &Path,
) -> Result<(), PackchainError> {
tokio::try_join!(
store.put_path(pack_key, pack_path, PutOpts::default()),
store.put_path(idx_key, idx_path, PutOpts::default()),
)
.map_err(PackchainError::Store)?;
Ok(())
}
async fn upload_bundle(
store: &dyn ObjectStore,
key: &str,
bundle_path: &Path,
) -> Result<(), PackchainError> {
store
.put_path(key, bundle_path, PutOpts::default())
.await
.map_err(PackchainError::Store)?;
Ok(())
}
async fn tombstone_prior_baseline_bundle(
store: &dyn ObjectStore,
prefix: Option<&str>,
ref_name: &RefName,
prior_full_sha: &Sha40,
current_full_sha: &Sha40,
) {
let wrote = write_baseline_tombstone_best_effort(
store,
prefix,
ref_name,
prior_full_sha,
current_full_sha,
"compact",
)
.await;
if wrote {
debug!(
ref_path = %ref_name.as_str(),
prior = %prior_full_sha.as_str(),
"compact: prior baseline bundle tombstoned for gc sweep",
);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::object_store::mock::MockStore;
use crate::packchain::gc::baseline_tombstone_listing_prefix;
use crate::packchain::manifest::write_chain;
use crate::packchain::pack::{build_baseline_pack, build_incremental_pack};
use crate::packchain::schema::{ChainSegment, Sha40};
use bytes::Bytes;
use gix::actor::SignatureRef;
use gix::bstr::BStr;
use gix_hash::ObjectId;
use std::sync::Arc;
use tempfile::TempDir;
fn signature() -> SignatureRef<'static> {
SignatureRef {
name: BStr::new("Tester"),
email: BStr::new("t@example.com"),
time: "0 +0000",
}
}
fn ref_main() -> RefName {
RefName::new("refs/heads/main").unwrap()
}
fn sha40(s: &str) -> Sha40 {
Sha40::try_new(s).unwrap()
}
fn opts(force: bool) -> CompactOpts {
CompactOpts {
force,
lock_ttl: Duration::seconds(60),
}
}
fn fixture_three_commits() -> (TempDir, Sha, Sha, Sha) {
let tmp = TempDir::new().unwrap();
let repo = gix::init(tmp.path()).unwrap();
let mut prev: Option<ObjectId> = None;
let mut shas: Vec<ObjectId> = Vec::new();
for (i, content) in [b"hello-1".as_slice(), b"hello-2", b"hello-3"]
.iter()
.enumerate()
{
let blob = repo.write_blob(content).unwrap().detach();
let tree = repo
.write_object(&gix::objs::Tree {
entries: vec![gix::objs::tree::Entry {
mode: gix::objs::tree::EntryKind::Blob.into(),
filename: format!("f{i}").into(),
oid: blob,
}],
})
.unwrap()
.detach();
let parents: Vec<ObjectId> = prev.into_iter().collect();
let c = repo
.commit_as(
signature(),
signature(),
"refs/heads/main",
format!("commit {i}"),
tree,
parents,
)
.unwrap()
.detach();
shas.push(c);
prev = Some(c);
}
(
tmp,
Sha::from_object_id(shas[0]),
Sha::from_object_id(shas[1]),
Sha::from_object_id(shas[2]),
)
}
fn arc_store(store: &MockStore) -> Arc<dyn ObjectStore> {
Arc::new(store.clone())
}
async fn lay_down_three_segment_chain(
store: &MockStore,
prefix: &str,
) -> (TempDir, ChainManifest, Sha, Sha, Sha) {
let (repo_dir, c1, c2, c3) = fixture_three_commits();
let out_dir = TempDir::new().unwrap();
let baseline = build_baseline_pack(
repo_dir.path(),
crate::git::PeeledTip::Commit {
commit: c1,
tag_chain: Vec::new(),
},
out_dir.path(),
)
.unwrap();
let inc2 = build_incremental_pack(repo_dir.path(), c1, c2, &[], out_dir.path()).unwrap();
let inc3 = build_incremental_pack(repo_dir.path(), c2, c3, &[], out_dir.path()).unwrap();
for built in [&baseline, &inc2, &inc3] {
let pack_key = format!("{}/packs/{}.pack", prefix, built.content_sha.as_str());
let idx_key = format!("{}/packs/{}.idx", prefix, built.content_sha.as_str());
let pack_bytes = std::fs::read(&built.pack_path).unwrap();
let idx_bytes = std::fs::read(&built.idx_path).unwrap();
store.insert(pack_key, Bytes::from(pack_bytes));
store.insert(idx_key, Bytes::from(idx_bytes));
}
let c1_spec = c1.to_string();
let bundle_path = crate::git::bundle_at(repo_dir.path(), out_dir.path(), c1, &c1_spec)
.await
.unwrap();
let bundle_bytes = std::fs::read(&bundle_path).unwrap();
let bundle_key = format!("{prefix}/refs/heads/main/{c1}.bundle");
store.insert(bundle_key, Bytes::from(bundle_bytes));
let chain = ChainManifest {
v: ChainManifest::SCHEMA_VERSION,
tip: sha40(&c3.to_string()),
full_at: sha40(&c1.to_string()),
segments: vec![
ChainSegment {
sha: sha40(&c3.to_string()),
parent_sha: Some(sha40(&c2.to_string())),
pack: format!("packs/{}.pack", inc3.content_sha.as_str()),
bytes: inc3.pack_bytes,
},
ChainSegment {
sha: sha40(&c2.to_string()),
parent_sha: Some(sha40(&c1.to_string())),
pack: format!("packs/{}.pack", inc2.content_sha.as_str()),
bytes: inc2.pack_bytes,
},
ChainSegment {
sha: sha40(&c1.to_string()),
parent_sha: None,
pack: format!("packs/{}.pack", baseline.content_sha.as_str()),
bytes: baseline.pack_bytes,
},
],
};
write_chain(store, Some(prefix), &ref_main(), &chain)
.await
.unwrap();
let path_index = {
let repo = gix::open(repo_dir.path()).unwrap();
let peeled = crate::git::peel_tag_chain(&repo, c3).unwrap();
crate::packchain::git::extract_path_index(&repo, &peeled, c3)
.unwrap()
.expect("commit-tipped fixture must produce a path-index")
};
crate::packchain::manifest::write_path_index(store, Some(prefix), &ref_main(), &path_index)
.await
.unwrap();
(repo_dir, chain, c1, c2, c3)
}
#[tokio::test]
async fn compact_short_circuits_for_single_segment_at_tip() {
let store = MockStore::new();
let chain = ChainManifest {
v: 1,
tip: sha40("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"),
full_at: sha40("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"),
segments: vec![ChainSegment {
sha: sha40("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"),
parent_sha: None,
pack: "packs/1111111111111111111111111111111111111111.pack".into(),
bytes: 100,
}],
};
write_chain(&store, Some("repo"), &ref_main(), &chain)
.await
.unwrap();
let outcome = compact(arc_store(&store), Some("repo"), &ref_main(), opts(true))
.await
.unwrap();
assert_eq!(outcome.action, CompactAction::AlreadyMinimal);
}
#[tokio::test]
async fn compact_skips_when_under_threshold_without_force() {
let store = MockStore::new();
let chain = ChainManifest {
v: 1,
tip: sha40("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"),
full_at: sha40("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"),
segments: vec![
ChainSegment {
sha: sha40("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"),
parent_sha: Some(sha40("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")),
pack: "packs/1111111111111111111111111111111111111111.pack".into(),
bytes: 1024,
},
ChainSegment {
sha: sha40("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"),
parent_sha: None,
pack: "packs/2222222222222222222222222222222222222222.pack".into(),
bytes: 1024,
},
],
};
write_chain(&store, Some("repo"), &ref_main(), &chain)
.await
.unwrap();
let outcome = compact(arc_store(&store), Some("repo"), &ref_main(), opts(false))
.await
.unwrap();
assert_eq!(outcome.action, CompactAction::SkippedUnderThreshold);
assert_eq!(outcome.prior_segments, 2);
}
#[tokio::test]
async fn compact_returns_lock_contended_when_lock_held_recently() {
let store = MockStore::new();
let (_repo, _chain, _c1, _c2, _c3) = lay_down_three_segment_chain(&store, "repo").await;
let lock_key = lock_key(Some("repo"), &ref_main());
store.insert(&lock_key, Bytes::new());
let outcome = compact(arc_store(&store), Some("repo"), &ref_main(), opts(true))
.await
.unwrap();
assert_eq!(outcome.action, CompactAction::LockContended);
assert!(store.contains(&lock_key));
}
#[tokio::test]
async fn compact_chain_absent_returns_chain_absent_error() {
let store = MockStore::new();
let err = compact(arc_store(&store), Some("repo"), &ref_main(), opts(true))
.await
.unwrap_err();
assert!(matches!(err, PackchainError::ChainAbsent { .. }));
}
#[tokio::test]
async fn compact_force_collapses_three_segment_chain_to_one() {
let store = MockStore::new();
let (_repo, prior, c1, _c2, c3) = lay_down_three_segment_chain(&store, "repo").await;
let path_index_key = "repo/refs/heads/main/path-index.json";
store.delete(path_index_key).await.unwrap();
let outcome = compact(arc_store(&store), Some("repo"), &ref_main(), opts(true))
.await
.unwrap();
assert_eq!(outcome.action, CompactAction::Compacted);
assert_eq!(outcome.prior_segments, 3);
let new_chain = crate::packchain::manifest::load_chain(&store, Some("repo"), &ref_main())
.await
.unwrap()
.expect("chain present");
assert_eq!(new_chain.tip, prior.tip);
assert_eq!(
new_chain.full_at, new_chain.tip,
"full_at must equal tip after compact",
);
assert_eq!(new_chain.segments.len(), 1);
assert_eq!(new_chain.segments[0].sha, prior.tip);
assert_eq!(
new_chain.segments[0].parent_sha, None,
"post-compact chain root segment has no parent",
);
let new_pack_sha = outcome
.new_pack_sha
.as_ref()
.expect("Compacted action carries new pack sha");
let pack_key = format!("repo/packs/{new_pack_sha}.pack");
let idx_key = format!("repo/packs/{new_pack_sha}.idx");
assert!(store.contains(&pack_key), "new pack must be uploaded");
assert!(store.contains(&idx_key), "new idx must be uploaded");
let bundle_key = format!("repo/refs/heads/main/{c3}.bundle");
assert!(
store.contains(&bundle_key),
"fresh baseline bundle at the new tip must be uploaded",
);
let prior_bundle_key = format!("repo/refs/heads/main/{c1}.bundle");
assert!(
store.contains(&prior_bundle_key),
"compact must leave the prior baseline bundle in place \
during the gc grace window",
);
let tombstones = store.list("repo/gc/").await.unwrap();
let baseline_tomb_count = tombstones
.iter()
.filter(|m| {
m.key
.starts_with(&baseline_tombstone_listing_prefix(Some("repo")))
})
.count();
assert_eq!(
baseline_tomb_count, 1,
"compact must write exactly one baseline tombstone for the prior full_at",
);
assert!(
store.contains(path_index_key),
"compact must write a fresh path-index.json at the ref",
);
let lock = lock_key(Some("repo"), &ref_main());
assert!(
!store.contains(&lock),
"compact must release the per-ref lock on success",
);
}
#[tokio::test]
async fn second_compact_after_successful_compact_is_already_minimal() {
let store = MockStore::new();
let (_repo, _prior, c1, _c2, _c3) = lay_down_three_segment_chain(&store, "repo").await;
let prior_bundle_key = format!("repo/refs/heads/main/{c1}.bundle");
assert!(store.contains(&prior_bundle_key));
let outcome = compact(arc_store(&store), Some("repo"), &ref_main(), opts(true))
.await
.unwrap();
assert_eq!(outcome.action, CompactAction::Compacted);
assert!(store.contains(&prior_bundle_key));
let second = compact(arc_store(&store), Some("repo"), &ref_main(), opts(true))
.await
.unwrap();
assert_eq!(second.action, CompactAction::AlreadyMinimal);
}
#[tokio::test]
async fn tombstone_prior_baseline_bundle_skips_when_keys_alias() {
let store = MockStore::new();
let sha = sha40("cccccccccccccccccccccccccccccccccccccccc");
let key = keys::bundle_key(Some("repo"), ref_main(), sha.as_str());
store.insert(&key, Bytes::from_static(b"live"));
tombstone_prior_baseline_bundle(&store, Some("repo"), &ref_main(), &sha, &sha).await;
let tombstones = store.list("repo/gc/").await.unwrap();
assert!(
tombstones.iter().all(|m| !m
.key
.starts_with(&baseline_tombstone_listing_prefix(Some("repo")))),
"aliasing keys must not write a baseline tombstone",
);
}
#[tokio::test]
async fn compact_reports_success_when_post_commit_baseline_tombstone_fails() {
let store = MockStore::new();
let (_repo, _prior, c1, _c2, _c3) = lay_down_three_segment_chain(&store, "repo").await;
store.arm(crate::object_store::mock::Fault::NetworkOnPutBytesPrefix {
prefix: baseline_tombstone_listing_prefix(Some("repo")),
});
let prior_bundle_key = format!("repo/refs/heads/main/{c1}.bundle");
let outcome = compact(arc_store(&store), Some("repo"), &ref_main(), opts(true))
.await
.expect("compact must report success when post-commit cleanup fails");
assert_eq!(outcome.action, CompactAction::Compacted);
let new_chain = crate::packchain::manifest::load_chain(&store, Some("repo"), &ref_main())
.await
.unwrap()
.expect("chain present");
assert_eq!(new_chain.segments.len(), 1);
assert_eq!(new_chain.full_at, new_chain.tip);
assert!(
store.contains(&prior_bundle_key),
"put fault must have left the prior baseline bundle in place",
);
let second = compact(arc_store(&store), Some("repo"), &ref_main(), opts(true))
.await
.unwrap();
assert_eq!(second.action, CompactAction::AlreadyMinimal);
let lock = lock_key(Some("repo"), &ref_main());
assert!(
!store.contains(&lock),
"compact must release the per-ref lock even on best-effort cleanup failure",
);
}
#[tokio::test]
async fn compact_round_trips_blobs_via_read_blob() {
let store = MockStore::new();
let (_repo, _prior, _c1, _c2, _c3) = lay_down_three_segment_chain(&store, "repo").await;
let store_arc: Arc<dyn ObjectStore> = Arc::new(store.clone());
let remote = crate::Remote::new_for_test(
store_arc.clone(),
"repo",
crate::url::StorageEngine::Packchain,
);
let cache = crate::packchain::PackIndexCache::new(1024 * 1024);
let pre = crate::packchain::read_blob(&remote, ref_main().as_str(), "f2", &cache)
.await
.expect("read_blob pre-compact");
let outcome = compact(arc_store(&store), Some("repo"), &ref_main(), opts(true))
.await
.unwrap();
assert_eq!(outcome.action, CompactAction::Compacted);
let cache2 = crate::packchain::PackIndexCache::new(1024 * 1024);
let post = crate::packchain::read_blob(&remote, ref_main().as_str(), "f2", &cache2)
.await
.expect("read_blob post-compact");
assert_eq!(pre, post, "blob bytes must round-trip through compact");
}
async fn assert_download_chain_artefacts_rejects(pack: &str) {
let store = MockStore::new();
let download_dir = TempDir::new().unwrap();
let chain = ChainManifest {
v: 1,
tip: sha40("3333333333333333333333333333333333333333"),
full_at: sha40("3333333333333333333333333333333333333333"),
segments: vec![ChainSegment {
sha: sha40("3333333333333333333333333333333333333333"),
parent_sha: None,
pack: pack.to_owned(),
bytes: 1_024,
}],
};
let result = download_chain_artefacts(
&store,
Some("repo"),
&ref_main(),
&chain,
download_dir.path(),
)
.await;
match result {
Err(PackchainError::MalformedPackEntry { offset: 0, reason }) => {
assert!(
reason.contains("is not of the form"),
"reason should describe the expected shape, got: {reason}",
);
if !pack.is_empty() {
assert!(
reason.contains(pack),
"reason should also name the offending key, got: {reason}",
);
}
}
other => panic!("expected MalformedPackEntry for pack `{pack}`, got {other:?}"),
}
}
#[tokio::test]
async fn download_chain_artefacts_rejects_empty_pack_key() {
assert_download_chain_artefacts_rejects("").await;
}
#[tokio::test]
async fn download_chain_artefacts_rejects_unstructured_pack_key() {
assert_download_chain_artefacts_rejects("wrong").await;
}
#[tokio::test]
async fn download_chain_artefacts_rejects_non_hex_pack_key() {
assert_download_chain_artefacts_rejects("packs/notahex.pack").await;
}
}