use std::collections::HashSet;
use bytes::Bytes;
use futures::stream::{StreamExt, TryStreamExt};
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use time::format_description::well_known::Rfc3339;
use tracing::{debug, error, info, warn};
use uuid::Uuid;
use crate::git::RefName;
use crate::keys;
use crate::object_store::{ObjectStore, ObjectStoreError, PutOpts};
use crate::protocol::fetch::MAX_FETCH_CONCURRENCY;
use super::PackchainError;
use super::manifest::load_chain;
use super::schema::{ChainManifest, Sha40};
pub const DEFAULT_GRACE_HOURS: u64 = 24;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum GraceDecision {
Within,
Past,
}
fn rfc3339_now() -> Result<String, PackchainError> {
OffsetDateTime::now_utc().format(&Rfc3339).map_err(|e| {
PackchainError::Io(std::io::Error::other(format!("rfc3339 format failed: {e}")))
})
}
fn check_grace_window(
marked_at: &str,
grace_hours: u64,
kind: &'static str,
) -> Result<GraceDecision, PackchainError> {
let marked_at_ts = OffsetDateTime::parse(marked_at, &Rfc3339).map_err(|e| {
PackchainError::Io(std::io::Error::other(format!(
"{kind} marked_at parse failed: {e}"
)))
})?;
let age_hours = (OffsetDateTime::now_utc() - marked_at_ts).whole_hours();
let within = age_hours
.try_into()
.map_or(true, |hours: u64| hours < grace_hours);
Ok(if within {
GraceDecision::Within
} else {
GraceDecision::Past
})
}
pub(crate) async fn write_baseline_tombstone_best_effort(
store: &dyn ObjectStore,
prefix: Option<&str>,
ref_name: &RefName,
prior_full_sha: &Sha40,
current_full_sha: &Sha40,
source: &'static str,
) -> bool {
match write_baseline_tombstone(store, prefix, ref_name, prior_full_sha, current_full_sha).await
{
Ok(()) => prior_full_sha != current_full_sha,
Err(e) => {
let orphan_key = keys::bundle_key(prefix, ref_name.as_str(), prior_full_sha.as_str());
warn!(
source,
ref_path = %ref_name.as_str(),
key = %orphan_key,
error = %e,
"baseline tombstone write failed (chain.json already committed); \
orphan bundle left for manual cleanup",
);
false
}
}
}
pub(crate) const ENV_GC_GRACE_HOURS: &str = "GIT_REMOTE_OBJECT_STORE_GC_GRACE_HOURS";
pub const TOMBSTONE_SCHEMA_VERSION: u32 = 1;
fn check_tombstone_schema_version(found: u32) -> Result<(), PackchainError> {
if found == TOMBSTONE_SCHEMA_VERSION {
Ok(())
} else {
Err(PackchainError::UnsupportedSchemaVersion {
found,
expected: TOMBSTONE_SCHEMA_VERSION,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct Tombstone {
pub(crate) v: u32,
pub(crate) run_id: String,
pub(crate) marked_at: String,
pub(crate) orphan_packs: Vec<Sha40>,
}
impl Tombstone {
pub(crate) fn from_json_bytes(bytes: &[u8]) -> Result<Self, PackchainError> {
let parsed: Self = serde_json::from_slice(bytes)?;
check_tombstone_schema_version(parsed.v)?;
Ok(parsed)
}
pub(crate) fn to_json_pretty(&self) -> Result<Vec<u8>, PackchainError> {
Ok(serde_json::to_vec_pretty(self)?)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct BaselineTombstone {
pub(crate) v: u32,
pub(crate) marked_at: String,
pub(crate) ref_name: String,
pub(crate) sha: Sha40,
}
impl BaselineTombstone {
pub(crate) fn from_json_bytes(bytes: &[u8]) -> Result<Self, PackchainError> {
let parsed: Self = serde_json::from_slice(bytes)?;
check_tombstone_schema_version(parsed.v)?;
Ok(parsed)
}
pub(crate) fn to_json_pretty(&self) -> Result<Vec<u8>, PackchainError> {
Ok(serde_json::to_vec_pretty(self)?)
}
}
pub(crate) async fn write_baseline_tombstone(
store: &dyn ObjectStore,
prefix: Option<&str>,
ref_name: &RefName,
prior_full_sha: &Sha40,
current_full_sha: &Sha40,
) -> Result<(), PackchainError> {
if prior_full_sha == current_full_sha {
return Ok(());
}
write_baseline_tombstone_unconditional(store, prefix, ref_name, prior_full_sha).await
}
pub(crate) async fn write_baseline_tombstone_for_orphan(
store: &dyn ObjectStore,
prefix: Option<&str>,
ref_name: &RefName,
orphan_sha: &Sha40,
) -> Result<(), PackchainError> {
write_baseline_tombstone_unconditional(store, prefix, ref_name, orphan_sha).await
}
pub(crate) async fn try_write_baseline_tombstone(
store: &dyn ObjectStore,
prefix: Option<&str>,
remote_ref: &RefName,
fresh: &[crate::object_store::ObjectMeta],
log_context: &'static str,
) -> Option<String> {
let chain = match load_chain(store, prefix, remote_ref).await {
Ok(Some(chain)) => chain,
Ok(None) => return None,
Err(err) => {
warn!(
source = log_context,
ref_path = %remote_ref.as_str(),
error = %err,
"chain.json read/parse failed; falling back to synchronous bundle delete",
);
return None;
}
};
let bundle_key = keys::bundle_key(prefix, remote_ref.as_str(), chain.full_at.as_str());
if !fresh.iter().any(|m| m.key == bundle_key) {
return None;
}
match write_baseline_tombstone_for_orphan(store, prefix, remote_ref, &chain.full_at).await {
Ok(()) => Some(bundle_key),
Err(err) => {
warn!(
source = log_context,
ref_path = %remote_ref.as_str(),
key = %bundle_key,
error = %err,
"baseline tombstone write failed; falling back to synchronous bundle delete",
);
None
}
}
}
pub(crate) async fn tombstoned_bundle_keys(
store: &dyn ObjectStore,
prefix: Option<&str>,
) -> Result<HashSet<String>, ObjectStoreError> {
let prefix_str = prefix.unwrap_or("");
let gc_listing = gc_listing_prefix(prefix_str);
let metas = match store.list(&gc_listing).await {
Ok(m) => m,
Err(ObjectStoreError::NotFound(_)) => return Ok(HashSet::new()),
Err(e) => return Err(e),
};
let mut keys = HashSet::new();
for meta in metas {
if !is_baseline_tombstone_key(&meta.key, prefix_str) {
continue;
}
let body = match store.get_bytes(&meta.key).await {
Ok(b) => b,
Err(ObjectStoreError::NotFound(_)) => continue, Err(e) => return Err(e),
};
let tombstone = match BaselineTombstone::from_json_bytes(&body) {
Ok(t) => t,
Err(e) => {
warn!(
key = %meta.key,
error = %e,
"tombstoned_bundle_keys: skipping unparseable baseline tombstone",
);
continue;
}
};
let Ok(ref_name) = RefName::new(tombstone.ref_name.clone()) else {
warn!(
key = %meta.key,
ref_name = %tombstone.ref_name,
"tombstoned_bundle_keys: skipping tombstone with invalid ref_name",
);
continue;
};
keys.insert(keys::bundle_key(prefix, &ref_name, tombstone.sha.as_str()));
}
Ok(keys)
}
async fn write_baseline_tombstone_unconditional(
store: &dyn ObjectStore,
prefix: Option<&str>,
ref_name: &RefName,
orphan_sha: &Sha40,
) -> Result<(), PackchainError> {
let marked_at = rfc3339_now()?;
let tombstone = BaselineTombstone {
v: TOMBSTONE_SCHEMA_VERSION,
marked_at,
ref_name: ref_name.as_str().to_owned(),
sha: orphan_sha.clone(),
};
let key = baseline_tombstone_key(prefix.unwrap_or(""), &Uuid::new_v4().to_string());
let body = Bytes::from(tombstone.to_json_pretty()?);
store.put_bytes(&key, body, PutOpts::default()).await?;
debug!(
key = %key,
ref_path = %ref_name.as_str(),
sha = %orphan_sha.as_str(),
"gc: baseline tombstone written",
);
Ok(())
}
#[derive(Debug, Clone)]
pub struct MarkOutcome {
pub run_id: String,
pub orphan_count: usize,
pub tombstone_key: String,
}
#[derive(Debug, Clone, Default)]
pub struct SweepOutcome {
pub swept_tombstones: usize,
pub deferred_tombstones: usize,
pub deleted_objects: usize,
pub skipped_repointed_packs: usize,
}
#[derive(Debug, Clone, Copy, Default)]
pub struct MarkOpts {
pub dry_run: bool,
}
#[derive(Debug, Clone, Copy)]
pub struct SweepOpts {
pub grace_hours: u64,
pub force: bool,
}
impl Default for SweepOpts {
fn default() -> Self {
Self {
grace_hours: DEFAULT_GRACE_HOURS,
force: false,
}
}
}
#[must_use]
pub(crate) fn grace_hours_from_env() -> u64 {
std::env::var(ENV_GC_GRACE_HOURS)
.ok()
.and_then(|v| v.parse::<u64>().ok())
.filter(|h| *h > 0)
.unwrap_or(DEFAULT_GRACE_HOURS)
}
#[must_use]
pub(crate) fn resolve_grace_hours(opt: Option<u64>) -> u64 {
opt.unwrap_or_else(grace_hours_from_env)
}
pub async fn mark(
store: &dyn ObjectStore,
prefix: &str,
opts: MarkOpts,
) -> Result<MarkOutcome, PackchainError> {
let on_bucket = list_pack_shas(store, prefix).await?;
let referenced = list_referenced_packs(store, prefix).await?;
let orphans: Vec<Sha40> = on_bucket
.into_iter()
.filter(|sha| !referenced.contains(sha))
.collect();
let run_id = Uuid::new_v4().to_string();
let marked_at = rfc3339_now()?;
let tombstone_key = tombstone_key(prefix, &run_id, &marked_at);
let orphan_count = orphans.len();
let tombstone = Tombstone {
v: TOMBSTONE_SCHEMA_VERSION,
run_id: run_id.clone(),
marked_at,
orphan_packs: orphans,
};
let outcome = MarkOutcome {
run_id,
orphan_count,
tombstone_key,
};
if opts.dry_run {
debug!(
run_id = %outcome.run_id,
orphans = outcome.orphan_count,
"gc mark: dry-run, not writing tombstone",
);
return Ok(outcome);
}
if outcome.orphan_count == 0 {
info!(run_id = %outcome.run_id, "gc mark: no orphans; skipping tombstone");
return Ok(outcome);
}
let body = Bytes::from(tombstone.to_json_pretty()?);
store
.put_bytes(&outcome.tombstone_key, body, PutOpts::default())
.await?;
info!(
run_id = %outcome.run_id,
orphans = outcome.orphan_count,
key = %outcome.tombstone_key,
"gc mark: tombstone written",
);
Ok(outcome)
}
pub async fn sweep(
store: &dyn ObjectStore,
prefix: &str,
opts: SweepOpts,
) -> Result<SweepOutcome, PackchainError> {
let tombstones_prefix = gc_listing_prefix(prefix);
let metas = store.list(&tombstones_prefix).await?;
let mut outcome = SweepOutcome::default();
if opts.force {
warn!("gc sweep: --force in effect; skipping grace window");
}
for meta in metas {
if !meta.key.as_bytes().ends_with(b".json") {
continue;
}
let step = if is_tombstone_key(&meta.key, prefix) {
sweep_one_tombstone(store, prefix, &meta.key, opts).await
} else if is_baseline_tombstone_key(&meta.key, prefix) {
sweep_one_baseline_tombstone(store, prefix, &meta.key, opts).await
} else {
continue;
};
match step {
Ok(SweepStep::Deferred) => outcome.deferred_tombstones += 1,
Ok(SweepStep::Swept {
deleted_objects,
skipped_repointed_packs,
}) => {
outcome.swept_tombstones += 1;
outcome.deleted_objects += deleted_objects;
outcome.skipped_repointed_packs += skipped_repointed_packs;
}
Err(e) => {
warn!(key = %meta.key, error = %e, "gc sweep: tombstone failed");
}
}
}
Ok(outcome)
}
#[derive(Debug)]
enum SweepStep {
Deferred,
Swept {
deleted_objects: usize,
skipped_repointed_packs: usize,
},
}
async fn sweep_one_tombstone(
store: &dyn ObjectStore,
prefix: &str,
tombstone_key: &str,
opts: SweepOpts,
) -> Result<SweepStep, PackchainError> {
let body = match store.get_bytes(tombstone_key).await {
Ok(b) => b,
Err(ObjectStoreError::NotFound(_)) => {
return Ok(SweepStep::Swept {
deleted_objects: 0,
skipped_repointed_packs: 0,
});
}
Err(e) => return Err(PackchainError::Store(e)),
};
let tombstone = Tombstone::from_json_bytes(&body)?;
if !opts.force
&& check_grace_window(&tombstone.marked_at, opts.grace_hours, "tombstone")?
== GraceDecision::Within
{
debug!(
key = %tombstone_key,
marked_at = %tombstone.marked_at,
"gc sweep: tombstone within grace window",
);
return Ok(SweepStep::Deferred);
}
let mut deleted_objects = 0usize;
let mut skipped_repointed_packs = 0usize;
for sha in &tombstone.orphan_packs {
let referenced = list_referenced_packs(store, prefix).await?;
if referenced.contains(sha) {
skipped_repointed_packs += 1;
debug!(
sha = %sha.as_str(),
"gc sweep: tombstoned pack re-referenced; skipping",
);
continue;
}
let pack_key = super::keys::pack_key(Some(prefix), sha);
let idx_key = super::keys::pack_idx_key(Some(prefix), sha);
if delete_idempotent(store, &pack_key).await? {
deleted_objects += 1;
}
if delete_idempotent(store, &idx_key).await? {
deleted_objects += 1;
}
}
delete_idempotent(store, tombstone_key).await?;
info!(
key = %tombstone_key,
deleted = deleted_objects,
skipped = skipped_repointed_packs,
"gc sweep: tombstone applied",
);
Ok(SweepStep::Swept {
deleted_objects,
skipped_repointed_packs,
})
}
async fn sweep_one_baseline_tombstone(
store: &dyn ObjectStore,
prefix: &str,
tombstone_key: &str,
opts: SweepOpts,
) -> Result<SweepStep, PackchainError> {
let body = match store.get_bytes(tombstone_key).await {
Ok(b) => b,
Err(ObjectStoreError::NotFound(_)) => {
return Ok(SweepStep::Swept {
deleted_objects: 0,
skipped_repointed_packs: 0,
});
}
Err(e) => return Err(PackchainError::Store(e)),
};
let tombstone = BaselineTombstone::from_json_bytes(&body)?;
if !opts.force
&& check_grace_window(&tombstone.marked_at, opts.grace_hours, "baseline tombstone")?
== GraceDecision::Within
{
debug!(
key = %tombstone_key,
marked_at = %tombstone.marked_at,
"gc sweep: baseline tombstone within grace window",
);
return Ok(SweepStep::Deferred);
}
let ref_name = match RefName::new(tombstone.ref_name.clone()) {
Ok(r) => r,
Err(e) => {
error!(
key = %tombstone_key,
ref_name = %tombstone.ref_name,
sha = %tombstone.sha.as_str(),
error = %e,
"gc sweep: baseline tombstone names invalid ref; preserving \
tombstone and bundle for operator review",
);
return Ok(SweepStep::Deferred);
}
};
let prefix_opt = (!prefix.is_empty()).then_some(prefix);
let chain = load_chain(store, prefix_opt, &ref_name).await?;
let mut skipped_repointed_packs = 0usize;
let mut deleted_objects = 0usize;
let still_live = chain.as_ref().is_some_and(|c| c.full_at == tombstone.sha);
if still_live {
skipped_repointed_packs += 1;
debug!(
key = %tombstone_key,
ref_path = %ref_name.as_str(),
sha = %tombstone.sha.as_str(),
"gc sweep: baseline re-referenced; skipping delete",
);
} else {
let recheck = load_chain(store, prefix_opt, &ref_name).await?;
if recheck.as_ref().is_some_and(|c| c.full_at == tombstone.sha) {
debug!(
key = %tombstone_key,
ref_path = %ref_name.as_str(),
sha = %tombstone.sha.as_str(),
"gc sweep: baseline re-referenced between checks; deferring",
);
return Ok(SweepStep::Deferred);
}
let bundle_key = keys::bundle_key(prefix_opt, &ref_name, tombstone.sha.as_str());
if delete_idempotent(store, &bundle_key).await? {
deleted_objects += 1;
}
}
delete_idempotent(store, tombstone_key).await?;
info!(
key = %tombstone_key,
deleted = deleted_objects,
skipped = skipped_repointed_packs,
"gc sweep: baseline tombstone applied",
);
Ok(SweepStep::Swept {
deleted_objects,
skipped_repointed_packs,
})
}
fn gc_listing_prefix(prefix: &str) -> String {
keys::join(Some(prefix), "gc/")
}
fn tombstone_key(prefix: &str, run_id: &str, marked_at: &str) -> String {
keys::join(
Some(prefix),
&format!("gc/tombstones-{run_id}-{marked_at}.json"),
)
}
pub(crate) const BASELINE_TOMBSTONE_KEY_FRAGMENT: &str = "gc/baseline-tomb-";
fn baseline_tombstone_key(prefix: &str, run_id: &str) -> String {
keys::join(
Some(prefix),
&format!("{BASELINE_TOMBSTONE_KEY_FRAGMENT}{run_id}.json"),
)
}
pub(crate) fn baseline_tombstone_listing_prefix(prefix: Option<&str>) -> String {
keys::join(prefix, BASELINE_TOMBSTONE_KEY_FRAGMENT)
}
fn is_tombstone_key(key: &str, prefix: &str) -> bool {
let expected_prefix = keys::join(Some(prefix), "gc/tombstones-");
key.starts_with(&expected_prefix)
}
fn is_baseline_tombstone_key(key: &str, prefix: &str) -> bool {
key.starts_with(&baseline_tombstone_listing_prefix(Some(prefix)))
}
async fn list_referenced_packs(
store: &dyn ObjectStore,
prefix: &str,
) -> Result<HashSet<Sha40>, PackchainError> {
let refs_prefix = keys::join(Some(prefix), "refs/");
let metas = store.list(&refs_prefix).await?;
futures::stream::iter(
metas
.into_iter()
.filter(|m| super::keys::is_chain_json_key(&m.key))
.map(|m| m.key),
)
.map(|key| async move { store.get_bytes(&key).await.map_err(PackchainError::Store) })
.buffer_unordered(MAX_FETCH_CONCURRENCY)
.try_fold(HashSet::<Sha40>::new(), |mut acc, body| async move {
let chain = ChainManifest::from_json_bytes(&body)?;
for segment in chain.segments {
let sha = super::keys::segment_pack_sha(&segment)?;
acc.insert(sha);
}
Ok(acc)
})
.await
}
async fn list_pack_shas(
store: &dyn ObjectStore,
prefix: &str,
) -> Result<HashSet<Sha40>, PackchainError> {
let packs_prefix = keys::join(Some(prefix), "packs/");
let metas = store.list(&packs_prefix).await?;
let mut shas: HashSet<Sha40> = HashSet::new();
for meta in metas {
let basename = meta
.key
.rsplit('/')
.next()
.expect("rsplit('/') on a non-empty key yields at least one element");
let candidate = basename
.strip_suffix(".pack")
.or_else(|| basename.strip_suffix(".idx"));
if let Some(sha) = candidate
&& let Ok(parsed) = Sha40::try_new(sha)
{
shas.insert(parsed);
}
}
Ok(shas)
}
async fn delete_idempotent(store: &dyn ObjectStore, key: &str) -> Result<bool, PackchainError> {
match store.delete(key).await {
Ok(()) => Ok(true),
Err(ObjectStoreError::NotFound(_)) => Ok(false),
Err(e) => Err(PackchainError::Store(e)),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::git::RefName;
use crate::object_store::mock::MockStore;
use crate::packchain::manifest::write_chain;
use crate::packchain::schema::ChainSegment;
const SHA_TIP: &str = "0000000000000000000000000000000000000001";
const SHA_FULL: &str = "0000000000000000000000000000000000000002";
const SHA_PACK_LIVE: &str = "1111111111111111111111111111111111111111";
const SHA_PACK_ORPHAN: &str = "2222222222222222222222222222222222222222";
const SHA_PACK_ORPHAN_2: &str = "3333333333333333333333333333333333333333";
fn sha40(s: &str) -> Sha40 {
Sha40::try_new(s).unwrap()
}
fn ref_main() -> RefName {
RefName::new("refs/heads/main").unwrap()
}
fn segment(pack_sha: &str, parent: Option<&str>) -> ChainSegment {
ChainSegment {
sha: sha40(SHA_TIP),
parent_sha: parent.map(sha40),
pack: format!("packs/{pack_sha}.pack"),
bytes: 1_024,
}
}
async fn seed_live_chain(store: &MockStore, prefix: Option<&str>) {
let chain = ChainManifest {
v: 1,
tip: sha40(SHA_TIP),
full_at: sha40(SHA_FULL),
segments: vec![segment(SHA_PACK_LIVE, None)],
};
write_chain(store, prefix, &ref_main(), &chain)
.await
.unwrap();
}
fn insert_pack_pair(store: &MockStore, prefix: Option<&str>, sha: &str) {
let pack_key = super::super::keys::pack_key(prefix, &sha40(sha));
let idx_key = super::super::keys::pack_idx_key(prefix, &sha40(sha));
store.insert(pack_key, Bytes::from_static(b"PACKDATA"));
store.insert(idx_key, Bytes::from_static(b"IDXDATA"));
}
#[tokio::test]
async fn mark_with_no_chains_treats_all_packs_as_orphan() {
let store = MockStore::new();
insert_pack_pair(&store, Some("repo"), SHA_PACK_ORPHAN);
let outcome = mark(&store, "repo", MarkOpts::default()).await.unwrap();
assert_eq!(outcome.orphan_count, 1);
let body = store.get_bytes(&outcome.tombstone_key).await.unwrap();
let parsed = Tombstone::from_json_bytes(&body).unwrap();
assert_eq!(parsed.orphan_packs, vec![sha40(SHA_PACK_ORPHAN)]);
}
#[tokio::test]
async fn mark_skips_chain_referenced_packs() {
let store = MockStore::new();
seed_live_chain(&store, Some("repo")).await;
insert_pack_pair(&store, Some("repo"), SHA_PACK_LIVE);
insert_pack_pair(&store, Some("repo"), SHA_PACK_ORPHAN);
let outcome = mark(&store, "repo", MarkOpts::default()).await.unwrap();
assert_eq!(outcome.orphan_count, 1);
let body = store.get_bytes(&outcome.tombstone_key).await.unwrap();
let parsed = Tombstone::from_json_bytes(&body).unwrap();
assert_eq!(parsed.orphan_packs, vec![sha40(SHA_PACK_ORPHAN)]);
}
#[tokio::test]
async fn mark_no_orphans_skips_tombstone_write() {
let store = MockStore::new();
seed_live_chain(&store, Some("repo")).await;
insert_pack_pair(&store, Some("repo"), SHA_PACK_LIVE);
let outcome = mark(&store, "repo", MarkOpts::default()).await.unwrap();
assert_eq!(outcome.orphan_count, 0);
let metas = store.list("repo/gc/").await.unwrap();
assert!(
metas.is_empty(),
"tombstone must not exist for empty orphan set"
);
}
#[tokio::test]
async fn mark_dry_run_does_not_write_tombstone() {
let store = MockStore::new();
insert_pack_pair(&store, Some("repo"), SHA_PACK_ORPHAN);
let outcome = mark(&store, "repo", MarkOpts { dry_run: true })
.await
.unwrap();
assert_eq!(outcome.orphan_count, 1);
let metas = store.list("repo/gc/").await.unwrap();
assert!(metas.is_empty(), "dry-run must not write tombstone");
}
#[tokio::test]
async fn mark_treats_tag_chain_referenced_packs_as_live() {
let store = MockStore::new();
let chain = ChainManifest {
v: 1,
tip: sha40(SHA_TIP),
full_at: sha40(SHA_FULL),
segments: vec![segment(SHA_PACK_LIVE, None)],
};
let tag_ref = RefName::new("refs/tags/v1").unwrap();
write_chain(&store, Some("repo"), &tag_ref, &chain)
.await
.unwrap();
insert_pack_pair(&store, Some("repo"), SHA_PACK_LIVE);
insert_pack_pair(&store, Some("repo"), SHA_PACK_ORPHAN);
let referenced = list_referenced_packs(&store, "repo").await.unwrap();
assert!(
referenced.contains(&sha40(SHA_PACK_LIVE)),
"pack referenced from refs/tags/ chain must be in the live set",
);
let outcome = mark(&store, "repo", MarkOpts::default()).await.unwrap();
assert_eq!(outcome.orphan_count, 1);
let body = store.get_bytes(&outcome.tombstone_key).await.unwrap();
let parsed = Tombstone::from_json_bytes(&body).unwrap();
assert_eq!(parsed.orphan_packs, vec![sha40(SHA_PACK_ORPHAN)]);
}
#[tokio::test]
async fn mark_treats_notes_chain_referenced_packs_as_live() {
let store = MockStore::new();
let chain = ChainManifest {
v: 1,
tip: sha40(SHA_TIP),
full_at: sha40(SHA_FULL),
segments: vec![segment(SHA_PACK_LIVE, None)],
};
let notes_ref = RefName::new("refs/notes/commits").unwrap();
write_chain(&store, Some("repo"), ¬es_ref, &chain)
.await
.unwrap();
insert_pack_pair(&store, Some("repo"), SHA_PACK_LIVE);
let referenced = list_referenced_packs(&store, "repo").await.unwrap();
assert!(
referenced.contains(&sha40(SHA_PACK_LIVE)),
"pack referenced from refs/notes/ chain must be in the live set",
);
let outcome = mark(&store, "repo", MarkOpts::default()).await.unwrap();
assert_eq!(outcome.orphan_count, 0);
}
#[tokio::test]
async fn list_referenced_packs_unions_across_namespaces() {
let store = MockStore::new();
let head_chain = ChainManifest {
v: 1,
tip: sha40(SHA_TIP),
full_at: sha40(SHA_FULL),
segments: vec![segment(SHA_PACK_LIVE, None)],
};
write_chain(&store, Some("repo"), &ref_main(), &head_chain)
.await
.unwrap();
let tag_chain = ChainManifest {
v: 1,
tip: sha40(SHA_TIP),
full_at: sha40(SHA_FULL),
segments: vec![segment(SHA_PACK_ORPHAN_2, None)],
};
let tag_ref = RefName::new("refs/tags/v1").unwrap();
write_chain(&store, Some("repo"), &tag_ref, &tag_chain)
.await
.unwrap();
let referenced = list_referenced_packs(&store, "repo").await.unwrap();
assert!(referenced.contains(&sha40(SHA_PACK_LIVE)));
assert!(referenced.contains(&sha40(SHA_PACK_ORPHAN_2)));
assert_eq!(referenced.len(), 2);
}
#[tokio::test]
async fn list_referenced_packs_ignores_sibling_artefacts() {
let store = MockStore::new();
seed_live_chain(&store, Some("repo")).await;
store.insert(
"repo/refs/heads/main/path-index.json",
Bytes::from_static(b"{}"),
);
store.insert(
format!("repo/refs/heads/main/{SHA_TIP}.bundle"),
Bytes::from_static(b"BUNDLE"),
);
store.insert(
"repo/refs/tags/v1/path-index.json",
Bytes::from_static(b"{}"),
);
let referenced = list_referenced_packs(&store, "repo").await.unwrap();
assert_eq!(referenced.len(), 1);
assert!(referenced.contains(&sha40(SHA_PACK_LIVE)));
}
#[tokio::test]
async fn list_referenced_packs_empty_for_no_chains() {
let store = MockStore::new();
let referenced = list_referenced_packs(&store, "repo").await.unwrap();
assert!(referenced.is_empty());
}
#[tokio::test]
async fn list_referenced_packs_unions_many_chains_with_bounded_parallel_fetch() {
let store = MockStore::new();
let chain_count = MAX_FETCH_CONCURRENCY * 3 + 1;
let namespaces = ["refs/heads", "refs/tags", "refs/notes"];
let mut expected: HashSet<Sha40> = HashSet::new();
for i in 0..chain_count {
let pack_sha = format!("{:040x}", 0x1000 + i);
let pack_sha40 = sha40(&pack_sha);
let namespace = namespaces[i % namespaces.len()];
let ref_name = RefName::new(format!("{namespace}/r{i}")).unwrap();
let chain = ChainManifest {
v: 1,
tip: sha40(SHA_TIP),
full_at: sha40(SHA_FULL),
segments: vec![ChainSegment {
sha: sha40(SHA_TIP),
parent_sha: None,
pack: format!("packs/{pack_sha}.pack"),
bytes: 1_024,
}],
};
write_chain(&store, Some("repo"), &ref_name, &chain)
.await
.unwrap();
expected.insert(pack_sha40);
}
let referenced = list_referenced_packs(&store, "repo").await.unwrap();
assert_eq!(referenced, expected);
}
#[tokio::test]
async fn mark_fails_closed_on_corrupt_chain() {
let store = MockStore::new();
store.insert(
"repo/refs/heads/main/chain.json",
Bytes::from_static(b"{not valid json"),
);
let err = mark(&store, "repo", MarkOpts::default()).await.unwrap_err();
assert!(matches!(err, PackchainError::ParseJson(_)));
let metas = store.list("repo/gc/").await.unwrap();
assert!(metas.is_empty());
}
#[tokio::test]
async fn mark_fails_closed_on_unsupported_schema_version() {
let store = MockStore::new();
store.insert(
"repo/refs/heads/main/chain.json",
Bytes::from_static(
br#"{"v":2,"tip":"0000000000000000000000000000000000000001","full_at":"0000000000000000000000000000000000000002","segments":[]}"#,
),
);
let err = mark(&store, "repo", MarkOpts::default()).await.unwrap_err();
assert!(matches!(
err,
PackchainError::UnsupportedSchemaVersion { .. }
));
}
fn sha_set<I: IntoIterator<Item = &'static str>>(shas: I) -> Vec<Sha40> {
shas.into_iter().map(sha40).collect()
}
fn write_tombstone(
store: &MockStore,
prefix: &str,
marked_at: &str,
shas: Vec<Sha40>,
) -> String {
let run_id = Uuid::new_v4().to_string();
let key = tombstone_key(prefix, &run_id, marked_at);
let body = Tombstone {
v: 1,
run_id,
marked_at: marked_at.to_string(),
orphan_packs: shas,
}
.to_json_pretty()
.unwrap();
store.insert(&key, Bytes::from(body));
key
}
#[tokio::test]
async fn sweep_inside_grace_defers_tombstone() {
let store = MockStore::new();
let now = OffsetDateTime::now_utc().format(&Rfc3339).unwrap();
let tombstone = write_tombstone(&store, "repo", &now, sha_set([SHA_PACK_ORPHAN]));
insert_pack_pair(&store, Some("repo"), SHA_PACK_ORPHAN);
let outcome = sweep(
&store,
"repo",
SweepOpts {
grace_hours: 24,
force: false,
},
)
.await
.unwrap();
assert_eq!(outcome.deferred_tombstones, 1);
assert_eq!(outcome.swept_tombstones, 0);
store.get_bytes(&tombstone).await.unwrap();
store
.get_bytes(&format!("repo/packs/{SHA_PACK_ORPHAN}.pack"))
.await
.unwrap();
}
#[tokio::test]
async fn sweep_after_grace_deletes_orphan_packs_and_tombstone() {
let store = MockStore::new();
let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
.format(&Rfc3339)
.unwrap();
let tombstone = write_tombstone(&store, "repo", &stale, sha_set([SHA_PACK_ORPHAN]));
insert_pack_pair(&store, Some("repo"), SHA_PACK_ORPHAN);
let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
assert_eq!(outcome.swept_tombstones, 1);
assert_eq!(outcome.deleted_objects, 2, "pack + idx");
let pack_err = store
.get_bytes(&format!("repo/packs/{SHA_PACK_ORPHAN}.pack"))
.await
.unwrap_err();
assert!(matches!(pack_err, ObjectStoreError::NotFound(_)));
let tomb_err = store.get_bytes(&tombstone).await.unwrap_err();
assert!(matches!(tomb_err, ObjectStoreError::NotFound(_)));
}
#[tokio::test]
async fn sweep_skips_repointed_packs() {
let store = MockStore::new();
let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
.format(&Rfc3339)
.unwrap();
write_tombstone(&store, "repo", &stale, sha_set([SHA_PACK_LIVE]));
let chain = ChainManifest {
v: 1,
tip: sha40(SHA_TIP),
full_at: sha40(SHA_FULL),
segments: vec![segment(SHA_PACK_LIVE, None)],
};
write_chain(&store, Some("repo"), &ref_main(), &chain)
.await
.unwrap();
insert_pack_pair(&store, Some("repo"), SHA_PACK_LIVE);
let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
assert_eq!(outcome.swept_tombstones, 1);
assert_eq!(outcome.skipped_repointed_packs, 1);
assert_eq!(outcome.deleted_objects, 0);
store
.get_bytes(&format!("repo/packs/{SHA_PACK_LIVE}.pack"))
.await
.unwrap();
}
#[tokio::test]
async fn sweep_force_bypasses_grace_only_not_live_recheck() {
let store = MockStore::new();
let now = OffsetDateTime::now_utc().format(&Rfc3339).unwrap();
write_tombstone(&store, "repo", &now, sha_set([SHA_PACK_LIVE]));
let chain = ChainManifest {
v: 1,
tip: sha40(SHA_TIP),
full_at: sha40(SHA_FULL),
segments: vec![segment(SHA_PACK_LIVE, None)],
};
write_chain(&store, Some("repo"), &ref_main(), &chain)
.await
.unwrap();
insert_pack_pair(&store, Some("repo"), SHA_PACK_LIVE);
let outcome = sweep(
&store,
"repo",
SweepOpts {
grace_hours: 24,
force: true,
},
)
.await
.unwrap();
assert_eq!(outcome.swept_tombstones, 1);
assert_eq!(outcome.deferred_tombstones, 0);
assert_eq!(outcome.skipped_repointed_packs, 1);
assert_eq!(outcome.deleted_objects, 0);
store
.get_bytes(&format!("repo/packs/{SHA_PACK_LIVE}.pack"))
.await
.expect("live pack must survive --force sweep");
store
.get_bytes(&format!("repo/packs/{SHA_PACK_LIVE}.idx"))
.await
.expect("live idx must survive --force sweep");
}
#[tokio::test]
async fn sweep_force_deletes_truly_orphan_pack_inside_grace() {
let store = MockStore::new();
let now = OffsetDateTime::now_utc().format(&Rfc3339).unwrap();
write_tombstone(&store, "repo", &now, sha_set([SHA_PACK_ORPHAN]));
insert_pack_pair(&store, Some("repo"), SHA_PACK_ORPHAN);
let outcome = sweep(
&store,
"repo",
SweepOpts {
grace_hours: 24,
force: true,
},
)
.await
.unwrap();
assert_eq!(outcome.swept_tombstones, 1);
assert_eq!(outcome.deferred_tombstones, 0);
assert_eq!(outcome.skipped_repointed_packs, 0);
assert_eq!(outcome.deleted_objects, 2);
let err = store
.get_bytes(&format!("repo/packs/{SHA_PACK_ORPHAN}.pack"))
.await
.unwrap_err();
assert!(matches!(err, ObjectStoreError::NotFound(_)));
}
#[tokio::test]
async fn sweep_tolerates_already_deleted_pack() {
let store = MockStore::new();
let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
.format(&Rfc3339)
.unwrap();
write_tombstone(&store, "repo", &stale, sha_set([SHA_PACK_ORPHAN]));
let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
assert_eq!(outcome.swept_tombstones, 1);
assert_eq!(outcome.deleted_objects, 0);
}
#[tokio::test]
async fn sweep_handles_multiple_tombstones_independently() {
let store = MockStore::new();
let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
.format(&Rfc3339)
.unwrap();
let now = OffsetDateTime::now_utc().format(&Rfc3339).unwrap();
write_tombstone(&store, "repo", &stale, sha_set([SHA_PACK_ORPHAN]));
write_tombstone(&store, "repo", &now, sha_set([SHA_PACK_ORPHAN_2]));
insert_pack_pair(&store, Some("repo"), SHA_PACK_ORPHAN);
insert_pack_pair(&store, Some("repo"), SHA_PACK_ORPHAN_2);
let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
assert_eq!(outcome.swept_tombstones, 1);
assert_eq!(outcome.deferred_tombstones, 1);
assert_eq!(outcome.deleted_objects, 2);
}
#[tokio::test]
async fn mark_then_force_sweep_round_trips() {
let store = MockStore::new();
seed_live_chain(&store, Some("repo")).await;
insert_pack_pair(&store, Some("repo"), SHA_PACK_LIVE);
insert_pack_pair(&store, Some("repo"), SHA_PACK_ORPHAN);
let mark_out = mark(&store, "repo", MarkOpts::default()).await.unwrap();
assert_eq!(mark_out.orphan_count, 1);
let sweep_out = sweep(
&store,
"repo",
SweepOpts {
grace_hours: 24,
force: true,
},
)
.await
.unwrap();
assert_eq!(sweep_out.swept_tombstones, 1);
assert_eq!(sweep_out.deleted_objects, 2);
store
.get_bytes(&format!("repo/packs/{SHA_PACK_LIVE}.pack"))
.await
.unwrap();
let err = store
.get_bytes(&format!("repo/packs/{SHA_PACK_ORPHAN}.pack"))
.await
.unwrap_err();
assert!(matches!(err, ObjectStoreError::NotFound(_)));
}
fn insert_baseline_bundle(store: &MockStore, prefix: Option<&str>, sha: &str) -> String {
let key = keys::bundle_key(prefix, ref_main(), sha);
store.insert(&key, Bytes::from_static(b"BUNDLE"));
key
}
fn write_baseline_tombstone_at(
store: &MockStore,
prefix: &str,
marked_at: &str,
sha: &str,
) -> String {
let key = baseline_tombstone_key(prefix, &Uuid::new_v4().to_string());
let body = BaselineTombstone {
v: TOMBSTONE_SCHEMA_VERSION,
marked_at: marked_at.to_owned(),
ref_name: ref_main().as_str().to_owned(),
sha: sha40(sha),
}
.to_json_pretty()
.unwrap();
store.insert(&key, Bytes::from(body));
key
}
#[tokio::test]
async fn tombstoned_bundle_keys_returns_bundle_paths_for_each_tombstone() {
let store = MockStore::new();
write_baseline_tombstone_for_orphan(&store, Some("repo"), &ref_main(), &sha40(SHA_FULL))
.await
.unwrap();
let other_ref = RefName::new("refs/heads/feature").unwrap();
write_baseline_tombstone_for_orphan(&store, Some("repo"), &other_ref, &sha40(SHA_TIP))
.await
.unwrap();
let keys = tombstoned_bundle_keys(&store, Some("repo")).await.unwrap();
assert_eq!(keys.len(), 2, "one bundle key per tombstone (got {keys:?})");
assert!(keys.contains(&format!("repo/refs/heads/main/{SHA_FULL}.bundle")));
assert!(keys.contains(&format!("repo/refs/heads/feature/{SHA_TIP}.bundle")));
}
#[tokio::test]
async fn tombstoned_bundle_keys_empty_when_no_tombstones() {
let store = MockStore::new();
let keys = tombstoned_bundle_keys(&store, Some("repo")).await.unwrap();
assert!(keys.is_empty(), "empty bucket yields no tombstoned keys");
}
#[tokio::test]
async fn tombstoned_bundle_keys_handles_root_prefix() {
let store = MockStore::new();
write_baseline_tombstone_for_orphan(&store, None, &ref_main(), &sha40(SHA_FULL))
.await
.unwrap();
let keys = tombstoned_bundle_keys(&store, None).await.unwrap();
assert_eq!(keys.len(), 1, "got {keys:?}");
assert!(
keys.contains(&format!("refs/heads/main/{SHA_FULL}.bundle")),
"root-prefix bundle key (no leading repo/) must be produced; got {keys:?}",
);
}
#[tokio::test]
async fn tombstoned_bundle_keys_skips_unparseable_tombstones() {
let store = MockStore::new();
write_baseline_tombstone_for_orphan(&store, Some("repo"), &ref_main(), &sha40(SHA_FULL))
.await
.unwrap();
store.insert(
"repo/gc/baseline-tomb-garbage.json",
Bytes::from_static(b"not json"),
);
let keys = tombstoned_bundle_keys(&store, Some("repo")).await.unwrap();
assert_eq!(
keys.len(),
1,
"good tombstone must still be returned despite garbage sibling",
);
assert!(keys.contains(&format!("repo/refs/heads/main/{SHA_FULL}.bundle")));
}
#[tokio::test]
async fn write_baseline_tombstone_round_trips() {
let store = MockStore::new();
let prior = sha40(SHA_FULL);
let current = sha40(SHA_TIP);
write_baseline_tombstone(&store, Some("repo"), &ref_main(), &prior, ¤t)
.await
.unwrap();
let metas = store.list("repo/gc/").await.unwrap();
let tomb_key = metas
.iter()
.find(|m| {
m.key
.starts_with(&baseline_tombstone_listing_prefix(Some("repo")))
})
.map(|m| m.key.clone())
.expect("baseline tombstone written");
let body = store.get_bytes(&tomb_key).await.unwrap();
let parsed = BaselineTombstone::from_json_bytes(&body).unwrap();
assert_eq!(parsed.v, TOMBSTONE_SCHEMA_VERSION);
assert_eq!(parsed.ref_name, "refs/heads/main");
assert_eq!(parsed.sha, prior);
}
#[tokio::test]
async fn write_baseline_tombstone_skips_when_prior_equals_current() {
let store = MockStore::new();
let sha = sha40(SHA_FULL);
write_baseline_tombstone(&store, Some("repo"), &ref_main(), &sha, &sha)
.await
.unwrap();
let metas = store.list("repo/gc/").await.unwrap();
assert!(
metas.is_empty(),
"aliasing prior/current must not write a tombstone",
);
}
#[tokio::test]
async fn sweep_defers_baseline_tombstone_within_grace_window() {
let store = MockStore::new();
let bundle_key = insert_baseline_bundle(&store, Some("repo"), SHA_FULL);
let now = OffsetDateTime::now_utc().format(&Rfc3339).unwrap();
let tomb_key = write_baseline_tombstone_at(&store, "repo", &now, SHA_FULL);
let outcome = sweep(
&store,
"repo",
SweepOpts {
grace_hours: 24,
force: false,
},
)
.await
.unwrap();
assert_eq!(outcome.deferred_tombstones, 1);
assert_eq!(outcome.swept_tombstones, 0);
assert_eq!(outcome.deleted_objects, 0);
store
.get_bytes(&bundle_key)
.await
.expect("bundle must survive sweep within grace");
store
.get_bytes(&tomb_key)
.await
.expect("tombstone must survive sweep within grace");
}
#[tokio::test]
async fn sweep_reclaims_baseline_tombstone_after_grace_window() {
let store = MockStore::new();
let bundle_key = insert_baseline_bundle(&store, Some("repo"), SHA_FULL);
let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
.format(&Rfc3339)
.unwrap();
let tomb_key = write_baseline_tombstone_at(&store, "repo", &stale, SHA_FULL);
let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
assert_eq!(outcome.swept_tombstones, 1);
assert_eq!(outcome.deferred_tombstones, 0);
assert_eq!(outcome.deleted_objects, 1, "bundle delete");
let bundle_err = store.get_bytes(&bundle_key).await.unwrap_err();
assert!(matches!(bundle_err, ObjectStoreError::NotFound(_)));
let tomb_err = store.get_bytes(&tomb_key).await.unwrap_err();
assert!(matches!(tomb_err, ObjectStoreError::NotFound(_)));
}
#[tokio::test]
async fn sweep_skips_re_baselined_bundle_after_grace() {
let store = MockStore::new();
let bundle_key = insert_baseline_bundle(&store, Some("repo"), SHA_FULL);
let chain = ChainManifest {
v: 1,
tip: sha40(SHA_TIP),
full_at: sha40(SHA_FULL),
segments: vec![segment(SHA_PACK_LIVE, None)],
};
write_chain(&store, Some("repo"), &ref_main(), &chain)
.await
.unwrap();
let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
.format(&Rfc3339)
.unwrap();
let tomb_key = write_baseline_tombstone_at(&store, "repo", &stale, SHA_FULL);
let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
assert_eq!(outcome.swept_tombstones, 1);
assert_eq!(outcome.skipped_repointed_packs, 1);
assert_eq!(outcome.deleted_objects, 0);
store
.get_bytes(&bundle_key)
.await
.expect("re-baselined bundle must survive");
let tomb_err = store.get_bytes(&tomb_key).await.unwrap_err();
assert!(matches!(tomb_err, ObjectStoreError::NotFound(_)));
}
#[tokio::test]
async fn sweep_baseline_tolerates_already_deleted_bundle() {
let store = MockStore::new();
let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
.format(&Rfc3339)
.unwrap();
let tomb_key = write_baseline_tombstone_at(&store, "repo", &stale, SHA_FULL);
let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
assert_eq!(outcome.swept_tombstones, 1);
assert_eq!(outcome.deleted_objects, 0);
let tomb_err = store.get_bytes(&tomb_key).await.unwrap_err();
assert!(matches!(tomb_err, ObjectStoreError::NotFound(_)));
}
#[tokio::test]
async fn sweep_baseline_force_bypasses_grace_only_not_live_recheck() {
let store = MockStore::new();
let bundle_key = insert_baseline_bundle(&store, Some("repo"), SHA_FULL);
let chain = ChainManifest {
v: 1,
tip: sha40(SHA_TIP),
full_at: sha40(SHA_FULL),
segments: vec![segment(SHA_PACK_LIVE, None)],
};
write_chain(&store, Some("repo"), &ref_main(), &chain)
.await
.unwrap();
let now = OffsetDateTime::now_utc().format(&Rfc3339).unwrap();
write_baseline_tombstone_at(&store, "repo", &now, SHA_FULL);
let outcome = sweep(
&store,
"repo",
SweepOpts {
grace_hours: 24,
force: true,
},
)
.await
.unwrap();
assert_eq!(outcome.swept_tombstones, 1);
assert_eq!(outcome.deferred_tombstones, 0);
assert_eq!(outcome.skipped_repointed_packs, 1);
assert_eq!(outcome.deleted_objects, 0);
store
.get_bytes(&bundle_key)
.await
.expect("live bundle must survive --force sweep");
}
#[tokio::test]
async fn sweep_processes_pack_and_baseline_tombstones_in_one_pass() {
let store = MockStore::new();
let bundle_key = insert_baseline_bundle(&store, Some("repo"), SHA_FULL);
insert_pack_pair(&store, Some("repo"), SHA_PACK_ORPHAN);
let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
.format(&Rfc3339)
.unwrap();
write_tombstone(&store, "repo", &stale, sha_set([SHA_PACK_ORPHAN]));
write_baseline_tombstone_at(&store, "repo", &stale, SHA_FULL);
let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
assert_eq!(outcome.swept_tombstones, 2);
assert_eq!(outcome.deleted_objects, 3);
let bundle_err = store.get_bytes(&bundle_key).await.unwrap_err();
assert!(matches!(bundle_err, ObjectStoreError::NotFound(_)));
let pack_err = store
.get_bytes(&format!("repo/packs/{SHA_PACK_ORPHAN}.pack"))
.await
.unwrap_err();
assert!(matches!(pack_err, ObjectStoreError::NotFound(_)));
}
#[tokio::test]
async fn compact_to_sweep_round_trip_simulates_concurrent_fetch_then_gc() {
let store = MockStore::new();
let bundle_key = insert_baseline_bundle(&store, Some("repo"), SHA_FULL);
let chain = ChainManifest {
v: 1,
tip: sha40(SHA_TIP),
full_at: sha40(SHA_TIP),
segments: vec![segment(SHA_PACK_LIVE, None)],
};
write_chain(&store, Some("repo"), &ref_main(), &chain)
.await
.unwrap();
let prior = sha40(SHA_FULL);
let current = sha40(SHA_TIP);
write_baseline_tombstone(&store, Some("repo"), &ref_main(), &prior, ¤t)
.await
.unwrap();
let body = store.get_bytes(&bundle_key).await.unwrap();
assert_eq!(&body[..], b"BUNDLE");
let in_grace = sweep(
&store,
"repo",
SweepOpts {
grace_hours: 24,
force: false,
},
)
.await
.unwrap();
assert_eq!(in_grace.deferred_tombstones, 1);
store
.get_bytes(&bundle_key)
.await
.expect("bundle must survive in-grace sweep");
let metas = store.list("repo/gc/").await.unwrap();
let tomb_key = metas
.iter()
.find(|m| {
m.key
.starts_with(&baseline_tombstone_listing_prefix(Some("repo")))
})
.map(|m| m.key.clone())
.unwrap();
let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
.format(&Rfc3339)
.unwrap();
let body = store.get_bytes(&tomb_key).await.unwrap();
let mut tomb: BaselineTombstone = serde_json::from_slice(&body).unwrap();
tomb.marked_at = stale;
let new_body = serde_json::to_vec_pretty(&tomb).unwrap();
store.insert(&tomb_key, Bytes::from(new_body));
let post_grace = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
assert_eq!(post_grace.swept_tombstones, 1);
assert_eq!(post_grace.deleted_objects, 1);
let err = store.get_bytes(&bundle_key).await.unwrap_err();
assert!(matches!(err, ObjectStoreError::NotFound(_)));
}
#[tokio::test]
async fn sweep_preserves_corrupt_baseline_tombstone_for_diagnosis() {
let store = MockStore::new();
let bad_ref = "refs/heads/[bad]";
assert!(
RefName::new(bad_ref).is_err(),
"fixture relies on this ref_name failing RefName::new",
);
let bundle_key = format!("repo/{bad_ref}/{SHA_FULL}.bundle");
store.insert(&bundle_key, Bytes::from_static(b"BUNDLE"));
let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
.format(&Rfc3339)
.unwrap();
let tomb_key = baseline_tombstone_key("repo", &Uuid::new_v4().to_string());
let body = BaselineTombstone {
v: TOMBSTONE_SCHEMA_VERSION,
marked_at: stale,
ref_name: bad_ref.to_owned(),
sha: sha40(SHA_FULL),
}
.to_json_pretty()
.unwrap();
store.insert(&tomb_key, Bytes::from(body));
let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
assert_eq!(
outcome.deferred_tombstones, 1,
"corrupt tombstone counts as deferred, not swept",
);
assert_eq!(outcome.swept_tombstones, 0);
assert_eq!(outcome.deleted_objects, 0);
let surviving = store
.get_bytes(&tomb_key)
.await
.expect("corrupt tombstone must survive sweep");
let parsed = BaselineTombstone::from_json_bytes(&surviving).unwrap();
assert_eq!(parsed.ref_name, bad_ref);
store
.get_bytes(&bundle_key)
.await
.expect("orphan bundle must survive corrupt-tombstone sweep");
}
type PostDeleteHook = Box<dyn FnOnce(&MockStore) + Send>;
struct PostDeleteHookStore {
inner: MockStore,
hook: std::sync::Mutex<Option<PostDeleteHook>>,
trigger_prefix: String,
}
impl PostDeleteHookStore {
fn new(
inner: MockStore,
trigger_prefix: impl Into<String>,
hook: impl FnOnce(&MockStore) + Send + 'static,
) -> Self {
Self {
inner,
hook: std::sync::Mutex::new(Some(Box::new(hook))),
trigger_prefix: trigger_prefix.into(),
}
}
}
crate::delegate_to_inner_impl! {
impl ObjectStore for PostDeleteHookStore {
forward: list, get_to_file, get_bytes, get_bytes_range,
put_bytes, put_path, put_if_absent,
head, copy;
async fn delete(&self, key: &str) -> Result<(), ObjectStoreError> {
let result = self.inner.delete(key).await;
if result.is_ok()
&& key.starts_with(&self.trigger_prefix)
&& let Some(hook) = self.hook.lock().unwrap().take()
{
hook(&self.inner);
}
result
}
}
}
#[tokio::test]
async fn sweep_re_derives_referenced_set_per_tombstone() {
let inner = MockStore::new();
let stale_a = (OffsetDateTime::now_utc() - time::Duration::hours(49))
.format(&Rfc3339)
.unwrap();
let stale_b = (OffsetDateTime::now_utc() - time::Duration::hours(48))
.format(&Rfc3339)
.unwrap();
write_tombstone(&inner, "repo", &stale_a, sha_set([SHA_PACK_ORPHAN]));
write_tombstone(&inner, "repo", &stale_b, sha_set([SHA_PACK_ORPHAN_2]));
insert_pack_pair(&inner, Some("repo"), SHA_PACK_ORPHAN);
insert_pack_pair(&inner, Some("repo"), SHA_PACK_ORPHAN_2);
let store = PostDeleteHookStore::new(inner, "repo/gc/tombstones-", |inner| {
for (ref_path, pack_sha) in [
("repo/refs/heads/branch_a/chain.json", SHA_PACK_ORPHAN),
("repo/refs/heads/branch_b/chain.json", SHA_PACK_ORPHAN_2),
] {
let chain = ChainManifest {
v: 1,
tip: sha40(SHA_TIP),
full_at: sha40(SHA_FULL),
segments: vec![segment(pack_sha, None)],
};
let body =
serde_json::to_vec_pretty(&chain).expect("chain.json serializes for the test");
inner.insert(ref_path, Bytes::from(body));
}
});
let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
assert_eq!(outcome.swept_tombstones, 2);
assert_eq!(outcome.deleted_objects, 2);
assert_eq!(outcome.skipped_repointed_packs, 1);
let first_survives = store
.inner
.get_bytes(&format!("repo/packs/{SHA_PACK_ORPHAN}.pack"))
.await
.is_ok();
let second_survives = store
.inner
.get_bytes(&format!("repo/packs/{SHA_PACK_ORPHAN_2}.pack"))
.await
.is_ok();
assert!(
first_survives ^ second_survives,
"exactly one pack must survive: \
first_survives={first_survives}, second_survives={second_survives}",
);
}
#[tokio::test]
async fn sweep_re_derives_referenced_set_per_pack_within_tombstone() {
let inner = MockStore::new();
let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
.format(&Rfc3339)
.unwrap();
write_tombstone(
&inner,
"repo",
&stale,
sha_set([SHA_PACK_ORPHAN, SHA_PACK_ORPHAN_2]),
);
insert_pack_pair(&inner, Some("repo"), SHA_PACK_ORPHAN);
insert_pack_pair(&inner, Some("repo"), SHA_PACK_ORPHAN_2);
let store = PostDeleteHookStore::new(inner, "repo/packs/", |inner| {
let chain = ChainManifest {
v: 1,
tip: sha40(SHA_TIP),
full_at: sha40(SHA_FULL),
segments: vec![segment(SHA_PACK_ORPHAN_2, None)],
};
let body =
serde_json::to_vec_pretty(&chain).expect("chain.json serializes for the test");
inner.insert("repo/refs/heads/concurrent/chain.json", Bytes::from(body));
});
let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
assert_eq!(outcome.swept_tombstones, 1);
assert_eq!(
outcome.deleted_objects, 2,
"only the first pack's pair deleted; the second was re-referenced",
);
assert_eq!(
outcome.skipped_repointed_packs, 1,
"second iteration's per-pack recompute must skip the re-referenced pack",
);
store
.inner
.get_bytes(&format!("repo/packs/{SHA_PACK_ORPHAN_2}.pack"))
.await
.expect("re-referenced pack must survive");
store
.inner
.get_bytes(&format!("repo/packs/{SHA_PACK_ORPHAN_2}.idx"))
.await
.expect("re-referenced pack idx must survive");
let first_err = store
.inner
.get_bytes(&format!("repo/packs/{SHA_PACK_ORPHAN}.pack"))
.await
.unwrap_err();
assert!(matches!(first_err, ObjectStoreError::NotFound(_)));
}
#[tokio::test]
async fn sweep_reclaims_genuinely_orphan_pack_with_per_tombstone_recompute() {
let store = MockStore::new();
let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
.format(&Rfc3339)
.unwrap();
write_tombstone(&store, "repo", &stale, sha_set([SHA_PACK_ORPHAN]));
insert_pack_pair(&store, Some("repo"), SHA_PACK_ORPHAN);
let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
assert_eq!(outcome.swept_tombstones, 1);
assert_eq!(outcome.deleted_objects, 2);
assert_eq!(outcome.skipped_repointed_packs, 0);
}
#[tokio::test]
async fn sweep_one_baseline_tombstone_re_reads_chain_per_tombstone() {
let inner = MockStore::new();
let bundle_key = insert_baseline_bundle(&inner, Some("repo"), SHA_FULL);
let stale_a = (OffsetDateTime::now_utc() - time::Duration::hours(49))
.format(&Rfc3339)
.unwrap();
let stale_b = (OffsetDateTime::now_utc() - time::Duration::hours(48))
.format(&Rfc3339)
.unwrap();
let tomb_a = write_baseline_tombstone_at(&inner, "repo", &stale_a, SHA_FULL);
let tomb_b = write_baseline_tombstone_at(&inner, "repo", &stale_b, SHA_FULL);
let tomb_listing = baseline_tombstone_listing_prefix(Some("repo"));
let store = PostDeleteHookStore::new(inner, &tomb_listing, |inner| {
let chain = ChainManifest {
v: 1,
tip: sha40(SHA_TIP),
full_at: sha40(SHA_FULL),
segments: vec![segment(SHA_PACK_LIVE, None)],
};
let body =
serde_json::to_vec_pretty(&chain).expect("chain.json serializes for the test");
inner.insert("repo/refs/heads/main/chain.json", Bytes::from(body));
});
let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
assert_eq!(outcome.swept_tombstones, 2);
assert_eq!(outcome.deleted_objects, 1, "only one bundle delete");
assert_eq!(
outcome.skipped_repointed_packs, 1,
"second iteration must see the re-baselined chain and skip",
);
for key in [&tomb_a, &tomb_b] {
let err = store.inner.get_bytes(key).await.unwrap_err();
assert!(matches!(err, ObjectStoreError::NotFound(_)));
}
let bundle_err = store.inner.get_bytes(&bundle_key).await.unwrap_err();
assert!(matches!(bundle_err, ObjectStoreError::NotFound(_)));
}
#[tokio::test]
async fn sweep_protects_pack_when_concurrent_push_aliases_existing_key() {
let store = MockStore::new();
let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
.format(&Rfc3339)
.unwrap();
write_tombstone(&store, "repo", &stale, sha_set([SHA_PACK_LIVE]));
insert_pack_pair(&store, Some("repo"), SHA_PACK_LIVE);
let chain = ChainManifest {
v: 1,
tip: sha40(SHA_TIP),
full_at: sha40(SHA_FULL),
segments: vec![segment(SHA_PACK_LIVE, None)],
};
write_chain(&store, Some("repo"), &ref_main(), &chain)
.await
.unwrap();
let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
assert_eq!(outcome.swept_tombstones, 1);
assert_eq!(outcome.skipped_repointed_packs, 1);
assert_eq!(outcome.deleted_objects, 0);
store
.get_bytes(&format!("repo/packs/{SHA_PACK_LIVE}.pack"))
.await
.expect("aliased pack must survive sweep");
}
#[tokio::test]
async fn grace_hours_env_override_falls_back_for_unset_or_invalid() {
let env = crate::test_util::EnvGuard::take(ENV_GC_GRACE_HOURS);
env.clear();
assert_eq!(grace_hours_from_env(), DEFAULT_GRACE_HOURS);
env.set_to("not-a-number");
assert_eq!(grace_hours_from_env(), DEFAULT_GRACE_HOURS);
env.set_to("0");
assert_eq!(grace_hours_from_env(), DEFAULT_GRACE_HOURS);
env.set_to("72");
assert_eq!(grace_hours_from_env(), 72);
}
#[test]
fn resolve_grace_hours_honours_some_zero() {
assert_eq!(resolve_grace_hours(Some(0)), 0);
}
#[test]
fn resolve_grace_hours_returns_explicit_value() {
assert_eq!(resolve_grace_hours(Some(7)), 7);
}
#[tokio::test]
async fn resolve_grace_hours_falls_back_to_env_for_none() {
let env = crate::test_util::EnvGuard::take(ENV_GC_GRACE_HOURS);
env.set_to("72");
assert_eq!(resolve_grace_hours(None), 72);
}
type PostListHook = Box<dyn FnOnce(&MockStore) + Send>;
struct PostListHookStore {
inner: MockStore,
hook: std::sync::Mutex<Option<PostListHook>>,
}
impl PostListHookStore {
fn new(inner: MockStore, hook: impl FnOnce(&MockStore) + Send + 'static) -> Self {
Self {
inner,
hook: std::sync::Mutex::new(Some(Box::new(hook))),
}
}
}
crate::delegate_to_inner_impl! {
impl ObjectStore for PostListHookStore {
forward: get_to_file, get_bytes, get_bytes_range,
put_bytes, put_path, put_if_absent,
head, copy, delete;
async fn list(
&self,
prefix: &str,
) -> Result<Vec<crate::object_store::ObjectMeta>, ObjectStoreError> {
let result = self.inner.list(prefix).await;
if result.is_ok() {
let hook = self.hook.lock().unwrap().take();
if let Some(hook) = hook {
hook(&self.inner);
}
}
result
}
}
}
#[tokio::test]
async fn mark_packs_first_ordering_avoids_false_positive_under_concurrent_push() {
let inner = MockStore::new();
seed_live_chain(&inner, Some("repo")).await;
insert_pack_pair(&inner, Some("repo"), SHA_PACK_LIVE);
let store = PostListHookStore::new(inner, |inner| {
insert_pack_pair(inner, Some("repo"), SHA_PACK_ORPHAN);
let new_chain = ChainManifest {
v: 1,
tip: sha40(SHA_TIP),
full_at: sha40(SHA_FULL),
segments: vec![segment(SHA_PACK_ORPHAN, None)],
};
let body =
serde_json::to_vec_pretty(&new_chain).expect("chain.json serializes for the test");
inner.insert("repo/refs/heads/concurrent/chain.json", Bytes::from(body));
});
let outcome = mark(&store, "repo", MarkOpts::default()).await.unwrap();
assert_eq!(
outcome.orphan_count, 0,
"packs-first ordering must not tombstone packs uploaded \
during mark whose chain commits before the chain listing"
);
let gc_metas = store.inner.list("repo/gc/").await.unwrap();
assert!(gc_metas.is_empty(), "no tombstone for empty orphan set");
}
type PostGetHook = Box<dyn FnOnce(&MockStore) + Send>;
struct PostGetHookStore {
inner: MockStore,
hook: std::sync::Mutex<Option<PostGetHook>>,
trigger_key: String,
}
impl PostGetHookStore {
fn new(
inner: MockStore,
trigger_key: impl Into<String>,
hook: impl FnOnce(&MockStore) + Send + 'static,
) -> Self {
Self {
inner,
hook: std::sync::Mutex::new(Some(Box::new(hook))),
trigger_key: trigger_key.into(),
}
}
fn hook_fired(&self) -> bool {
self.hook.lock().unwrap().is_none()
}
}
crate::delegate_to_inner_impl! {
impl ObjectStore for PostGetHookStore {
forward: list, get_to_file, get_bytes_range,
put_bytes, put_path, put_if_absent,
head, copy, delete;
async fn get_bytes(&self, key: &str) -> Result<Bytes, ObjectStoreError> {
let result = self.inner.get_bytes(key).await;
if result.is_ok()
&& key == self.trigger_key
&& let Some(hook) = self.hook.lock().unwrap().take()
{
hook(&self.inner);
}
result
}
}
}
#[tokio::test]
async fn sweep_baseline_defers_when_recheck_observes_re_baseline() {
let inner = MockStore::new();
let bundle_key = insert_baseline_bundle(&inner, Some("repo"), SHA_FULL);
let initial_chain = ChainManifest {
v: 1,
tip: sha40(SHA_TIP),
full_at: sha40(SHA_TIP),
segments: vec![segment(SHA_PACK_LIVE, None)],
};
write_chain(&inner, Some("repo"), &ref_main(), &initial_chain)
.await
.unwrap();
let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
.format(&Rfc3339)
.unwrap();
let tomb_key = write_baseline_tombstone_at(&inner, "repo", &stale, SHA_FULL);
let chain_key = "repo/refs/heads/main/chain.json";
let store = PostGetHookStore::new(inner, chain_key, move |inner| {
let re_baselined = ChainManifest {
v: 1,
tip: sha40(SHA_TIP),
full_at: sha40(SHA_FULL),
segments: vec![segment(SHA_PACK_LIVE, None)],
};
let body = serde_json::to_vec_pretty(&re_baselined)
.expect("chain.json serializes for the test");
inner.insert(chain_key, Bytes::from(body));
});
let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
assert_eq!(
outcome.deferred_tombstones, 1,
"recheck must defer when the chain re-baselined to the \
tombstoned SHA between checks",
);
assert_eq!(outcome.swept_tombstones, 0);
assert_eq!(outcome.deleted_objects, 0);
assert_eq!(outcome.skipped_repointed_packs, 0);
assert!(
store.hook_fired(),
"production code must have read chain.json so the hook \
could inject the concurrent re-baseline",
);
store
.inner
.get_bytes(&bundle_key)
.await
.expect("re-baselined bundle must survive sweep");
store
.inner
.get_bytes(&tomb_key)
.await
.expect("tombstone must survive deferred path");
}
}