git_remote_object_store/packchain/gc.rs
1//! Two-phase mark-and-sweep garbage collection for orphan packs
2//! (issue #66, Phase 5 of #52).
3//!
4//! Orphan packs are pack files in `<prefix>/packs/` that no
5//! `chain.json` references. They accumulate from:
6//!
7//! - **Force push**: replaces a chain's segments; old packs become orphan.
8//! - **Lost-race push**: a pre-lock pack upload by the loser of a
9//! concurrent push (Phase 2 design — packs upload pre-lock to keep
10//! the lock window short, and the loser's pack is left orphan).
11//! - **Aborted push**: a crash between pack upload and chain.json
12//! commit leaves orphans the next push doesn't reach.
13//! - **Branch deletion**: `delete-branch` removes `chain.json` and
14//! `path-index.json` but does not touch `<prefix>/packs/`. The
15//! issue umbrella's "exclusively owned by that branch" claim is
16//! wrong under content-hash dedup; pack keys can be shared across
17//! branches that ever pushed identical object sets. The baseline
18//! bundle (`<prefix>/<ref>/<full_at>.bundle`) is tombstoned rather
19//! than deleted synchronously (issue #143), so an in-flight fetcher
20//! that already read the prior `chain.json` can still complete its
21//! range GET; the bundle is reclaimed by [`sweep`] after the grace
22//! window.
23//! - **Compaction** (when implemented): a chain rewrite leaves the
24//! superseded segment packs orphan.
25//! - **Missing `.idx`** (rare): a `.pack` whose sibling `.idx` was
26//! manually deleted is treated as orphan and tombstoned.
27//!
28//! ## Two-phase mark-and-sweep
29//!
30//! Naive deletion ("delete every pack older than 24 h") races a
31//! concurrent fetch on a freshly-orphaned pack: the pack's
32//! `last_modified` reflects upload time, not orphan time. The
33//! mark/sweep split fixes this by tombstoning at orphan time and
34//! deferring deletion until after a configurable grace window.
35//!
36//! ### Phase 1 (mark)
37//!
38//! 1. List `<prefix>/packs/` to snapshot the packs currently on the
39//! bucket. Packs-first is deliberate (issue #135): see "Concurrency"
40//! below.
41//! 2. List `<prefix>/refs/**/chain.json` across every ref namespace
42//! (`refs/heads/`, `refs/tags/`, `refs/notes/`, etc.), parse each,
43//! collect referenced pack content-shas.
44//! 3. **Fail closed** on parse error: abort, log the bad key, do not
45//! write tombstones. A corrupt chain could under-report the
46//! referenced set and tombstone live packs.
47//! 4. Derive the orphan set (`on_bucket - referenced`) and write
48//! `<prefix>/gc/tombstones-<run_id>-<rfc3339>.json`.
49//!
50//! ### Phase 2 (sweep)
51//!
52//! 1. List `<prefix>/gc/tombstones-*.json`.
53//! 2. For each tombstone past the grace age:
54//! - Re-derive the orphan set from the *current* chain state.
55//! Repeated **per tombstone**, not cached across the sweep: a
56//! concurrent push committing `chain.json` mid-sweep would let
57//! a cached snapshot delete a pack the new chain references,
58//! permanently dangling the reference (issue #140). Force-revert
59//! is the canonical trigger — deterministic gix pack emission
60//! lets the new push reuse the tombstoned pack key without
61//! re-uploading. The cost is one `list("refs/")` per eligible
62//! tombstone vs one per sweep; correctness wins over the linear
63//! overhead for the O(1)-eligible-tombstones common case.
64//! - For each pack still orphan, delete `.pack` + `.idx`
65//! idempotently (a prior partial sweep is fine).
66//! - Delete the tombstone itself.
67//! 3. Younger tombstones survive for the next sweep.
68//!
69//! ### Baseline-bundle tombstones (issues #134, #143)
70//!
71//! Baseline bundles at `<prefix>/<ref>/<full_at>.bundle` are NOT
72//! reapable by the mark/sweep flow above — they live outside
73//! `<prefix>/packs/`, so [`list_pack_shas`] never sees them. The
74//! compact, force-push, and `delete-branch` code paths instead enqueue
75//! a baseline tombstone at `<prefix>/gc/baseline-tomb-<uuid>.json`
76//! whenever they supersede or remove a baseline. Sweep processes those alongside pack
77//! tombstones: after the grace window expires it re-checks the
78//! current `chain.json` for the ref (skipping the delete if a later
79//! push re-baselined to the same SHA), then deletes the bundle and
80//! the tombstone. The bundle stays in place for the entire grace
81//! window, so a concurrent fetch that read the prior `chain.json`
82//! before the compact/force-push committed can still download it.
83//!
84//! ### `--force`
85//!
86//! Skips ONLY the grace window. The live-pack re-check still runs:
87//! a tombstone whose SHA appears in the current chain set is left
88//! alone. This closes the race where `mark()` snapshots packs after a
89//! concurrent push has uploaded `packs/<sha>.{pack,idx}` but has not
90//! yet committed `chain.json` — by sweep time the chain has landed
91//! and the pack is live, so the stale tombstone must not delete it.
92//! A `tracing::warn!` line records the operator's choice.
93//!
94//! ## Concurrency
95//!
96//! Two operators running `gc` simultaneously each get a `UUIDv4` run id
97//! → distinct tombstone files, no clobber. Concurrent sweeps tolerate
98//! `NotFound` on already-deleted packs.
99//!
100//! Mark lists packs first, then chains (issue #135). With this order,
101//! a push landing during mark either:
102//!
103//! - uploaded its pack *after* [`list_pack_shas`] — the pack is not in
104//! the on-bucket snapshot, so it cannot enter the orphan set
105//! regardless of when its chain commits; or
106//! - uploaded its pack *before* [`list_pack_shas`] AND committed
107//! `chain.json` before [`list_referenced_packs`] — the pack is in the
108//! referenced set, so it is filtered out of orphans; or
109//! - uploaded its pack *before* [`list_pack_shas`] and has not yet
110//! committed `chain.json` by the time [`list_referenced_packs`] runs
111//! — the pack is tombstoned, but the grace window leaves it readable
112//! long enough for the push to complete (the genuine-orphan case for
113//! an aborted push is exactly what the GC is designed to reap).
114//!
115//! The reverse order (chains-first) is the bug fixed by #135: a chain
116//! commit landing between the chain list and the pack list would let a
117//! freshly-uploaded pack appear in [`list_pack_shas`] without appearing
118//! in [`list_referenced_packs`], producing a false-positive tombstone.
119//! Sweep's per-tombstone re-derive (issue #140) would usually catch
120//! that at sweep time, but a `--force` sweep run in the same session as
121//! mark (e.g. `compact --with-gc`) could still delete the live pack
122//! before the push's chain commit lands.
123//!
124//! The grace window separately covers a fetch reading an old chain
125//! whose packs are about to be swept.
126
127use std::collections::HashSet;
128
129use bytes::Bytes;
130use futures::stream::{StreamExt, TryStreamExt};
131use serde::{Deserialize, Serialize};
132use time::OffsetDateTime;
133use time::format_description::well_known::Rfc3339;
134use tracing::{debug, error, info, warn};
135use uuid::Uuid;
136
137use crate::git::RefName;
138use crate::keys;
139use crate::object_store::{ObjectStore, ObjectStoreError, PutOpts};
140use crate::protocol::fetch::MAX_FETCH_CONCURRENCY;
141
142use super::PackchainError;
143use super::manifest::load_chain;
144use super::schema::{ChainManifest, Sha40};
145
146/// Default grace window between mark and sweep (24 hours). A pack
147/// tombstoned during mark is only deletable after this duration has
148/// elapsed since `marked_at`.
149pub const DEFAULT_GRACE_HOURS: u64 = 24;
150
151/// Decision returned by [`check_grace_window`].
152#[derive(Debug, Clone, Copy, PartialEq, Eq)]
153enum GraceDecision {
154 /// `marked_at` is recent enough that the tombstone must remain.
155 Within,
156 /// `marked_at` is older than `grace_hours`; the sweep may proceed.
157 Past,
158}
159
160/// Format `now` as an RFC 3339 string and wrap the underlying
161/// formatter error in a [`PackchainError::Io`]. Centralises the
162/// `OffsetDateTime::format` + `map_err` shape previously inlined at
163/// the two tombstone-write paths in [`write_baseline_tombstone`] and
164/// [`mark`].
165fn rfc3339_now() -> Result<String, PackchainError> {
166 OffsetDateTime::now_utc().format(&Rfc3339).map_err(|e| {
167 PackchainError::Io(std::io::Error::other(format!("rfc3339 format failed: {e}")))
168 })
169}
170
171/// Parse `marked_at` as RFC 3339 and decide whether `now - marked_at`
172/// has crossed `grace_hours`. A negative age (tombstone marked in the
173/// future under operator clock skew) is treated as
174/// [`GraceDecision::Within`] so a sweep does not run prematurely.
175///
176/// `kind` is interpolated into both the parse-error message and the
177/// `debug!` log line so the two sweep call paths (pack tombstones vs.
178/// baseline tombstones) stay distinguishable in operator logs.
179fn check_grace_window(
180 marked_at: &str,
181 grace_hours: u64,
182 kind: &'static str,
183) -> Result<GraceDecision, PackchainError> {
184 let marked_at_ts = OffsetDateTime::parse(marked_at, &Rfc3339).map_err(|e| {
185 PackchainError::Io(std::io::Error::other(format!(
186 "{kind} marked_at parse failed: {e}"
187 )))
188 })?;
189 let age_hours = (OffsetDateTime::now_utc() - marked_at_ts).whole_hours();
190 // Negative age = a tombstone marked in the future (operator clock
191 // skew). Treat as "still within grace" rather than sweeping
192 // prematurely. The `try_into` is the canonical way to compare an
193 // `i64` against an unsigned grace window without a sign-loss cast.
194 let within = age_hours
195 .try_into()
196 .map_or(true, |hours: u64| hours < grace_hours);
197 Ok(if within {
198 GraceDecision::Within
199 } else {
200 GraceDecision::Past
201 })
202}
203
204/// Best-effort version of [`write_baseline_tombstone`]: writes the
205/// tombstone and, on error, logs at `warn` with the orphan key and
206/// `source` discriminator (`"force-push"` / `"compact"`). Used by
207/// callers that run AFTER `chain.json` is durable, where a tombstone
208/// failure must NOT propagate as a push/compact failure — retrying the
209/// caller would short-circuit through `AlreadyMinimal` (compact) or
210/// the no-op same-SHA branch (push) and never re-attempt the cleanup,
211/// leaving the orphaned bundle without a tombstone.
212///
213/// Returns `true` iff a tombstone was successfully written. Callers
214/// that emit a success-only debug trace check the return value;
215/// `false` covers both the same-SHA short-circuit and a warned-on
216/// write error.
217pub(crate) async fn write_baseline_tombstone_best_effort(
218 store: &dyn ObjectStore,
219 prefix: Option<&str>,
220 ref_name: &RefName,
221 prior_full_sha: &Sha40,
222 current_full_sha: &Sha40,
223 source: &'static str,
224) -> bool {
225 match write_baseline_tombstone(store, prefix, ref_name, prior_full_sha, current_full_sha).await
226 {
227 Ok(()) => prior_full_sha != current_full_sha,
228 Err(e) => {
229 let orphan_key = keys::bundle_key(prefix, ref_name.as_str(), prior_full_sha.as_str());
230 warn!(
231 source,
232 ref_path = %ref_name.as_str(),
233 key = %orphan_key,
234 error = %e,
235 "baseline tombstone write failed (chain.json already committed); \
236 orphan bundle left for manual cleanup",
237 );
238 false
239 }
240 }
241}
242
243/// Environment variable that overrides [`DEFAULT_GRACE_HOURS`] when
244/// set to a positive integer. Mirrors the shape of
245/// `GIT_REMOTE_OBJECT_STORE_LOCK_TTL_SECONDS` used by the protocol REPL.
246pub(crate) const ENV_GC_GRACE_HOURS: &str = "GIT_REMOTE_OBJECT_STORE_GC_GRACE_HOURS";
247
248/// On-bucket schema version this build reads and writes.
249pub const TOMBSTONE_SCHEMA_VERSION: u32 = 1;
250
251/// Reject a parsed schema version that does not match
252/// [`TOMBSTONE_SCHEMA_VERSION`]. Shared by the two tombstone parsers
253/// ([`Tombstone::from_json_bytes`] and [`BaselineTombstone::from_json_bytes`])
254/// so both report the same `UnsupportedSchemaVersion` shape against
255/// the same constant.
256fn check_tombstone_schema_version(found: u32) -> Result<(), PackchainError> {
257 if found == TOMBSTONE_SCHEMA_VERSION {
258 Ok(())
259 } else {
260 Err(PackchainError::UnsupportedSchemaVersion {
261 found,
262 expected: TOMBSTONE_SCHEMA_VERSION,
263 })
264 }
265}
266
267/// On-bucket tombstone — a record of one mark phase's orphan set.
268///
269/// Lives at `<prefix>/gc/tombstones-<run_id>-<rfc3339>.json`. The
270/// timestamp in the filename is for human inspection; the
271/// authoritative `marked_at` is the field inside the JSON body.
272#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
273pub(crate) struct Tombstone {
274 /// Schema version. Always [`TOMBSTONE_SCHEMA_VERSION`] when written.
275 pub(crate) v: u32,
276 /// `UUIDv4` run identifier. Two concurrent `gc` runs each get a
277 /// distinct id, so their tombstone keys don't clobber.
278 pub(crate) run_id: String,
279 /// RFC 3339 timestamp at which the mark phase produced this set.
280 /// Sweep compares this against the grace window.
281 pub(crate) marked_at: String,
282 /// Content-shas of orphan packs at mark time. Sweep re-checks
283 /// each against the current chain state before deleting.
284 pub(crate) orphan_packs: Vec<Sha40>,
285}
286
287impl Tombstone {
288 /// Parse `bytes` as a tombstone JSON, validating the schema
289 /// version before returning.
290 ///
291 /// # Errors
292 ///
293 /// - [`PackchainError::ParseJson`] for malformed JSON / missing
294 /// fields / `Sha40` validation failures.
295 /// - [`PackchainError::UnsupportedSchemaVersion`] when `v` is not
296 /// [`TOMBSTONE_SCHEMA_VERSION`].
297 pub(crate) fn from_json_bytes(bytes: &[u8]) -> Result<Self, PackchainError> {
298 let parsed: Self = serde_json::from_slice(bytes)?;
299 check_tombstone_schema_version(parsed.v)?;
300 Ok(parsed)
301 }
302
303 /// Render to pretty-printed JSON bytes.
304 ///
305 /// # Errors
306 ///
307 /// `serde_json::to_vec_pretty` is infallible for this schema
308 /// today, but the function returns `Result` for forward
309 /// compatibility with future fields.
310 pub(crate) fn to_json_pretty(&self) -> Result<Vec<u8>, PackchainError> {
311 Ok(serde_json::to_vec_pretty(self)?)
312 }
313}
314
315/// On-bucket tombstone for a superseded baseline bundle (issues #134, #143).
316///
317/// Lives at `<prefix>/gc/baseline-tomb-<uuid>.json`. Written by
318/// [`super::compact`], [`super::push`], and the management
319/// `delete-branch` flow whenever a chain rewrite or ref removal makes
320/// a `<prefix>/<ref>/<sha>.bundle` unreachable. Unlike pack
321/// tombstones the body names a specific (ref, sha) — there is exactly
322/// one bundle key per record — so [`sweep`] does not need to re-derive
323/// an orphan set from listings.
324#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
325pub(crate) struct BaselineTombstone {
326 /// Schema version. Always [`TOMBSTONE_SCHEMA_VERSION`] when written.
327 pub(crate) v: u32,
328 /// RFC 3339 timestamp at which the tombstone was written. Sweep
329 /// compares this against the grace window.
330 pub(crate) marked_at: String,
331 /// Ref the orphaned bundle belonged to (e.g. `refs/heads/main`).
332 /// Stored as a raw string for forward compatibility with whatever
333 /// `RefName` accepts at sweep time.
334 pub(crate) ref_name: String,
335 /// Content-SHA of the bundle (matches the `<sha>.bundle` filename).
336 /// Sweep skips the delete when the ref's current `chain.full_at`
337 /// equals this SHA (a later push re-baselined to the same tip).
338 pub(crate) sha: Sha40,
339}
340
341impl BaselineTombstone {
342 /// Parse `bytes` as a baseline tombstone JSON, validating the
343 /// schema version before returning.
344 ///
345 /// # Errors
346 ///
347 /// - [`PackchainError::ParseJson`] for malformed JSON / missing
348 /// fields / `Sha40` validation failures.
349 /// - [`PackchainError::UnsupportedSchemaVersion`] when `v` is not
350 /// [`TOMBSTONE_SCHEMA_VERSION`].
351 pub(crate) fn from_json_bytes(bytes: &[u8]) -> Result<Self, PackchainError> {
352 let parsed: Self = serde_json::from_slice(bytes)?;
353 check_tombstone_schema_version(parsed.v)?;
354 Ok(parsed)
355 }
356
357 /// Render to pretty-printed JSON bytes.
358 pub(crate) fn to_json_pretty(&self) -> Result<Vec<u8>, PackchainError> {
359 Ok(serde_json::to_vec_pretty(self)?)
360 }
361}
362
363/// Write a baseline tombstone for the bundle at
364/// `<prefix>/<ref_name>/<sha>.bundle` (issue #134).
365///
366/// Called from [`super::compact`] and [`super::push`] after the new
367/// `chain.json` is durable — at that point the bundle has no chain
368/// reference and is eligible for deletion, but a fetch that loaded
369/// the prior chain may still be about to GET it. The tombstone
370/// defers the delete to the next `gc sweep` past the grace window.
371///
372/// `prior_full_sha` is the SHA of the superseded baseline; `current_full_sha`
373/// is the new chain's `full_at`. When they are equal the function
374/// returns without writing a tombstone — the keys alias the same live
375/// bundle (compact left `full_at` unchanged, or force-push targeted
376/// the same tip).
377///
378/// # Errors
379///
380/// Returns [`PackchainError::Store`] on a PUT failure. Callers run
381/// this AFTER `chain.json` is committed and must treat the failure as
382/// best-effort: log a warning and report success, since retrying
383/// would short-circuit through `AlreadyMinimal` and never re-attempt
384/// the cleanup.
385pub(crate) async fn write_baseline_tombstone(
386 store: &dyn ObjectStore,
387 prefix: Option<&str>,
388 ref_name: &RefName,
389 prior_full_sha: &Sha40,
390 current_full_sha: &Sha40,
391) -> Result<(), PackchainError> {
392 if prior_full_sha == current_full_sha {
393 return Ok(());
394 }
395 write_baseline_tombstone_unconditional(store, prefix, ref_name, prior_full_sha).await
396}
397
398/// Write a baseline tombstone naming `orphan_sha` regardless of any
399/// successor SHA — used by `delete-branch` (issue #143), which
400/// removes the chain entirely and has no replacement baseline to
401/// compare against. Otherwise identical in shape to
402/// [`write_baseline_tombstone`]: the body is parsed by
403/// [`sweep_one_baseline_tombstone`], which sees a `chain.json`-less
404/// ref (the synchronous sweep deletes it) and proceeds with the
405/// deferred bundle delete after the grace window.
406///
407/// # Errors
408///
409/// Returns [`PackchainError::Store`] on a PUT failure. Callers run
410/// this BEFORE the synchronous sweep that removes the rest of the
411/// ref's objects; a failure here should fall back to immediate
412/// bundle deletion so the operator's "ref is gone" intent is still
413/// satisfied, rather than leaving a half-tombstoned half-deleted
414/// state behind.
415pub(crate) async fn write_baseline_tombstone_for_orphan(
416 store: &dyn ObjectStore,
417 prefix: Option<&str>,
418 ref_name: &RefName,
419 orphan_sha: &Sha40,
420) -> Result<(), PackchainError> {
421 write_baseline_tombstone_unconditional(store, prefix, ref_name, orphan_sha).await
422}
423
424/// Attempt to tombstone the baseline bundle for a delete so the
425/// synchronous sweep loop can skip it (issue #143 / #203).
426/// Returns the bundle key that was deferred, or `None` when deferral
427/// is not actionable (bundle-engine ref with no `chain.json`,
428/// unparseable `chain.json`, no `<full_at>.bundle` in the listing, or
429/// a tombstone PUT failure).
430///
431/// The `None` fall-through is the correct behaviour for every
432/// "deferral is not actionable" case: with no tombstone, `gc sweep`
433/// has nothing to reclaim, so the sweep loop must remove the bundle
434/// synchronously instead. A logged warning surfaces the rarer
435/// load/parse/PUT failures for operator review without blocking the
436/// delete.
437///
438/// Runs UNDER the per-ref lock (#158): a concurrent push that landed
439/// between the tombstone and the chain.json delete would otherwise
440/// leave the bucket with a tombstone referencing a SHA no longer in
441/// the chain, and `gc sweep` would reclaim a live bundle.
442///
443/// `log_context` discriminates the warn-event source for log
444/// scraping (`"packchain delete"`, `"delete-branch"`, etc.).
445///
446/// Replaces the previous per-call-site clones in
447/// `manage::branch::ManageBranch::try_tombstone_baseline` and
448/// `packchain::push::try_tombstone_baseline_for_delete` (#221).
449pub(crate) async fn try_write_baseline_tombstone(
450 store: &dyn ObjectStore,
451 prefix: Option<&str>,
452 remote_ref: &RefName,
453 fresh: &[crate::object_store::ObjectMeta],
454 log_context: &'static str,
455) -> Option<String> {
456 let chain = match load_chain(store, prefix, remote_ref).await {
457 Ok(Some(chain)) => chain,
458 Ok(None) => return None,
459 Err(err) => {
460 warn!(
461 source = log_context,
462 ref_path = %remote_ref.as_str(),
463 error = %err,
464 "chain.json read/parse failed; falling back to synchronous bundle delete",
465 );
466 return None;
467 }
468 };
469 let bundle_key = keys::bundle_key(prefix, remote_ref.as_str(), chain.full_at.as_str());
470 // The baseline bundle must actually be in the under-lock listing
471 // — otherwise the deferred delete has nothing to defer (it was
472 // already gone, or the chain points outside our prefix). A
473 // mismatched `full_at` against listing reality is the canonical
474 // "chain.json points at a missing bundle" doctor case; immediate
475 // sweep is the right fallback there too.
476 if !fresh.iter().any(|m| m.key == bundle_key) {
477 return None;
478 }
479 match write_baseline_tombstone_for_orphan(store, prefix, remote_ref, &chain.full_at).await {
480 Ok(()) => Some(bundle_key),
481 Err(err) => {
482 warn!(
483 source = log_context,
484 ref_path = %remote_ref.as_str(),
485 key = %bundle_key,
486 error = %err,
487 "baseline tombstone write failed; falling back to synchronous bundle delete",
488 );
489 None
490 }
491 }
492}
493
494/// Return the set of bundle keys (full `<prefix>/<ref>/<sha>.bundle`
495/// paths) currently named by any baseline tombstone under
496/// `<prefix>/gc/baseline-tomb-*.json`.
497///
498/// Issue #157: the bundle engine derives `list`'s per-ref `<sha>`
499/// from the bundle keys themselves (unlike packchain, which reads
500/// `chain.tip`). Once a force-push tombstones the prior bundle, the
501/// listing must hide that bundle so:
502///
503/// 1. The `list` wire output does not advertise two SHAs for the
504/// same ref (which would emit two `<sha> <ref>\n` lines and
505/// confuse git), and
506/// 2. The under-lock multi-bundle guard in
507/// `crate::protocol::push::perform_push_under_lock` does not
508/// refuse the next push with the "multiple bundles" wire error.
509///
510/// The bundle keys themselves remain readable at their original
511/// paths — fetchers that already advertised the tombstoned SHA
512/// (issue #157's race) complete normally. `gc sweep` reclaims them
513/// after the grace window.
514///
515/// Returns full bucket keys, not stems, so callers can compare
516/// directly against `ObjectMeta::key` without re-deriving the prefix.
517/// A tombstone whose `ref_name` no longer parses as a `RefName` is
518/// skipped with a warn (mirrors `sweep_one_baseline_tombstone`'s
519/// invalid-ref handling) — the bundle key is unknowable but the
520/// tombstone itself remains for operator review.
521///
522/// # Errors
523///
524/// Propagates [`ObjectStoreError`] from the listing call. Per-tombstone
525/// parse failures are skipped with a warn — a single malformed
526/// tombstone must not block every listing.
527pub(crate) async fn tombstoned_bundle_keys(
528 store: &dyn ObjectStore,
529 prefix: Option<&str>,
530) -> Result<HashSet<String>, ObjectStoreError> {
531 let prefix_str = prefix.unwrap_or("");
532 let gc_listing = gc_listing_prefix(prefix_str);
533 let metas = match store.list(&gc_listing).await {
534 Ok(m) => m,
535 // NotFound on an unsupported listing prefix on a fresh bucket
536 // is normal — no tombstones to hide.
537 Err(ObjectStoreError::NotFound(_)) => return Ok(HashSet::new()),
538 Err(e) => return Err(e),
539 };
540 let mut keys = HashSet::new();
541 for meta in metas {
542 if !is_baseline_tombstone_key(&meta.key, prefix_str) {
543 continue;
544 }
545 let body = match store.get_bytes(&meta.key).await {
546 Ok(b) => b,
547 Err(ObjectStoreError::NotFound(_)) => continue, // raced delete
548 Err(e) => return Err(e),
549 };
550 let tombstone = match BaselineTombstone::from_json_bytes(&body) {
551 Ok(t) => t,
552 Err(e) => {
553 warn!(
554 key = %meta.key,
555 error = %e,
556 "tombstoned_bundle_keys: skipping unparseable baseline tombstone",
557 );
558 continue;
559 }
560 };
561 let Ok(ref_name) = RefName::new(tombstone.ref_name.clone()) else {
562 warn!(
563 key = %meta.key,
564 ref_name = %tombstone.ref_name,
565 "tombstoned_bundle_keys: skipping tombstone with invalid ref_name",
566 );
567 continue;
568 };
569 keys.insert(keys::bundle_key(prefix, &ref_name, tombstone.sha.as_str()));
570 }
571 Ok(keys)
572}
573
574/// Shared body of [`write_baseline_tombstone`] and
575/// [`write_baseline_tombstone_for_orphan`]: emit a
576/// `<prefix>/gc/baseline-tomb-<uuid>.json` record naming
577/// `(ref_name, orphan_sha)`. Centralised so the two call shapes
578/// (with-successor and unconditional) cannot drift on the JSON body
579/// shape or the key namespace.
580async fn write_baseline_tombstone_unconditional(
581 store: &dyn ObjectStore,
582 prefix: Option<&str>,
583 ref_name: &RefName,
584 orphan_sha: &Sha40,
585) -> Result<(), PackchainError> {
586 let marked_at = rfc3339_now()?;
587 let tombstone = BaselineTombstone {
588 v: TOMBSTONE_SCHEMA_VERSION,
589 marked_at,
590 ref_name: ref_name.as_str().to_owned(),
591 sha: orphan_sha.clone(),
592 };
593 let key = baseline_tombstone_key(prefix.unwrap_or(""), &Uuid::new_v4().to_string());
594 let body = Bytes::from(tombstone.to_json_pretty()?);
595 store.put_bytes(&key, body, PutOpts::default()).await?;
596 debug!(
597 key = %key,
598 ref_path = %ref_name.as_str(),
599 sha = %orphan_sha.as_str(),
600 "gc: baseline tombstone written",
601 );
602 Ok(())
603}
604
605/// Outcome of [`mark`].
606#[derive(Debug, Clone)]
607pub struct MarkOutcome {
608 /// `UUIDv4` run id assigned to this mark pass. Embedded in the
609 /// tombstone filename and body.
610 pub run_id: String,
611 /// Number of orphan packs identified.
612 pub orphan_count: usize,
613 /// Bucket key the tombstone was written to.
614 pub tombstone_key: String,
615}
616
617/// Outcome of [`sweep`].
618#[derive(Debug, Clone, Default)]
619pub struct SweepOutcome {
620 /// Tombstones whose packs were deleted (and which were themselves
621 /// deleted as a result).
622 pub swept_tombstones: usize,
623 /// Tombstones still inside the grace window — left for the next
624 /// sweep.
625 pub deferred_tombstones: usize,
626 /// Pack file deletions executed (counts both `.pack` and `.idx`
627 /// deletions, so two per orphan in the typical case).
628 pub deleted_objects: usize,
629 /// Tombstoned packs that were no longer orphan at sweep time
630 /// (re-referenced between mark and sweep, or deleted by an
631 /// earlier sweep). Skipped without error.
632 pub skipped_repointed_packs: usize,
633}
634
635/// Knobs for [`mark`].
636#[derive(Debug, Clone, Copy, Default)]
637pub struct MarkOpts {
638 /// When `true`, list and report but do not write a tombstone file
639 /// or modify the bucket. Used by `doctor` to surface orphan stats.
640 pub dry_run: bool,
641}
642
643/// Knobs for [`sweep`].
644#[derive(Debug, Clone, Copy)]
645pub struct SweepOpts {
646 /// Grace duration in hours. Tombstones with `marked_at` younger
647 /// than this stay deferred. Ignored when `force` is `true`.
648 pub grace_hours: u64,
649 /// When `true`, skip the grace check. The live-pack re-derive
650 /// still runs — a tombstone whose SHA is now referenced by a
651 /// committed chain is left alone (closes the mark/commit race
652 /// from #117). The grace window is the only safety check this
653 /// flag suppresses; concurrent fetches that still hold a SHA in
654 /// flight are NOT protected by either path.
655 pub force: bool,
656}
657
658impl Default for SweepOpts {
659 fn default() -> Self {
660 Self {
661 grace_hours: DEFAULT_GRACE_HOURS,
662 force: false,
663 }
664 }
665}
666
667/// Read [`DEFAULT_GRACE_HOURS`] subject to the
668/// [`ENV_GC_GRACE_HOURS`] override. Returns the default for unset
669/// vars, non-numeric values, or zero (a zero grace would defeat the
670/// mark/sweep design's point).
671#[must_use]
672pub(crate) fn grace_hours_from_env() -> u64 {
673 std::env::var(ENV_GC_GRACE_HOURS)
674 .ok()
675 .and_then(|v| v.parse::<u64>().ok())
676 .filter(|h| *h > 0)
677 .unwrap_or(DEFAULT_GRACE_HOURS)
678}
679
680/// Resolve a caller-supplied `Option<u64>` grace-hours value to a
681/// concrete count, deferring to [`grace_hours_from_env`] when the
682/// caller passes `None` (#221).
683///
684/// Unlike [`crate::protocol::push::resolve_lock_ttl_seconds`], this
685/// resolver does **not** clamp `Some(0)`. A zero grace window is a
686/// legitimate operator intent — "sweep without a grace window, e.g.
687/// in force mode" — and is the canonical value used by force-sweep
688/// tests (`SweepOpts { grace_hours: 0, force: true }`). The lock-TTL
689/// clamp protects against `acquire_lock` treating every held lock as
690/// instantly stale (#208); grace-hours has no analogous foot-gun.
691#[must_use]
692pub(crate) fn resolve_grace_hours(opt: Option<u64>) -> u64 {
693 opt.unwrap_or_else(grace_hours_from_env)
694}
695
696/// Run the mark phase: snapshot every pack on the bucket, then every
697/// chain, then write a tombstone naming the orphans.
698///
699/// `prefix` is the repository prefix without leading or trailing
700/// slashes — pass an empty string for bucket-root repositories.
701///
702/// # Ordering
703///
704/// Listings run packs-first, chains-second (issue #135). The reverse
705/// order races a concurrent push that uploads a new pack between the
706/// two listings and commits its `chain.json` *between the chain list
707/// and the pack list*: the new pack would appear in the on-bucket set
708/// but not in the referenced set, producing a false-positive tombstone.
709/// Packs-first inverts the staleness: the referenced set is always at
710/// least as fresh as the on-bucket set, so a pack appearing in the
711/// snapshot is either also in the chain set (saved) or genuinely
712/// orphan at some point during the mark (correctly tombstoned, with
713/// the grace window covering in-flight pushes).
714///
715/// # Errors
716///
717/// - Any chain.json that fails to parse aborts the mark with
718/// [`PackchainError::ParseJson`] / [`PackchainError::InvalidSha`] /
719/// [`PackchainError::UnsupportedSchemaVersion`]. The tombstone is
720/// not written. Operators must repair the bad chain (or remove it)
721/// before re-running.
722/// - [`PackchainError::Store`] / [`PackchainError::Io`] for transport
723/// or local-I/O failures.
724///
725/// # Example
726///
727/// ```no_run
728/// # #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> {
729/// use git_remote_object_store::Remote;
730/// use git_remote_object_store::packchain::gc::{MarkOpts, mark};
731///
732/// let remote = Remote::connect("s3+https://bucket/repo?engine=packchain").await?;
733/// let outcome = mark(remote.store(), remote.prefix(), MarkOpts::default()).await?;
734/// println!(
735/// "{} orphan pack(s) tombstoned (run id {})",
736/// outcome.orphan_count, outcome.run_id,
737/// );
738/// # Ok(())
739/// # }
740/// ```
741pub async fn mark(
742 store: &dyn ObjectStore,
743 prefix: &str,
744 opts: MarkOpts,
745) -> Result<MarkOutcome, PackchainError> {
746 // Packs-first ordering (issue #135): the on-bucket snapshot must
747 // be at least as stale as the referenced set. A pack uploaded by a
748 // concurrent push between these two listings is harmless — it
749 // either is not in `on_bucket` (uploaded after the pack list) or
750 // is in `referenced` (uploaded before the pack list AND its
751 // chain.json committed before the chain list). The reverse order
752 // produces false-positive tombstones; see the module docstring.
753 let on_bucket = list_pack_shas(store, prefix).await?;
754 let referenced = list_referenced_packs(store, prefix).await?;
755 let orphans: Vec<Sha40> = on_bucket
756 .into_iter()
757 .filter(|sha| !referenced.contains(sha))
758 .collect();
759
760 let run_id = Uuid::new_v4().to_string();
761 let marked_at = rfc3339_now()?;
762 let tombstone_key = tombstone_key(prefix, &run_id, &marked_at);
763 let orphan_count = orphans.len();
764 let tombstone = Tombstone {
765 v: TOMBSTONE_SCHEMA_VERSION,
766 run_id: run_id.clone(),
767 marked_at,
768 orphan_packs: orphans,
769 };
770 let outcome = MarkOutcome {
771 run_id,
772 orphan_count,
773 tombstone_key,
774 };
775
776 if opts.dry_run {
777 debug!(
778 run_id = %outcome.run_id,
779 orphans = outcome.orphan_count,
780 "gc mark: dry-run, not writing tombstone",
781 );
782 return Ok(outcome);
783 }
784
785 if outcome.orphan_count == 0 {
786 info!(run_id = %outcome.run_id, "gc mark: no orphans; skipping tombstone");
787 return Ok(outcome);
788 }
789
790 let body = Bytes::from(tombstone.to_json_pretty()?);
791 store
792 .put_bytes(&outcome.tombstone_key, body, PutOpts::default())
793 .await?;
794 info!(
795 run_id = %outcome.run_id,
796 orphans = outcome.orphan_count,
797 key = %outcome.tombstone_key,
798 "gc mark: tombstone written",
799 );
800 Ok(outcome)
801}
802
803/// Run the sweep phase: walk tombstones, delete eligible orphans.
804///
805/// `prefix` and the threading semantics match [`mark`].
806///
807/// # Errors
808///
809/// Sweep is best-effort: a single tombstone failure does not abort
810/// the run (errors are logged and the next tombstone is tried).
811/// Returns [`PackchainError::Store`] only when the initial
812/// tombstone-list call fails.
813///
814/// # Example
815///
816/// ```no_run
817/// # #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> {
818/// use git_remote_object_store::Remote;
819/// use git_remote_object_store::packchain::gc::{SweepOpts, sweep};
820///
821/// let remote = Remote::connect("s3+https://bucket/repo?engine=packchain").await?;
822/// let outcome = sweep(
823/// remote.store(),
824/// remote.prefix(),
825/// SweepOpts::default(),
826/// )
827/// .await?;
828/// println!(
829/// "swept {} tombstone(s), deleted {} object(s), deferred {}",
830/// outcome.swept_tombstones,
831/// outcome.deleted_objects,
832/// outcome.deferred_tombstones,
833/// );
834/// # Ok(())
835/// # }
836/// ```
837pub async fn sweep(
838 store: &dyn ObjectStore,
839 prefix: &str,
840 opts: SweepOpts,
841) -> Result<SweepOutcome, PackchainError> {
842 let tombstones_prefix = gc_listing_prefix(prefix);
843 let metas = store.list(&tombstones_prefix).await?;
844 let mut outcome = SweepOutcome::default();
845
846 if opts.force {
847 warn!("gc sweep: --force in effect; skipping grace window");
848 }
849
850 for meta in metas {
851 if !meta.key.as_bytes().ends_with(b".json") {
852 continue;
853 }
854 let step = if is_tombstone_key(&meta.key, prefix) {
855 sweep_one_tombstone(store, prefix, &meta.key, opts).await
856 } else if is_baseline_tombstone_key(&meta.key, prefix) {
857 sweep_one_baseline_tombstone(store, prefix, &meta.key, opts).await
858 } else {
859 continue;
860 };
861 match step {
862 Ok(SweepStep::Deferred) => outcome.deferred_tombstones += 1,
863 Ok(SweepStep::Swept {
864 deleted_objects,
865 skipped_repointed_packs,
866 }) => {
867 outcome.swept_tombstones += 1;
868 outcome.deleted_objects += deleted_objects;
869 outcome.skipped_repointed_packs += skipped_repointed_packs;
870 }
871 Err(e) => {
872 warn!(key = %meta.key, error = %e, "gc sweep: tombstone failed");
873 }
874 }
875 }
876 Ok(outcome)
877}
878
879#[derive(Debug)]
880enum SweepStep {
881 Deferred,
882 Swept {
883 deleted_objects: usize,
884 skipped_repointed_packs: usize,
885 },
886}
887
888async fn sweep_one_tombstone(
889 store: &dyn ObjectStore,
890 prefix: &str,
891 tombstone_key: &str,
892 opts: SweepOpts,
893) -> Result<SweepStep, PackchainError> {
894 let body = match store.get_bytes(tombstone_key).await {
895 Ok(b) => b,
896 Err(ObjectStoreError::NotFound(_)) => {
897 // Concurrent sweep already cleaned this up.
898 return Ok(SweepStep::Swept {
899 deleted_objects: 0,
900 skipped_repointed_packs: 0,
901 });
902 }
903 Err(e) => return Err(PackchainError::Store(e)),
904 };
905 let tombstone = Tombstone::from_json_bytes(&body)?;
906
907 if !opts.force
908 && check_grace_window(&tombstone.marked_at, opts.grace_hours, "tombstone")?
909 == GraceDecision::Within
910 {
911 debug!(
912 key = %tombstone_key,
913 marked_at = %tombstone.marked_at,
914 "gc sweep: tombstone within grace window",
915 );
916 return Ok(SweepStep::Deferred);
917 }
918
919 // Re-derive the live referenced set per pack delete, AFTER the
920 // grace check passes — never cache across iterations (issue #140
921 // for the cross-tombstone race, issue #152 for the cross-pack
922 // race inside a single tombstone). A concurrent push committing
923 // chain.json after a per-tombstone snapshot still loses a live
924 // pack named later in the same tombstone's `orphan_packs` vector,
925 // because `mark()` packs all orphans for a run into one tombstone
926 // body. Force-revert is the canonical trigger: gix pack emission
927 // is deterministic for the same object set, so the new pack key
928 // aliases the tombstoned one and the push skips upload, only
929 // touching chain.json. Per-pack re-listing costs one extra
930 // `list("refs/")` + bounded-parallel chain GETs per orphan pack;
931 // for the rare GC maintenance path this is acceptable overhead in
932 // exchange for closing the cross-pack window. The recompute also
933 // runs under --force: that flag suppresses the grace window only,
934 // NOT this guard (issue #117). A residual TOCTOU window remains
935 // between this listing and the delete that follows; the fix
936 // shrinks the window from "one snapshot per tombstone" to "one
937 // snapshot per pack", which is the bound the issue asks for.
938 let mut deleted_objects = 0usize;
939 let mut skipped_repointed_packs = 0usize;
940 for sha in &tombstone.orphan_packs {
941 // Always honour the live-pack guard, including under --force.
942 // See the recompute comment above and issue #117 for why.
943 let referenced = list_referenced_packs(store, prefix).await?;
944 if referenced.contains(sha) {
945 skipped_repointed_packs += 1;
946 debug!(
947 sha = %sha.as_str(),
948 "gc sweep: tombstoned pack re-referenced; skipping",
949 );
950 continue;
951 }
952 let pack_key = super::keys::pack_key(Some(prefix), sha);
953 let idx_key = super::keys::pack_idx_key(Some(prefix), sha);
954 if delete_idempotent(store, &pack_key).await? {
955 deleted_objects += 1;
956 }
957 if delete_idempotent(store, &idx_key).await? {
958 deleted_objects += 1;
959 }
960 }
961 // Drop the tombstone last so a sweep crash mid-deletion leaves a
962 // tombstone the next sweep can finish.
963 delete_idempotent(store, tombstone_key).await?;
964 info!(
965 key = %tombstone_key,
966 deleted = deleted_objects,
967 skipped = skipped_repointed_packs,
968 "gc sweep: tombstone applied",
969 );
970 Ok(SweepStep::Swept {
971 deleted_objects,
972 skipped_repointed_packs,
973 })
974}
975
976/// Sweep one baseline tombstone (issue #134). Parses the tombstone,
977/// honours the grace window, re-checks the ref's current `chain.full_at`
978/// to skip a re-baselined-to-same-SHA case, and then idempotently
979/// deletes both the bundle and the tombstone.
980///
981/// The live-state recheck mirrors the pack sweep's
982/// `referenced.contains` guard: a tombstone written by a force-push
983/// can be invalidated by a subsequent force-push that lands on the
984/// same SHA, in which case the bundle is once again live.
985async fn sweep_one_baseline_tombstone(
986 store: &dyn ObjectStore,
987 prefix: &str,
988 tombstone_key: &str,
989 opts: SweepOpts,
990) -> Result<SweepStep, PackchainError> {
991 let body = match store.get_bytes(tombstone_key).await {
992 Ok(b) => b,
993 Err(ObjectStoreError::NotFound(_)) => {
994 return Ok(SweepStep::Swept {
995 deleted_objects: 0,
996 skipped_repointed_packs: 0,
997 });
998 }
999 Err(e) => return Err(PackchainError::Store(e)),
1000 };
1001 let tombstone = BaselineTombstone::from_json_bytes(&body)?;
1002
1003 if !opts.force
1004 && check_grace_window(&tombstone.marked_at, opts.grace_hours, "baseline tombstone")?
1005 == GraceDecision::Within
1006 {
1007 debug!(
1008 key = %tombstone_key,
1009 marked_at = %tombstone.marked_at,
1010 "gc sweep: baseline tombstone within grace window",
1011 );
1012 return Ok(SweepStep::Deferred);
1013 }
1014
1015 // Re-check the live chain. A subsequent push that re-baselined to
1016 // the same SHA (force-push at the same tip, or compact short-cut)
1017 // makes this bundle live again — leave it alone, drop the now-stale
1018 // tombstone. A missing ref (chain deleted) means the bundle is also
1019 // unreachable; proceed with the delete.
1020 let ref_name = match RefName::new(tombstone.ref_name.clone()) {
1021 Ok(r) => r,
1022 Err(e) => {
1023 // Issue #146: a tombstone whose recorded ref_name no longer
1024 // parses as a RefName cannot be turned into a bundle key, so
1025 // deleting the tombstone would orphan the bundle with no
1026 // record on the bucket. Preserve both records and surface
1027 // the ref_name + tombstone key at error! so an operator can
1028 // locate and reconcile the corruption manually. Reachable
1029 // today only via manual bucket tampering or a future schema
1030 // tweak that loosens what BaselineTombstone::ref_name
1031 // accepts at write time relative to RefName::new at read
1032 // time.
1033 error!(
1034 key = %tombstone_key,
1035 ref_name = %tombstone.ref_name,
1036 sha = %tombstone.sha.as_str(),
1037 error = %e,
1038 "gc sweep: baseline tombstone names invalid ref; preserving \
1039 tombstone and bundle for operator review",
1040 );
1041 return Ok(SweepStep::Deferred);
1042 }
1043 };
1044 let prefix_opt = (!prefix.is_empty()).then_some(prefix);
1045 let chain = load_chain(store, prefix_opt, &ref_name).await?;
1046 let mut skipped_repointed_packs = 0usize;
1047 let mut deleted_objects = 0usize;
1048 let still_live = chain.as_ref().is_some_and(|c| c.full_at == tombstone.sha);
1049 if still_live {
1050 skipped_repointed_packs += 1;
1051 debug!(
1052 key = %tombstone_key,
1053 ref_path = %ref_name.as_str(),
1054 sha = %tombstone.sha.as_str(),
1055 "gc sweep: baseline re-referenced; skipping delete",
1056 );
1057 } else {
1058 // Issue #153: re-read the chain IMMEDIATELY before deleting the
1059 // bundle. The earlier `load_chain` above closes the common
1060 // stale-tombstone path, but a concurrent force-push or compact
1061 // can re-baseline the ref to the tombstoned SHA between that
1062 // read and the delete that follows; without this recheck, sweep
1063 // would erase a now-live bundle and then drop the tombstone.
1064 // The window between this recheck and the bundle delete is
1065 // bounded by a single network round-trip — the same bound the
1066 // pack-tombstone path uses (issues #140, #152). When the
1067 // recheck catches the race, return `Deferred` so the tombstone
1068 // is preserved for a future sweep (mirrors #146's
1069 // operator-review pattern) — the chain may flip back to a
1070 // different SHA before then, in which case the tombstone
1071 // becomes actionable again.
1072 let recheck = load_chain(store, prefix_opt, &ref_name).await?;
1073 if recheck.as_ref().is_some_and(|c| c.full_at == tombstone.sha) {
1074 debug!(
1075 key = %tombstone_key,
1076 ref_path = %ref_name.as_str(),
1077 sha = %tombstone.sha.as_str(),
1078 "gc sweep: baseline re-referenced between checks; deferring",
1079 );
1080 return Ok(SweepStep::Deferred);
1081 }
1082 let bundle_key = keys::bundle_key(prefix_opt, &ref_name, tombstone.sha.as_str());
1083 if delete_idempotent(store, &bundle_key).await? {
1084 deleted_objects += 1;
1085 }
1086 }
1087 // Drop the tombstone last so a crash mid-delete leaves it for the
1088 // next sweep to finish.
1089 delete_idempotent(store, tombstone_key).await?;
1090 info!(
1091 key = %tombstone_key,
1092 deleted = deleted_objects,
1093 skipped = skipped_repointed_packs,
1094 "gc sweep: baseline tombstone applied",
1095 );
1096 Ok(SweepStep::Swept {
1097 deleted_objects,
1098 skipped_repointed_packs,
1099 })
1100}
1101
1102/// `<prefix>/gc/` prefix for [`ObjectStore::list`]. Empty `prefix`
1103/// drops the leading slash (matches the project's bucket-root rule).
1104fn gc_listing_prefix(prefix: &str) -> String {
1105 keys::join(Some(prefix), "gc/")
1106}
1107
1108/// Build a tombstone key. The `marked_at` segment may contain `:`
1109/// characters; S3 / Azure both accept colons in keys.
1110fn tombstone_key(prefix: &str, run_id: &str, marked_at: &str) -> String {
1111 keys::join(
1112 Some(prefix),
1113 &format!("gc/tombstones-{run_id}-{marked_at}.json"),
1114 )
1115}
1116
1117/// Key-namespace fragment for baseline tombstones (issue #134).
1118/// Composed with a bucket prefix via [`keys::join`] / [`baseline_tombstone_listing_prefix`]
1119/// to form the full listable prefix; the UUID-suffixed body filename
1120/// is appended by [`baseline_tombstone_key`].
1121///
1122/// Single source of truth for the on-bucket key shape — production
1123/// builders and test assertions both compose against this constant
1124/// rather than embedding the literal string (#221).
1125pub(crate) const BASELINE_TOMBSTONE_KEY_FRAGMENT: &str = "gc/baseline-tomb-";
1126
1127/// Build a baseline tombstone key. UUID-keyed so concurrent compacts
1128/// / force-pushes across different refs never clobber, and the
1129/// timestamp lives in the body rather than the filename to keep the
1130/// `is_baseline_tombstone_key` predicate cheap.
1131fn baseline_tombstone_key(prefix: &str, run_id: &str) -> String {
1132 keys::join(
1133 Some(prefix),
1134 &format!("{BASELINE_TOMBSTONE_KEY_FRAGMENT}{run_id}.json"),
1135 )
1136}
1137
1138/// Listable prefix for every baseline tombstone under `prefix`
1139/// (e.g. `"repo/gc/baseline-tomb-"`). Composes
1140/// [`BASELINE_TOMBSTONE_KEY_FRAGMENT`] with the bucket prefix via
1141/// [`keys::join`] so callers don't open-code the literal.
1142pub(crate) fn baseline_tombstone_listing_prefix(prefix: Option<&str>) -> String {
1143 keys::join(prefix, BASELINE_TOMBSTONE_KEY_FRAGMENT)
1144}
1145
1146/// Robust check that `key` is a tombstone under our prefix. Guards
1147/// against unrelated `.json` files in `<prefix>/gc/` and against a
1148/// regression where a future schema rev moves the prefix.
1149///
1150/// Root-prefix (`prefix == ""`) case: `expected_prefix` is just
1151/// `"gc/tombstones-"`, so every `gc/tombstones-*.json` key at the
1152/// bucket root matches. That is the intended behaviour — a root
1153/// repo owns the entire `gc/` namespace.
1154fn is_tombstone_key(key: &str, prefix: &str) -> bool {
1155 let expected_prefix = keys::join(Some(prefix), "gc/tombstones-");
1156 key.starts_with(&expected_prefix)
1157}
1158
1159/// Robust check that `key` is a baseline tombstone under our prefix
1160/// (issue #134). Mirrors [`is_tombstone_key`] for the
1161/// [`BASELINE_TOMBSTONE_KEY_FRAGMENT`] namespace.
1162fn is_baseline_tombstone_key(key: &str, prefix: &str) -> bool {
1163 key.starts_with(&baseline_tombstone_listing_prefix(Some(prefix)))
1164}
1165
1166/// List every `<prefix>/refs/**/chain.json` (across every ref
1167/// namespace — `refs/heads/`, `refs/tags/`, `refs/notes/`, etc.) and
1168/// union the pack content-shas they reference. Fail closed on parse
1169/// error.
1170async fn list_referenced_packs(
1171 store: &dyn ObjectStore,
1172 prefix: &str,
1173) -> Result<HashSet<Sha40>, PackchainError> {
1174 let refs_prefix = keys::join(Some(prefix), "refs/");
1175 let metas = store.list(&refs_prefix).await?;
1176
1177 // Bounded-parallel `get_bytes` per chain.json, parse-as-fetched.
1178 // Mirrors `list::list_refs` (#89 widened the listing prefix to
1179 // all `refs/` namespaces, so candidate count scales with branches
1180 // + tags + notes). `MAX_FETCH_CONCURRENCY` (= 8) is the same bound
1181 // Phase 3 fetch uses for chain pack downloads. `try_fold` folds
1182 // each body into the set as soon as `buffer_unordered` yields it,
1183 // so parse overlaps the next batch's fetch latency and no
1184 // intermediate `Vec<Bytes>` is held.
1185 //
1186 // Fail-closed semantics: a transport failure on any GET, or a
1187 // parse failure on any chain, aborts the run — the mark phase
1188 // cannot tombstone live packs because of an under-reporting
1189 // corrupt chain.
1190 futures::stream::iter(
1191 metas
1192 .into_iter()
1193 .filter(|m| super::keys::is_chain_json_key(&m.key))
1194 .map(|m| m.key),
1195 )
1196 .map(|key| async move { store.get_bytes(&key).await.map_err(PackchainError::Store) })
1197 .buffer_unordered(MAX_FETCH_CONCURRENCY)
1198 .try_fold(HashSet::<Sha40>::new(), |mut acc, body| async move {
1199 let chain = ChainManifest::from_json_bytes(&body)?;
1200 for segment in chain.segments {
1201 // gc fails closed on a malformed pack key — the chain is
1202 // corrupt and tombstoning live packs based on it would be
1203 // unsafe. Uses the same `MalformedPackEntry` variant as
1204 // every other consumer (read, fetch, compact) so error
1205 // wording stays aligned across the engine.
1206 let sha = super::keys::segment_pack_sha(&segment)?;
1207 acc.insert(sha);
1208 }
1209 Ok(acc)
1210 })
1211 .await
1212}
1213
1214/// List every `<prefix>/packs/*.pack` and `*.idx` and return the union
1215/// of their content-shas. The set is keyed by sha so a pack with a
1216/// missing-but-tombstoneable idx still counts (and vice versa).
1217async fn list_pack_shas(
1218 store: &dyn ObjectStore,
1219 prefix: &str,
1220) -> Result<HashSet<Sha40>, PackchainError> {
1221 let packs_prefix = keys::join(Some(prefix), "packs/");
1222 let metas = store.list(&packs_prefix).await?;
1223 let mut shas: HashSet<Sha40> = HashSet::new();
1224 for meta in metas {
1225 let basename = meta
1226 .key
1227 .rsplit('/')
1228 .next()
1229 .expect("rsplit('/') on a non-empty key yields at least one element");
1230 let candidate = basename
1231 .strip_suffix(".pack")
1232 .or_else(|| basename.strip_suffix(".idx"));
1233 if let Some(sha) = candidate
1234 && let Ok(parsed) = Sha40::try_new(sha)
1235 {
1236 shas.insert(parsed);
1237 }
1238 }
1239 Ok(shas)
1240}
1241
1242/// Best-effort delete: returns `Ok(true)` on a real delete, `Ok(false)`
1243/// when the object was already absent (concurrent sweep raced ahead,
1244/// or a partial sweep ran earlier).
1245async fn delete_idempotent(store: &dyn ObjectStore, key: &str) -> Result<bool, PackchainError> {
1246 match store.delete(key).await {
1247 Ok(()) => Ok(true),
1248 Err(ObjectStoreError::NotFound(_)) => Ok(false),
1249 Err(e) => Err(PackchainError::Store(e)),
1250 }
1251}
1252
1253#[cfg(test)]
1254mod tests {
1255 use super::*;
1256 use crate::git::RefName;
1257 use crate::object_store::mock::MockStore;
1258 use crate::packchain::manifest::write_chain;
1259 use crate::packchain::schema::ChainSegment;
1260
1261 const SHA_TIP: &str = "0000000000000000000000000000000000000001";
1262 const SHA_FULL: &str = "0000000000000000000000000000000000000002";
1263 const SHA_PACK_LIVE: &str = "1111111111111111111111111111111111111111";
1264 const SHA_PACK_ORPHAN: &str = "2222222222222222222222222222222222222222";
1265 const SHA_PACK_ORPHAN_2: &str = "3333333333333333333333333333333333333333";
1266
1267 fn sha40(s: &str) -> Sha40 {
1268 Sha40::try_new(s).unwrap()
1269 }
1270
1271 fn ref_main() -> RefName {
1272 RefName::new("refs/heads/main").unwrap()
1273 }
1274
1275 fn segment(pack_sha: &str, parent: Option<&str>) -> ChainSegment {
1276 ChainSegment {
1277 sha: sha40(SHA_TIP),
1278 parent_sha: parent.map(sha40),
1279 pack: format!("packs/{pack_sha}.pack"),
1280 bytes: 1_024,
1281 }
1282 }
1283
1284 async fn seed_live_chain(store: &MockStore, prefix: Option<&str>) {
1285 let chain = ChainManifest {
1286 v: 1,
1287 tip: sha40(SHA_TIP),
1288 full_at: sha40(SHA_FULL),
1289 segments: vec![segment(SHA_PACK_LIVE, None)],
1290 };
1291 write_chain(store, prefix, &ref_main(), &chain)
1292 .await
1293 .unwrap();
1294 }
1295
1296 fn insert_pack_pair(store: &MockStore, prefix: Option<&str>, sha: &str) {
1297 let pack_key = super::super::keys::pack_key(prefix, &sha40(sha));
1298 let idx_key = super::super::keys::pack_idx_key(prefix, &sha40(sha));
1299 store.insert(pack_key, Bytes::from_static(b"PACKDATA"));
1300 store.insert(idx_key, Bytes::from_static(b"IDXDATA"));
1301 }
1302
1303 // --- mark -----------------------------------------------------------
1304
1305 #[tokio::test]
1306 async fn mark_with_no_chains_treats_all_packs_as_orphan() {
1307 let store = MockStore::new();
1308 insert_pack_pair(&store, Some("repo"), SHA_PACK_ORPHAN);
1309 let outcome = mark(&store, "repo", MarkOpts::default()).await.unwrap();
1310 assert_eq!(outcome.orphan_count, 1);
1311 // Tombstone written to the correct prefix.
1312 let body = store.get_bytes(&outcome.tombstone_key).await.unwrap();
1313 let parsed = Tombstone::from_json_bytes(&body).unwrap();
1314 assert_eq!(parsed.orphan_packs, vec![sha40(SHA_PACK_ORPHAN)]);
1315 }
1316
1317 #[tokio::test]
1318 async fn mark_skips_chain_referenced_packs() {
1319 let store = MockStore::new();
1320 seed_live_chain(&store, Some("repo")).await;
1321 insert_pack_pair(&store, Some("repo"), SHA_PACK_LIVE);
1322 insert_pack_pair(&store, Some("repo"), SHA_PACK_ORPHAN);
1323 let outcome = mark(&store, "repo", MarkOpts::default()).await.unwrap();
1324 assert_eq!(outcome.orphan_count, 1);
1325 let body = store.get_bytes(&outcome.tombstone_key).await.unwrap();
1326 let parsed = Tombstone::from_json_bytes(&body).unwrap();
1327 assert_eq!(parsed.orphan_packs, vec![sha40(SHA_PACK_ORPHAN)]);
1328 }
1329
1330 #[tokio::test]
1331 async fn mark_no_orphans_skips_tombstone_write() {
1332 let store = MockStore::new();
1333 seed_live_chain(&store, Some("repo")).await;
1334 insert_pack_pair(&store, Some("repo"), SHA_PACK_LIVE);
1335 let outcome = mark(&store, "repo", MarkOpts::default()).await.unwrap();
1336 assert_eq!(outcome.orphan_count, 0);
1337 // No tombstone listed.
1338 let metas = store.list("repo/gc/").await.unwrap();
1339 assert!(
1340 metas.is_empty(),
1341 "tombstone must not exist for empty orphan set"
1342 );
1343 }
1344
1345 #[tokio::test]
1346 async fn mark_dry_run_does_not_write_tombstone() {
1347 let store = MockStore::new();
1348 insert_pack_pair(&store, Some("repo"), SHA_PACK_ORPHAN);
1349 let outcome = mark(&store, "repo", MarkOpts { dry_run: true })
1350 .await
1351 .unwrap();
1352 assert_eq!(outcome.orphan_count, 1);
1353 let metas = store.list("repo/gc/").await.unwrap();
1354 assert!(metas.is_empty(), "dry-run must not write tombstone");
1355 }
1356
1357 #[tokio::test]
1358 async fn mark_treats_tag_chain_referenced_packs_as_live() {
1359 // A pack referenced only from a chain under refs/tags/ must
1360 // not be tombstoned. (Regression for issue #89.)
1361 let store = MockStore::new();
1362 let chain = ChainManifest {
1363 v: 1,
1364 tip: sha40(SHA_TIP),
1365 full_at: sha40(SHA_FULL),
1366 segments: vec![segment(SHA_PACK_LIVE, None)],
1367 };
1368 let tag_ref = RefName::new("refs/tags/v1").unwrap();
1369 write_chain(&store, Some("repo"), &tag_ref, &chain)
1370 .await
1371 .unwrap();
1372 insert_pack_pair(&store, Some("repo"), SHA_PACK_LIVE);
1373 insert_pack_pair(&store, Some("repo"), SHA_PACK_ORPHAN);
1374
1375 let referenced = list_referenced_packs(&store, "repo").await.unwrap();
1376 assert!(
1377 referenced.contains(&sha40(SHA_PACK_LIVE)),
1378 "pack referenced from refs/tags/ chain must be in the live set",
1379 );
1380
1381 let outcome = mark(&store, "repo", MarkOpts::default()).await.unwrap();
1382 assert_eq!(outcome.orphan_count, 1);
1383 let body = store.get_bytes(&outcome.tombstone_key).await.unwrap();
1384 let parsed = Tombstone::from_json_bytes(&body).unwrap();
1385 assert_eq!(parsed.orphan_packs, vec![sha40(SHA_PACK_ORPHAN)]);
1386 }
1387
1388 #[tokio::test]
1389 async fn mark_treats_notes_chain_referenced_packs_as_live() {
1390 // refs/notes/commits is the standard git notes ref. A pack
1391 // referenced only from a notes chain must not be tombstoned.
1392 let store = MockStore::new();
1393 let chain = ChainManifest {
1394 v: 1,
1395 tip: sha40(SHA_TIP),
1396 full_at: sha40(SHA_FULL),
1397 segments: vec![segment(SHA_PACK_LIVE, None)],
1398 };
1399 let notes_ref = RefName::new("refs/notes/commits").unwrap();
1400 write_chain(&store, Some("repo"), ¬es_ref, &chain)
1401 .await
1402 .unwrap();
1403 insert_pack_pair(&store, Some("repo"), SHA_PACK_LIVE);
1404
1405 let referenced = list_referenced_packs(&store, "repo").await.unwrap();
1406 assert!(
1407 referenced.contains(&sha40(SHA_PACK_LIVE)),
1408 "pack referenced from refs/notes/ chain must be in the live set",
1409 );
1410
1411 let outcome = mark(&store, "repo", MarkOpts::default()).await.unwrap();
1412 assert_eq!(outcome.orphan_count, 0);
1413 }
1414
1415 #[tokio::test]
1416 async fn list_referenced_packs_unions_across_namespaces() {
1417 // A live chain in refs/heads/ AND in refs/tags/ both
1418 // contribute to the referenced set.
1419 let store = MockStore::new();
1420 let head_chain = ChainManifest {
1421 v: 1,
1422 tip: sha40(SHA_TIP),
1423 full_at: sha40(SHA_FULL),
1424 segments: vec![segment(SHA_PACK_LIVE, None)],
1425 };
1426 write_chain(&store, Some("repo"), &ref_main(), &head_chain)
1427 .await
1428 .unwrap();
1429 let tag_chain = ChainManifest {
1430 v: 1,
1431 tip: sha40(SHA_TIP),
1432 full_at: sha40(SHA_FULL),
1433 segments: vec![segment(SHA_PACK_ORPHAN_2, None)],
1434 };
1435 let tag_ref = RefName::new("refs/tags/v1").unwrap();
1436 write_chain(&store, Some("repo"), &tag_ref, &tag_chain)
1437 .await
1438 .unwrap();
1439
1440 let referenced = list_referenced_packs(&store, "repo").await.unwrap();
1441 assert!(referenced.contains(&sha40(SHA_PACK_LIVE)));
1442 assert!(referenced.contains(&sha40(SHA_PACK_ORPHAN_2)));
1443 assert_eq!(referenced.len(), 2);
1444 }
1445
1446 #[tokio::test]
1447 async fn list_referenced_packs_ignores_sibling_artefacts() {
1448 // path-index.json, .bundle baselines, and other artefacts
1449 // under refs/<namespace>/<name>/ must not be parsed as
1450 // chain.json.
1451 let store = MockStore::new();
1452 seed_live_chain(&store, Some("repo")).await;
1453 // Add sibling artefacts that share the ref directory.
1454 store.insert(
1455 "repo/refs/heads/main/path-index.json",
1456 Bytes::from_static(b"{}"),
1457 );
1458 store.insert(
1459 format!("repo/refs/heads/main/{SHA_TIP}.bundle"),
1460 Bytes::from_static(b"BUNDLE"),
1461 );
1462 // And a tombstone-style key under refs/ that must be filtered.
1463 store.insert(
1464 "repo/refs/tags/v1/path-index.json",
1465 Bytes::from_static(b"{}"),
1466 );
1467
1468 let referenced = list_referenced_packs(&store, "repo").await.unwrap();
1469 assert_eq!(referenced.len(), 1);
1470 assert!(referenced.contains(&sha40(SHA_PACK_LIVE)));
1471 }
1472
1473 #[tokio::test]
1474 async fn list_referenced_packs_empty_for_no_chains() {
1475 let store = MockStore::new();
1476 let referenced = list_referenced_packs(&store, "repo").await.unwrap();
1477 assert!(referenced.is_empty());
1478 }
1479
1480 #[tokio::test]
1481 async fn list_referenced_packs_unions_many_chains_with_bounded_parallel_fetch() {
1482 // Regression guard for the buffer_unordered fetch path:
1483 // exercise more chain.json bodies than MAX_FETCH_CONCURRENCY
1484 // (= 8) so multiple batches must complete and union without
1485 // dropping any pack sha. Spans heads, tags, and notes so the
1486 // listing prefix widening from #89 stays exercised.
1487 let store = MockStore::new();
1488 let chain_count = MAX_FETCH_CONCURRENCY * 3 + 1;
1489 let namespaces = ["refs/heads", "refs/tags", "refs/notes"];
1490 let mut expected: HashSet<Sha40> = HashSet::new();
1491 for i in 0..chain_count {
1492 let pack_sha = format!("{:040x}", 0x1000 + i);
1493 let pack_sha40 = sha40(&pack_sha);
1494 let namespace = namespaces[i % namespaces.len()];
1495 let ref_name = RefName::new(format!("{namespace}/r{i}")).unwrap();
1496 let chain = ChainManifest {
1497 v: 1,
1498 tip: sha40(SHA_TIP),
1499 full_at: sha40(SHA_FULL),
1500 segments: vec![ChainSegment {
1501 sha: sha40(SHA_TIP),
1502 parent_sha: None,
1503 pack: format!("packs/{pack_sha}.pack"),
1504 bytes: 1_024,
1505 }],
1506 };
1507 write_chain(&store, Some("repo"), &ref_name, &chain)
1508 .await
1509 .unwrap();
1510 expected.insert(pack_sha40);
1511 }
1512
1513 let referenced = list_referenced_packs(&store, "repo").await.unwrap();
1514 assert_eq!(referenced, expected);
1515 }
1516
1517 #[tokio::test]
1518 async fn mark_fails_closed_on_corrupt_chain() {
1519 let store = MockStore::new();
1520 // chain.json with malformed JSON.
1521 store.insert(
1522 "repo/refs/heads/main/chain.json",
1523 Bytes::from_static(b"{not valid json"),
1524 );
1525 let err = mark(&store, "repo", MarkOpts::default()).await.unwrap_err();
1526 assert!(matches!(err, PackchainError::ParseJson(_)));
1527 // No tombstone written.
1528 let metas = store.list("repo/gc/").await.unwrap();
1529 assert!(metas.is_empty());
1530 }
1531
1532 #[tokio::test]
1533 async fn mark_fails_closed_on_unsupported_schema_version() {
1534 let store = MockStore::new();
1535 store.insert(
1536 "repo/refs/heads/main/chain.json",
1537 Bytes::from_static(
1538 br#"{"v":2,"tip":"0000000000000000000000000000000000000001","full_at":"0000000000000000000000000000000000000002","segments":[]}"#,
1539 ),
1540 );
1541 let err = mark(&store, "repo", MarkOpts::default()).await.unwrap_err();
1542 assert!(matches!(
1543 err,
1544 PackchainError::UnsupportedSchemaVersion { .. }
1545 ));
1546 }
1547
1548 // --- sweep ----------------------------------------------------------
1549
1550 fn sha_set<I: IntoIterator<Item = &'static str>>(shas: I) -> Vec<Sha40> {
1551 shas.into_iter().map(sha40).collect()
1552 }
1553
1554 fn write_tombstone(
1555 store: &MockStore,
1556 prefix: &str,
1557 marked_at: &str,
1558 shas: Vec<Sha40>,
1559 ) -> String {
1560 let run_id = Uuid::new_v4().to_string();
1561 let key = tombstone_key(prefix, &run_id, marked_at);
1562 let body = Tombstone {
1563 v: 1,
1564 run_id,
1565 marked_at: marked_at.to_string(),
1566 orphan_packs: shas,
1567 }
1568 .to_json_pretty()
1569 .unwrap();
1570 store.insert(&key, Bytes::from(body));
1571 key
1572 }
1573
1574 #[tokio::test]
1575 async fn sweep_inside_grace_defers_tombstone() {
1576 let store = MockStore::new();
1577 let now = OffsetDateTime::now_utc().format(&Rfc3339).unwrap();
1578 let tombstone = write_tombstone(&store, "repo", &now, sha_set([SHA_PACK_ORPHAN]));
1579 insert_pack_pair(&store, Some("repo"), SHA_PACK_ORPHAN);
1580
1581 let outcome = sweep(
1582 &store,
1583 "repo",
1584 SweepOpts {
1585 grace_hours: 24,
1586 force: false,
1587 },
1588 )
1589 .await
1590 .unwrap();
1591 assert_eq!(outcome.deferred_tombstones, 1);
1592 assert_eq!(outcome.swept_tombstones, 0);
1593 // Tombstone and packs survive.
1594 store.get_bytes(&tombstone).await.unwrap();
1595 store
1596 .get_bytes(&format!("repo/packs/{SHA_PACK_ORPHAN}.pack"))
1597 .await
1598 .unwrap();
1599 }
1600
1601 #[tokio::test]
1602 async fn sweep_after_grace_deletes_orphan_packs_and_tombstone() {
1603 let store = MockStore::new();
1604 let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
1605 .format(&Rfc3339)
1606 .unwrap();
1607 let tombstone = write_tombstone(&store, "repo", &stale, sha_set([SHA_PACK_ORPHAN]));
1608 insert_pack_pair(&store, Some("repo"), SHA_PACK_ORPHAN);
1609
1610 let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
1611 assert_eq!(outcome.swept_tombstones, 1);
1612 assert_eq!(outcome.deleted_objects, 2, "pack + idx");
1613 // Tombstone and packs gone.
1614 let pack_err = store
1615 .get_bytes(&format!("repo/packs/{SHA_PACK_ORPHAN}.pack"))
1616 .await
1617 .unwrap_err();
1618 assert!(matches!(pack_err, ObjectStoreError::NotFound(_)));
1619 let tomb_err = store.get_bytes(&tombstone).await.unwrap_err();
1620 assert!(matches!(tomb_err, ObjectStoreError::NotFound(_)));
1621 }
1622
1623 #[tokio::test]
1624 async fn sweep_skips_repointed_packs() {
1625 // A tombstoned pack got re-referenced by a chain rewrite
1626 // before the grace expired. Sweep must NOT delete it.
1627 let store = MockStore::new();
1628 let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
1629 .format(&Rfc3339)
1630 .unwrap();
1631 // The tombstone names SHA_PACK_LIVE — but a chain now references it.
1632 write_tombstone(&store, "repo", &stale, sha_set([SHA_PACK_LIVE]));
1633 let chain = ChainManifest {
1634 v: 1,
1635 tip: sha40(SHA_TIP),
1636 full_at: sha40(SHA_FULL),
1637 segments: vec![segment(SHA_PACK_LIVE, None)],
1638 };
1639 write_chain(&store, Some("repo"), &ref_main(), &chain)
1640 .await
1641 .unwrap();
1642 insert_pack_pair(&store, Some("repo"), SHA_PACK_LIVE);
1643
1644 let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
1645 assert_eq!(outcome.swept_tombstones, 1);
1646 assert_eq!(outcome.skipped_repointed_packs, 1);
1647 assert_eq!(outcome.deleted_objects, 0);
1648 // Pack still present.
1649 store
1650 .get_bytes(&format!("repo/packs/{SHA_PACK_LIVE}.pack"))
1651 .await
1652 .unwrap();
1653 }
1654
1655 #[tokio::test]
1656 async fn sweep_force_bypasses_grace_only_not_live_recheck() {
1657 // Regression for #117: --force must skip ONLY the grace window,
1658 // not the live-pack re-check. A fresh tombstone names a pack
1659 // that has since been referenced by a committed chain — the
1660 // classic outcome of mark() snapshotting between a concurrent
1661 // push's pack upload and its chain.json commit. Sweep with
1662 // --force must NOT delete that pack.
1663 let store = MockStore::new();
1664 let now = OffsetDateTime::now_utc().format(&Rfc3339).unwrap();
1665 write_tombstone(&store, "repo", &now, sha_set([SHA_PACK_LIVE]));
1666 let chain = ChainManifest {
1667 v: 1,
1668 tip: sha40(SHA_TIP),
1669 full_at: sha40(SHA_FULL),
1670 segments: vec![segment(SHA_PACK_LIVE, None)],
1671 };
1672 write_chain(&store, Some("repo"), &ref_main(), &chain)
1673 .await
1674 .unwrap();
1675 insert_pack_pair(&store, Some("repo"), SHA_PACK_LIVE);
1676
1677 let outcome = sweep(
1678 &store,
1679 "repo",
1680 SweepOpts {
1681 grace_hours: 24,
1682 force: true,
1683 },
1684 )
1685 .await
1686 .unwrap();
1687 // Grace was bypassed (fresh tombstone got processed instead of
1688 // deferred), but the live-pack guard fired and the pack stayed.
1689 assert_eq!(outcome.swept_tombstones, 1);
1690 assert_eq!(outcome.deferred_tombstones, 0);
1691 assert_eq!(outcome.skipped_repointed_packs, 1);
1692 assert_eq!(outcome.deleted_objects, 0);
1693 store
1694 .get_bytes(&format!("repo/packs/{SHA_PACK_LIVE}.pack"))
1695 .await
1696 .expect("live pack must survive --force sweep");
1697 store
1698 .get_bytes(&format!("repo/packs/{SHA_PACK_LIVE}.idx"))
1699 .await
1700 .expect("live idx must survive --force sweep");
1701 }
1702
1703 #[tokio::test]
1704 async fn sweep_force_deletes_truly_orphan_pack_inside_grace() {
1705 // The happy path for --force: a fresh tombstone naming a pack
1706 // that is NOT in any chain. Grace is bypassed, the live-pack
1707 // re-check finds the SHA absent, the pack is deleted.
1708 let store = MockStore::new();
1709 let now = OffsetDateTime::now_utc().format(&Rfc3339).unwrap();
1710 write_tombstone(&store, "repo", &now, sha_set([SHA_PACK_ORPHAN]));
1711 insert_pack_pair(&store, Some("repo"), SHA_PACK_ORPHAN);
1712
1713 let outcome = sweep(
1714 &store,
1715 "repo",
1716 SweepOpts {
1717 grace_hours: 24,
1718 force: true,
1719 },
1720 )
1721 .await
1722 .unwrap();
1723 assert_eq!(outcome.swept_tombstones, 1);
1724 assert_eq!(outcome.deferred_tombstones, 0);
1725 assert_eq!(outcome.skipped_repointed_packs, 0);
1726 assert_eq!(outcome.deleted_objects, 2);
1727 let err = store
1728 .get_bytes(&format!("repo/packs/{SHA_PACK_ORPHAN}.pack"))
1729 .await
1730 .unwrap_err();
1731 assert!(matches!(err, ObjectStoreError::NotFound(_)));
1732 }
1733
1734 #[tokio::test]
1735 async fn sweep_tolerates_already_deleted_pack() {
1736 // Tombstone names a pack that no longer exists on the bucket
1737 // (e.g. a previous partial sweep deleted the .pack but
1738 // crashed before deleting the .idx). Sweep must complete
1739 // without error.
1740 let store = MockStore::new();
1741 let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
1742 .format(&Rfc3339)
1743 .unwrap();
1744 write_tombstone(&store, "repo", &stale, sha_set([SHA_PACK_ORPHAN]));
1745 // No pack inserted.
1746 let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
1747 assert_eq!(outcome.swept_tombstones, 1);
1748 assert_eq!(outcome.deleted_objects, 0);
1749 }
1750
1751 #[tokio::test]
1752 async fn sweep_handles_multiple_tombstones_independently() {
1753 let store = MockStore::new();
1754 let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
1755 .format(&Rfc3339)
1756 .unwrap();
1757 let now = OffsetDateTime::now_utc().format(&Rfc3339).unwrap();
1758 // One stale tombstone (must sweep) + one fresh (must defer).
1759 write_tombstone(&store, "repo", &stale, sha_set([SHA_PACK_ORPHAN]));
1760 write_tombstone(&store, "repo", &now, sha_set([SHA_PACK_ORPHAN_2]));
1761 insert_pack_pair(&store, Some("repo"), SHA_PACK_ORPHAN);
1762 insert_pack_pair(&store, Some("repo"), SHA_PACK_ORPHAN_2);
1763
1764 let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
1765 assert_eq!(outcome.swept_tombstones, 1);
1766 assert_eq!(outcome.deferred_tombstones, 1);
1767 assert_eq!(outcome.deleted_objects, 2);
1768 }
1769
1770 // --- end-to-end ---------------------------------------------------
1771
1772 #[tokio::test]
1773 async fn mark_then_force_sweep_round_trips() {
1774 let store = MockStore::new();
1775 seed_live_chain(&store, Some("repo")).await;
1776 insert_pack_pair(&store, Some("repo"), SHA_PACK_LIVE);
1777 insert_pack_pair(&store, Some("repo"), SHA_PACK_ORPHAN);
1778
1779 let mark_out = mark(&store, "repo", MarkOpts::default()).await.unwrap();
1780 assert_eq!(mark_out.orphan_count, 1);
1781
1782 // Force sweep — bypass grace.
1783 let sweep_out = sweep(
1784 &store,
1785 "repo",
1786 SweepOpts {
1787 grace_hours: 24,
1788 force: true,
1789 },
1790 )
1791 .await
1792 .unwrap();
1793 assert_eq!(sweep_out.swept_tombstones, 1);
1794 assert_eq!(sweep_out.deleted_objects, 2);
1795
1796 // Live pack survives, orphan pack is gone.
1797 store
1798 .get_bytes(&format!("repo/packs/{SHA_PACK_LIVE}.pack"))
1799 .await
1800 .unwrap();
1801 let err = store
1802 .get_bytes(&format!("repo/packs/{SHA_PACK_ORPHAN}.pack"))
1803 .await
1804 .unwrap_err();
1805 assert!(matches!(err, ObjectStoreError::NotFound(_)));
1806 }
1807
1808 // --- baseline tombstones (issue #134) -----------------------------
1809
1810 fn insert_baseline_bundle(store: &MockStore, prefix: Option<&str>, sha: &str) -> String {
1811 let key = keys::bundle_key(prefix, ref_main(), sha);
1812 store.insert(&key, Bytes::from_static(b"BUNDLE"));
1813 key
1814 }
1815
1816 fn write_baseline_tombstone_at(
1817 store: &MockStore,
1818 prefix: &str,
1819 marked_at: &str,
1820 sha: &str,
1821 ) -> String {
1822 let key = baseline_tombstone_key(prefix, &Uuid::new_v4().to_string());
1823 let body = BaselineTombstone {
1824 v: TOMBSTONE_SCHEMA_VERSION,
1825 marked_at: marked_at.to_owned(),
1826 ref_name: ref_main().as_str().to_owned(),
1827 sha: sha40(sha),
1828 }
1829 .to_json_pretty()
1830 .unwrap();
1831 store.insert(&key, Bytes::from(body));
1832 key
1833 }
1834
1835 /// Issue #157: [`tombstoned_bundle_keys`] enumerates every bundle
1836 /// key currently named by a baseline tombstone, regardless of
1837 /// which engine wrote the tombstone. The bundle engine relies on
1838 /// this set to hide tombstoned bundles from `list` and from the
1839 /// under-lock multi-bundle guard.
1840 #[tokio::test]
1841 async fn tombstoned_bundle_keys_returns_bundle_paths_for_each_tombstone() {
1842 let store = MockStore::new();
1843 // Two tombstones for distinct (ref, sha) pairs.
1844 write_baseline_tombstone_for_orphan(&store, Some("repo"), &ref_main(), &sha40(SHA_FULL))
1845 .await
1846 .unwrap();
1847 let other_ref = RefName::new("refs/heads/feature").unwrap();
1848 write_baseline_tombstone_for_orphan(&store, Some("repo"), &other_ref, &sha40(SHA_TIP))
1849 .await
1850 .unwrap();
1851
1852 let keys = tombstoned_bundle_keys(&store, Some("repo")).await.unwrap();
1853 assert_eq!(keys.len(), 2, "one bundle key per tombstone (got {keys:?})");
1854 assert!(keys.contains(&format!("repo/refs/heads/main/{SHA_FULL}.bundle")));
1855 assert!(keys.contains(&format!("repo/refs/heads/feature/{SHA_TIP}.bundle")));
1856 }
1857
1858 /// A fresh bucket with no `gc/` directory must not error — empty
1859 /// set is the right answer.
1860 #[tokio::test]
1861 async fn tombstoned_bundle_keys_empty_when_no_tombstones() {
1862 let store = MockStore::new();
1863 let keys = tombstoned_bundle_keys(&store, Some("repo")).await.unwrap();
1864 assert!(keys.is_empty(), "empty bucket yields no tombstoned keys");
1865 }
1866
1867 /// Root-prefix repos (no `<prefix>/` segment, keys collapse to
1868 /// `gc/baseline-tomb-*.json` and `refs/heads/<ref>/<sha>.bundle`)
1869 /// must produce the same tombstone → bundle-key mapping. All other
1870 /// tests cover `Some("repo")`; this is the negative-control for
1871 /// the `prefix.unwrap_or("")` path in `tombstoned_bundle_keys`.
1872 #[tokio::test]
1873 async fn tombstoned_bundle_keys_handles_root_prefix() {
1874 let store = MockStore::new();
1875 write_baseline_tombstone_for_orphan(&store, None, &ref_main(), &sha40(SHA_FULL))
1876 .await
1877 .unwrap();
1878
1879 let keys = tombstoned_bundle_keys(&store, None).await.unwrap();
1880 assert_eq!(keys.len(), 1, "got {keys:?}");
1881 assert!(
1882 keys.contains(&format!("refs/heads/main/{SHA_FULL}.bundle")),
1883 "root-prefix bundle key (no leading repo/) must be produced; got {keys:?}",
1884 );
1885 }
1886
1887 /// An unparseable tombstone must not block the rest. Mirrors
1888 /// `sweep_one_baseline_tombstone`'s tolerance for bad records:
1889 /// the sweep loop logs a warn and continues.
1890 #[tokio::test]
1891 async fn tombstoned_bundle_keys_skips_unparseable_tombstones() {
1892 let store = MockStore::new();
1893 // One good tombstone + one garbage tombstone-keyed file.
1894 write_baseline_tombstone_for_orphan(&store, Some("repo"), &ref_main(), &sha40(SHA_FULL))
1895 .await
1896 .unwrap();
1897 store.insert(
1898 "repo/gc/baseline-tomb-garbage.json",
1899 Bytes::from_static(b"not json"),
1900 );
1901
1902 let keys = tombstoned_bundle_keys(&store, Some("repo")).await.unwrap();
1903 assert_eq!(
1904 keys.len(),
1905 1,
1906 "good tombstone must still be returned despite garbage sibling",
1907 );
1908 assert!(keys.contains(&format!("repo/refs/heads/main/{SHA_FULL}.bundle")));
1909 }
1910
1911 #[tokio::test]
1912 async fn write_baseline_tombstone_round_trips() {
1913 // Writer + parser agree on the on-bucket shape. Regression
1914 // guard: a future serde tweak that broke the JSON layout would
1915 // make sweep silently skip every baseline tombstone.
1916 let store = MockStore::new();
1917 let prior = sha40(SHA_FULL);
1918 let current = sha40(SHA_TIP);
1919 write_baseline_tombstone(&store, Some("repo"), &ref_main(), &prior, ¤t)
1920 .await
1921 .unwrap();
1922 let metas = store.list("repo/gc/").await.unwrap();
1923 let tomb_key = metas
1924 .iter()
1925 .find(|m| {
1926 m.key
1927 .starts_with(&baseline_tombstone_listing_prefix(Some("repo")))
1928 })
1929 .map(|m| m.key.clone())
1930 .expect("baseline tombstone written");
1931 let body = store.get_bytes(&tomb_key).await.unwrap();
1932 let parsed = BaselineTombstone::from_json_bytes(&body).unwrap();
1933 assert_eq!(parsed.v, TOMBSTONE_SCHEMA_VERSION);
1934 assert_eq!(parsed.ref_name, "refs/heads/main");
1935 assert_eq!(parsed.sha, prior);
1936 }
1937
1938 #[tokio::test]
1939 async fn write_baseline_tombstone_skips_when_prior_equals_current() {
1940 // No-op when the keys alias: a tombstone in this case would
1941 // later cause sweep to delete the live baseline bundle.
1942 let store = MockStore::new();
1943 let sha = sha40(SHA_FULL);
1944 write_baseline_tombstone(&store, Some("repo"), &ref_main(), &sha, &sha)
1945 .await
1946 .unwrap();
1947 let metas = store.list("repo/gc/").await.unwrap();
1948 assert!(
1949 metas.is_empty(),
1950 "aliasing prior/current must not write a tombstone",
1951 );
1952 }
1953
1954 #[tokio::test]
1955 async fn sweep_defers_baseline_tombstone_within_grace_window() {
1956 // Issue #134: a fetch that started before compact must be able
1957 // to read the prior baseline within the grace window. Concrete
1958 // manifestation: a baseline tombstone marked "now" is left
1959 // alone, and the bundle it names stays on the bucket.
1960 let store = MockStore::new();
1961 let bundle_key = insert_baseline_bundle(&store, Some("repo"), SHA_FULL);
1962 let now = OffsetDateTime::now_utc().format(&Rfc3339).unwrap();
1963 let tomb_key = write_baseline_tombstone_at(&store, "repo", &now, SHA_FULL);
1964
1965 let outcome = sweep(
1966 &store,
1967 "repo",
1968 SweepOpts {
1969 grace_hours: 24,
1970 force: false,
1971 },
1972 )
1973 .await
1974 .unwrap();
1975 assert_eq!(outcome.deferred_tombstones, 1);
1976 assert_eq!(outcome.swept_tombstones, 0);
1977 assert_eq!(outcome.deleted_objects, 0);
1978 store
1979 .get_bytes(&bundle_key)
1980 .await
1981 .expect("bundle must survive sweep within grace");
1982 store
1983 .get_bytes(&tomb_key)
1984 .await
1985 .expect("tombstone must survive sweep within grace");
1986 }
1987
1988 #[tokio::test]
1989 async fn sweep_reclaims_baseline_tombstone_after_grace_window() {
1990 // Issue #134: past the grace window, sweep deletes the bundle
1991 // and the tombstone. This is the path that reclaims the
1992 // orphan baseline left in place by compact / force-push.
1993 let store = MockStore::new();
1994 let bundle_key = insert_baseline_bundle(&store, Some("repo"), SHA_FULL);
1995 let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
1996 .format(&Rfc3339)
1997 .unwrap();
1998 let tomb_key = write_baseline_tombstone_at(&store, "repo", &stale, SHA_FULL);
1999
2000 let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
2001 assert_eq!(outcome.swept_tombstones, 1);
2002 assert_eq!(outcome.deferred_tombstones, 0);
2003 assert_eq!(outcome.deleted_objects, 1, "bundle delete");
2004 let bundle_err = store.get_bytes(&bundle_key).await.unwrap_err();
2005 assert!(matches!(bundle_err, ObjectStoreError::NotFound(_)));
2006 let tomb_err = store.get_bytes(&tomb_key).await.unwrap_err();
2007 assert!(matches!(tomb_err, ObjectStoreError::NotFound(_)));
2008 }
2009
2010 #[tokio::test]
2011 async fn sweep_skips_re_baselined_bundle_after_grace() {
2012 // A later push re-baselined to the SAME SHA the tombstone names
2013 // (force-push at the same tip, or compact short-cut). Sweep
2014 // must NOT delete the bundle — it is live again. The
2015 // now-stale tombstone is dropped.
2016 let store = MockStore::new();
2017 let bundle_key = insert_baseline_bundle(&store, Some("repo"), SHA_FULL);
2018 // Live chain points at the same SHA the tombstone names.
2019 let chain = ChainManifest {
2020 v: 1,
2021 tip: sha40(SHA_TIP),
2022 full_at: sha40(SHA_FULL),
2023 segments: vec![segment(SHA_PACK_LIVE, None)],
2024 };
2025 write_chain(&store, Some("repo"), &ref_main(), &chain)
2026 .await
2027 .unwrap();
2028 let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
2029 .format(&Rfc3339)
2030 .unwrap();
2031 let tomb_key = write_baseline_tombstone_at(&store, "repo", &stale, SHA_FULL);
2032
2033 let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
2034 assert_eq!(outcome.swept_tombstones, 1);
2035 assert_eq!(outcome.skipped_repointed_packs, 1);
2036 assert_eq!(outcome.deleted_objects, 0);
2037 store
2038 .get_bytes(&bundle_key)
2039 .await
2040 .expect("re-baselined bundle must survive");
2041 let tomb_err = store.get_bytes(&tomb_key).await.unwrap_err();
2042 assert!(matches!(tomb_err, ObjectStoreError::NotFound(_)));
2043 }
2044
2045 #[tokio::test]
2046 async fn sweep_baseline_tolerates_already_deleted_bundle() {
2047 // The bundle was deleted out of band (operator cleanup, or a
2048 // ref deletion that happened to sweep it). Sweep must finish
2049 // cleanly.
2050 let store = MockStore::new();
2051 let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
2052 .format(&Rfc3339)
2053 .unwrap();
2054 let tomb_key = write_baseline_tombstone_at(&store, "repo", &stale, SHA_FULL);
2055 // No bundle inserted.
2056
2057 let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
2058 assert_eq!(outcome.swept_tombstones, 1);
2059 assert_eq!(outcome.deleted_objects, 0);
2060 let tomb_err = store.get_bytes(&tomb_key).await.unwrap_err();
2061 assert!(matches!(tomb_err, ObjectStoreError::NotFound(_)));
2062 }
2063
2064 #[tokio::test]
2065 async fn sweep_baseline_force_bypasses_grace_only_not_live_recheck() {
2066 // --force on a fresh baseline tombstone whose SHA is now live
2067 // (re-baselined). Grace is bypassed (tombstone is processed),
2068 // but the live-state guard fires and the bundle stays.
2069 let store = MockStore::new();
2070 let bundle_key = insert_baseline_bundle(&store, Some("repo"), SHA_FULL);
2071 let chain = ChainManifest {
2072 v: 1,
2073 tip: sha40(SHA_TIP),
2074 full_at: sha40(SHA_FULL),
2075 segments: vec![segment(SHA_PACK_LIVE, None)],
2076 };
2077 write_chain(&store, Some("repo"), &ref_main(), &chain)
2078 .await
2079 .unwrap();
2080 let now = OffsetDateTime::now_utc().format(&Rfc3339).unwrap();
2081 write_baseline_tombstone_at(&store, "repo", &now, SHA_FULL);
2082
2083 let outcome = sweep(
2084 &store,
2085 "repo",
2086 SweepOpts {
2087 grace_hours: 24,
2088 force: true,
2089 },
2090 )
2091 .await
2092 .unwrap();
2093 assert_eq!(outcome.swept_tombstones, 1);
2094 assert_eq!(outcome.deferred_tombstones, 0);
2095 assert_eq!(outcome.skipped_repointed_packs, 1);
2096 assert_eq!(outcome.deleted_objects, 0);
2097 store
2098 .get_bytes(&bundle_key)
2099 .await
2100 .expect("live bundle must survive --force sweep");
2101 }
2102
2103 #[tokio::test]
2104 async fn sweep_processes_pack_and_baseline_tombstones_in_one_pass() {
2105 // Mixed tombstone types under `<prefix>/gc/`. Sweep must
2106 // dispatch each to the right handler without mis-counting or
2107 // skipping.
2108 let store = MockStore::new();
2109 let bundle_key = insert_baseline_bundle(&store, Some("repo"), SHA_FULL);
2110 insert_pack_pair(&store, Some("repo"), SHA_PACK_ORPHAN);
2111 let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
2112 .format(&Rfc3339)
2113 .unwrap();
2114 write_tombstone(&store, "repo", &stale, sha_set([SHA_PACK_ORPHAN]));
2115 write_baseline_tombstone_at(&store, "repo", &stale, SHA_FULL);
2116
2117 let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
2118 assert_eq!(outcome.swept_tombstones, 2);
2119 // pack + idx + bundle = 3 deletions
2120 assert_eq!(outcome.deleted_objects, 3);
2121 let bundle_err = store.get_bytes(&bundle_key).await.unwrap_err();
2122 assert!(matches!(bundle_err, ObjectStoreError::NotFound(_)));
2123 let pack_err = store
2124 .get_bytes(&format!("repo/packs/{SHA_PACK_ORPHAN}.pack"))
2125 .await
2126 .unwrap_err();
2127 assert!(matches!(pack_err, ObjectStoreError::NotFound(_)));
2128 }
2129
2130 #[tokio::test]
2131 async fn compact_to_sweep_round_trip_simulates_concurrent_fetch_then_gc() {
2132 // End-to-end issue #134 scenario: compact writes a tombstone
2133 // (we simulate by hand to avoid pulling in the full compact
2134 // fixture), an in-flight fetch reads the prior bundle within
2135 // grace and succeeds, and a later sweep past the grace
2136 // reclaims it.
2137 let store = MockStore::new();
2138 let bundle_key = insert_baseline_bundle(&store, Some("repo"), SHA_FULL);
2139 // Compact moved the baseline to a new SHA — simulate by
2140 // writing a chain pointing to SHA_TIP as full_at.
2141 let chain = ChainManifest {
2142 v: 1,
2143 tip: sha40(SHA_TIP),
2144 full_at: sha40(SHA_TIP),
2145 segments: vec![segment(SHA_PACK_LIVE, None)],
2146 };
2147 write_chain(&store, Some("repo"), &ref_main(), &chain)
2148 .await
2149 .unwrap();
2150 let prior = sha40(SHA_FULL);
2151 let current = sha40(SHA_TIP);
2152 write_baseline_tombstone(&store, Some("repo"), &ref_main(), &prior, ¤t)
2153 .await
2154 .unwrap();
2155
2156 // In-flight fetch: bundle GET within grace MUST succeed.
2157 let body = store.get_bytes(&bundle_key).await.unwrap();
2158 assert_eq!(&body[..], b"BUNDLE");
2159 let in_grace = sweep(
2160 &store,
2161 "repo",
2162 SweepOpts {
2163 grace_hours: 24,
2164 force: false,
2165 },
2166 )
2167 .await
2168 .unwrap();
2169 assert_eq!(in_grace.deferred_tombstones, 1);
2170 store
2171 .get_bytes(&bundle_key)
2172 .await
2173 .expect("bundle must survive in-grace sweep");
2174
2175 // Backdate the tombstone past the grace and re-sweep —
2176 // bundle is reaped.
2177 let metas = store.list("repo/gc/").await.unwrap();
2178 let tomb_key = metas
2179 .iter()
2180 .find(|m| {
2181 m.key
2182 .starts_with(&baseline_tombstone_listing_prefix(Some("repo")))
2183 })
2184 .map(|m| m.key.clone())
2185 .unwrap();
2186 let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
2187 .format(&Rfc3339)
2188 .unwrap();
2189 let body = store.get_bytes(&tomb_key).await.unwrap();
2190 let mut tomb: BaselineTombstone = serde_json::from_slice(&body).unwrap();
2191 tomb.marked_at = stale;
2192 let new_body = serde_json::to_vec_pretty(&tomb).unwrap();
2193 store.insert(&tomb_key, Bytes::from(new_body));
2194
2195 let post_grace = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
2196 assert_eq!(post_grace.swept_tombstones, 1);
2197 assert_eq!(post_grace.deleted_objects, 1);
2198 let err = store.get_bytes(&bundle_key).await.unwrap_err();
2199 assert!(matches!(err, ObjectStoreError::NotFound(_)));
2200 }
2201
2202 #[tokio::test]
2203 async fn sweep_preserves_corrupt_baseline_tombstone_for_diagnosis() {
2204 // Issue #146: a baseline tombstone whose `ref_name` no longer
2205 // passes `RefName::new` cannot be turned into a bundle key, so
2206 // deleting the tombstone would orphan the bundle on the bucket
2207 // with no record. Sweep must preserve BOTH records (tombstone
2208 // and any bundle it would have named under the raw string), and
2209 // signal `Deferred` so an operator can reconcile manually.
2210 let store = MockStore::new();
2211 // Seed a "bundle" at the raw-string path the tombstone names,
2212 // so a regression that reconstructs a key from the raw ref_name
2213 // and deletes it would also be caught here.
2214 let bad_ref = "refs/heads/[bad]";
2215 assert!(
2216 RefName::new(bad_ref).is_err(),
2217 "fixture relies on this ref_name failing RefName::new",
2218 );
2219 let bundle_key = format!("repo/{bad_ref}/{SHA_FULL}.bundle");
2220 store.insert(&bundle_key, Bytes::from_static(b"BUNDLE"));
2221
2222 let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
2223 .format(&Rfc3339)
2224 .unwrap();
2225 let tomb_key = baseline_tombstone_key("repo", &Uuid::new_v4().to_string());
2226 let body = BaselineTombstone {
2227 v: TOMBSTONE_SCHEMA_VERSION,
2228 marked_at: stale,
2229 ref_name: bad_ref.to_owned(),
2230 sha: sha40(SHA_FULL),
2231 }
2232 .to_json_pretty()
2233 .unwrap();
2234 store.insert(&tomb_key, Bytes::from(body));
2235
2236 let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
2237 assert_eq!(
2238 outcome.deferred_tombstones, 1,
2239 "corrupt tombstone counts as deferred, not swept",
2240 );
2241 assert_eq!(outcome.swept_tombstones, 0);
2242 assert_eq!(outcome.deleted_objects, 0);
2243 // Tombstone survives (operator must inspect).
2244 let surviving = store
2245 .get_bytes(&tomb_key)
2246 .await
2247 .expect("corrupt tombstone must survive sweep");
2248 let parsed = BaselineTombstone::from_json_bytes(&surviving).unwrap();
2249 assert_eq!(parsed.ref_name, bad_ref);
2250 // Bundle at the would-be key survives (the key was unreachable
2251 // through the normal RefName path, but sweep must not have
2252 // reconstructed it from the raw string either).
2253 store
2254 .get_bytes(&bundle_key)
2255 .await
2256 .expect("orphan bundle must survive corrupt-tombstone sweep");
2257 }
2258
2259 // --- per-tombstone live-pack recompute (issue #140) --------------
2260
2261 /// One-shot post-delete hook used by [`PostDeleteHookStore`].
2262 type PostDeleteHook = Box<dyn FnOnce(&MockStore) + Send>;
2263
2264 /// Test-only [`ObjectStore`] decorator that runs a one-shot
2265 /// callback the first time `delete()` succeeds on a key matching
2266 /// `trigger_prefix`, *after* the inner delete completes. Used to
2267 /// inject a concurrent push (writing a fresh `chain.json`) between
2268 /// successive `sweep_one_tombstone` iterations and verify that the
2269 /// per-tombstone live-pack recompute picks it up.
2270 ///
2271 /// Every other trait method forwards to the inner store unchanged.
2272 struct PostDeleteHookStore {
2273 inner: MockStore,
2274 hook: std::sync::Mutex<Option<PostDeleteHook>>,
2275 /// Key-prefix the hook fires on. The pack-tombstone case
2276 /// uses `<prefix>/gc/tombstones-`; the test never deletes
2277 /// other keys before the intended trigger so this stays
2278 /// unambiguous.
2279 trigger_prefix: String,
2280 }
2281
2282 impl PostDeleteHookStore {
2283 fn new(
2284 inner: MockStore,
2285 trigger_prefix: impl Into<String>,
2286 hook: impl FnOnce(&MockStore) + Send + 'static,
2287 ) -> Self {
2288 Self {
2289 inner,
2290 hook: std::sync::Mutex::new(Some(Box::new(hook))),
2291 trigger_prefix: trigger_prefix.into(),
2292 }
2293 }
2294 }
2295
2296 crate::delegate_to_inner_impl! {
2297 impl ObjectStore for PostDeleteHookStore {
2298 forward: list, get_to_file, get_bytes, get_bytes_range,
2299 put_bytes, put_path, put_if_absent,
2300 head, copy;
2301
2302 async fn delete(&self, key: &str) -> Result<(), ObjectStoreError> {
2303 let result = self.inner.delete(key).await;
2304 if result.is_ok()
2305 && key.starts_with(&self.trigger_prefix)
2306 && let Some(hook) = self.hook.lock().unwrap().take()
2307 {
2308 hook(&self.inner);
2309 }
2310 result
2311 }
2312 }
2313 }
2314
2315 #[tokio::test]
2316 async fn sweep_re_derives_referenced_set_per_tombstone() {
2317 // Issue #140 regression: a concurrent push committing
2318 // chain.json between two `sweep_one_tombstone` iterations
2319 // must not let sweep delete a pack the new chain references.
2320 //
2321 // Layout: two stale tombstones, each naming a distinct pack
2322 // on its own ref. After the FIRST tombstone is fully
2323 // processed and deleted, the post-delete hook fires and
2324 // writes BOTH refs' `chain.json` files — simulating a
2325 // concurrent push that committed chain.json for the second
2326 // ref between sweep's two iterations. The second iteration
2327 // must re-derive the live set and skip the delete.
2328 //
2329 // Pre-fix: the once-per-sweep snapshot is empty for both
2330 // iterations and BOTH packs are deleted (`deleted_objects = 4`).
2331 // Post-fix: the second iteration's recompute picks up the new
2332 // chain and the second pack survives
2333 // (`deleted_objects = 2`, `skipped_repointed_packs = 1`).
2334 //
2335 // The hook writes chains for both refs (rather than guessing
2336 // which tombstone runs first) so the assertions are independent
2337 // of MockStore iteration order. Writing the first ref's chain
2338 // is a no-op for that pack — its delete already happened
2339 // before the hook fired — and the second ref's chain is what
2340 // protects the still-pending pack.
2341 let inner = MockStore::new();
2342 let stale_a = (OffsetDateTime::now_utc() - time::Duration::hours(49))
2343 .format(&Rfc3339)
2344 .unwrap();
2345 let stale_b = (OffsetDateTime::now_utc() - time::Duration::hours(48))
2346 .format(&Rfc3339)
2347 .unwrap();
2348 write_tombstone(&inner, "repo", &stale_a, sha_set([SHA_PACK_ORPHAN]));
2349 write_tombstone(&inner, "repo", &stale_b, sha_set([SHA_PACK_ORPHAN_2]));
2350 insert_pack_pair(&inner, Some("repo"), SHA_PACK_ORPHAN);
2351 insert_pack_pair(&inner, Some("repo"), SHA_PACK_ORPHAN_2);
2352
2353 // After the FIRST tombstone delete completes, simulate the
2354 // concurrent push by committing chain.json files for both
2355 // refs at once.
2356 let store = PostDeleteHookStore::new(inner, "repo/gc/tombstones-", |inner| {
2357 for (ref_path, pack_sha) in [
2358 ("repo/refs/heads/branch_a/chain.json", SHA_PACK_ORPHAN),
2359 ("repo/refs/heads/branch_b/chain.json", SHA_PACK_ORPHAN_2),
2360 ] {
2361 let chain = ChainManifest {
2362 v: 1,
2363 tip: sha40(SHA_TIP),
2364 full_at: sha40(SHA_FULL),
2365 segments: vec![segment(pack_sha, None)],
2366 };
2367 let body =
2368 serde_json::to_vec_pretty(&chain).expect("chain.json serializes for the test");
2369 inner.insert(ref_path, Bytes::from(body));
2370 }
2371 });
2372
2373 let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
2374 // Both tombstones processed.
2375 assert_eq!(outcome.swept_tombstones, 2);
2376 // Whichever tombstone ran first deleted its pack pair (2
2377 // objects). The second iteration's recompute saw the
2378 // freshly-committed chain and skipped the delete.
2379 assert_eq!(outcome.deleted_objects, 2);
2380 assert_eq!(outcome.skipped_repointed_packs, 1);
2381
2382 // Exactly one of the two packs survives — the one whose
2383 // tombstone was processed second.
2384 let first_survives = store
2385 .inner
2386 .get_bytes(&format!("repo/packs/{SHA_PACK_ORPHAN}.pack"))
2387 .await
2388 .is_ok();
2389 let second_survives = store
2390 .inner
2391 .get_bytes(&format!("repo/packs/{SHA_PACK_ORPHAN_2}.pack"))
2392 .await
2393 .is_ok();
2394 assert!(
2395 first_survives ^ second_survives,
2396 "exactly one pack must survive: \
2397 first_survives={first_survives}, second_survives={second_survives}",
2398 );
2399 }
2400
2401 #[tokio::test]
2402 async fn sweep_re_derives_referenced_set_per_pack_within_tombstone() {
2403 // Issue #152 regression: a concurrent push committing
2404 // chain.json AFTER `sweep_one_tombstone`'s referenced-set
2405 // snapshot but BEFORE a later pack in the SAME tombstone is
2406 // reached must not let sweep delete the now-live pack.
2407 //
2408 // Layout: one stale tombstone naming TWO orphan packs (the
2409 // shape `mark()` produces — every orphan SHA for a run goes
2410 // into one tombstone body). The post-delete hook fires on
2411 // the FIRST `packs/` delete (the first pack's `.pack` key,
2412 // mid-iter-1) and writes a chain.json that references the
2413 // SECOND pack — simulating a concurrent push that landed
2414 // after the per-tombstone snapshot.
2415 //
2416 // Pre-fix (single snapshot per tombstone, taken before the
2417 // loop): `referenced` is empty for both iterations and both
2418 // packs are deleted (`deleted_objects = 4`,
2419 // `skipped_repointed_packs = 0`). Post-fix (per-pack
2420 // recompute): the second iteration's fresh recompute picks
2421 // up the new chain and skips the delete
2422 // (`deleted_objects = 2`, `skipped_repointed_packs = 1`).
2423 //
2424 // The pack order inside `orphan_packs` is the Vec insertion
2425 // order — deterministic — so the assertion is on the EXACT
2426 // surviving pack key (`SHA_PACK_ORPHAN_2`), not a generic
2427 // "one of two survives". This catches a regression that
2428 // dropped the recompute entirely (both deleted) AND a
2429 // regression that kept the recompute but forgot to skip on
2430 // re-reference (would also delete the second pack).
2431 let inner = MockStore::new();
2432 let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
2433 .format(&Rfc3339)
2434 .unwrap();
2435 write_tombstone(
2436 &inner,
2437 "repo",
2438 &stale,
2439 sha_set([SHA_PACK_ORPHAN, SHA_PACK_ORPHAN_2]),
2440 );
2441 insert_pack_pair(&inner, Some("repo"), SHA_PACK_ORPHAN);
2442 insert_pack_pair(&inner, Some("repo"), SHA_PACK_ORPHAN_2);
2443
2444 // Fire on the first `packs/` delete (the first pack's `.pack`
2445 // key) — strictly between the two `orphan_packs` iterations
2446 // from the caller's perspective: iter-1's deletes complete,
2447 // iter-2 has not yet run its `list_referenced_packs`. The
2448 // hook writes a chain.json that re-references the SECOND
2449 // pack, which the per-pack recompute must observe.
2450 let store = PostDeleteHookStore::new(inner, "repo/packs/", |inner| {
2451 let chain = ChainManifest {
2452 v: 1,
2453 tip: sha40(SHA_TIP),
2454 full_at: sha40(SHA_FULL),
2455 segments: vec![segment(SHA_PACK_ORPHAN_2, None)],
2456 };
2457 let body =
2458 serde_json::to_vec_pretty(&chain).expect("chain.json serializes for the test");
2459 inner.insert("repo/refs/heads/concurrent/chain.json", Bytes::from(body));
2460 });
2461
2462 let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
2463 assert_eq!(outcome.swept_tombstones, 1);
2464 // First pack: deleted (.pack + .idx = 2). Second pack: skipped
2465 // because the per-pack recompute saw the freshly-committed
2466 // chain referencing it.
2467 assert_eq!(
2468 outcome.deleted_objects, 2,
2469 "only the first pack's pair deleted; the second was re-referenced",
2470 );
2471 assert_eq!(
2472 outcome.skipped_repointed_packs, 1,
2473 "second iteration's per-pack recompute must skip the re-referenced pack",
2474 );
2475 // Exact surviving pack: the second one (the one the
2476 // concurrent push re-referenced). Asserting on the specific
2477 // key — not a broad "one survives" — catches a regression
2478 // that flipped the iteration order or skipped the wrong pack.
2479 store
2480 .inner
2481 .get_bytes(&format!("repo/packs/{SHA_PACK_ORPHAN_2}.pack"))
2482 .await
2483 .expect("re-referenced pack must survive");
2484 store
2485 .inner
2486 .get_bytes(&format!("repo/packs/{SHA_PACK_ORPHAN_2}.idx"))
2487 .await
2488 .expect("re-referenced pack idx must survive");
2489 // First pack is gone.
2490 let first_err = store
2491 .inner
2492 .get_bytes(&format!("repo/packs/{SHA_PACK_ORPHAN}.pack"))
2493 .await
2494 .unwrap_err();
2495 assert!(matches!(first_err, ObjectStoreError::NotFound(_)));
2496 }
2497
2498 #[tokio::test]
2499 async fn sweep_reclaims_genuinely_orphan_pack_with_per_tombstone_recompute() {
2500 // Sanity: the per-tombstone recompute does NOT regress the
2501 // normal sweep path. A stale tombstone naming a pack with no
2502 // chain reference is reclaimed exactly as before.
2503 let store = MockStore::new();
2504 let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
2505 .format(&Rfc3339)
2506 .unwrap();
2507 write_tombstone(&store, "repo", &stale, sha_set([SHA_PACK_ORPHAN]));
2508 insert_pack_pair(&store, Some("repo"), SHA_PACK_ORPHAN);
2509 // No chain.json at all: referenced set is empty for every
2510 // recompute pass.
2511
2512 let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
2513 assert_eq!(outcome.swept_tombstones, 1);
2514 assert_eq!(outcome.deleted_objects, 2);
2515 assert_eq!(outcome.skipped_repointed_packs, 0);
2516 }
2517
2518 #[tokio::test]
2519 async fn sweep_one_baseline_tombstone_re_reads_chain_per_tombstone() {
2520 // Companion to `sweep_re_derives_referenced_set_per_tombstone`
2521 // for the baseline tombstone path. `sweep_one_baseline_tombstone`
2522 // calls `load_chain` inside the function, so each tombstone in
2523 // a sweep pass sees a freshly-loaded chain. A regression that
2524 // hoisted the chain load out of the per-tombstone loop would
2525 // let a concurrent push between iterations slip past the
2526 // re-baselined-to-same-SHA guard, deleting a bundle that is
2527 // once again live.
2528 //
2529 // Layout: two stale baseline tombstones for the same ref both
2530 // naming SHA_FULL, plus a baseline bundle at SHA_FULL. With
2531 // no live chain initially, the first iteration sees
2532 // `chain.is_none()` → `still_live = false` → deletes the
2533 // bundle. The post-delete hook (firing after the tombstone
2534 // delete that follows the bundle delete) writes a chain whose
2535 // `full_at == SHA_FULL`, simulating a force-push that
2536 // re-baselined to the same SHA. The second iteration must
2537 // re-read the chain, observe `full_at == tombstone.sha`, set
2538 // `still_live = true`, and refuse to re-delete (the bundle is
2539 // already gone anyway, but the assertion is on the counter:
2540 // `skipped_repointed_packs == 1`).
2541 let inner = MockStore::new();
2542 let bundle_key = insert_baseline_bundle(&inner, Some("repo"), SHA_FULL);
2543 let stale_a = (OffsetDateTime::now_utc() - time::Duration::hours(49))
2544 .format(&Rfc3339)
2545 .unwrap();
2546 let stale_b = (OffsetDateTime::now_utc() - time::Duration::hours(48))
2547 .format(&Rfc3339)
2548 .unwrap();
2549 let tomb_a = write_baseline_tombstone_at(&inner, "repo", &stale_a, SHA_FULL);
2550 let tomb_b = write_baseline_tombstone_at(&inner, "repo", &stale_b, SHA_FULL);
2551
2552 // Trigger on the baseline-tomb prefix: the hook fires AFTER
2553 // the FIRST iteration's tombstone delete completes (the
2554 // tombstone delete is the last delete in `sweep_one_baseline_tombstone`),
2555 // which is precisely the window in which a concurrent
2556 // force-push could land before the second iteration's chain
2557 // re-read.
2558 let tomb_listing = baseline_tombstone_listing_prefix(Some("repo"));
2559 let store = PostDeleteHookStore::new(inner, &tomb_listing, |inner| {
2560 let chain = ChainManifest {
2561 v: 1,
2562 tip: sha40(SHA_TIP),
2563 full_at: sha40(SHA_FULL),
2564 segments: vec![segment(SHA_PACK_LIVE, None)],
2565 };
2566 let body =
2567 serde_json::to_vec_pretty(&chain).expect("chain.json serializes for the test");
2568 inner.insert("repo/refs/heads/main/chain.json", Bytes::from(body));
2569 });
2570
2571 let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
2572 // Both baseline tombstones processed.
2573 assert_eq!(outcome.swept_tombstones, 2);
2574 // First iteration deleted the bundle (1 object). Second
2575 // iteration's fresh chain read showed full_at == tombstone.sha
2576 // and skipped the delete — the recompute is per-tombstone.
2577 assert_eq!(outcome.deleted_objects, 1, "only one bundle delete");
2578 assert_eq!(
2579 outcome.skipped_repointed_packs, 1,
2580 "second iteration must see the re-baselined chain and skip",
2581 );
2582 // Both tombstones are gone.
2583 for key in [&tomb_a, &tomb_b] {
2584 let err = store.inner.get_bytes(key).await.unwrap_err();
2585 assert!(matches!(err, ObjectStoreError::NotFound(_)));
2586 }
2587 // The bundle was deleted by the first iteration. Asserting on
2588 // the counter (not the bundle's presence) is what proves the
2589 // chain is re-read per tombstone — the survival is necessarily
2590 // about the COUNTER because the first iteration already removed
2591 // the bundle before the hook fired.
2592 let bundle_err = store.inner.get_bytes(&bundle_key).await.unwrap_err();
2593 assert!(matches!(bundle_err, ObjectStoreError::NotFound(_)));
2594 }
2595
2596 #[tokio::test]
2597 async fn sweep_protects_pack_when_concurrent_push_aliases_existing_key() {
2598 // Issue #140's canonical scenario, framed as the issue
2599 // describes it: a force-revert republishes a pack with the
2600 // SAME content SHA as the tombstoned pack (deterministic gix
2601 // pack emission). The concurrent push only updates
2602 // chain.json; the pack key is reused. Sweep must observe
2603 // the new chain reference and leave the pack alone.
2604 //
2605 // Modelled at the post-fix invariant level: the chain
2606 // referencing the tombstoned SHA exists when
2607 // `sweep_one_tombstone` runs its recompute, and the pack is
2608 // preserved with `skipped_repointed_packs += 1`.
2609 let store = MockStore::new();
2610 let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
2611 .format(&Rfc3339)
2612 .unwrap();
2613 write_tombstone(&store, "repo", &stale, sha_set([SHA_PACK_LIVE]));
2614 // Insert the pack, then commit chain.json referencing it —
2615 // identical-content SHA path through the engine ends here.
2616 insert_pack_pair(&store, Some("repo"), SHA_PACK_LIVE);
2617 let chain = ChainManifest {
2618 v: 1,
2619 tip: sha40(SHA_TIP),
2620 full_at: sha40(SHA_FULL),
2621 segments: vec![segment(SHA_PACK_LIVE, None)],
2622 };
2623 write_chain(&store, Some("repo"), &ref_main(), &chain)
2624 .await
2625 .unwrap();
2626
2627 let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
2628 assert_eq!(outcome.swept_tombstones, 1);
2629 assert_eq!(outcome.skipped_repointed_packs, 1);
2630 assert_eq!(outcome.deleted_objects, 0);
2631 store
2632 .get_bytes(&format!("repo/packs/{SHA_PACK_LIVE}.pack"))
2633 .await
2634 .expect("aliased pack must survive sweep");
2635 }
2636
2637 #[tokio::test]
2638 async fn grace_hours_env_override_falls_back_for_unset_or_invalid() {
2639 // `EnvGuard` holds the per-key lock for the whole test and
2640 // restores the prior value on drop, including on panic.
2641 let env = crate::test_util::EnvGuard::take(ENV_GC_GRACE_HOURS);
2642 // Unset returns default.
2643 env.clear();
2644 assert_eq!(grace_hours_from_env(), DEFAULT_GRACE_HOURS);
2645 // Non-numeric falls back.
2646 env.set_to("not-a-number");
2647 assert_eq!(grace_hours_from_env(), DEFAULT_GRACE_HOURS);
2648 // Zero falls back (would defeat the design).
2649 env.set_to("0");
2650 assert_eq!(grace_hours_from_env(), DEFAULT_GRACE_HOURS);
2651 // Positive integer wins.
2652 env.set_to("72");
2653 assert_eq!(grace_hours_from_env(), 72);
2654 }
2655
2656 #[test]
2657 fn resolve_grace_hours_honours_some_zero() {
2658 // The key semantic divergence from `resolve_lock_ttl_seconds`:
2659 // `Some(0)` is a legitimate "no grace window" operator intent
2660 // (force-mode tests like `delete_tombstone_is_reaped_by_gc_sweep`
2661 // depend on this), so the resolver must NOT clamp it. A
2662 // regression that copy-pasted the lock-TTL filter would silently
2663 // turn `--grace-hours 0` into `--grace-hours <env-default>`.
2664 assert_eq!(resolve_grace_hours(Some(0)), 0);
2665 }
2666
2667 #[test]
2668 fn resolve_grace_hours_returns_explicit_value() {
2669 assert_eq!(resolve_grace_hours(Some(7)), 7);
2670 }
2671
2672 #[tokio::test]
2673 async fn resolve_grace_hours_falls_back_to_env_for_none() {
2674 let env = crate::test_util::EnvGuard::take(ENV_GC_GRACE_HOURS);
2675 env.set_to("72");
2676 assert_eq!(resolve_grace_hours(None), 72);
2677 }
2678
2679 // --- mark list-order race (issue #135) ---------------------------
2680
2681 /// One-shot post-`list` hook used by [`PostListHookStore`].
2682 type PostListHook = Box<dyn FnOnce(&MockStore) + Send>;
2683
2684 /// Test-only [`ObjectStore`] decorator that runs a one-shot
2685 /// callback the first time `list()` returns successfully, *after*
2686 /// the inner list completes. Used to simulate a concurrent push
2687 /// that uploads a new pack AND commits its `chain.json` between
2688 /// `mark`'s two listings — the regression scenario for issue #135.
2689 /// Firing on the first list (regardless of prefix) means the hook
2690 /// runs between `list_pack_shas` and `list_referenced_packs` under
2691 /// either ordering, so the test exercises the race against both
2692 /// the buggy chains-first order and the fixed packs-first order.
2693 struct PostListHookStore {
2694 inner: MockStore,
2695 hook: std::sync::Mutex<Option<PostListHook>>,
2696 }
2697
2698 impl PostListHookStore {
2699 fn new(inner: MockStore, hook: impl FnOnce(&MockStore) + Send + 'static) -> Self {
2700 Self {
2701 inner,
2702 hook: std::sync::Mutex::new(Some(Box::new(hook))),
2703 }
2704 }
2705 }
2706
2707 crate::delegate_to_inner_impl! {
2708 impl ObjectStore for PostListHookStore {
2709 forward: get_to_file, get_bytes, get_bytes_range,
2710 put_bytes, put_path, put_if_absent,
2711 head, copy, delete;
2712
2713 async fn list(
2714 &self,
2715 prefix: &str,
2716 ) -> Result<Vec<crate::object_store::ObjectMeta>, ObjectStoreError> {
2717 let result = self.inner.list(prefix).await;
2718 if result.is_ok() {
2719 let hook = self.hook.lock().unwrap().take();
2720 if let Some(hook) = hook {
2721 hook(&self.inner);
2722 }
2723 }
2724 result
2725 }
2726 }
2727 }
2728
2729 #[tokio::test]
2730 async fn mark_packs_first_ordering_avoids_false_positive_under_concurrent_push() {
2731 // Issue #135 regression: a concurrent push that uploads a new
2732 // pack AND commits its chain.json between mark's two listings
2733 // must not be tombstoned as orphan.
2734 //
2735 // The hook fires after the FIRST list call against
2736 // `<prefix>/packs/` and inserts a new pack pair plus a
2737 // chain.json referencing it. With the fixed packs-first
2738 // ordering, the new pack is absent from the on-bucket snapshot
2739 // (the snapshot was already taken) and present in the
2740 // referenced set (the chain.json is committed before the
2741 // chain list runs). Either way, the new pack is NOT in the
2742 // orphan set.
2743 //
2744 // Pre-fix (chains-first ordering): the hook would fire after
2745 // the chain list, the chain commit would miss the chain
2746 // listing, and the pack would appear in `list_pack_shas` →
2747 // tombstoned as a false positive. The fix flips the ordering
2748 // so this test asserts orphan_count == 0.
2749 let inner = MockStore::new();
2750 // Seed an existing live chain + its pack so the test exercises
2751 // a realistic non-empty state.
2752 seed_live_chain(&inner, Some("repo")).await;
2753 insert_pack_pair(&inner, Some("repo"), SHA_PACK_LIVE);
2754
2755 let store = PostListHookStore::new(inner, |inner| {
2756 // Simulate the concurrent push landing mid-mark: upload a
2757 // fresh pack AND commit its chain.json BEFORE mark's
2758 // second listing runs.
2759 insert_pack_pair(inner, Some("repo"), SHA_PACK_ORPHAN);
2760 let new_chain = ChainManifest {
2761 v: 1,
2762 tip: sha40(SHA_TIP),
2763 full_at: sha40(SHA_FULL),
2764 segments: vec![segment(SHA_PACK_ORPHAN, None)],
2765 };
2766 // `write_chain` is async; the hook is sync, so insert
2767 // chain.json directly at the canonical key.
2768 let body =
2769 serde_json::to_vec_pretty(&new_chain).expect("chain.json serializes for the test");
2770 inner.insert("repo/refs/heads/concurrent/chain.json", Bytes::from(body));
2771 });
2772
2773 let outcome = mark(&store, "repo", MarkOpts::default()).await.unwrap();
2774 // The fresh pack must NOT be tombstoned: under packs-first
2775 // ordering it is either absent from `on_bucket` or present in
2776 // `referenced`.
2777 assert_eq!(
2778 outcome.orphan_count, 0,
2779 "packs-first ordering must not tombstone packs uploaded \
2780 during mark whose chain commits before the chain listing"
2781 );
2782 // No tombstone object emitted for an empty orphan set.
2783 let gc_metas = store.inner.list("repo/gc/").await.unwrap();
2784 assert!(gc_metas.is_empty(), "no tombstone for empty orphan set");
2785 }
2786
2787 // --- baseline-tombstone post-recheck race (issue #153) -----------
2788
2789 /// One-shot post-`get_bytes` hook used by [`PostGetHookStore`].
2790 type PostGetHook = Box<dyn FnOnce(&MockStore) + Send>;
2791
2792 /// Test-only [`ObjectStore`] decorator that runs a one-shot
2793 /// callback the first time `get_bytes()` succeeds on `trigger_key`,
2794 /// *after* the inner read completes. Used to deterministically
2795 /// simulate a concurrent force-push landing between the initial
2796 /// `load_chain` in `sweep_one_baseline_tombstone` and the
2797 /// immediate-pre-delete recheck added by issue #153.
2798 ///
2799 /// The `trigger_key` filter is exact-match so the hook fires only
2800 /// on the targeted chain.json read, not on the tombstone-body
2801 /// `get_bytes` that runs earlier in the same sweep.
2802 struct PostGetHookStore {
2803 inner: MockStore,
2804 hook: std::sync::Mutex<Option<PostGetHook>>,
2805 trigger_key: String,
2806 }
2807
2808 impl PostGetHookStore {
2809 fn new(
2810 inner: MockStore,
2811 trigger_key: impl Into<String>,
2812 hook: impl FnOnce(&MockStore) + Send + 'static,
2813 ) -> Self {
2814 Self {
2815 inner,
2816 hook: std::sync::Mutex::new(Some(Box::new(hook))),
2817 trigger_key: trigger_key.into(),
2818 }
2819 }
2820
2821 /// `true` once the hook has been consumed — used to witness
2822 /// that the production code reached the targeted read rather
2823 /// than skipping past it on a different branch.
2824 fn hook_fired(&self) -> bool {
2825 self.hook.lock().unwrap().is_none()
2826 }
2827 }
2828
2829 crate::delegate_to_inner_impl! {
2830 impl ObjectStore for PostGetHookStore {
2831 forward: list, get_to_file, get_bytes_range,
2832 put_bytes, put_path, put_if_absent,
2833 head, copy, delete;
2834
2835 async fn get_bytes(&self, key: &str) -> Result<Bytes, ObjectStoreError> {
2836 let result = self.inner.get_bytes(key).await;
2837 if result.is_ok()
2838 && key == self.trigger_key
2839 && let Some(hook) = self.hook.lock().unwrap().take()
2840 {
2841 hook(&self.inner);
2842 }
2843 result
2844 }
2845 }
2846 }
2847
2848 #[tokio::test]
2849 async fn sweep_baseline_defers_when_recheck_observes_re_baseline() {
2850 // Issue #153 regression: a concurrent force-push or compact may
2851 // re-baseline the ref to the tombstoned SHA between the initial
2852 // `load_chain` and the bundle delete that follows. Without the
2853 // immediate-pre-delete recheck, sweep would erase a now-live
2854 // bundle and then drop its tombstone.
2855 //
2856 // Layout: a baseline bundle at SHA_FULL, a stale baseline
2857 // tombstone naming SHA_FULL, and an initial chain.json with
2858 // `full_at = SHA_TIP` (different from SHA_FULL) so the initial
2859 // check sees `still_live = false` and proceeds toward the
2860 // delete. The PostGetHookStore fires AFTER the first read of
2861 // `chain.json` (the initial `load_chain`) and overwrites it
2862 // with a chain whose `full_at = SHA_FULL` — modelling the
2863 // concurrent force-push landing in the gap.
2864 //
2865 // With the fix in place: the immediate-pre-delete recheck
2866 // observes the new state, `still_live` flips to true, and
2867 // sweep returns `Deferred` — preserving BOTH the bundle and
2868 // the tombstone so a future sweep can retry.
2869 let inner = MockStore::new();
2870 let bundle_key = insert_baseline_bundle(&inner, Some("repo"), SHA_FULL);
2871 // Initial chain points at a different SHA, so the first
2872 // `still_live` check is false and the code falls into the
2873 // delete branch where the recheck now lives.
2874 let initial_chain = ChainManifest {
2875 v: 1,
2876 tip: sha40(SHA_TIP),
2877 full_at: sha40(SHA_TIP),
2878 segments: vec![segment(SHA_PACK_LIVE, None)],
2879 };
2880 write_chain(&inner, Some("repo"), &ref_main(), &initial_chain)
2881 .await
2882 .unwrap();
2883 let stale = (OffsetDateTime::now_utc() - time::Duration::hours(48))
2884 .format(&Rfc3339)
2885 .unwrap();
2886 let tomb_key = write_baseline_tombstone_at(&inner, "repo", &stale, SHA_FULL);
2887
2888 // Hook fires AFTER the FIRST get_bytes of chain.json (the
2889 // initial load_chain). Before the fix, that was the only
2890 // chain read; the bundle delete that followed would erase a
2891 // live bundle. After the fix, a second get_bytes (the
2892 // immediate-pre-delete recheck) sees this updated state.
2893 let chain_key = "repo/refs/heads/main/chain.json";
2894 let store = PostGetHookStore::new(inner, chain_key, move |inner| {
2895 let re_baselined = ChainManifest {
2896 v: 1,
2897 tip: sha40(SHA_TIP),
2898 full_at: sha40(SHA_FULL),
2899 segments: vec![segment(SHA_PACK_LIVE, None)],
2900 };
2901 let body = serde_json::to_vec_pretty(&re_baselined)
2902 .expect("chain.json serializes for the test");
2903 inner.insert(chain_key, Bytes::from(body));
2904 });
2905
2906 let outcome = sweep(&store, "repo", SweepOpts::default()).await.unwrap();
2907 // The recheck caught the race: tombstone deferred for a
2908 // future sweep.
2909 assert_eq!(
2910 outcome.deferred_tombstones, 1,
2911 "recheck must defer when the chain re-baselined to the \
2912 tombstoned SHA between checks",
2913 );
2914 assert_eq!(outcome.swept_tombstones, 0);
2915 assert_eq!(outcome.deleted_objects, 0);
2916 assert_eq!(outcome.skipped_repointed_packs, 0);
2917
2918 // Witness that the production code actually executed the
2919 // recheck branch (otherwise the hook would still be armed and
2920 // the test would be vacuously passing).
2921 assert!(
2922 store.hook_fired(),
2923 "production code must have read chain.json so the hook \
2924 could inject the concurrent re-baseline",
2925 );
2926
2927 // Bundle MUST survive — the whole point of the fix.
2928 store
2929 .inner
2930 .get_bytes(&bundle_key)
2931 .await
2932 .expect("re-baselined bundle must survive sweep");
2933 // Tombstone MUST survive — preserved for a future sweep to
2934 // retry once the race settles.
2935 store
2936 .inner
2937 .get_bytes(&tomb_key)
2938 .await
2939 .expect("tombstone must survive deferred path");
2940 }
2941}