Skip to main content

git_remote_object_store/packchain/
gc.rs

1//! Two-phase mark-and-sweep garbage collection for orphan packs
2//! (issue #66, Phase 5 of #52).
3//!
4//! Orphan packs are pack files in `<prefix>/packs/` that no
5//! `chain.json` references. They accumulate from:
6//!
7//! - **Force push**: replaces a chain's segments; old packs become orphan.
8//! - **Lost-race push**: a pre-lock pack upload by the loser of a
9//!   concurrent push (Phase 2 design — packs upload pre-lock to keep
10//!   the lock window short, and the loser's pack is left orphan).
11//! - **Aborted push**: a crash between pack upload and chain.json
12//!   commit leaves orphans the next push doesn't reach.
13//! - **Branch deletion**: `delete-branch` removes `chain.json` and
14//!   `path-index.json` but does not touch `<prefix>/packs/`. The
15//!   issue umbrella's "exclusively owned by that branch" claim is
16//!   wrong under content-hash dedup; pack keys can be shared across
17//!   branches that ever pushed identical object sets. The baseline
18//!   bundle (`<prefix>/<ref>/<full_at>.bundle`) is tombstoned rather
19//!   than deleted synchronously (issue #143), so an in-flight fetcher
20//!   that already read the prior `chain.json` can still complete its
21//!   range GET; the bundle is reclaimed by [`sweep`] after the grace
22//!   window.
23//! - **Compaction** (when implemented): a chain rewrite leaves the
24//!   superseded segment packs orphan.
25//! - **Missing `.idx`** (rare): a `.pack` whose sibling `.idx` was
26//!   manually deleted is treated as orphan and tombstoned.
27//!
28//! ## Two-phase mark-and-sweep
29//!
30//! Naive deletion ("delete every pack older than 24 h") races a
31//! concurrent fetch on a freshly-orphaned pack: the pack's
32//! `last_modified` reflects upload time, not orphan time. The
33//! mark/sweep split fixes this by tombstoning at orphan time and
34//! deferring deletion until after a configurable grace window.
35//!
36//! ### Phase 1 (mark)
37//!
38//! 1. List `<prefix>/packs/` to snapshot the packs currently on the
39//!    bucket. Packs-first is deliberate (issue #135): see "Concurrency"
40//!    below.
41//! 2. List `<prefix>/refs/**/chain.json` across every ref namespace
42//!    (`refs/heads/`, `refs/tags/`, `refs/notes/`, etc.), parse each,
43//!    collect referenced pack content-shas.
44//! 3. **Fail closed** on parse error: abort, log the bad key, do not
45//!    write tombstones. A corrupt chain could under-report the
46//!    referenced set and tombstone live packs.
47//! 4. Derive the orphan set (`on_bucket - referenced`) and write
48//!    `<prefix>/gc/tombstones-<run_id>-<rfc3339>.json`.
49//!
50//! ### Phase 2 (sweep)
51//!
52//! 1. List `<prefix>/gc/tombstones-*.json`.
53//! 2. For each tombstone past the grace age:
54//!    - Re-derive the orphan set from the *current* chain state.
55//!      Repeated **per tombstone**, not cached across the sweep: a
56//!      concurrent push committing `chain.json` mid-sweep would let
57//!      a cached snapshot delete a pack the new chain references,
58//!      permanently dangling the reference (issue #140). Force-revert
59//!      is the canonical trigger — deterministic gix pack emission
60//!      lets the new push reuse the tombstoned pack key without
61//!      re-uploading. The cost is one `list("refs/")` per eligible
62//!      tombstone vs one per sweep; correctness wins over the linear
63//!      overhead for the O(1)-eligible-tombstones common case.
64//!    - For each pack still orphan, delete `.pack` + `.idx`
65//!      idempotently (a prior partial sweep is fine).
66//!    - Delete the tombstone itself.
67//! 3. Younger tombstones survive for the next sweep.
68//!
69//! ### Baseline-bundle tombstones (issues #134, #143)
70//!
71//! Baseline bundles at `<prefix>/<ref>/<full_at>.bundle` are NOT
72//! reapable by the mark/sweep flow above — they live outside
73//! `<prefix>/packs/`, so [`list_pack_shas`] never sees them. The
74//! compact, force-push, and `delete-branch` code paths instead enqueue
75//! a baseline tombstone at `<prefix>/gc/baseline-tomb-<uuid>.json`
76//! whenever they supersede or remove a baseline. Sweep processes those alongside pack
77//! tombstones: after the grace window expires it re-checks the
78//! current `chain.json` for the ref (skipping the delete if a later
79//! push re-baselined to the same SHA), then deletes the bundle and
80//! the tombstone. The bundle stays in place for the entire grace
81//! window, so a concurrent fetch that read the prior `chain.json`
82//! before the compact/force-push committed can still download it.
83//!
84//! ### `--force`
85//!
86//! Skips ONLY the grace window. The live-pack re-check still runs:
87//! a tombstone whose SHA appears in the current chain set is left
88//! alone. This closes the race where `mark()` snapshots packs after a
89//! concurrent push has uploaded `packs/<sha>.{pack,idx}` but has not
90//! yet committed `chain.json` — by sweep time the chain has landed
91//! and the pack is live, so the stale tombstone must not delete it.
92//! A `tracing::warn!` line records the operator's choice.
93//!
94//! ## Concurrency
95//!
96//! Two operators running `gc` simultaneously each get a `UUIDv4` run id
97//! → distinct tombstone files, no clobber. Concurrent sweeps tolerate
98//! `NotFound` on already-deleted packs.
99//!
100//! Mark lists packs first, then chains (issue #135). With this order,
101//! a push landing during mark either:
102//!
103//! - uploaded its pack *after* [`list_pack_shas`] — the pack is not in
104//!   the on-bucket snapshot, so it cannot enter the orphan set
105//!   regardless of when its chain commits; or
106//! - uploaded its pack *before* [`list_pack_shas`] AND committed
107//!   `chain.json` before [`list_referenced_packs`] — the pack is in the
108//!   referenced set, so it is filtered out of orphans; or
109//! - uploaded its pack *before* [`list_pack_shas`] and has not yet
110//!   committed `chain.json` by the time [`list_referenced_packs`] runs
111//!   — the pack is tombstoned, but the grace window leaves it readable
112//!   long enough for the push to complete (the genuine-orphan case for
113//!   an aborted push is exactly what the GC is designed to reap).
114//!
115//! The reverse order (chains-first) is the bug fixed by #135: a chain
116//! commit landing between the chain list and the pack list would let a
117//! freshly-uploaded pack appear in [`list_pack_shas`] without appearing
118//! in [`list_referenced_packs`], producing a false-positive tombstone.
119//! Sweep's per-tombstone re-derive (issue #140) would usually catch
120//! that at sweep time, but a `--force` sweep run in the same session as
121//! mark (e.g. `compact --with-gc`) could still delete the live pack
122//! before the push's chain commit lands.
123//!
124//! The grace window separately covers a fetch reading an old chain
125//! whose packs are about to be swept.
126
127use std::collections::HashSet;
128
129use bytes::Bytes;
130use futures::stream::{StreamExt, TryStreamExt};
131use serde::{Deserialize, Serialize};
132use time::OffsetDateTime;
133use time::format_description::well_known::Rfc3339;
134use tracing::{debug, error, info, warn};
135use uuid::Uuid;
136
137use crate::git::RefName;
138use crate::keys;
139use crate::object_store::{ObjectStore, ObjectStoreError, PutOpts};
140use crate::protocol::fetch::MAX_FETCH_CONCURRENCY;
141
142use super::PackchainError;
143use super::manifest::load_chain;
144use super::schema::{ChainManifest, Sha40};
145
146/// Default grace window between mark and sweep (24 hours). A pack
147/// tombstoned during mark is only deletable after this duration has
148/// elapsed since `marked_at`.
149pub const DEFAULT_GRACE_HOURS: u64 = 24;
150
151/// Decision returned by [`check_grace_window`].
152#[derive(Debug, Clone, Copy, PartialEq, Eq)]
153enum GraceDecision {
154    /// `marked_at` is recent enough that the tombstone must remain.
155    Within,
156    /// `marked_at` is older than `grace_hours`; the sweep may proceed.
157    Past,
158}
159
160/// Format `now` as an RFC 3339 string and wrap the underlying
161/// formatter error in a [`PackchainError::Io`]. Centralises the
162/// `OffsetDateTime::format` + `map_err` shape previously inlined at
163/// the two tombstone-write paths in [`write_baseline_tombstone`] and
164/// [`mark`].
165fn rfc3339_now() -> Result<String, PackchainError> {
166    OffsetDateTime::now_utc().format(&Rfc3339).map_err(|e| {
167        PackchainError::Io(std::io::Error::other(format!("rfc3339 format failed: {e}")))
168    })
169}
170
171/// Parse `marked_at` as RFC 3339 and decide whether `now - marked_at`
172/// has crossed `grace_hours`. A negative age (tombstone marked in the
173/// future under operator clock skew) is treated as
174/// [`GraceDecision::Within`] so a sweep does not run prematurely.
175///
176/// `kind` is interpolated into both the parse-error message and the
177/// `debug!` log line so the two sweep call paths (pack tombstones vs.
178/// baseline tombstones) stay distinguishable in operator logs.
179fn check_grace_window(
180    marked_at: &str,
181    grace_hours: u64,
182    kind: &'static str,
183) -> Result<GraceDecision, PackchainError> {
184    let marked_at_ts = OffsetDateTime::parse(marked_at, &Rfc3339).map_err(|e| {
185        PackchainError::Io(std::io::Error::other(format!(
186            "{kind} marked_at parse failed: {e}"
187        )))
188    })?;
189    let age_hours = (OffsetDateTime::now_utc() - marked_at_ts).whole_hours();
190    // Negative age = a tombstone marked in the future (operator clock
191    // skew). Treat as "still within grace" rather than sweeping
192    // prematurely. The `try_into` is the canonical way to compare an
193    // `i64` against an unsigned grace window without a sign-loss cast.
194    let within = age_hours
195        .try_into()
196        .map_or(true, |hours: u64| hours < grace_hours);
197    Ok(if within {
198        GraceDecision::Within
199    } else {
200        GraceDecision::Past
201    })
202}
203
204/// Best-effort version of [`write_baseline_tombstone`]: writes the
205/// tombstone and, on error, logs at `warn` with the orphan key and
206/// `source` discriminator (`"force-push"` / `"compact"`). Used by
207/// callers that run AFTER `chain.json` is durable, where a tombstone
208/// failure must NOT propagate as a push/compact failure — retrying the
209/// caller would short-circuit through `AlreadyMinimal` (compact) or
210/// the no-op same-SHA branch (push) and never re-attempt the cleanup,
211/// leaving the orphaned bundle without a tombstone.
212///
213/// Returns `true` iff a tombstone was successfully written. Callers
214/// that emit a success-only debug trace check the return value;
215/// `false` covers both the same-SHA short-circuit and a warned-on
216/// write error.
217pub(crate) async fn write_baseline_tombstone_best_effort(
218    store: &dyn ObjectStore,
219    prefix: Option<&str>,
220    ref_name: &RefName,
221    prior_full_sha: &Sha40,
222    current_full_sha: &Sha40,
223    source: &'static str,
224) -> bool {
225    match write_baseline_tombstone(store, prefix, ref_name, prior_full_sha, current_full_sha).await
226    {
227        Ok(()) => prior_full_sha != current_full_sha,
228        Err(e) => {
229            let orphan_key = keys::bundle_key(prefix, ref_name.as_str(), prior_full_sha.as_str());
230            warn!(
231                source,
232                ref_path = %ref_name.as_str(),
233                key = %orphan_key,
234                error = %e,
235                "baseline tombstone write failed (chain.json already committed); \
236                 orphan bundle left for manual cleanup",
237            );
238            false
239        }
240    }
241}
242
243/// Environment variable that overrides [`DEFAULT_GRACE_HOURS`] when
244/// set to a positive integer. Mirrors the shape of
245/// `GIT_REMOTE_OBJECT_STORE_LOCK_TTL_SECONDS` used by the protocol REPL.
246pub(crate) const ENV_GC_GRACE_HOURS: &str = "GIT_REMOTE_OBJECT_STORE_GC_GRACE_HOURS";
247
248/// On-bucket schema version this build reads and writes.
249pub const TOMBSTONE_SCHEMA_VERSION: u32 = 1;
250
251/// Reject a parsed schema version that does not match
252/// [`TOMBSTONE_SCHEMA_VERSION`]. Shared by the two tombstone parsers
253/// ([`Tombstone::from_json_bytes`] and [`BaselineTombstone::from_json_bytes`])
254/// so both report the same `UnsupportedSchemaVersion` shape against
255/// the same constant.
256fn check_tombstone_schema_version(found: u32) -> Result<(), PackchainError> {
257    if found == TOMBSTONE_SCHEMA_VERSION {
258        Ok(())
259    } else {
260        Err(PackchainError::UnsupportedSchemaVersion {
261            found,
262            expected: TOMBSTONE_SCHEMA_VERSION,
263        })
264    }
265}
266
267/// On-bucket tombstone — a record of one mark phase's orphan set.
268///
269/// Lives at `<prefix>/gc/tombstones-<run_id>-<rfc3339>.json`. The
270/// timestamp in the filename is for human inspection; the
271/// authoritative `marked_at` is the field inside the JSON body.
272#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
273pub(crate) struct Tombstone {
274    /// Schema version. Always [`TOMBSTONE_SCHEMA_VERSION`] when written.
275    pub(crate) v: u32,
276    /// `UUIDv4` run identifier. Two concurrent `gc` runs each get a
277    /// distinct id, so their tombstone keys don't clobber.
278    pub(crate) run_id: String,
279    /// RFC 3339 timestamp at which the mark phase produced this set.
280    /// Sweep compares this against the grace window.
281    pub(crate) marked_at: String,
282    /// Content-shas of orphan packs at mark time. Sweep re-checks
283    /// each against the current chain state before deleting.
284    pub(crate) orphan_packs: Vec<Sha40>,
285}
286
287impl Tombstone {
288    /// Parse `bytes` as a tombstone JSON, validating the schema
289    /// version before returning.
290    ///
291    /// # Errors
292    ///
293    /// - [`PackchainError::ParseJson`] for malformed JSON / missing
294    ///   fields / `Sha40` validation failures.
295    /// - [`PackchainError::UnsupportedSchemaVersion`] when `v` is not
296    ///   [`TOMBSTONE_SCHEMA_VERSION`].
297    pub(crate) fn from_json_bytes(bytes: &[u8]) -> Result<Self, PackchainError> {
298        let parsed: Self = serde_json::from_slice(bytes)?;
299        check_tombstone_schema_version(parsed.v)?;
300        Ok(parsed)
301    }
302
303    /// Render to pretty-printed JSON bytes.
304    ///
305    /// # Errors
306    ///
307    /// `serde_json::to_vec_pretty` is infallible for this schema
308    /// today, but the function returns `Result` for forward
309    /// compatibility with future fields.
310    pub(crate) fn to_json_pretty(&self) -> Result<Vec<u8>, PackchainError> {
311        Ok(serde_json::to_vec_pretty(self)?)
312    }
313}
314
315/// On-bucket tombstone for a superseded baseline bundle (issues #134, #143).
316///
317/// Lives at `<prefix>/gc/baseline-tomb-<uuid>.json`. Written by
318/// [`super::compact`], [`super::push`], and the management
319/// `delete-branch` flow whenever a chain rewrite or ref removal makes
320/// a `<prefix>/<ref>/<sha>.bundle` unreachable. Unlike pack
321/// tombstones the body names a specific (ref, sha) — there is exactly
322/// one bundle key per record — so [`sweep`] does not need to re-derive
323/// an orphan set from listings.
324#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
325pub(crate) struct BaselineTombstone {
326    /// Schema version. Always [`TOMBSTONE_SCHEMA_VERSION`] when written.
327    pub(crate) v: u32,
328    /// RFC 3339 timestamp at which the tombstone was written. Sweep
329    /// compares this against the grace window.
330    pub(crate) marked_at: String,
331    /// Ref the orphaned bundle belonged to (e.g. `refs/heads/main`).
332    /// Stored as a raw string for forward compatibility with whatever
333    /// `RefName` accepts at sweep time.
334    pub(crate) ref_name: String,
335    /// Content-SHA of the bundle (matches the `<sha>.bundle` filename).
336    /// Sweep skips the delete when the ref's current `chain.full_at`
337    /// equals this SHA (a later push re-baselined to the same tip).
338    pub(crate) sha: Sha40,
339}
340
341impl BaselineTombstone {
342    /// Parse `bytes` as a baseline tombstone JSON, validating the
343    /// schema version before returning.
344    ///
345    /// # Errors
346    ///
347    /// - [`PackchainError::ParseJson`] for malformed JSON / missing
348    ///   fields / `Sha40` validation failures.
349    /// - [`PackchainError::UnsupportedSchemaVersion`] when `v` is not
350    ///   [`TOMBSTONE_SCHEMA_VERSION`].
351    pub(crate) fn from_json_bytes(bytes: &[u8]) -> Result<Self, PackchainError> {
352        let parsed: Self = serde_json::from_slice(bytes)?;
353        check_tombstone_schema_version(parsed.v)?;
354        Ok(parsed)
355    }
356
357    /// Render to pretty-printed JSON bytes.
358    pub(crate) fn to_json_pretty(&self) -> Result<Vec<u8>, PackchainError> {
359        Ok(serde_json::to_vec_pretty(self)?)
360    }
361}
362
363/// Write a baseline tombstone for the bundle at
364/// `<prefix>/<ref_name>/<sha>.bundle` (issue #134).
365///
366/// Called from [`super::compact`] and [`super::push`] after the new
367/// `chain.json` is durable — at that point the bundle has no chain
368/// reference and is eligible for deletion, but a fetch that loaded
369/// the prior chain may still be about to GET it. The tombstone
370/// defers the delete to the next `gc sweep` past the grace window.
371///
372/// `prior_full_sha` is the SHA of the superseded baseline; `current_full_sha`
373/// is the new chain's `full_at`. When they are equal the function
374/// returns without writing a tombstone — the keys alias the same live
375/// bundle (compact left `full_at` unchanged, or force-push targeted
376/// the same tip).
377///
378/// # Errors
379///
380/// Returns [`PackchainError::Store`] on a PUT failure. Callers run
381/// this AFTER `chain.json` is committed and must treat the failure as
382/// best-effort: log a warning and report success, since retrying
383/// would short-circuit through `AlreadyMinimal` and never re-attempt
384/// the cleanup.
385pub(crate) async fn write_baseline_tombstone(
386    store: &dyn ObjectStore,
387    prefix: Option<&str>,
388    ref_name: &RefName,
389    prior_full_sha: &Sha40,
390    current_full_sha: &Sha40,
391) -> Result<(), PackchainError> {
392    if prior_full_sha == current_full_sha {
393        return Ok(());
394    }
395    write_baseline_tombstone_unconditional(store, prefix, ref_name, prior_full_sha).await
396}
397
398/// Write a baseline tombstone naming `orphan_sha` regardless of any
399/// successor SHA — used by `delete-branch` (issue #143), which
400/// removes the chain entirely and has no replacement baseline to
401/// compare against. Otherwise identical in shape to
402/// [`write_baseline_tombstone`]: the body is parsed by
403/// [`sweep_one_baseline_tombstone`], which sees a `chain.json`-less
404/// ref (the synchronous sweep deletes it) and proceeds with the
405/// deferred bundle delete after the grace window.
406///
407/// # Errors
408///
409/// Returns [`PackchainError::Store`] on a PUT failure. Callers run
410/// this BEFORE the synchronous sweep that removes the rest of the
411/// ref's objects; a failure here should fall back to immediate
412/// bundle deletion so the operator's "ref is gone" intent is still
413/// satisfied, rather than leaving a half-tombstoned half-deleted
414/// state behind.
415pub(crate) async fn write_baseline_tombstone_for_orphan(
416    store: &dyn ObjectStore,
417    prefix: Option<&str>,
418    ref_name: &RefName,
419    orphan_sha: &Sha40,
420) -> Result<(), PackchainError> {
421    write_baseline_tombstone_unconditional(store, prefix, ref_name, orphan_sha).await
422}
423
424/// Attempt to tombstone the baseline bundle for a delete so the
425/// synchronous sweep loop can skip it (issue #143 / #203).
426/// Returns the bundle key that was deferred, or `None` when deferral
427/// is not actionable (bundle-engine ref with no `chain.json`,
428/// unparseable `chain.json`, no `<full_at>.bundle` in the listing, or
429/// a tombstone PUT failure).
430///
431/// The `None` fall-through is the correct behaviour for every
432/// "deferral is not actionable" case: with no tombstone, `gc sweep`
433/// has nothing to reclaim, so the sweep loop must remove the bundle
434/// synchronously instead. A logged warning surfaces the rarer
435/// load/parse/PUT failures for operator review without blocking the
436/// delete.
437///
438/// Runs UNDER the per-ref lock (#158): a concurrent push that landed
439/// between the tombstone and the chain.json delete would otherwise
440/// leave the bucket with a tombstone referencing a SHA no longer in
441/// the chain, and `gc sweep` would reclaim a live bundle.
442///
443/// `log_context` discriminates the warn-event source for log
444/// scraping (`"packchain delete"`, `"delete-branch"`, etc.).
445///
446/// Replaces the previous per-call-site clones in
447/// `manage::branch::ManageBranch::try_tombstone_baseline` and
448/// `packchain::push::try_tombstone_baseline_for_delete` (#221).
449pub(crate) async fn try_write_baseline_tombstone(
450    store: &dyn ObjectStore,
451    prefix: Option<&str>,
452    remote_ref: &RefName,
453    fresh: &[crate::object_store::ObjectMeta],
454    log_context: &'static str,
455) -> Option<String> {
456    let chain = match load_chain(store, prefix, remote_ref).await {
457        Ok(Some(chain)) => chain,
458        Ok(None) => return None,
459        Err(err) => {
460            warn!(
461                source = log_context,
462                ref_path = %remote_ref.as_str(),
463                error = %err,
464                "chain.json read/parse failed; falling back to synchronous bundle delete",
465            );
466            return None;
467        }
468    };
469    let bundle_key = keys::bundle_key(prefix, remote_ref.as_str(), chain.full_at.as_str());
470    // The baseline bundle must actually be in the under-lock listing
471    // — otherwise the deferred delete has nothing to defer (it was
472    // already gone, or the chain points outside our prefix). A
473    // mismatched `full_at` against listing reality is the canonical
474    // "chain.json points at a missing bundle" doctor case; immediate
475    // sweep is the right fallback there too.
476    if !fresh.iter().any(|m| m.key == bundle_key) {
477        return None;
478    }
479    match write_baseline_tombstone_for_orphan(store, prefix, remote_ref, &chain.full_at).await {
480        Ok(()) => Some(bundle_key),
481        Err(err) => {
482            warn!(
483                source = log_context,
484                ref_path = %remote_ref.as_str(),
485                key = %bundle_key,
486                error = %err,
487                "baseline tombstone write failed; falling back to synchronous bundle delete",
488            );
489            None
490        }
491    }
492}
493
494/// Return the set of bundle keys (full `<prefix>/<ref>/<sha>.bundle`
495/// paths) currently named by any baseline tombstone under
496/// `<prefix>/gc/baseline-tomb-*.json`.
497///
498/// Issue #157: the bundle engine derives `list`'s per-ref `<sha>`
499/// from the bundle keys themselves (unlike packchain, which reads
500/// `chain.tip`). Once a force-push tombstones the prior bundle, the
501/// listing must hide that bundle so:
502///
503/// 1. The `list` wire output does not advertise two SHAs for the
504///    same ref (which would emit two `<sha> <ref>\n` lines and
505///    confuse git), and
506/// 2. The under-lock multi-bundle guard in
507///    `crate::protocol::push::perform_push_under_lock` does not
508///    refuse the next push with the "multiple bundles" wire error.
509///
510/// The bundle keys themselves remain readable at their original
511/// paths — fetchers that already advertised the tombstoned SHA
512/// (issue #157's race) complete normally. `gc sweep` reclaims them
513/// after the grace window.
514///
515/// Returns full bucket keys, not stems, so callers can compare
516/// directly against `ObjectMeta::key` without re-deriving the prefix.
517/// A tombstone whose `ref_name` no longer parses as a `RefName` is
518/// skipped with a warn (mirrors `sweep_one_baseline_tombstone`'s
519/// invalid-ref handling) — the bundle key is unknowable but the
520/// tombstone itself remains for operator review.
521///
522/// # Errors
523///
524/// Propagates [`ObjectStoreError`] from the listing call. Per-tombstone
525/// parse failures are skipped with a warn — a single malformed
526/// tombstone must not block every listing.
527pub(crate) async fn tombstoned_bundle_keys(
528    store: &dyn ObjectStore,
529    prefix: Option<&str>,
530) -> Result<HashSet<String>, ObjectStoreError> {
531    let prefix_str = prefix.unwrap_or("");
532    let gc_listing = gc_listing_prefix(prefix_str);
533    let metas = match store.list(&gc_listing).await {
534        Ok(m) => m,
535        // NotFound on an unsupported listing prefix on a fresh bucket
536        // is normal — no tombstones to hide.
537        Err(ObjectStoreError::NotFound(_)) => return Ok(HashSet::new()),
538        Err(e) => return Err(e),
539    };
540    let mut keys = HashSet::new();
541    for meta in metas {
542        if !is_baseline_tombstone_key(&meta.key, prefix_str) {
543            continue;
544        }
545        let body = match store.get_bytes(&meta.key).await {
546            Ok(b) => b,
547            Err(ObjectStoreError::NotFound(_)) => continue, // raced delete
548            Err(e) => return Err(e),
549        };
550        let tombstone = match BaselineTombstone::from_json_bytes(&body) {
551            Ok(t) => t,
552            Err(e) => {
553                warn!(
554                    key = %meta.key,
555                    error = %e,
556                    "tombstoned_bundle_keys: skipping unparseable baseline tombstone",
557                );
558                continue;
559            }
560        };
561        let Ok(ref_name) = RefName::new(tombstone.ref_name.clone()) else {
562            warn!(
563                key = %meta.key,
564                ref_name = %tombstone.ref_name,
565                "tombstoned_bundle_keys: skipping tombstone with invalid ref_name",
566            );
567            continue;
568        };
569        keys.insert(keys::bundle_key(prefix, &ref_name, tombstone.sha.as_str()));
570    }
571    Ok(keys)
572}
573
574/// Shared body of [`write_baseline_tombstone`] and
575/// [`write_baseline_tombstone_for_orphan`]: emit a
576/// `<prefix>/gc/baseline-tomb-<uuid>.json` record naming
577/// `(ref_name, orphan_sha)`. Centralised so the two call shapes
578/// (with-successor and unconditional) cannot drift on the JSON body
579/// shape or the key namespace.
580async fn write_baseline_tombstone_unconditional(
581    store: &dyn ObjectStore,
582    prefix: Option<&str>,
583    ref_name: &RefName,
584    orphan_sha: &Sha40,
585) -> Result<(), PackchainError> {
586    let marked_at = rfc3339_now()?;
587    let tombstone = BaselineTombstone {
588        v: TOMBSTONE_SCHEMA_VERSION,
589        marked_at,
590        ref_name: ref_name.as_str().to_owned(),
591        sha: orphan_sha.clone(),
592    };
593    let key = baseline_tombstone_key(prefix.unwrap_or(""), &Uuid::new_v4().to_string());
594    let body = Bytes::from(tombstone.to_json_pretty()?);
595    store.put_bytes(&key, body, PutOpts::default()).await?;
596    debug!(
597        key = %key,
598        ref_path = %ref_name.as_str(),
599        sha = %orphan_sha.as_str(),
600        "gc: baseline tombstone written",
601    );
602    Ok(())
603}
604
605/// Outcome of [`mark`].
606#[derive(Debug, Clone)]
607pub struct MarkOutcome {
608    /// `UUIDv4` run id assigned to this mark pass. Embedded in the
609    /// tombstone filename and body.
610    pub run_id: String,
611    /// Number of orphan packs identified.
612    pub orphan_count: usize,
613    /// Bucket key the tombstone was written to.
614    pub tombstone_key: String,
615}
616
617/// Outcome of [`sweep`].
618#[derive(Debug, Clone, Default)]
619pub struct SweepOutcome {
620    /// Tombstones whose packs were deleted (and which were themselves
621    /// deleted as a result).
622    pub swept_tombstones: usize,
623    /// Tombstones still inside the grace window — left for the next
624    /// sweep.
625    pub deferred_tombstones: usize,
626    /// Pack file deletions executed (counts both `.pack` and `.idx`
627    /// deletions, so two per orphan in the typical case).
628    pub deleted_objects: usize,
629    /// Tombstoned packs that were no longer orphan at sweep time
630    /// (re-referenced between mark and sweep, or deleted by an
631    /// earlier sweep). Skipped without error.
632    pub skipped_repointed_packs: usize,
633}
634
635/// Knobs for [`mark`].
636#[derive(Debug, Clone, Copy, Default)]
637pub struct MarkOpts {
638    /// When `true`, list and report but do not write a tombstone file
639    /// or modify the bucket. Used by `doctor` to surface orphan stats.
640    pub dry_run: bool,
641}
642
643/// Knobs for [`sweep`].
644#[derive(Debug, Clone, Copy)]
645pub struct SweepOpts {
646    /// Grace duration in hours. Tombstones with `marked_at` younger
647    /// than this stay deferred. Ignored when `force` is `true`.
648    pub grace_hours: u64,
649    /// When `true`, skip the grace check. The live-pack re-derive
650    /// still runs — a tombstone whose SHA is now referenced by a
651    /// committed chain is left alone (closes the mark/commit race
652    /// from #117). The grace window is the only safety check this
653    /// flag suppresses; concurrent fetches that still hold a SHA in
654    /// flight are NOT protected by either path.
655    pub force: bool,
656}
657
658impl Default for SweepOpts {
659    fn default() -> Self {
660        Self {
661            grace_hours: DEFAULT_GRACE_HOURS,
662            force: false,
663        }
664    }
665}
666
667/// Read [`DEFAULT_GRACE_HOURS`] subject to the
668/// [`ENV_GC_GRACE_HOURS`] override. Returns the default for unset
669/// vars, non-numeric values, or zero (a zero grace would defeat the
670/// mark/sweep design's point).
671#[must_use]
672pub(crate) fn grace_hours_from_env() -> u64 {
673    std::env::var(ENV_GC_GRACE_HOURS)
674        .ok()
675        .and_then(|v| v.parse::<u64>().ok())
676        .filter(|h| *h > 0)
677        .unwrap_or(DEFAULT_GRACE_HOURS)
678}
679
680/// Resolve a caller-supplied `Option<u64>` grace-hours value to a
681/// concrete count, deferring to [`grace_hours_from_env`] when the
682/// caller passes `None` (#221).
683///
684/// Unlike [`crate::protocol::push::resolve_lock_ttl_seconds`], this
685/// resolver does **not** clamp `Some(0)`. A zero grace window is a
686/// legitimate operator intent — "sweep without a grace window, e.g.
687/// in force mode" — and is the canonical value used by force-sweep
688/// tests (`SweepOpts { grace_hours: 0, force: true }`). The lock-TTL
689/// clamp protects against `acquire_lock` treating every held lock as
690/// instantly stale (#208); grace-hours has no analogous foot-gun.
691#[must_use]
692pub(crate) fn resolve_grace_hours(opt: Option<u64>) -> u64 {
693    opt.unwrap_or_else(grace_hours_from_env)
694}
695
696/// Run the mark phase: snapshot every pack on the bucket, then every
697/// chain, then write a tombstone naming the orphans.
698///
699/// `prefix` is the repository prefix without leading or trailing
700/// slashes — pass an empty string for bucket-root repositories.
701///
702/// # Ordering
703///
704/// Listings run packs-first, chains-second (issue #135). The reverse
705/// order races a concurrent push that uploads a new pack between the
706/// two listings and commits its `chain.json` *between the chain list
707/// and the pack list*: the new pack would appear in the on-bucket set
708/// but not in the referenced set, producing a false-positive tombstone.
709/// Packs-first inverts the staleness: the referenced set is always at
710/// least as fresh as the on-bucket set, so a pack appearing in the
711/// snapshot is either also in the chain set (saved) or genuinely
712/// orphan at some point during the mark (correctly tombstoned, with
713/// the grace window covering in-flight pushes).
714///
715/// # Errors
716///
717/// - Any chain.json that fails to parse aborts the mark with
718///   [`PackchainError::ParseJson`] / [`PackchainError::InvalidSha`] /
719///   [`PackchainError::UnsupportedSchemaVersion`]. The tombstone is
720///   not written. Operators must repair the bad chain (or remove it)
721///   before re-running.
722/// - [`PackchainError::Store`] / [`PackchainError::Io`] for transport
723///   or local-I/O failures.
724///
725/// # Example
726///
727/// ```no_run
728/// # #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> {
729/// use git_remote_object_store::Remote;
730/// use git_remote_object_store::packchain::gc::{MarkOpts, mark};
731///
732/// let remote = Remote::connect("s3+https://bucket/repo?engine=packchain").await?;
733/// let outcome = mark(remote.store(), remote.prefix(), MarkOpts::default()).await?;
734/// println!(
735///     "{} orphan pack(s) tombstoned (run id {})",
736///     outcome.orphan_count, outcome.run_id,
737/// );
738/// # Ok(())
739/// # }
740/// ```
741pub async fn mark(
742    store: &dyn ObjectStore,
743    prefix: &str,
744    opts: MarkOpts,
745) -> Result<MarkOutcome, PackchainError> {
746    // Packs-first ordering (issue #135): the on-bucket snapshot must
747    // be at least as stale as the referenced set. A pack uploaded by a
748    // concurrent push between these two listings is harmless — it
749    // either is not in `on_bucket` (uploaded after the pack list) or
750    // is in `referenced` (uploaded before the pack list AND its
751    // chain.json committed before the chain list). The reverse order
752    // produces false-positive tombstones; see the module docstring.
753    let on_bucket = list_pack_shas(store, prefix).await?;
754    let referenced = list_referenced_packs(store, prefix).await?;
755    let orphans: Vec<Sha40> = on_bucket
756        .into_iter()
757        .filter(|sha| !referenced.contains(sha))
758        .collect();
759
760    let run_id = Uuid::new_v4().to_string();
761    let marked_at = rfc3339_now()?;
762    let tombstone_key = tombstone_key(prefix, &run_id, &marked_at);
763    let orphan_count = orphans.len();
764    let tombstone = Tombstone {
765        v: TOMBSTONE_SCHEMA_VERSION,
766        run_id: run_id.clone(),
767        marked_at,
768        orphan_packs: orphans,
769    };
770    let outcome = MarkOutcome {
771        run_id,
772        orphan_count,
773        tombstone_key,
774    };
775
776    if opts.dry_run {
777        debug!(
778            run_id = %outcome.run_id,
779            orphans = outcome.orphan_count,
780            "gc mark: dry-run, not writing tombstone",
781        );
782        return Ok(outcome);
783    }
784
785    if outcome.orphan_count == 0 {
786        info!(run_id = %outcome.run_id, "gc mark: no orphans; skipping tombstone");
787        return Ok(outcome);
788    }
789
790    let body = Bytes::from(tombstone.to_json_pretty()?);
791    store
792        .put_bytes(&outcome.tombstone_key, body, PutOpts::default())
793        .await?;
794    info!(
795        run_id = %outcome.run_id,
796        orphans = outcome.orphan_count,
797        key = %outcome.tombstone_key,
798        "gc mark: tombstone written",
799    );
800    Ok(outcome)
801}
802
803/// Run the sweep phase: walk tombstones, delete eligible orphans.
804///
805/// `prefix` and the threading semantics match [`mark`].
806///
807/// # Errors
808///
809/// Sweep is best-effort: a single tombstone failure does not abort
810/// the run (errors are logged and the next tombstone is tried).
811/// Returns [`PackchainError::Store`] only when the initial
812/// tombstone-list call fails.
813///
814/// # Example
815///
816/// ```no_run
817/// # #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> {
818/// use git_remote_object_store::Remote;
819/// use git_remote_object_store::packchain::gc::{SweepOpts, sweep};
820///
821/// let remote = Remote::connect("s3+https://bucket/repo?engine=packchain").await?;
822/// let outcome = sweep(
823///     remote.store(),
824///     remote.prefix(),
825///     SweepOpts::default(),
826/// )
827/// .await?;
828/// println!(
829///     "swept {} tombstone(s), deleted {} object(s), deferred {}",
830///     outcome.swept_tombstones,
831///     outcome.deleted_objects,
832///     outcome.deferred_tombstones,
833/// );
834/// # Ok(())
835/// # }
836/// ```
837pub async fn sweep(
838    store: &dyn ObjectStore,
839    prefix: &str,
840    opts: SweepOpts,
841) -> Result<SweepOutcome, PackchainError> {
842    let tombstones_prefix = gc_listing_prefix(prefix);
843    let metas = store.list(&tombstones_prefix).await?;
844    let mut outcome = SweepOutcome::default();
845
846    if opts.force {
847        warn!("gc sweep: --force in effect; skipping grace window");
848    }
849
850    for meta in metas {
851        if !meta.key.as_bytes().ends_with(b".json") {
852            continue;
853        }
854        let step = if is_tombstone_key(&meta.key, prefix) {
855            sweep_one_tombstone(store, prefix, &meta.key, opts).await
856        } else if is_baseline_tombstone_key(&meta.key, prefix) {
857            sweep_one_baseline_tombstone(store, prefix, &meta.key, opts).await
858        } else {
859            continue;
860        };
861        match step {
862            Ok(SweepStep::Deferred) => outcome.deferred_tombstones += 1,
863            Ok(SweepStep::Swept {
864                deleted_objects,
865                skipped_repointed_packs,
866            }) => {
867                outcome.swept_tombstones += 1;
868                outcome.deleted_objects += deleted_objects;
869                outcome.skipped_repointed_packs += skipped_repointed_packs;
870            }
871            Err(e) => {
872                warn!(key = %meta.key, error = %e, "gc sweep: tombstone failed");
873            }
874        }
875    }
876    Ok(outcome)
877}
878
879#[derive(Debug)]
880enum SweepStep {
881    Deferred,
882    Swept {
883        deleted_objects: usize,
884        skipped_repointed_packs: usize,
885    },
886}
887
888async fn sweep_one_tombstone(
889    store: &dyn ObjectStore,
890    prefix: &str,
891    tombstone_key: &str,
892    opts: SweepOpts,
893) -> Result<SweepStep, PackchainError> {
894    let body = match store.get_bytes(tombstone_key).await {
895        Ok(b) => b,
896        Err(ObjectStoreError::NotFound(_)) => {
897            // Concurrent sweep already cleaned this up.
898            return Ok(SweepStep::Swept {
899                deleted_objects: 0,
900                skipped_repointed_packs: 0,
901            });
902        }
903        Err(e) => return Err(PackchainError::Store(e)),
904    };
905    let tombstone = Tombstone::from_json_bytes(&body)?;
906
907    if !opts.force
908        && check_grace_window(&tombstone.marked_at, opts.grace_hours, "tombstone")?
909            == GraceDecision::Within
910    {
911        debug!(
912            key = %tombstone_key,
913            marked_at = %tombstone.marked_at,
914            "gc sweep: tombstone within grace window",
915        );
916        return Ok(SweepStep::Deferred);
917    }
918
919    // Re-derive the live referenced set per pack delete, AFTER the
920    // grace check passes — never cache across iterations (issue #140
921    // for the cross-tombstone race, issue #152 for the cross-pack
922    // race inside a single tombstone). A concurrent push committing
923    // chain.json after a per-tombstone snapshot still loses a live
924    // pack named later in the same tombstone's `orphan_packs` vector,
925    // because `mark()` packs all orphans for a run into one tombstone
926    // body. Force-revert is the canonical trigger: gix pack emission
927    // is deterministic for the same object set, so the new pack key
928    // aliases the tombstoned one and the push skips upload, only
929    // touching chain.json. Per-pack re-listing costs one extra
930    // `list("refs/")` + bounded-parallel chain GETs per orphan pack;
931    // for the rare GC maintenance path this is acceptable overhead in
932    // exchange for closing the cross-pack window. The recompute also
933    // runs under --force: that flag suppresses the grace window only,
934    // NOT this guard (issue #117). A residual TOCTOU window remains
935    // between this listing and the delete that follows; the fix
936    // shrinks the window from "one snapshot per tombstone" to "one
937    // snapshot per pack", which is the bound the issue asks for.
938    let mut deleted_objects = 0usize;
939    let mut skipped_repointed_packs = 0usize;
940    for sha in &tombstone.orphan_packs {
941        // Always honour the live-pack guard, including under --force.
942        // See the recompute comment above and issue #117 for why.
943        let referenced = list_referenced_packs(store, prefix).await?;
944        if referenced.contains(sha) {
945            skipped_repointed_packs += 1;
946            debug!(
947                sha = %sha.as_str(),
948                "gc sweep: tombstoned pack re-referenced; skipping",
949            );
950            continue;
951        }
952        let pack_key = super::keys::pack_key(Some(prefix), sha);
953        let idx_key = super::keys::pack_idx_key(Some(prefix), sha);
954        if delete_idempotent(store, &pack_key).await? {
955            deleted_objects += 1;
956        }
957        if delete_idempotent(store, &idx_key).await? {
958            deleted_objects += 1;
959        }
960    }
961    // Drop the tombstone last so a sweep crash mid-deletion leaves a
962    // tombstone the next sweep can finish.
963    delete_idempotent(store, tombstone_key).await?;
964    info!(
965        key = %tombstone_key,
966        deleted = deleted_objects,
967        skipped = skipped_repointed_packs,
968        "gc sweep: tombstone applied",
969    );
970    Ok(SweepStep::Swept {
971        deleted_objects,
972        skipped_repointed_packs,
973    })
974}
975
976/// Sweep one baseline tombstone (issue #134). Parses the tombstone,
977/// honours the grace window, re-checks the ref's current `chain.full_at`
978/// to skip a re-baselined-to-same-SHA case, and then idempotently
979/// deletes both the bundle and the tombstone.
980///
981/// The live-state recheck mirrors the pack sweep's
982/// `referenced.contains` guard: a tombstone written by a force-push
983/// can be invalidated by a subsequent force-push that lands on the
984/// same SHA, in which case the bundle is once again live.
985async fn sweep_one_baseline_tombstone(
986    store: &dyn ObjectStore,
987    prefix: &str,
988    tombstone_key: &str,
989    opts: SweepOpts,
990) -> Result<SweepStep, PackchainError> {
991    let body = match store.get_bytes(tombstone_key).await {
992        Ok(b) => b,
993        Err(ObjectStoreError::NotFound(_)) => {
994            return Ok(SweepStep::Swept {
995                deleted_objects: 0,
996                skipped_repointed_packs: 0,
997            });
998        }
999        Err(e) => return Err(PackchainError::Store(e)),
1000    };
1001    let tombstone = BaselineTombstone::from_json_bytes(&body)?;
1002
1003    if !opts.force
1004        && check_grace_window(&tombstone.marked_at, opts.grace_hours, "baseline tombstone")?
1005            == GraceDecision::Within
1006    {
1007        debug!(
1008            key = %tombstone_key,
1009            marked_at = %tombstone.marked_at,
1010            "gc sweep: baseline tombstone within grace window",
1011        );
1012        return Ok(SweepStep::Deferred);
1013    }
1014
1015    // Re-check the live chain. A subsequent push that re-baselined to
1016    // the same SHA (force-push at the same tip, or compact short-cut)
1017    // makes this bundle live again — leave it alone, drop the now-stale
1018    // tombstone. A missing ref (chain deleted) means the bundle is also
1019    // unreachable; proceed with the delete.
1020    let ref_name = match RefName::new(tombstone.ref_name.clone()) {
1021        Ok(r) => r,
1022        Err(e) => {
1023            // Issue #146: a tombstone whose recorded ref_name no longer
1024            // parses as a RefName cannot be turned into a bundle key, so
1025            // deleting the tombstone would orphan the bundle with no
1026            // record on the bucket. Preserve both records and surface
1027            // the ref_name + tombstone key at error! so an operator can
1028            // locate and reconcile the corruption manually. Reachable
1029            // today only via manual bucket tampering or a future schema
1030            // tweak that loosens what BaselineTombstone::ref_name
1031            // accepts at write time relative to RefName::new at read
1032            // time.
1033            error!(
1034                key = %tombstone_key,
1035                ref_name = %tombstone.ref_name,
1036                sha = %tombstone.sha.as_str(),
1037                error = %e,
1038                "gc sweep: baseline tombstone names invalid ref; preserving \
1039                 tombstone and bundle for operator review",
1040            );
1041            return Ok(SweepStep::Deferred);
1042        }
1043    };
1044    let prefix_opt = (!prefix.is_empty()).then_some(prefix);
1045    let chain = load_chain(store, prefix_opt, &ref_name).await?;
1046    let mut skipped_repointed_packs = 0usize;
1047    let mut deleted_objects = 0usize;
1048    let still_live = chain.as_ref().is_some_and(|c| c.full_at == tombstone.sha);
1049    if still_live {
1050        skipped_repointed_packs += 1;
1051        debug!(
1052            key = %tombstone_key,
1053            ref_path = %ref_name.as_str(),
1054            sha = %tombstone.sha.as_str(),
1055            "gc sweep: baseline re-referenced; skipping delete",
1056        );
1057    } else {
1058        // Issue #153: re-read the chain IMMEDIATELY before deleting the
1059        // bundle. The earlier `load_chain` above closes the common
1060        // stale-tombstone path, but a concurrent force-push or compact
1061        // can re-baseline the ref to the tombstoned SHA between that
1062        // read and the delete that follows; without this recheck, sweep
1063        // would erase a now-live bundle and then drop the tombstone.
1064        // The window between this recheck and the bundle delete is
1065        // bounded by a single network round-trip — the same bound the
1066        // pack-tombstone path uses (issues #140, #152). When the
1067        // recheck catches the race, return `Deferred` so the tombstone
1068        // is preserved for a future sweep (mirrors #146's
1069        // operator-review pattern) — the chain may flip back to a
1070        // different SHA before then, in which case the tombstone
1071        // becomes actionable again.
1072        let recheck = load_chain(store, prefix_opt, &ref_name).await?;
1073        if recheck.as_ref().is_some_and(|c| c.full_at == tombstone.sha) {
1074            debug!(
1075                key = %tombstone_key,
1076                ref_path = %ref_name.as_str(),
1077                sha = %tombstone.sha.as_str(),
1078                "gc sweep: baseline re-referenced between checks; deferring",
1079            );
1080            return Ok(SweepStep::Deferred);
1081        }
1082        let bundle_key = keys::bundle_key(prefix_opt, &ref_name, tombstone.sha.as_str());
1083        if delete_idempotent(store, &bundle_key).await? {
1084            deleted_objects += 1;
1085        }
1086    }
1087    // Drop the tombstone last so a crash mid-delete leaves it for the
1088    // next sweep to finish.
1089    delete_idempotent(store, tombstone_key).await?;
1090    info!(
1091        key = %tombstone_key,
1092        deleted = deleted_objects,
1093        skipped = skipped_repointed_packs,
1094        "gc sweep: baseline tombstone applied",
1095    );
1096    Ok(SweepStep::Swept {
1097        deleted_objects,
1098        skipped_repointed_packs,
1099    })
1100}
1101
1102/// `<prefix>/gc/` prefix for [`ObjectStore::list`]. Empty `prefix`
1103/// drops the leading slash (matches the project's bucket-root rule).
1104fn gc_listing_prefix(prefix: &str) -> String {
1105    keys::join(Some(prefix), "gc/")
1106}
1107
1108/// Build a tombstone key. The `marked_at` segment may contain `:`
1109/// characters; S3 / Azure both accept colons in keys.
1110fn tombstone_key(prefix: &str, run_id: &str, marked_at: &str) -> String {
1111    keys::join(
1112        Some(prefix),
1113        &format!("gc/tombstones-{run_id}-{marked_at}.json"),
1114    )
1115}
1116
1117/// Key-namespace fragment for baseline tombstones (issue #134).
1118/// Composed with a bucket prefix via [`keys::join`] / [`baseline_tombstone_listing_prefix`]
1119/// to form the full listable prefix; the UUID-suffixed body filename
1120/// is appended by [`baseline_tombstone_key`].
1121///
1122/// Single source of truth for the on-bucket key shape — production
1123/// builders and test assertions both compose against this constant
1124/// rather than embedding the literal string (#221).
1125pub(crate) const BASELINE_TOMBSTONE_KEY_FRAGMENT: &str = "gc/baseline-tomb-";
1126
1127/// Build a baseline tombstone key. UUID-keyed so concurrent compacts
1128/// / force-pushes across different refs never clobber, and the
1129/// timestamp lives in the body rather than the filename to keep the
1130/// `is_baseline_tombstone_key` predicate cheap.
1131fn baseline_tombstone_key(prefix: &str, run_id: &str) -> String {
1132    keys::join(
1133        Some(prefix),
1134        &format!("{BASELINE_TOMBSTONE_KEY_FRAGMENT}{run_id}.json"),
1135    )
1136}
1137
1138/// Listable prefix for every baseline tombstone under `prefix`
1139/// (e.g. `"repo/gc/baseline-tomb-"`). Composes
1140/// [`BASELINE_TOMBSTONE_KEY_FRAGMENT`] with the bucket prefix via
1141/// [`keys::join`] so callers don't open-code the literal.
1142pub(crate) fn baseline_tombstone_listing_prefix(prefix: Option<&str>) -> String {
1143    keys::join(prefix, BASELINE_TOMBSTONE_KEY_FRAGMENT)
1144}
1145
1146/// Robust check that `key` is a tombstone under our prefix. Guards
1147/// against unrelated `.json` files in `<prefix>/gc/` and against a
1148/// regression where a future schema rev moves the prefix.
1149///
1150/// Root-prefix (`prefix == ""`) case: `expected_prefix` is just
1151/// `"gc/tombstones-"`, so every `gc/tombstones-*.json` key at the
1152/// bucket root matches. That is the intended behaviour — a root
1153/// repo owns the entire `gc/` namespace.
1154fn is_tombstone_key(key: &str, prefix: &str) -> bool {
1155    let expected_prefix = keys::join(Some(prefix), "gc/tombstones-");
1156    key.starts_with(&expected_prefix)
1157}
1158
1159/// Robust check that `key` is a baseline tombstone under our prefix
1160/// (issue #134). Mirrors [`is_tombstone_key`] for the
1161/// [`BASELINE_TOMBSTONE_KEY_FRAGMENT`] namespace.
1162fn is_baseline_tombstone_key(key: &str, prefix: &str) -> bool {
1163    key.starts_with(&baseline_tombstone_listing_prefix(Some(prefix)))
1164}
1165
1166/// List every `<prefix>/refs/**/chain.json` (across every ref
1167/// namespace — `refs/heads/`, `refs/tags/`, `refs/notes/`, etc.) and
1168/// union the pack content-shas they reference. Fail closed on parse
1169/// error.
1170async fn list_referenced_packs(
1171    store: &dyn ObjectStore,
1172    prefix: &str,
1173) -> Result<HashSet<Sha40>, PackchainError> {
1174    let refs_prefix = keys::join(Some(prefix), "refs/");
1175    let metas = store.list(&refs_prefix).await?;
1176
1177    // Bounded-parallel `get_bytes` per chain.json, parse-as-fetched.
1178    // Mirrors `list::list_refs` (#89 widened the listing prefix to
1179    // all `refs/` namespaces, so candidate count scales with branches
1180    // + tags + notes). `MAX_FETCH_CONCURRENCY` (= 8) is the same bound
1181    // Phase 3 fetch uses for chain pack downloads. `try_fold` folds
1182    // each body into the set as soon as `buffer_unordered` yields it,
1183    // so parse overlaps the next batch's fetch latency and no
1184    // intermediate `Vec<Bytes>` is held.
1185    //
1186    // Fail-closed semantics: a transport failure on any GET, or a
1187    // parse failure on any chain, aborts the run — the mark phase
1188    // cannot tombstone live packs because of an under-reporting
1189    // corrupt chain.
1190    futures::stream::iter(
1191        metas
1192            .into_iter()
1193            .filter(|m| super::keys::is_chain_json_key(&m.key))
1194            .map(|m| m.key),
1195    )
1196    .map(|key| async move { store.get_bytes(&key).await.map_err(PackchainError::Store) })
1197    .buffer_unordered(MAX_FETCH_CONCURRENCY)
1198    .try_fold(HashSet::<Sha40>::new(), |mut acc, body| async move {
1199        let chain = ChainManifest::from_json_bytes(&body)?;
1200        for segment in chain.segments {
1201            // gc fails closed on a malformed pack key — the chain is
1202            // corrupt and tombstoning live packs based on it would be
1203            // unsafe. Uses the same `MalformedPackEntry` variant as
1204            // every other consumer (read, fetch, compact) so error
1205            // wording stays aligned across the engine.
1206            let sha = super::keys::segment_pack_sha(&segment)?;
1207            acc.insert(sha);
1208        }
1209        Ok(acc)
1210    })
1211    .await
1212}
1213
1214/// List every `<prefix>/packs/*.pack` and `*.idx` and return the union
1215/// of their content-shas. The set is keyed by sha so a pack with a
1216/// missing-but-tombstoneable idx still counts (and vice versa).
1217async fn list_pack_shas(
1218    store: &dyn ObjectStore,
1219    prefix: &str,
1220) -> Result<HashSet<Sha40>, PackchainError> {
1221    let packs_prefix = keys::join(Some(prefix), "packs/");
1222    let metas = store.list(&packs_prefix).await?;
1223    let mut shas: HashSet<Sha40> = HashSet::new();
1224    for meta in metas {
1225        let basename = meta
1226            .key
1227            .rsplit('/')
1228            .next()
1229            .expect("rsplit('/') on a non-empty key yields at least one element");
1230        let candidate = basename
1231            .strip_suffix(".pack")
1232            .or_else(|| basename.strip_suffix(".idx"));
1233        if let Some(sha) = candidate
1234            && let Ok(parsed) = Sha40::try_new(sha)
1235        {
1236            shas.insert(parsed);
1237        }
1238    }
1239    Ok(shas)
1240}
1241
1242/// Best-effort delete: returns `Ok(true)` on a real delete, `Ok(false)`
1243/// when the object was already absent (concurrent sweep raced ahead,
1244/// or a partial sweep ran earlier).
1245async fn delete_idempotent(store: &dyn ObjectStore, key: &str) -> Result<bool, PackchainError> {
1246    match store.delete(key).await {
1247        Ok(()) => Ok(true),
1248        Err(ObjectStoreError::NotFound(_)) => Ok(false),
1249        Err(e) => Err(PackchainError::Store(e)),
1250    }
1251}
1252
1253#[cfg(test)]
1254mod tests {
1255    use super::*;
1256    use crate::git::RefName;
1257    use crate::object_store::mock::MockStore;
1258    use crate::packchain::manifest::write_chain;
1259    use crate::packchain::schema::ChainSegment;
1260
1261    const SHA_TIP: &str = "0000000000000000000000000000000000000001";
1262    const SHA_FULL: &str = "0000000000000000000000000000000000000002";
1263    const SHA_PACK_LIVE: &str = "1111111111111111111111111111111111111111";
1264    const SHA_PACK_ORPHAN: &str = "2222222222222222222222222222222222222222";
1265    const SHA_PACK_ORPHAN_2: &str = "3333333333333333333333333333333333333333";
1266
1267    fn sha40(s: &str) -> Sha40 {
1268        Sha40::try_new(s).unwrap()
1269    }
1270
1271    fn ref_main() -> RefName {
1272        RefName::new("refs/heads/main").unwrap()
1273    }
1274
1275    fn segment(pack_sha: &str, parent: Option<&str>) -> ChainSegment {
1276        ChainSegment {
1277            sha: sha40(SHA_TIP),
1278            parent_sha: parent.map(sha40),
1279            pack: format!("packs/{pack_sha}.pack"),
1280            bytes: 1_024,
1281        }
1282    }
1283
1284    async fn seed_live_chain(store: &MockStore, prefix: Option<&str>) {
1285        let chain = ChainManifest {
1286            v: 1,
1287            tip: sha40(SHA_TIP),
1288            full_at: sha40(SHA_FULL),
1289            segments: vec![segment(SHA_PACK_LIVE, None)],
1290        };
1291        write_chain(store, prefix, &ref_main(), &chain)
1292            .await
1293            .unwrap();
1294    }
1295
1296    fn insert_pack_pair(store: &MockStore, prefix: Option<&str>, sha: &str) {
1297        let pack_key = super::super::keys::pack_key(prefix, &sha40(sha));
1298        let idx_key = super::super::keys::pack_idx_key(prefix, &sha40(sha));
1299        store.insert(pack_key, Bytes::from_static(b"PACKDATA"));
1300        store.insert(idx_key, Bytes::from_static(b"IDXDATA"));
1301    }
1302
1303    // --- mark -----------------------------------------------------------
1304
1305    #[tokio::test]
1306    async fn mark_with_no_chains_treats_all_packs_as_orphan() {
1307        let store = MockStore::new();
1308        insert_pack_pair(&store, Some("repo"), SHA_PACK_ORPHAN);
1309        let outcome = mark(&store, "repo", MarkOpts::default()).await.unwrap();
1310        assert_eq!(outcome.orphan_count, 1);
1311        // Tombstone written to the correct prefix.
1312        let body = store.get_bytes(&outcome.tombstone_key).await.unwrap();
1313        let parsed = Tombstone::from_json_bytes(&body).unwrap();
1314        assert_eq!(parsed.orphan_packs, vec![sha40(SHA_PACK_ORPHAN)]);
1315    }
1316
1317    #[tokio::test]
1318    async fn mark_skips_chain_referenced_packs() {
1319        let store = MockStore::new();
1320        seed_live_chain(&store, Some("repo")).await;
1321        insert_pack_pair(&store, Some("repo"), SHA_PACK_LIVE);
1322        insert_pack_pair(&store, Some("repo"), SHA_PACK_ORPHAN);
1323        let outcome = mark(&store, "repo", MarkOpts::default()).await.unwrap();
1324        assert_eq!(outcome.orphan_count, 1);
1325        let body = store.get_bytes(&outcome.tombstone_key).await.unwrap();
1326        let parsed = Tombstone::from_json_bytes(&body).unwrap();
1327        assert_eq!(parsed.orphan_packs, vec![sha40(SHA_PACK_ORPHAN)]);
1328    }
1329
1330    #[tokio::test]
1331    async fn mark_no_orphans_skips_tombstone_write() {
1332        let store = MockStore::new();
1333        seed_live_chain(&store, Some("repo")).await;
1334        insert_pack_pair(&store, Some("repo"), SHA_PACK_LIVE);
1335        let outcome = mark(&store, "repo", MarkOpts::default()).await.unwrap();
1336        assert_eq!(outcome.orphan_count, 0);
1337        // No tombstone listed.
1338        let metas = store.list("repo/gc/").await.unwrap();
1339        assert!(
1340            metas.is_empty(),
1341            "tombstone must not exist for empty orphan set"
1342        );
1343    }
1344
1345    #[tokio::test]
1346    async fn mark_dry_run_does_not_write_tombstone() {
1347        let store = MockStore::new();
1348        insert_pack_pair(&store, Some("repo"), SHA_PACK_ORPHAN);
1349        let outcome = mark(&store, "repo", MarkOpts { dry_run: true })
1350            .await
1351            .unwrap();
1352        assert_eq!(outcome.orphan_count, 1);
1353        let metas = store.list("repo/gc/").await.unwrap();
1354        assert!(metas.is_empty(), "dry-run must not write tombstone");
1355    }
1356
1357    #[tokio::test]
1358    async fn mark_treats_tag_chain_referenced_packs_as_live() {
1359        // A pack referenced only from a chain under refs/tags/ must
1360        // not be tombstoned. (Regression for issue #89.)
1361        let store = MockStore::new();
1362        let chain = ChainManifest {
1363            v: 1,
1364            tip: sha40(SHA_TIP),
1365            full_at: sha40(SHA_FULL),
1366            segments: vec![segment(SHA_PACK_LIVE, None)],
1367        };
1368        let tag_ref = RefName::new("refs/tags/v1").unwrap();
1369        write_chain(&store, Some("repo"), &tag_ref, &chain)
1370            .await
1371            .unwrap();
1372        insert_pack_pair(&store, Some("repo"), SHA_PACK_LIVE);
1373        insert_pack_pair(&store, Some("repo"), SHA_PACK_ORPHAN);
1374
1375        let referenced = list_referenced_packs(&store, "repo").await.unwrap();
1376        assert!(
1377            referenced.contains(&sha40(SHA_PACK_LIVE)),
1378            "pack referenced from refs/tags/ chain must be in the live set",
1379        );
1380
1381        let outcome = mark(&store, "repo", MarkOpts::default()).await.unwrap();
1382        assert_eq!(outcome.orphan_count, 1);
1383        let body = store.get_bytes(&outcome.tombstone_key).await.unwrap();
1384        let parsed = Tombstone::from_json_bytes(&body).unwrap();
1385        assert_eq!(parsed.orphan_packs, vec![sha40(SHA_PACK_ORPHAN)]);
1386    }
1387
1388    #[tokio::test]
1389    async fn mark_treats_notes_chain_referenced_packs_as_live() {
1390        // refs/notes/commits is the standard git notes ref. A pack
1391        // referenced only from a notes chain must not be tombstoned.
1392        let store = MockStore::new();
1393        let chain = ChainManifest {
1394            v: 1,
1395            tip: sha40(SHA_TIP),
1396            full_at: sha40(SHA_FULL),
1397            segments: vec![segment(SHA_PACK_LIVE, None)],
1398        };
1399        let notes_ref = RefName::new("refs/notes/commits").unwrap();
1400        write_chain(&store, Some("repo"), &notes_ref, &chain)
1401            .await
1402            .unwrap();
1403        insert_pack_pair(&store, Some("repo"), SHA_PACK_LIVE);
1404
1405        let referenced = list_referenced_packs(&store, "repo").await.unwrap();
1406        assert!(
1407            referenced.contains(&sha40(SHA_PACK_LIVE)),
1408            "pack referenced from refs/notes/ chain must be in the live set",
1409        );
1410
1411        let outcome = mark(&store, "repo", MarkOpts::default()).await.unwrap();
1412        assert_eq!(outcome.orphan_count, 0);
1413    }
1414
1415    #[tokio::test]
1416    async fn list_referenced_packs_unions_across_namespaces() {
1417        // A live chain in refs/heads/ AND in refs/tags/ both
1418        // contribute to the referenced set.
1419        let store = MockStore::new();
1420        let head_chain = ChainManifest {
1421            v: 1,
1422            tip: sha40(SHA_TIP),
1423            full_at: sha40(SHA_FULL),
1424            segments: vec![segment(SHA_PACK_LIVE, None)],
1425        };
1426        write_chain(&store, Some("repo"), &ref_main(), &head_chain)
1427            .await
1428            .unwrap();
1429        let tag_chain = ChainManifest {
1430            v: 1,
1431            tip: sha40(SHA_TIP),
1432            full_at: sha40(SHA_FULL),
1433            segments: vec![segment(SHA_PACK_ORPHAN_2, None)],
1434        };
1435        let tag_ref = RefName::new("refs/tags/v1").unwrap();
1436        write_chain(&store, Some("repo"), &tag_ref, &tag_chain)
1437            .await
1438            .unwrap();
1439
1440        let referenced = list_referenced_packs(&store, "repo").await.unwrap();
1441        assert!(referenced.contains(&sha40(SHA_PACK_LIVE)));
1442        assert!(referenced.contains(&sha40(SHA_PACK_ORPHAN_2)));
1443        assert_eq!(referenced.len(), 2);
1444    }
1445
1446    #[tokio::test]
1447    async fn list_referenced_packs_ignores_sibling_artefacts() {
1448        // path-index.json, .bundle baselines, and other artefacts
1449        // under refs/<namespace>/<name>/ must not be parsed as
1450        // chain.json.
1451        let store = MockStore::new();
1452        seed_live_chain(&store, Some("repo")).await;
1453        // Add sibling artefacts that share the ref directory.
1454        store.insert(
1455            "repo/refs/heads/main/path-index.json",
1456            Bytes::from_static(b"{}"),
1457        );
1458        store.insert(
1459            format!("repo/refs/heads/main/{SHA_TIP}.bundle"),
1460            Bytes::from_static(b"BUNDLE"),
1461        );
1462        // And a tombstone-style key under refs/ that must be filtered.
1463        store.insert(
1464            "repo/refs/tags/v1/path-index.json",
1465            Bytes::from_static(b"{}"),
1466        );
1467
1468        let referenced = list_referenced_packs(&store, "repo").await.unwrap();
1469        assert_eq!(referenced.len(), 1);
1470        assert!(referenced.contains(&sha40(SHA_PACK_LIVE)));
1471    }
1472
1473    #[tokio::test]
1474    async fn list_referenced_packs_empty_for_no_chains() {
1475        let store = MockStore::new();
1476        let referenced = list_referenced_packs(&store, "repo").await.unwrap();
1477        assert!(referenced.is_empty());
1478    }
1479
1480    #[tokio::test]
1481    async fn list_referenced_packs_unions_many_chains_with_bounded_parallel_fetch() {
1482        // Regression guard for the buffer_unordered fetch path:
1483        // exercise more chain.json bodies than MAX_FETCH_CONCURRENCY
1484        // (= 8) so multiple batches must complete and union without
1485        // dropping any pack sha. Spans heads, tags, and notes so the
1486        // listing prefix widening from #89 stays exercised.
1487        let store = MockStore::new();
1488        let chain_count = MAX_FETCH_CONCURRENCY * 3 + 1;
1489        let namespaces = ["refs/heads", "refs/tags", "refs/notes"];
1490        let mut expected: HashSet<Sha40> = HashSet::new();
1491        for i in 0..chain_count {
1492            let pack_sha = format!("{:040x}", 0x1000 + i);
1493            let pack_sha40 = sha40(&pack_sha);
1494            let namespace = namespaces[i % namespaces.len()];
1495            let ref_name = RefName::new(format!("{namespace}/r{i}")).unwrap();
1496            let chain = ChainManifest {
1497                v: 1,
1498                tip: sha40(SHA_TIP),
1499                full_at: sha40(SHA_FULL),
1500                segments: vec![ChainSegment {
1501                    sha: sha40(SHA_TIP),
1502                    parent_sha: None,
1503                    pack: format!("packs/{pack_sha}.pack"),
1504                    bytes: 1_024,
1505                }],
1506            };
1507            write_chain(&store, Some("repo"), &ref_name, &chain)
1508                .await
1509                .unwrap();
1510            expected.insert(pack_sha40);
1511        }
1512
1513        let referenced = list_referenced_packs(&store, "repo").await.unwrap();
1514        assert_eq!(referenced, expected);
1515    }
1516
1517    #[tokio::test]
1518    async fn mark_fails_closed_on_corrupt_chain() {
1519        let store = MockStore::new();
1520        // chain.json with malformed JSON.
1521        store.insert(
1522            "repo/refs/heads/main/chain.json",
1523            Bytes::from_static(b"{not valid json"),
1524        );
1525        let err = mark(&store, "repo", MarkOpts::default()).await.unwrap_err();
1526        assert!(matches!(err, PackchainError::ParseJson(_)));
1527        // No tombstone written.
1528        let metas = store.list("repo/gc/").await.unwrap();
1529        assert!(metas.is_empty());
1530    }
1531
1532    #[tokio::test]
1533    async fn mark_fails_closed_on_unsupported_schema_version() {
1534        let store = MockStore::new();
1535        store.insert(
1536            "repo/refs/heads/main/chain.json",
1537            Bytes::from_static(
1538                br#"{"v":2,"tip":"0000000000000000000000000000000000000001","full_at":"0000000000000000000000000000000000000002","segments":[]}"#,
1539            ),
1540        );
1541        let err = mark(&store, "repo", MarkOpts::default()).await.unwrap_err();
1542        assert!(matches!(
1543            err,
1544            PackchainError::UnsupportedSchemaVersion { .. }
1545        ));
1546    }
1547
1548    // --- sweep ----------------------------------------------------------
1549
1550    fn sha_set<I: IntoIterator<Item = &'static str>>(shas: I) -> Vec<Sha40> {
1551        shas.into_iter().map(sha40).collect()
1552    }
1553
1554    fn write_tombstone(
1555        store: &MockStore,
1556        prefix: &str,
1557        marked_at: &str,
1558        shas: Vec<Sha40>,
1559    ) -> String {
1560        let run_id = Uuid::new_v4().to_string();
1561        let key = tombstone_key(prefix, &run_id, marked_at);
1562        let body = Tombstone {
1563            v: 1,
1564            run_id,
1565            marked_at: marked_at.to_string(),
1566            orphan_packs: shas,
1567        }
1568        .to_json_pretty()
1569        .unwrap();
1570        store.insert(&key, Bytes::from(body));
1571        key
1572    }
1573
1574    #[tokio::test]
1575    async fn sweep_inside_grace_defers_tombstone() {
1576        let store = MockStore::new();
1577        let now = OffsetDateTime::now_utc().format(&Rfc3339).unwrap();
1578        let tombstone = write_tombstone(&store, "repo", &now, sha_set([SHA_PACK_ORPHAN]));
1579        insert_pack_pair(&store, Some("repo"), SHA_PACK_ORPHAN);
1580
1581        let outcome = sweep(
1582            &store,
1583            "repo",
1584            SweepOpts {
1585                grace_hours: 24,
1586                force: false,
1587            },
1588        )
1589        .await
1590        .unwrap();
1591        assert_eq!(outcome.deferred_tombstones, 1);
1592        assert_eq!(outcome.swept_tombstones, 0);
1593        // Tombstone and packs survive.
1594        store.get_bytes(&tombstone).await.unwrap();
1595        store
1596            .get_bytes(&format!("repo/packs/{SHA_PACK_ORPHAN}.pack"))
1597            .await
1598            .unwrap();
1599    }
1600
1601    #[tokio::test]
1602    async fn sweep_after_grace_deletes_orphan_packs_and_tombstone() {
1603        let store = MockStore::new();
1604        let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
1605            .format(&Rfc3339)
1606            .unwrap();
1607        let tombstone = write_tombstone(&store, "repo", &stale, sha_set([SHA_PACK_ORPHAN]));
1608        insert_pack_pair(&store, Some("repo"), SHA_PACK_ORPHAN);
1609
1610        let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
1611        assert_eq!(outcome.swept_tombstones, 1);
1612        assert_eq!(outcome.deleted_objects, 2, "pack + idx");
1613        // Tombstone and packs gone.
1614        let pack_err = store
1615            .get_bytes(&format!("repo/packs/{SHA_PACK_ORPHAN}.pack"))
1616            .await
1617            .unwrap_err();
1618        assert!(matches!(pack_err, ObjectStoreError::NotFound(_)));
1619        let tomb_err = store.get_bytes(&tombstone).await.unwrap_err();
1620        assert!(matches!(tomb_err, ObjectStoreError::NotFound(_)));
1621    }
1622
1623    #[tokio::test]
1624    async fn sweep_skips_repointed_packs() {
1625        // A tombstoned pack got re-referenced by a chain rewrite
1626        // before the grace expired. Sweep must NOT delete it.
1627        let store = MockStore::new();
1628        let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
1629            .format(&Rfc3339)
1630            .unwrap();
1631        // The tombstone names SHA_PACK_LIVE — but a chain now references it.
1632        write_tombstone(&store, "repo", &stale, sha_set([SHA_PACK_LIVE]));
1633        let chain = ChainManifest {
1634            v: 1,
1635            tip: sha40(SHA_TIP),
1636            full_at: sha40(SHA_FULL),
1637            segments: vec![segment(SHA_PACK_LIVE, None)],
1638        };
1639        write_chain(&store, Some("repo"), &ref_main(), &chain)
1640            .await
1641            .unwrap();
1642        insert_pack_pair(&store, Some("repo"), SHA_PACK_LIVE);
1643
1644        let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
1645        assert_eq!(outcome.swept_tombstones, 1);
1646        assert_eq!(outcome.skipped_repointed_packs, 1);
1647        assert_eq!(outcome.deleted_objects, 0);
1648        // Pack still present.
1649        store
1650            .get_bytes(&format!("repo/packs/{SHA_PACK_LIVE}.pack"))
1651            .await
1652            .unwrap();
1653    }
1654
1655    #[tokio::test]
1656    async fn sweep_force_bypasses_grace_only_not_live_recheck() {
1657        // Regression for #117: --force must skip ONLY the grace window,
1658        // not the live-pack re-check. A fresh tombstone names a pack
1659        // that has since been referenced by a committed chain — the
1660        // classic outcome of mark() snapshotting between a concurrent
1661        // push's pack upload and its chain.json commit. Sweep with
1662        // --force must NOT delete that pack.
1663        let store = MockStore::new();
1664        let now = OffsetDateTime::now_utc().format(&Rfc3339).unwrap();
1665        write_tombstone(&store, "repo", &now, sha_set([SHA_PACK_LIVE]));
1666        let chain = ChainManifest {
1667            v: 1,
1668            tip: sha40(SHA_TIP),
1669            full_at: sha40(SHA_FULL),
1670            segments: vec![segment(SHA_PACK_LIVE, None)],
1671        };
1672        write_chain(&store, Some("repo"), &ref_main(), &chain)
1673            .await
1674            .unwrap();
1675        insert_pack_pair(&store, Some("repo"), SHA_PACK_LIVE);
1676
1677        let outcome = sweep(
1678            &store,
1679            "repo",
1680            SweepOpts {
1681                grace_hours: 24,
1682                force: true,
1683            },
1684        )
1685        .await
1686        .unwrap();
1687        // Grace was bypassed (fresh tombstone got processed instead of
1688        // deferred), but the live-pack guard fired and the pack stayed.
1689        assert_eq!(outcome.swept_tombstones, 1);
1690        assert_eq!(outcome.deferred_tombstones, 0);
1691        assert_eq!(outcome.skipped_repointed_packs, 1);
1692        assert_eq!(outcome.deleted_objects, 0);
1693        store
1694            .get_bytes(&format!("repo/packs/{SHA_PACK_LIVE}.pack"))
1695            .await
1696            .expect("live pack must survive --force sweep");
1697        store
1698            .get_bytes(&format!("repo/packs/{SHA_PACK_LIVE}.idx"))
1699            .await
1700            .expect("live idx must survive --force sweep");
1701    }
1702
1703    #[tokio::test]
1704    async fn sweep_force_deletes_truly_orphan_pack_inside_grace() {
1705        // The happy path for --force: a fresh tombstone naming a pack
1706        // that is NOT in any chain. Grace is bypassed, the live-pack
1707        // re-check finds the SHA absent, the pack is deleted.
1708        let store = MockStore::new();
1709        let now = OffsetDateTime::now_utc().format(&Rfc3339).unwrap();
1710        write_tombstone(&store, "repo", &now, sha_set([SHA_PACK_ORPHAN]));
1711        insert_pack_pair(&store, Some("repo"), SHA_PACK_ORPHAN);
1712
1713        let outcome = sweep(
1714            &store,
1715            "repo",
1716            SweepOpts {
1717                grace_hours: 24,
1718                force: true,
1719            },
1720        )
1721        .await
1722        .unwrap();
1723        assert_eq!(outcome.swept_tombstones, 1);
1724        assert_eq!(outcome.deferred_tombstones, 0);
1725        assert_eq!(outcome.skipped_repointed_packs, 0);
1726        assert_eq!(outcome.deleted_objects, 2);
1727        let err = store
1728            .get_bytes(&format!("repo/packs/{SHA_PACK_ORPHAN}.pack"))
1729            .await
1730            .unwrap_err();
1731        assert!(matches!(err, ObjectStoreError::NotFound(_)));
1732    }
1733
1734    #[tokio::test]
1735    async fn sweep_tolerates_already_deleted_pack() {
1736        // Tombstone names a pack that no longer exists on the bucket
1737        // (e.g. a previous partial sweep deleted the .pack but
1738        // crashed before deleting the .idx). Sweep must complete
1739        // without error.
1740        let store = MockStore::new();
1741        let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
1742            .format(&Rfc3339)
1743            .unwrap();
1744        write_tombstone(&store, "repo", &stale, sha_set([SHA_PACK_ORPHAN]));
1745        // No pack inserted.
1746        let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
1747        assert_eq!(outcome.swept_tombstones, 1);
1748        assert_eq!(outcome.deleted_objects, 0);
1749    }
1750
1751    #[tokio::test]
1752    async fn sweep_handles_multiple_tombstones_independently() {
1753        let store = MockStore::new();
1754        let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
1755            .format(&Rfc3339)
1756            .unwrap();
1757        let now = OffsetDateTime::now_utc().format(&Rfc3339).unwrap();
1758        // One stale tombstone (must sweep) + one fresh (must defer).
1759        write_tombstone(&store, "repo", &stale, sha_set([SHA_PACK_ORPHAN]));
1760        write_tombstone(&store, "repo", &now, sha_set([SHA_PACK_ORPHAN_2]));
1761        insert_pack_pair(&store, Some("repo"), SHA_PACK_ORPHAN);
1762        insert_pack_pair(&store, Some("repo"), SHA_PACK_ORPHAN_2);
1763
1764        let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
1765        assert_eq!(outcome.swept_tombstones, 1);
1766        assert_eq!(outcome.deferred_tombstones, 1);
1767        assert_eq!(outcome.deleted_objects, 2);
1768    }
1769
1770    // --- end-to-end ---------------------------------------------------
1771
1772    #[tokio::test]
1773    async fn mark_then_force_sweep_round_trips() {
1774        let store = MockStore::new();
1775        seed_live_chain(&store, Some("repo")).await;
1776        insert_pack_pair(&store, Some("repo"), SHA_PACK_LIVE);
1777        insert_pack_pair(&store, Some("repo"), SHA_PACK_ORPHAN);
1778
1779        let mark_out = mark(&store, "repo", MarkOpts::default()).await.unwrap();
1780        assert_eq!(mark_out.orphan_count, 1);
1781
1782        // Force sweep — bypass grace.
1783        let sweep_out = sweep(
1784            &store,
1785            "repo",
1786            SweepOpts {
1787                grace_hours: 24,
1788                force: true,
1789            },
1790        )
1791        .await
1792        .unwrap();
1793        assert_eq!(sweep_out.swept_tombstones, 1);
1794        assert_eq!(sweep_out.deleted_objects, 2);
1795
1796        // Live pack survives, orphan pack is gone.
1797        store
1798            .get_bytes(&format!("repo/packs/{SHA_PACK_LIVE}.pack"))
1799            .await
1800            .unwrap();
1801        let err = store
1802            .get_bytes(&format!("repo/packs/{SHA_PACK_ORPHAN}.pack"))
1803            .await
1804            .unwrap_err();
1805        assert!(matches!(err, ObjectStoreError::NotFound(_)));
1806    }
1807
1808    // --- baseline tombstones (issue #134) -----------------------------
1809
1810    fn insert_baseline_bundle(store: &MockStore, prefix: Option<&str>, sha: &str) -> String {
1811        let key = keys::bundle_key(prefix, ref_main(), sha);
1812        store.insert(&key, Bytes::from_static(b"BUNDLE"));
1813        key
1814    }
1815
1816    fn write_baseline_tombstone_at(
1817        store: &MockStore,
1818        prefix: &str,
1819        marked_at: &str,
1820        sha: &str,
1821    ) -> String {
1822        let key = baseline_tombstone_key(prefix, &Uuid::new_v4().to_string());
1823        let body = BaselineTombstone {
1824            v: TOMBSTONE_SCHEMA_VERSION,
1825            marked_at: marked_at.to_owned(),
1826            ref_name: ref_main().as_str().to_owned(),
1827            sha: sha40(sha),
1828        }
1829        .to_json_pretty()
1830        .unwrap();
1831        store.insert(&key, Bytes::from(body));
1832        key
1833    }
1834
1835    /// Issue #157: [`tombstoned_bundle_keys`] enumerates every bundle
1836    /// key currently named by a baseline tombstone, regardless of
1837    /// which engine wrote the tombstone. The bundle engine relies on
1838    /// this set to hide tombstoned bundles from `list` and from the
1839    /// under-lock multi-bundle guard.
1840    #[tokio::test]
1841    async fn tombstoned_bundle_keys_returns_bundle_paths_for_each_tombstone() {
1842        let store = MockStore::new();
1843        // Two tombstones for distinct (ref, sha) pairs.
1844        write_baseline_tombstone_for_orphan(&store, Some("repo"), &ref_main(), &sha40(SHA_FULL))
1845            .await
1846            .unwrap();
1847        let other_ref = RefName::new("refs/heads/feature").unwrap();
1848        write_baseline_tombstone_for_orphan(&store, Some("repo"), &other_ref, &sha40(SHA_TIP))
1849            .await
1850            .unwrap();
1851
1852        let keys = tombstoned_bundle_keys(&store, Some("repo")).await.unwrap();
1853        assert_eq!(keys.len(), 2, "one bundle key per tombstone (got {keys:?})");
1854        assert!(keys.contains(&format!("repo/refs/heads/main/{SHA_FULL}.bundle")));
1855        assert!(keys.contains(&format!("repo/refs/heads/feature/{SHA_TIP}.bundle")));
1856    }
1857
1858    /// A fresh bucket with no `gc/` directory must not error — empty
1859    /// set is the right answer.
1860    #[tokio::test]
1861    async fn tombstoned_bundle_keys_empty_when_no_tombstones() {
1862        let store = MockStore::new();
1863        let keys = tombstoned_bundle_keys(&store, Some("repo")).await.unwrap();
1864        assert!(keys.is_empty(), "empty bucket yields no tombstoned keys");
1865    }
1866
1867    /// Root-prefix repos (no `<prefix>/` segment, keys collapse to
1868    /// `gc/baseline-tomb-*.json` and `refs/heads/<ref>/<sha>.bundle`)
1869    /// must produce the same tombstone → bundle-key mapping. All other
1870    /// tests cover `Some("repo")`; this is the negative-control for
1871    /// the `prefix.unwrap_or("")` path in `tombstoned_bundle_keys`.
1872    #[tokio::test]
1873    async fn tombstoned_bundle_keys_handles_root_prefix() {
1874        let store = MockStore::new();
1875        write_baseline_tombstone_for_orphan(&store, None, &ref_main(), &sha40(SHA_FULL))
1876            .await
1877            .unwrap();
1878
1879        let keys = tombstoned_bundle_keys(&store, None).await.unwrap();
1880        assert_eq!(keys.len(), 1, "got {keys:?}");
1881        assert!(
1882            keys.contains(&format!("refs/heads/main/{SHA_FULL}.bundle")),
1883            "root-prefix bundle key (no leading repo/) must be produced; got {keys:?}",
1884        );
1885    }
1886
1887    /// An unparseable tombstone must not block the rest. Mirrors
1888    /// `sweep_one_baseline_tombstone`'s tolerance for bad records:
1889    /// the sweep loop logs a warn and continues.
1890    #[tokio::test]
1891    async fn tombstoned_bundle_keys_skips_unparseable_tombstones() {
1892        let store = MockStore::new();
1893        // One good tombstone + one garbage tombstone-keyed file.
1894        write_baseline_tombstone_for_orphan(&store, Some("repo"), &ref_main(), &sha40(SHA_FULL))
1895            .await
1896            .unwrap();
1897        store.insert(
1898            "repo/gc/baseline-tomb-garbage.json",
1899            Bytes::from_static(b"not json"),
1900        );
1901
1902        let keys = tombstoned_bundle_keys(&store, Some("repo")).await.unwrap();
1903        assert_eq!(
1904            keys.len(),
1905            1,
1906            "good tombstone must still be returned despite garbage sibling",
1907        );
1908        assert!(keys.contains(&format!("repo/refs/heads/main/{SHA_FULL}.bundle")));
1909    }
1910
1911    #[tokio::test]
1912    async fn write_baseline_tombstone_round_trips() {
1913        // Writer + parser agree on the on-bucket shape. Regression
1914        // guard: a future serde tweak that broke the JSON layout would
1915        // make sweep silently skip every baseline tombstone.
1916        let store = MockStore::new();
1917        let prior = sha40(SHA_FULL);
1918        let current = sha40(SHA_TIP);
1919        write_baseline_tombstone(&store, Some("repo"), &ref_main(), &prior, &current)
1920            .await
1921            .unwrap();
1922        let metas = store.list("repo/gc/").await.unwrap();
1923        let tomb_key = metas
1924            .iter()
1925            .find(|m| {
1926                m.key
1927                    .starts_with(&baseline_tombstone_listing_prefix(Some("repo")))
1928            })
1929            .map(|m| m.key.clone())
1930            .expect("baseline tombstone written");
1931        let body = store.get_bytes(&tomb_key).await.unwrap();
1932        let parsed = BaselineTombstone::from_json_bytes(&body).unwrap();
1933        assert_eq!(parsed.v, TOMBSTONE_SCHEMA_VERSION);
1934        assert_eq!(parsed.ref_name, "refs/heads/main");
1935        assert_eq!(parsed.sha, prior);
1936    }
1937
1938    #[tokio::test]
1939    async fn write_baseline_tombstone_skips_when_prior_equals_current() {
1940        // No-op when the keys alias: a tombstone in this case would
1941        // later cause sweep to delete the live baseline bundle.
1942        let store = MockStore::new();
1943        let sha = sha40(SHA_FULL);
1944        write_baseline_tombstone(&store, Some("repo"), &ref_main(), &sha, &sha)
1945            .await
1946            .unwrap();
1947        let metas = store.list("repo/gc/").await.unwrap();
1948        assert!(
1949            metas.is_empty(),
1950            "aliasing prior/current must not write a tombstone",
1951        );
1952    }
1953
1954    #[tokio::test]
1955    async fn sweep_defers_baseline_tombstone_within_grace_window() {
1956        // Issue #134: a fetch that started before compact must be able
1957        // to read the prior baseline within the grace window. Concrete
1958        // manifestation: a baseline tombstone marked "now" is left
1959        // alone, and the bundle it names stays on the bucket.
1960        let store = MockStore::new();
1961        let bundle_key = insert_baseline_bundle(&store, Some("repo"), SHA_FULL);
1962        let now = OffsetDateTime::now_utc().format(&Rfc3339).unwrap();
1963        let tomb_key = write_baseline_tombstone_at(&store, "repo", &now, SHA_FULL);
1964
1965        let outcome = sweep(
1966            &store,
1967            "repo",
1968            SweepOpts {
1969                grace_hours: 24,
1970                force: false,
1971            },
1972        )
1973        .await
1974        .unwrap();
1975        assert_eq!(outcome.deferred_tombstones, 1);
1976        assert_eq!(outcome.swept_tombstones, 0);
1977        assert_eq!(outcome.deleted_objects, 0);
1978        store
1979            .get_bytes(&bundle_key)
1980            .await
1981            .expect("bundle must survive sweep within grace");
1982        store
1983            .get_bytes(&tomb_key)
1984            .await
1985            .expect("tombstone must survive sweep within grace");
1986    }
1987
1988    #[tokio::test]
1989    async fn sweep_reclaims_baseline_tombstone_after_grace_window() {
1990        // Issue #134: past the grace window, sweep deletes the bundle
1991        // and the tombstone. This is the path that reclaims the
1992        // orphan baseline left in place by compact / force-push.
1993        let store = MockStore::new();
1994        let bundle_key = insert_baseline_bundle(&store, Some("repo"), SHA_FULL);
1995        let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
1996            .format(&Rfc3339)
1997            .unwrap();
1998        let tomb_key = write_baseline_tombstone_at(&store, "repo", &stale, SHA_FULL);
1999
2000        let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
2001        assert_eq!(outcome.swept_tombstones, 1);
2002        assert_eq!(outcome.deferred_tombstones, 0);
2003        assert_eq!(outcome.deleted_objects, 1, "bundle delete");
2004        let bundle_err = store.get_bytes(&bundle_key).await.unwrap_err();
2005        assert!(matches!(bundle_err, ObjectStoreError::NotFound(_)));
2006        let tomb_err = store.get_bytes(&tomb_key).await.unwrap_err();
2007        assert!(matches!(tomb_err, ObjectStoreError::NotFound(_)));
2008    }
2009
2010    #[tokio::test]
2011    async fn sweep_skips_re_baselined_bundle_after_grace() {
2012        // A later push re-baselined to the SAME SHA the tombstone names
2013        // (force-push at the same tip, or compact short-cut). Sweep
2014        // must NOT delete the bundle — it is live again. The
2015        // now-stale tombstone is dropped.
2016        let store = MockStore::new();
2017        let bundle_key = insert_baseline_bundle(&store, Some("repo"), SHA_FULL);
2018        // Live chain points at the same SHA the tombstone names.
2019        let chain = ChainManifest {
2020            v: 1,
2021            tip: sha40(SHA_TIP),
2022            full_at: sha40(SHA_FULL),
2023            segments: vec![segment(SHA_PACK_LIVE, None)],
2024        };
2025        write_chain(&store, Some("repo"), &ref_main(), &chain)
2026            .await
2027            .unwrap();
2028        let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
2029            .format(&Rfc3339)
2030            .unwrap();
2031        let tomb_key = write_baseline_tombstone_at(&store, "repo", &stale, SHA_FULL);
2032
2033        let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
2034        assert_eq!(outcome.swept_tombstones, 1);
2035        assert_eq!(outcome.skipped_repointed_packs, 1);
2036        assert_eq!(outcome.deleted_objects, 0);
2037        store
2038            .get_bytes(&bundle_key)
2039            .await
2040            .expect("re-baselined bundle must survive");
2041        let tomb_err = store.get_bytes(&tomb_key).await.unwrap_err();
2042        assert!(matches!(tomb_err, ObjectStoreError::NotFound(_)));
2043    }
2044
2045    #[tokio::test]
2046    async fn sweep_baseline_tolerates_already_deleted_bundle() {
2047        // The bundle was deleted out of band (operator cleanup, or a
2048        // ref deletion that happened to sweep it). Sweep must finish
2049        // cleanly.
2050        let store = MockStore::new();
2051        let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
2052            .format(&Rfc3339)
2053            .unwrap();
2054        let tomb_key = write_baseline_tombstone_at(&store, "repo", &stale, SHA_FULL);
2055        // No bundle inserted.
2056
2057        let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
2058        assert_eq!(outcome.swept_tombstones, 1);
2059        assert_eq!(outcome.deleted_objects, 0);
2060        let tomb_err = store.get_bytes(&tomb_key).await.unwrap_err();
2061        assert!(matches!(tomb_err, ObjectStoreError::NotFound(_)));
2062    }
2063
2064    #[tokio::test]
2065    async fn sweep_baseline_force_bypasses_grace_only_not_live_recheck() {
2066        // --force on a fresh baseline tombstone whose SHA is now live
2067        // (re-baselined). Grace is bypassed (tombstone is processed),
2068        // but the live-state guard fires and the bundle stays.
2069        let store = MockStore::new();
2070        let bundle_key = insert_baseline_bundle(&store, Some("repo"), SHA_FULL);
2071        let chain = ChainManifest {
2072            v: 1,
2073            tip: sha40(SHA_TIP),
2074            full_at: sha40(SHA_FULL),
2075            segments: vec![segment(SHA_PACK_LIVE, None)],
2076        };
2077        write_chain(&store, Some("repo"), &ref_main(), &chain)
2078            .await
2079            .unwrap();
2080        let now = OffsetDateTime::now_utc().format(&Rfc3339).unwrap();
2081        write_baseline_tombstone_at(&store, "repo", &now, SHA_FULL);
2082
2083        let outcome = sweep(
2084            &store,
2085            "repo",
2086            SweepOpts {
2087                grace_hours: 24,
2088                force: true,
2089            },
2090        )
2091        .await
2092        .unwrap();
2093        assert_eq!(outcome.swept_tombstones, 1);
2094        assert_eq!(outcome.deferred_tombstones, 0);
2095        assert_eq!(outcome.skipped_repointed_packs, 1);
2096        assert_eq!(outcome.deleted_objects, 0);
2097        store
2098            .get_bytes(&bundle_key)
2099            .await
2100            .expect("live bundle must survive --force sweep");
2101    }
2102
2103    #[tokio::test]
2104    async fn sweep_processes_pack_and_baseline_tombstones_in_one_pass() {
2105        // Mixed tombstone types under `<prefix>/gc/`. Sweep must
2106        // dispatch each to the right handler without mis-counting or
2107        // skipping.
2108        let store = MockStore::new();
2109        let bundle_key = insert_baseline_bundle(&store, Some("repo"), SHA_FULL);
2110        insert_pack_pair(&store, Some("repo"), SHA_PACK_ORPHAN);
2111        let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
2112            .format(&Rfc3339)
2113            .unwrap();
2114        write_tombstone(&store, "repo", &stale, sha_set([SHA_PACK_ORPHAN]));
2115        write_baseline_tombstone_at(&store, "repo", &stale, SHA_FULL);
2116
2117        let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
2118        assert_eq!(outcome.swept_tombstones, 2);
2119        // pack + idx + bundle = 3 deletions
2120        assert_eq!(outcome.deleted_objects, 3);
2121        let bundle_err = store.get_bytes(&bundle_key).await.unwrap_err();
2122        assert!(matches!(bundle_err, ObjectStoreError::NotFound(_)));
2123        let pack_err = store
2124            .get_bytes(&format!("repo/packs/{SHA_PACK_ORPHAN}.pack"))
2125            .await
2126            .unwrap_err();
2127        assert!(matches!(pack_err, ObjectStoreError::NotFound(_)));
2128    }
2129
2130    #[tokio::test]
2131    async fn compact_to_sweep_round_trip_simulates_concurrent_fetch_then_gc() {
2132        // End-to-end issue #134 scenario: compact writes a tombstone
2133        // (we simulate by hand to avoid pulling in the full compact
2134        // fixture), an in-flight fetch reads the prior bundle within
2135        // grace and succeeds, and a later sweep past the grace
2136        // reclaims it.
2137        let store = MockStore::new();
2138        let bundle_key = insert_baseline_bundle(&store, Some("repo"), SHA_FULL);
2139        // Compact moved the baseline to a new SHA — simulate by
2140        // writing a chain pointing to SHA_TIP as full_at.
2141        let chain = ChainManifest {
2142            v: 1,
2143            tip: sha40(SHA_TIP),
2144            full_at: sha40(SHA_TIP),
2145            segments: vec![segment(SHA_PACK_LIVE, None)],
2146        };
2147        write_chain(&store, Some("repo"), &ref_main(), &chain)
2148            .await
2149            .unwrap();
2150        let prior = sha40(SHA_FULL);
2151        let current = sha40(SHA_TIP);
2152        write_baseline_tombstone(&store, Some("repo"), &ref_main(), &prior, &current)
2153            .await
2154            .unwrap();
2155
2156        // In-flight fetch: bundle GET within grace MUST succeed.
2157        let body = store.get_bytes(&bundle_key).await.unwrap();
2158        assert_eq!(&body[..], b"BUNDLE");
2159        let in_grace = sweep(
2160            &store,
2161            "repo",
2162            SweepOpts {
2163                grace_hours: 24,
2164                force: false,
2165            },
2166        )
2167        .await
2168        .unwrap();
2169        assert_eq!(in_grace.deferred_tombstones, 1);
2170        store
2171            .get_bytes(&bundle_key)
2172            .await
2173            .expect("bundle must survive in-grace sweep");
2174
2175        // Backdate the tombstone past the grace and re-sweep —
2176        // bundle is reaped.
2177        let metas = store.list("repo/gc/").await.unwrap();
2178        let tomb_key = metas
2179            .iter()
2180            .find(|m| {
2181                m.key
2182                    .starts_with(&baseline_tombstone_listing_prefix(Some("repo")))
2183            })
2184            .map(|m| m.key.clone())
2185            .unwrap();
2186        let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
2187            .format(&Rfc3339)
2188            .unwrap();
2189        let body = store.get_bytes(&tomb_key).await.unwrap();
2190        let mut tomb: BaselineTombstone = serde_json::from_slice(&body).unwrap();
2191        tomb.marked_at = stale;
2192        let new_body = serde_json::to_vec_pretty(&tomb).unwrap();
2193        store.insert(&tomb_key, Bytes::from(new_body));
2194
2195        let post_grace = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
2196        assert_eq!(post_grace.swept_tombstones, 1);
2197        assert_eq!(post_grace.deleted_objects, 1);
2198        let err = store.get_bytes(&bundle_key).await.unwrap_err();
2199        assert!(matches!(err, ObjectStoreError::NotFound(_)));
2200    }
2201
2202    #[tokio::test]
2203    async fn sweep_preserves_corrupt_baseline_tombstone_for_diagnosis() {
2204        // Issue #146: a baseline tombstone whose `ref_name` no longer
2205        // passes `RefName::new` cannot be turned into a bundle key, so
2206        // deleting the tombstone would orphan the bundle on the bucket
2207        // with no record. Sweep must preserve BOTH records (tombstone
2208        // and any bundle it would have named under the raw string), and
2209        // signal `Deferred` so an operator can reconcile manually.
2210        let store = MockStore::new();
2211        // Seed a "bundle" at the raw-string path the tombstone names,
2212        // so a regression that reconstructs a key from the raw ref_name
2213        // and deletes it would also be caught here.
2214        let bad_ref = "refs/heads/[bad]";
2215        assert!(
2216            RefName::new(bad_ref).is_err(),
2217            "fixture relies on this ref_name failing RefName::new",
2218        );
2219        let bundle_key = format!("repo/{bad_ref}/{SHA_FULL}.bundle");
2220        store.insert(&bundle_key, Bytes::from_static(b"BUNDLE"));
2221
2222        let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
2223            .format(&Rfc3339)
2224            .unwrap();
2225        let tomb_key = baseline_tombstone_key("repo", &Uuid::new_v4().to_string());
2226        let body = BaselineTombstone {
2227            v: TOMBSTONE_SCHEMA_VERSION,
2228            marked_at: stale,
2229            ref_name: bad_ref.to_owned(),
2230            sha: sha40(SHA_FULL),
2231        }
2232        .to_json_pretty()
2233        .unwrap();
2234        store.insert(&tomb_key, Bytes::from(body));
2235
2236        let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
2237        assert_eq!(
2238            outcome.deferred_tombstones, 1,
2239            "corrupt tombstone counts as deferred, not swept",
2240        );
2241        assert_eq!(outcome.swept_tombstones, 0);
2242        assert_eq!(outcome.deleted_objects, 0);
2243        // Tombstone survives (operator must inspect).
2244        let surviving = store
2245            .get_bytes(&tomb_key)
2246            .await
2247            .expect("corrupt tombstone must survive sweep");
2248        let parsed = BaselineTombstone::from_json_bytes(&surviving).unwrap();
2249        assert_eq!(parsed.ref_name, bad_ref);
2250        // Bundle at the would-be key survives (the key was unreachable
2251        // through the normal RefName path, but sweep must not have
2252        // reconstructed it from the raw string either).
2253        store
2254            .get_bytes(&bundle_key)
2255            .await
2256            .expect("orphan bundle must survive corrupt-tombstone sweep");
2257    }
2258
2259    // --- per-tombstone live-pack recompute (issue #140) --------------
2260
2261    /// One-shot post-delete hook used by [`PostDeleteHookStore`].
2262    type PostDeleteHook = Box<dyn FnOnce(&MockStore) + Send>;
2263
2264    /// Test-only [`ObjectStore`] decorator that runs a one-shot
2265    /// callback the first time `delete()` succeeds on a key matching
2266    /// `trigger_prefix`, *after* the inner delete completes. Used to
2267    /// inject a concurrent push (writing a fresh `chain.json`) between
2268    /// successive `sweep_one_tombstone` iterations and verify that the
2269    /// per-tombstone live-pack recompute picks it up.
2270    ///
2271    /// Every other trait method forwards to the inner store unchanged.
2272    struct PostDeleteHookStore {
2273        inner: MockStore,
2274        hook: std::sync::Mutex<Option<PostDeleteHook>>,
2275        /// Key-prefix the hook fires on. The pack-tombstone case
2276        /// uses `<prefix>/gc/tombstones-`; the test never deletes
2277        /// other keys before the intended trigger so this stays
2278        /// unambiguous.
2279        trigger_prefix: String,
2280    }
2281
2282    impl PostDeleteHookStore {
2283        fn new(
2284            inner: MockStore,
2285            trigger_prefix: impl Into<String>,
2286            hook: impl FnOnce(&MockStore) + Send + 'static,
2287        ) -> Self {
2288            Self {
2289                inner,
2290                hook: std::sync::Mutex::new(Some(Box::new(hook))),
2291                trigger_prefix: trigger_prefix.into(),
2292            }
2293        }
2294    }
2295
2296    crate::delegate_to_inner_impl! {
2297        impl ObjectStore for PostDeleteHookStore {
2298            forward: list, get_to_file, get_bytes, get_bytes_range,
2299                     put_bytes, put_path, put_if_absent,
2300                     head, copy;
2301
2302            async fn delete(&self, key: &str) -> Result<(), ObjectStoreError> {
2303                let result = self.inner.delete(key).await;
2304                if result.is_ok()
2305                    && key.starts_with(&self.trigger_prefix)
2306                    && let Some(hook) = self.hook.lock().unwrap().take()
2307                {
2308                    hook(&self.inner);
2309                }
2310                result
2311            }
2312        }
2313    }
2314
2315    #[tokio::test]
2316    async fn sweep_re_derives_referenced_set_per_tombstone() {
2317        // Issue #140 regression: a concurrent push committing
2318        // chain.json between two `sweep_one_tombstone` iterations
2319        // must not let sweep delete a pack the new chain references.
2320        //
2321        // Layout: two stale tombstones, each naming a distinct pack
2322        // on its own ref. After the FIRST tombstone is fully
2323        // processed and deleted, the post-delete hook fires and
2324        // writes BOTH refs' `chain.json` files — simulating a
2325        // concurrent push that committed chain.json for the second
2326        // ref between sweep's two iterations. The second iteration
2327        // must re-derive the live set and skip the delete.
2328        //
2329        // Pre-fix: the once-per-sweep snapshot is empty for both
2330        // iterations and BOTH packs are deleted (`deleted_objects = 4`).
2331        // Post-fix: the second iteration's recompute picks up the new
2332        // chain and the second pack survives
2333        // (`deleted_objects = 2`, `skipped_repointed_packs = 1`).
2334        //
2335        // The hook writes chains for both refs (rather than guessing
2336        // which tombstone runs first) so the assertions are independent
2337        // of MockStore iteration order. Writing the first ref's chain
2338        // is a no-op for that pack — its delete already happened
2339        // before the hook fired — and the second ref's chain is what
2340        // protects the still-pending pack.
2341        let inner = MockStore::new();
2342        let stale_a = (OffsetDateTime::now_utc() - time::Duration::hours(49))
2343            .format(&Rfc3339)
2344            .unwrap();
2345        let stale_b = (OffsetDateTime::now_utc() - time::Duration::hours(48))
2346            .format(&Rfc3339)
2347            .unwrap();
2348        write_tombstone(&inner, "repo", &stale_a, sha_set([SHA_PACK_ORPHAN]));
2349        write_tombstone(&inner, "repo", &stale_b, sha_set([SHA_PACK_ORPHAN_2]));
2350        insert_pack_pair(&inner, Some("repo"), SHA_PACK_ORPHAN);
2351        insert_pack_pair(&inner, Some("repo"), SHA_PACK_ORPHAN_2);
2352
2353        // After the FIRST tombstone delete completes, simulate the
2354        // concurrent push by committing chain.json files for both
2355        // refs at once.
2356        let store = PostDeleteHookStore::new(inner, "repo/gc/tombstones-", |inner| {
2357            for (ref_path, pack_sha) in [
2358                ("repo/refs/heads/branch_a/chain.json", SHA_PACK_ORPHAN),
2359                ("repo/refs/heads/branch_b/chain.json", SHA_PACK_ORPHAN_2),
2360            ] {
2361                let chain = ChainManifest {
2362                    v: 1,
2363                    tip: sha40(SHA_TIP),
2364                    full_at: sha40(SHA_FULL),
2365                    segments: vec![segment(pack_sha, None)],
2366                };
2367                let body =
2368                    serde_json::to_vec_pretty(&chain).expect("chain.json serializes for the test");
2369                inner.insert(ref_path, Bytes::from(body));
2370            }
2371        });
2372
2373        let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
2374        // Both tombstones processed.
2375        assert_eq!(outcome.swept_tombstones, 2);
2376        // Whichever tombstone ran first deleted its pack pair (2
2377        // objects). The second iteration's recompute saw the
2378        // freshly-committed chain and skipped the delete.
2379        assert_eq!(outcome.deleted_objects, 2);
2380        assert_eq!(outcome.skipped_repointed_packs, 1);
2381
2382        // Exactly one of the two packs survives — the one whose
2383        // tombstone was processed second.
2384        let first_survives = store
2385            .inner
2386            .get_bytes(&format!("repo/packs/{SHA_PACK_ORPHAN}.pack"))
2387            .await
2388            .is_ok();
2389        let second_survives = store
2390            .inner
2391            .get_bytes(&format!("repo/packs/{SHA_PACK_ORPHAN_2}.pack"))
2392            .await
2393            .is_ok();
2394        assert!(
2395            first_survives ^ second_survives,
2396            "exactly one pack must survive: \
2397             first_survives={first_survives}, second_survives={second_survives}",
2398        );
2399    }
2400
2401    #[tokio::test]
2402    async fn sweep_re_derives_referenced_set_per_pack_within_tombstone() {
2403        // Issue #152 regression: a concurrent push committing
2404        // chain.json AFTER `sweep_one_tombstone`'s referenced-set
2405        // snapshot but BEFORE a later pack in the SAME tombstone is
2406        // reached must not let sweep delete the now-live pack.
2407        //
2408        // Layout: one stale tombstone naming TWO orphan packs (the
2409        // shape `mark()` produces — every orphan SHA for a run goes
2410        // into one tombstone body). The post-delete hook fires on
2411        // the FIRST `packs/` delete (the first pack's `.pack` key,
2412        // mid-iter-1) and writes a chain.json that references the
2413        // SECOND pack — simulating a concurrent push that landed
2414        // after the per-tombstone snapshot.
2415        //
2416        // Pre-fix (single snapshot per tombstone, taken before the
2417        // loop): `referenced` is empty for both iterations and both
2418        // packs are deleted (`deleted_objects = 4`,
2419        // `skipped_repointed_packs = 0`). Post-fix (per-pack
2420        // recompute): the second iteration's fresh recompute picks
2421        // up the new chain and skips the delete
2422        // (`deleted_objects = 2`, `skipped_repointed_packs = 1`).
2423        //
2424        // The pack order inside `orphan_packs` is the Vec insertion
2425        // order — deterministic — so the assertion is on the EXACT
2426        // surviving pack key (`SHA_PACK_ORPHAN_2`), not a generic
2427        // "one of two survives". This catches a regression that
2428        // dropped the recompute entirely (both deleted) AND a
2429        // regression that kept the recompute but forgot to skip on
2430        // re-reference (would also delete the second pack).
2431        let inner = MockStore::new();
2432        let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
2433            .format(&Rfc3339)
2434            .unwrap();
2435        write_tombstone(
2436            &inner,
2437            "repo",
2438            &stale,
2439            sha_set([SHA_PACK_ORPHAN, SHA_PACK_ORPHAN_2]),
2440        );
2441        insert_pack_pair(&inner, Some("repo"), SHA_PACK_ORPHAN);
2442        insert_pack_pair(&inner, Some("repo"), SHA_PACK_ORPHAN_2);
2443
2444        // Fire on the first `packs/` delete (the first pack's `.pack`
2445        // key) — strictly between the two `orphan_packs` iterations
2446        // from the caller's perspective: iter-1's deletes complete,
2447        // iter-2 has not yet run its `list_referenced_packs`. The
2448        // hook writes a chain.json that re-references the SECOND
2449        // pack, which the per-pack recompute must observe.
2450        let store = PostDeleteHookStore::new(inner, "repo/packs/", |inner| {
2451            let chain = ChainManifest {
2452                v: 1,
2453                tip: sha40(SHA_TIP),
2454                full_at: sha40(SHA_FULL),
2455                segments: vec![segment(SHA_PACK_ORPHAN_2, None)],
2456            };
2457            let body =
2458                serde_json::to_vec_pretty(&chain).expect("chain.json serializes for the test");
2459            inner.insert("repo/refs/heads/concurrent/chain.json", Bytes::from(body));
2460        });
2461
2462        let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
2463        assert_eq!(outcome.swept_tombstones, 1);
2464        // First pack: deleted (.pack + .idx = 2). Second pack: skipped
2465        // because the per-pack recompute saw the freshly-committed
2466        // chain referencing it.
2467        assert_eq!(
2468            outcome.deleted_objects, 2,
2469            "only the first pack's pair deleted; the second was re-referenced",
2470        );
2471        assert_eq!(
2472            outcome.skipped_repointed_packs, 1,
2473            "second iteration's per-pack recompute must skip the re-referenced pack",
2474        );
2475        // Exact surviving pack: the second one (the one the
2476        // concurrent push re-referenced). Asserting on the specific
2477        // key — not a broad "one survives" — catches a regression
2478        // that flipped the iteration order or skipped the wrong pack.
2479        store
2480            .inner
2481            .get_bytes(&format!("repo/packs/{SHA_PACK_ORPHAN_2}.pack"))
2482            .await
2483            .expect("re-referenced pack must survive");
2484        store
2485            .inner
2486            .get_bytes(&format!("repo/packs/{SHA_PACK_ORPHAN_2}.idx"))
2487            .await
2488            .expect("re-referenced pack idx must survive");
2489        // First pack is gone.
2490        let first_err = store
2491            .inner
2492            .get_bytes(&format!("repo/packs/{SHA_PACK_ORPHAN}.pack"))
2493            .await
2494            .unwrap_err();
2495        assert!(matches!(first_err, ObjectStoreError::NotFound(_)));
2496    }
2497
2498    #[tokio::test]
2499    async fn sweep_reclaims_genuinely_orphan_pack_with_per_tombstone_recompute() {
2500        // Sanity: the per-tombstone recompute does NOT regress the
2501        // normal sweep path. A stale tombstone naming a pack with no
2502        // chain reference is reclaimed exactly as before.
2503        let store = MockStore::new();
2504        let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
2505            .format(&Rfc3339)
2506            .unwrap();
2507        write_tombstone(&store, "repo", &stale, sha_set([SHA_PACK_ORPHAN]));
2508        insert_pack_pair(&store, Some("repo"), SHA_PACK_ORPHAN);
2509        // No chain.json at all: referenced set is empty for every
2510        // recompute pass.
2511
2512        let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
2513        assert_eq!(outcome.swept_tombstones, 1);
2514        assert_eq!(outcome.deleted_objects, 2);
2515        assert_eq!(outcome.skipped_repointed_packs, 0);
2516    }
2517
2518    #[tokio::test]
2519    async fn sweep_one_baseline_tombstone_re_reads_chain_per_tombstone() {
2520        // Companion to `sweep_re_derives_referenced_set_per_tombstone`
2521        // for the baseline tombstone path. `sweep_one_baseline_tombstone`
2522        // calls `load_chain` inside the function, so each tombstone in
2523        // a sweep pass sees a freshly-loaded chain. A regression that
2524        // hoisted the chain load out of the per-tombstone loop would
2525        // let a concurrent push between iterations slip past the
2526        // re-baselined-to-same-SHA guard, deleting a bundle that is
2527        // once again live.
2528        //
2529        // Layout: two stale baseline tombstones for the same ref both
2530        // naming SHA_FULL, plus a baseline bundle at SHA_FULL. With
2531        // no live chain initially, the first iteration sees
2532        // `chain.is_none()` → `still_live = false` → deletes the
2533        // bundle. The post-delete hook (firing after the tombstone
2534        // delete that follows the bundle delete) writes a chain whose
2535        // `full_at == SHA_FULL`, simulating a force-push that
2536        // re-baselined to the same SHA. The second iteration must
2537        // re-read the chain, observe `full_at == tombstone.sha`, set
2538        // `still_live = true`, and refuse to re-delete (the bundle is
2539        // already gone anyway, but the assertion is on the counter:
2540        // `skipped_repointed_packs == 1`).
2541        let inner = MockStore::new();
2542        let bundle_key = insert_baseline_bundle(&inner, Some("repo"), SHA_FULL);
2543        let stale_a = (OffsetDateTime::now_utc() - time::Duration::hours(49))
2544            .format(&Rfc3339)
2545            .unwrap();
2546        let stale_b = (OffsetDateTime::now_utc() - time::Duration::hours(48))
2547            .format(&Rfc3339)
2548            .unwrap();
2549        let tomb_a = write_baseline_tombstone_at(&inner, "repo", &stale_a, SHA_FULL);
2550        let tomb_b = write_baseline_tombstone_at(&inner, "repo", &stale_b, SHA_FULL);
2551
2552        // Trigger on the baseline-tomb prefix: the hook fires AFTER
2553        // the FIRST iteration's tombstone delete completes (the
2554        // tombstone delete is the last delete in `sweep_one_baseline_tombstone`),
2555        // which is precisely the window in which a concurrent
2556        // force-push could land before the second iteration's chain
2557        // re-read.
2558        let tomb_listing = baseline_tombstone_listing_prefix(Some("repo"));
2559        let store = PostDeleteHookStore::new(inner, &tomb_listing, |inner| {
2560            let chain = ChainManifest {
2561                v: 1,
2562                tip: sha40(SHA_TIP),
2563                full_at: sha40(SHA_FULL),
2564                segments: vec![segment(SHA_PACK_LIVE, None)],
2565            };
2566            let body =
2567                serde_json::to_vec_pretty(&chain).expect("chain.json serializes for the test");
2568            inner.insert("repo/refs/heads/main/chain.json", Bytes::from(body));
2569        });
2570
2571        let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
2572        // Both baseline tombstones processed.
2573        assert_eq!(outcome.swept_tombstones, 2);
2574        // First iteration deleted the bundle (1 object). Second
2575        // iteration's fresh chain read showed full_at == tombstone.sha
2576        // and skipped the delete — the recompute is per-tombstone.
2577        assert_eq!(outcome.deleted_objects, 1, "only one bundle delete");
2578        assert_eq!(
2579            outcome.skipped_repointed_packs, 1,
2580            "second iteration must see the re-baselined chain and skip",
2581        );
2582        // Both tombstones are gone.
2583        for key in [&tomb_a, &tomb_b] {
2584            let err = store.inner.get_bytes(key).await.unwrap_err();
2585            assert!(matches!(err, ObjectStoreError::NotFound(_)));
2586        }
2587        // The bundle was deleted by the first iteration. Asserting on
2588        // the counter (not the bundle's presence) is what proves the
2589        // chain is re-read per tombstone — the survival is necessarily
2590        // about the COUNTER because the first iteration already removed
2591        // the bundle before the hook fired.
2592        let bundle_err = store.inner.get_bytes(&bundle_key).await.unwrap_err();
2593        assert!(matches!(bundle_err, ObjectStoreError::NotFound(_)));
2594    }
2595
2596    #[tokio::test]
2597    async fn sweep_protects_pack_when_concurrent_push_aliases_existing_key() {
2598        // Issue #140's canonical scenario, framed as the issue
2599        // describes it: a force-revert republishes a pack with the
2600        // SAME content SHA as the tombstoned pack (deterministic gix
2601        // pack emission). The concurrent push only updates
2602        // chain.json; the pack key is reused. Sweep must observe
2603        // the new chain reference and leave the pack alone.
2604        //
2605        // Modelled at the post-fix invariant level: the chain
2606        // referencing the tombstoned SHA exists when
2607        // `sweep_one_tombstone` runs its recompute, and the pack is
2608        // preserved with `skipped_repointed_packs += 1`.
2609        let store = MockStore::new();
2610        let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
2611            .format(&Rfc3339)
2612            .unwrap();
2613        write_tombstone(&store, "repo", &stale, sha_set([SHA_PACK_LIVE]));
2614        // Insert the pack, then commit chain.json referencing it —
2615        // identical-content SHA path through the engine ends here.
2616        insert_pack_pair(&store, Some("repo"), SHA_PACK_LIVE);
2617        let chain = ChainManifest {
2618            v: 1,
2619            tip: sha40(SHA_TIP),
2620            full_at: sha40(SHA_FULL),
2621            segments: vec![segment(SHA_PACK_LIVE, None)],
2622        };
2623        write_chain(&store, Some("repo"), &ref_main(), &chain)
2624            .await
2625            .unwrap();
2626
2627        let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
2628        assert_eq!(outcome.swept_tombstones, 1);
2629        assert_eq!(outcome.skipped_repointed_packs, 1);
2630        assert_eq!(outcome.deleted_objects, 0);
2631        store
2632            .get_bytes(&format!("repo/packs/{SHA_PACK_LIVE}.pack"))
2633            .await
2634            .expect("aliased pack must survive sweep");
2635    }
2636
2637    #[tokio::test]
2638    async fn grace_hours_env_override_falls_back_for_unset_or_invalid() {
2639        // `EnvGuard` holds the per-key lock for the whole test and
2640        // restores the prior value on drop, including on panic.
2641        let env = crate::test_util::EnvGuard::take(ENV_GC_GRACE_HOURS);
2642        // Unset returns default.
2643        env.clear();
2644        assert_eq!(grace_hours_from_env(), DEFAULT_GRACE_HOURS);
2645        // Non-numeric falls back.
2646        env.set_to("not-a-number");
2647        assert_eq!(grace_hours_from_env(), DEFAULT_GRACE_HOURS);
2648        // Zero falls back (would defeat the design).
2649        env.set_to("0");
2650        assert_eq!(grace_hours_from_env(), DEFAULT_GRACE_HOURS);
2651        // Positive integer wins.
2652        env.set_to("72");
2653        assert_eq!(grace_hours_from_env(), 72);
2654    }
2655
2656    #[test]
2657    fn resolve_grace_hours_honours_some_zero() {
2658        // The key semantic divergence from `resolve_lock_ttl_seconds`:
2659        // `Some(0)` is a legitimate "no grace window" operator intent
2660        // (force-mode tests like `delete_tombstone_is_reaped_by_gc_sweep`
2661        // depend on this), so the resolver must NOT clamp it. A
2662        // regression that copy-pasted the lock-TTL filter would silently
2663        // turn `--grace-hours 0` into `--grace-hours <env-default>`.
2664        assert_eq!(resolve_grace_hours(Some(0)), 0);
2665    }
2666
2667    #[test]
2668    fn resolve_grace_hours_returns_explicit_value() {
2669        assert_eq!(resolve_grace_hours(Some(7)), 7);
2670    }
2671
2672    #[tokio::test]
2673    async fn resolve_grace_hours_falls_back_to_env_for_none() {
2674        let env = crate::test_util::EnvGuard::take(ENV_GC_GRACE_HOURS);
2675        env.set_to("72");
2676        assert_eq!(resolve_grace_hours(None), 72);
2677    }
2678
2679    // --- mark list-order race (issue #135) ---------------------------
2680
2681    /// One-shot post-`list` hook used by [`PostListHookStore`].
2682    type PostListHook = Box<dyn FnOnce(&MockStore) + Send>;
2683
2684    /// Test-only [`ObjectStore`] decorator that runs a one-shot
2685    /// callback the first time `list()` returns successfully, *after*
2686    /// the inner list completes. Used to simulate a concurrent push
2687    /// that uploads a new pack AND commits its `chain.json` between
2688    /// `mark`'s two listings — the regression scenario for issue #135.
2689    /// Firing on the first list (regardless of prefix) means the hook
2690    /// runs between `list_pack_shas` and `list_referenced_packs` under
2691    /// either ordering, so the test exercises the race against both
2692    /// the buggy chains-first order and the fixed packs-first order.
2693    struct PostListHookStore {
2694        inner: MockStore,
2695        hook: std::sync::Mutex<Option<PostListHook>>,
2696    }
2697
2698    impl PostListHookStore {
2699        fn new(inner: MockStore, hook: impl FnOnce(&MockStore) + Send + 'static) -> Self {
2700            Self {
2701                inner,
2702                hook: std::sync::Mutex::new(Some(Box::new(hook))),
2703            }
2704        }
2705    }
2706
2707    crate::delegate_to_inner_impl! {
2708        impl ObjectStore for PostListHookStore {
2709            forward: get_to_file, get_bytes, get_bytes_range,
2710                     put_bytes, put_path, put_if_absent,
2711                     head, copy, delete;
2712
2713            async fn list(
2714                &self,
2715                prefix: &str,
2716            ) -> Result<Vec<crate::object_store::ObjectMeta>, ObjectStoreError> {
2717                let result = self.inner.list(prefix).await;
2718                if result.is_ok() {
2719                    let hook = self.hook.lock().unwrap().take();
2720                    if let Some(hook) = hook {
2721                        hook(&self.inner);
2722                    }
2723                }
2724                result
2725            }
2726        }
2727    }
2728
2729    #[tokio::test]
2730    async fn mark_packs_first_ordering_avoids_false_positive_under_concurrent_push() {
2731        // Issue #135 regression: a concurrent push that uploads a new
2732        // pack AND commits its chain.json between mark's two listings
2733        // must not be tombstoned as orphan.
2734        //
2735        // The hook fires after the FIRST list call against
2736        // `<prefix>/packs/` and inserts a new pack pair plus a
2737        // chain.json referencing it. With the fixed packs-first
2738        // ordering, the new pack is absent from the on-bucket snapshot
2739        // (the snapshot was already taken) and present in the
2740        // referenced set (the chain.json is committed before the
2741        // chain list runs). Either way, the new pack is NOT in the
2742        // orphan set.
2743        //
2744        // Pre-fix (chains-first ordering): the hook would fire after
2745        // the chain list, the chain commit would miss the chain
2746        // listing, and the pack would appear in `list_pack_shas` →
2747        // tombstoned as a false positive. The fix flips the ordering
2748        // so this test asserts orphan_count == 0.
2749        let inner = MockStore::new();
2750        // Seed an existing live chain + its pack so the test exercises
2751        // a realistic non-empty state.
2752        seed_live_chain(&inner, Some("repo")).await;
2753        insert_pack_pair(&inner, Some("repo"), SHA_PACK_LIVE);
2754
2755        let store = PostListHookStore::new(inner, |inner| {
2756            // Simulate the concurrent push landing mid-mark: upload a
2757            // fresh pack AND commit its chain.json BEFORE mark's
2758            // second listing runs.
2759            insert_pack_pair(inner, Some("repo"), SHA_PACK_ORPHAN);
2760            let new_chain = ChainManifest {
2761                v: 1,
2762                tip: sha40(SHA_TIP),
2763                full_at: sha40(SHA_FULL),
2764                segments: vec![segment(SHA_PACK_ORPHAN, None)],
2765            };
2766            // `write_chain` is async; the hook is sync, so insert
2767            // chain.json directly at the canonical key.
2768            let body =
2769                serde_json::to_vec_pretty(&new_chain).expect("chain.json serializes for the test");
2770            inner.insert("repo/refs/heads/concurrent/chain.json", Bytes::from(body));
2771        });
2772
2773        let outcome = mark(&store, "repo", MarkOpts::default()).await.unwrap();
2774        // The fresh pack must NOT be tombstoned: under packs-first
2775        // ordering it is either absent from `on_bucket` or present in
2776        // `referenced`.
2777        assert_eq!(
2778            outcome.orphan_count, 0,
2779            "packs-first ordering must not tombstone packs uploaded \
2780             during mark whose chain commits before the chain listing"
2781        );
2782        // No tombstone object emitted for an empty orphan set.
2783        let gc_metas = store.inner.list("repo/gc/").await.unwrap();
2784        assert!(gc_metas.is_empty(), "no tombstone for empty orphan set");
2785    }
2786
2787    // --- baseline-tombstone post-recheck race (issue #153) -----------
2788
2789    /// One-shot post-`get_bytes` hook used by [`PostGetHookStore`].
2790    type PostGetHook = Box<dyn FnOnce(&MockStore) + Send>;
2791
2792    /// Test-only [`ObjectStore`] decorator that runs a one-shot
2793    /// callback the first time `get_bytes()` succeeds on `trigger_key`,
2794    /// *after* the inner read completes. Used to deterministically
2795    /// simulate a concurrent force-push landing between the initial
2796    /// `load_chain` in `sweep_one_baseline_tombstone` and the
2797    /// immediate-pre-delete recheck added by issue #153.
2798    ///
2799    /// The `trigger_key` filter is exact-match so the hook fires only
2800    /// on the targeted chain.json read, not on the tombstone-body
2801    /// `get_bytes` that runs earlier in the same sweep.
2802    struct PostGetHookStore {
2803        inner: MockStore,
2804        hook: std::sync::Mutex<Option<PostGetHook>>,
2805        trigger_key: String,
2806    }
2807
2808    impl PostGetHookStore {
2809        fn new(
2810            inner: MockStore,
2811            trigger_key: impl Into<String>,
2812            hook: impl FnOnce(&MockStore) + Send + 'static,
2813        ) -> Self {
2814            Self {
2815                inner,
2816                hook: std::sync::Mutex::new(Some(Box::new(hook))),
2817                trigger_key: trigger_key.into(),
2818            }
2819        }
2820
2821        /// `true` once the hook has been consumed — used to witness
2822        /// that the production code reached the targeted read rather
2823        /// than skipping past it on a different branch.
2824        fn hook_fired(&self) -> bool {
2825            self.hook.lock().unwrap().is_none()
2826        }
2827    }
2828
2829    crate::delegate_to_inner_impl! {
2830        impl ObjectStore for PostGetHookStore {
2831            forward: list, get_to_file, get_bytes_range,
2832                     put_bytes, put_path, put_if_absent,
2833                     head, copy, delete;
2834
2835            async fn get_bytes(&self, key: &str) -> Result<Bytes, ObjectStoreError> {
2836                let result = self.inner.get_bytes(key).await;
2837                if result.is_ok()
2838                    && key == self.trigger_key
2839                    && let Some(hook) = self.hook.lock().unwrap().take()
2840                {
2841                    hook(&self.inner);
2842                }
2843                result
2844            }
2845        }
2846    }
2847
2848    #[tokio::test]
2849    async fn sweep_baseline_defers_when_recheck_observes_re_baseline() {
2850        // Issue #153 regression: a concurrent force-push or compact may
2851        // re-baseline the ref to the tombstoned SHA between the initial
2852        // `load_chain` and the bundle delete that follows. Without the
2853        // immediate-pre-delete recheck, sweep would erase a now-live
2854        // bundle and then drop its tombstone.
2855        //
2856        // Layout: a baseline bundle at SHA_FULL, a stale baseline
2857        // tombstone naming SHA_FULL, and an initial chain.json with
2858        // `full_at = SHA_TIP` (different from SHA_FULL) so the initial
2859        // check sees `still_live = false` and proceeds toward the
2860        // delete. The PostGetHookStore fires AFTER the first read of
2861        // `chain.json` (the initial `load_chain`) and overwrites it
2862        // with a chain whose `full_at = SHA_FULL` — modelling the
2863        // concurrent force-push landing in the gap.
2864        //
2865        // With the fix in place: the immediate-pre-delete recheck
2866        // observes the new state, `still_live` flips to true, and
2867        // sweep returns `Deferred` — preserving BOTH the bundle and
2868        // the tombstone so a future sweep can retry.
2869        let inner = MockStore::new();
2870        let bundle_key = insert_baseline_bundle(&inner, Some("repo"), SHA_FULL);
2871        // Initial chain points at a different SHA, so the first
2872        // `still_live` check is false and the code falls into the
2873        // delete branch where the recheck now lives.
2874        let initial_chain = ChainManifest {
2875            v: 1,
2876            tip: sha40(SHA_TIP),
2877            full_at: sha40(SHA_TIP),
2878            segments: vec![segment(SHA_PACK_LIVE, None)],
2879        };
2880        write_chain(&inner, Some("repo"), &ref_main(), &initial_chain)
2881            .await
2882            .unwrap();
2883        let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
2884            .format(&Rfc3339)
2885            .unwrap();
2886        let tomb_key = write_baseline_tombstone_at(&inner, "repo", &stale, SHA_FULL);
2887
2888        // Hook fires AFTER the FIRST get_bytes of chain.json (the
2889        // initial load_chain). Before the fix, that was the only
2890        // chain read; the bundle delete that followed would erase a
2891        // live bundle. After the fix, a second get_bytes (the
2892        // immediate-pre-delete recheck) sees this updated state.
2893        let chain_key = "repo/refs/heads/main/chain.json";
2894        let store = PostGetHookStore::new(inner, chain_key, move |inner| {
2895            let re_baselined = ChainManifest {
2896                v: 1,
2897                tip: sha40(SHA_TIP),
2898                full_at: sha40(SHA_FULL),
2899                segments: vec![segment(SHA_PACK_LIVE, None)],
2900            };
2901            let body = serde_json::to_vec_pretty(&re_baselined)
2902                .expect("chain.json serializes for the test");
2903            inner.insert(chain_key, Bytes::from(body));
2904        });
2905
2906        let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
2907        // The recheck caught the race: tombstone deferred for a
2908        // future sweep.
2909        assert_eq!(
2910            outcome.deferred_tombstones, 1,
2911            "recheck must defer when the chain re-baselined to the \
2912             tombstoned SHA between checks",
2913        );
2914        assert_eq!(outcome.swept_tombstones, 0);
2915        assert_eq!(outcome.deleted_objects, 0);
2916        assert_eq!(outcome.skipped_repointed_packs, 0);
2917
2918        // Witness that the production code actually executed the
2919        // recheck branch (otherwise the hook would still be armed and
2920        // the test would be vacuously passing).
2921        assert!(
2922            store.hook_fired(),
2923            "production code must have read chain.json so the hook \
2924             could inject the concurrent re-baseline",
2925        );
2926
2927        // Bundle MUST survive — the whole point of the fix.
2928        store
2929            .inner
2930            .get_bytes(&bundle_key)
2931            .await
2932            .expect("re-baselined bundle must survive sweep");
2933        // Tombstone MUST survive — preserved for a future sweep to
2934        // retry once the race settles.
2935        store
2936            .inner
2937            .get_bytes(&tomb_key)
2938            .await
2939            .expect("tombstone must survive deferred path");
2940    }
2941}