Skip to main content

git_remote_object_store/manage/
branch.rs

1//! `delete-branch`, `protect`, `unprotect` subcommands.
2//!
3//! Each operation is anchored at `<prefix>/refs/heads/<branch>/`, the same
4//! key space the protocol REPL writes bundles into. When the URL has no
5//! repository prefix (root-of-bucket repos, `<prefix>` is empty), keys
6//! collapse to `refs/heads/<branch>/...` with no leading slash.
7//!
8//! All operator-visible output goes through a `Write`-bound writer (the
9//! `*_into` entry points) so tests can capture and assert on the
10//! messages. The public `delete()`, `protect()`, `unprotect()` methods
11//! wrap their `*_into` siblings with `std::io::stdout()` for the
12//! management CLI.
13
14use std::collections::HashSet;
15use std::io::Write;
16use std::sync::Arc;
17
18use bytes::Bytes;
19use time::OffsetDateTime;
20use tracing::{info, warn};
21
22use super::{ManageError, Prompter};
23use crate::git::RefName;
24use crate::keys;
25use crate::object_store::{ObjectMeta, ObjectStore, ObjectStoreError, PutOpts};
26use crate::packchain::gc::try_write_baseline_tombstone;
27use crate::protocol::push::{
28    LockGuard, acquire_lock, lock_key, lock_ttl_from_env, release_lock,
29    verify_no_orphan_protected_after_delete,
30};
31
32/// Operations on a single branch within a repository.
33pub struct ManageBranch<'a> {
34    store: Arc<dyn ObjectStore>,
35    prefix: String,
36    branch: String,
37    prompter: &'a dyn Prompter,
38}
39
40impl<'a> ManageBranch<'a> {
41    /// Open a branch handle, verifying it exists by listing
42    /// `<prefix>/refs/heads/<branch>/` (or `refs/heads/<branch>/` when
43    /// `prefix` is empty).
44    ///
45    /// # Errors
46    ///
47    /// Returns [`ManageError::InvalidBranch`] if `branch` fails
48    /// `gix-validate`'s strict ref-name check. Returns
49    /// [`ManageError::BranchNotFound`] when no objects exist under the
50    /// branch prefix. Returns [`ManageError::Store`] for object-store
51    /// failures.
52    pub async fn open(
53        store: Arc<dyn ObjectStore>,
54        prefix: impl Into<String>,
55        branch: impl Into<String>,
56        prompter: &'a dyn Prompter,
57    ) -> Result<Self, ManageError> {
58        let branch = branch.into();
59        // Reject branch names that git itself would reject. S3 / Azure
60        // are case-sensitive byte stores with no path semantics, so a
61        // value like `foo/../bar` would be stored verbatim and produce
62        // unrecoverable junk under `<prefix>/refs/heads/`. The strict
63        // `RefName::is_valid` (delegating to `gix_validate::reference::name`)
64        // rejects empties, `..`, control characters, and the rest of
65        // git's invalid-ref alphabet. Use the borrow-only predicate
66        // here so we don't allocate a wrapped `RefName` we'd discard.
67        if !RefName::is_valid(&format!("refs/heads/{branch}")) {
68            return Err(ManageError::InvalidBranch(branch));
69        }
70        let mb = Self {
71            store,
72            prefix: prefix.into(),
73            branch,
74            prompter,
75        };
76        if mb.store.list(&mb.branch_prefix()).await?.is_empty() {
77            return Err(ManageError::BranchNotFound(mb.branch));
78        }
79        Ok(mb)
80    }
81
82    fn branch_prefix(&self) -> String {
83        keys::ref_listing_prefix(Some(&self.prefix), &format!("refs/heads/{}", self.branch))
84    }
85
86    fn protected_key(&self) -> String {
87        keys::join(
88            Some(&self.prefix),
89            &format!(
90                "refs/heads/{}/{}",
91                self.branch,
92                keys::PROTECTED_MARKER_SEGMENT,
93            ),
94        )
95    }
96
97    /// Delete every object under the branch's prefix after a `yes/no`
98    /// confirmation. Aborts (returns `Ok(())`) if the user answers no;
99    /// the `Cancelled` variant is reserved for prompt I/O failures.
100    ///
101    /// Refuses outright when a `PROTECTED#` marker is present under the
102    /// branch prefix — the operator must run `unprotect` first. This
103    /// mirrors the refusal the helper-protocol delete path
104    /// (`delete_remote_ref_under_lock`) emits, so a `git push :branch`
105    /// against a protected ref and a management-CLI `delete-branch` of
106    /// the same ref fail the same way.
107    ///
108    /// # Per-ref lock (#158)
109    ///
110    /// After the operator confirms the prompt, `delete-branch` acquires
111    /// the same `<prefix>/<ref>/LOCK#.lock` the helper-protocol push
112    /// and delete paths take. The lock is held across the fresh re-list,
113    /// the baseline tombstone write (#143), and the synchronous sweep.
114    /// Without it a concurrent `git push` could land a new bundle after
115    /// the post-prompt re-list, the sweep would delete only the stale
116    /// snapshot, and the ref would survive with the just-pushed bundle
117    /// even though delete-branch reported success.
118    ///
119    /// Lock acquisition runs AFTER the prompt — the prompt is
120    /// interactive and could block indefinitely, and holding the lock
121    /// across user input would make every other writer wait on the
122    /// operator's keyboard. If the lock is contended at acquisition
123    /// time the function returns [`ManageError::LockContended`] and
124    /// makes no changes. Release failures are downgraded to a `warn!`
125    /// because the lock's TTL guarantees a stale lock is recovered by
126    /// the next acquirer; matches the protocol-push pattern.
127    ///
128    /// The prompt-display and protection-marker check use a first listing
129    /// for accuracy of the displayed object count, then a **second
130    /// listing is taken under the lock immediately before the deletion
131    /// loop**. The fresh listing drives the sweep so that any concurrent
132    /// push landing under the branch prefix during the prompt window —
133    /// before the lock window opens — is caught and deleted rather than
134    /// left as a zombie object (#139). The protection-marker check is
135    /// re-evaluated on the fresh listing so a `protect` racing with the
136    /// prompt is honoured (#131) — the post-prompt re-check is what
137    /// closes the TOCTOU window between the initial marker check and
138    /// the deletion loop. If the fresh listing is empty (a concurrent
139    /// delete won the race) the function reports it and returns
140    /// `Ok(())` rather than silently claiming success.
141    ///
142    /// `NotFound` errors observed during the sweep are tolerated — they
143    /// mean a concurrent deleter swept the key first, which still
144    /// satisfies the operator's intent. Other per-key delete errors
145    /// (Network, `AccessDenied`, ...) are collected: the loop does NOT
146    /// short-circuit, every remaining key is still attempted, and the
147    /// function returns [`ManageError::PartialDelete`] with the exact
148    /// list of keys that survived so a retry can converge (#122). A
149    /// list-call failure still propagates immediately because there is
150    /// nothing to recover — without a listing the sweep cannot proceed.
151    ///
152    /// Packchain refs with a parseable `chain.json` skip immediate
153    /// deletion of the baseline bundle (`<full_at>.bundle`): a
154    /// baseline tombstone is written first and the bundle is left for
155    /// `gc sweep` to reclaim after the grace window (#143). The
156    /// synchronous sweep still removes `chain.json`,
157    /// `path-index.json`, and any other residue. The deferral protects
158    /// an in-flight fetcher that already read the prior `chain.json`
159    /// from a `BaselineMissing` range-GET failure; a fresh reader
160    /// sees the missing chain and the ref is gone from its
161    /// perspective. Bundle-engine refs, refs with an unparseable
162    /// chain, and any tombstone PUT failure fall through to immediate
163    /// bundle deletion so the operator's "ref is gone" intent is
164    /// never blocked on the tombstone path.
165    ///
166    /// # Errors
167    ///
168    /// Returns [`ManageError::Protected`] if the branch carries a
169    /// `PROTECTED#` marker (checked on both listings),
170    /// [`ManageError::LockContended`] if another writer holds the
171    /// per-ref lock at acquisition time,
172    /// [`ManageError::Cancelled`] if the user cancels the prompt,
173    /// [`ManageError::Io`] for prompt or write I/O failures,
174    /// [`ManageError::Store`] if a list operation fails, or
175    /// [`ManageError::PartialDelete`] when one or more per-key deletes
176    /// fail with a non-`NotFound` error after every key in the fresh
177    /// listing has been attempted.
178    pub async fn delete(&self) -> Result<(), ManageError> {
179        self.delete_into(&mut std::io::stdout()).await
180    }
181
182    /// Same contract as [`delete`](Self::delete) but writes human-readable
183    /// output to `out`. Tests use this to capture the operator messages
184    /// (e.g. the "already gone" race notice from #139) so a regression
185    /// that drops the message — silently turning a concurrent race into
186    /// an apparent success — is caught.
187    ///
188    /// # Errors
189    ///
190    /// Same as [`delete`](Self::delete), plus [`ManageError::Io`] if a
191    /// write to `out` fails.
192    pub(crate) async fn delete_into<W: Write>(&self, out: &mut W) -> Result<(), ManageError> {
193        let listing_prefix = self.branch_prefix();
194        let initial = self.store.list(&listing_prefix).await?;
195        if keys::entries_have_protected_marker(&initial) {
196            return Err(ManageError::Protected(self.branch.clone()));
197        }
198        let prompt = format!("Delete branch {} ({} objects)?", self.branch, initial.len());
199        if !self.prompter.confirm(&prompt)? {
200            writeln!(out, "Aborted")?;
201            return Ok(());
202        }
203
204        // Acquire the per-ref lock AFTER the prompt and BEFORE the
205        // fresh re-list / tombstone / sweep. Holding the lock across
206        // the prompt would block every concurrent writer on the
207        // operator's keyboard; the lock window starts only once the
208        // operator has confirmed the intent (#158). The protocol push
209        // / delete paths and `packchain::compact` use the same lock
210        // key, so a concurrent `git push` or `compact` racing this
211        // delete is mutually excluded.
212        let ref_name = self.validated_ref_name()?;
213        let (lock_key, guard) = self.acquire_ref_lock("delete-branch").await?;
214        let work = self
215            .delete_under_lock(out, &listing_prefix, &lock_key, &initial, &ref_name)
216            .await;
217        self.release_or_warn(guard, &lock_key, "delete-branch")
218            .await;
219        work
220    }
221
222    /// The lock-held body of [`Self::delete_into`]: fresh re-list,
223    /// protection re-check, tombstone write, sweep. Extracted so the
224    /// caller's `release_lock` runs unconditionally on every exit
225    /// path. The lock key is filtered from the fresh listing so the
226    /// sweep does not delete the very lock we hold.
227    async fn delete_under_lock<W: Write>(
228        &self,
229        out: &mut W,
230        listing_prefix: &str,
231        lock: &str,
232        initial: &[ObjectMeta],
233        ref_name: &RefName,
234    ) -> Result<(), ManageError> {
235        // Re-list under the lock so concurrent pushes that landed
236        // during the prompt window — before the lock window opened —
237        // are included in the deletion set. With the lock now held,
238        // no further writes can sneak in between this listing and the
239        // sweep. Filter out the lock key itself: we hold it and the
240        // release tail removes it; sweeping it mid-critical-section
241        // would let another acquirer take the lock under us.
242        let fresh: Vec<ObjectMeta> = self
243            .store
244            .list(listing_prefix)
245            .await?
246            .into_iter()
247            .filter(|m| m.key != lock)
248            .collect();
249        if fresh.is_empty() {
250            writeln!(
251                out,
252                "Branch {} is already gone (concurrent delete during prompt); nothing to do",
253                self.branch,
254            )?;
255            info!(
256                branch = %self.branch,
257                "branch already deleted by concurrent operation",
258            );
259            return Ok(());
260        }
261        if keys::entries_have_protected_marker(&fresh) {
262            return Err(ManageError::Protected(self.branch.clone()));
263        }
264
265        let initial_keys: HashSet<&str> = initial.iter().map(|m| m.key.as_str()).collect();
266        let concurrent_adds = fresh
267            .iter()
268            .filter(|m| !initial_keys.contains(m.key.as_str()))
269            .count();
270        if concurrent_adds > 0 {
271            warn!(
272                branch = %self.branch,
273                added = concurrent_adds,
274                "concurrent activity detected during prompt; sweeping fresh listing",
275            );
276        }
277
278        // Issue #143: if the ref is a packchain ref with a parseable
279        // `chain.json`, write a baseline tombstone naming the current
280        // `full_at` bundle BEFORE the synchronous sweep, then exclude
281        // that bundle key from the delete loop. A concurrent fetcher
282        // that read the prior `chain.json` (t₀) and is mid-range-GET
283        // on `<full_at>.bundle` then completes against the still-live
284        // bundle; `gc sweep` reclaims it after the grace window. The
285        // synchronous sweep still removes `chain.json`,
286        // `path-index.json`, and every other key — from a fresh
287        // reader's perspective the ref is gone the moment those
288        // commit. Bundle-engine refs (no `chain.json`) and refs with
289        // an unparseable chain fall through to the immediate-delete
290        // path: there is nothing for `sweep` to reconcile against, so
291        // deferral would just orphan the bundle.
292        //
293        // The tombstone write runs UNDER the lock (#158): a concurrent
294        // push that landed between the tombstone and the chain.json
295        // delete would otherwise leave the bucket with a tombstone
296        // referencing a SHA no longer in the chain, and `gc sweep`
297        // would reclaim a live bundle.
298        let deferred_bundle_key = self.try_tombstone_baseline(&fresh).await;
299        if let Some(ref key) = deferred_bundle_key {
300            info!(
301                branch = %self.branch,
302                key = %key,
303                "delete-branch: deferred baseline bundle delete via tombstone",
304            );
305        }
306
307        // Collect, don't short-circuit: a transient failure on key #2
308        // of a 4-key listing must not leave #3 and #4 standing with no
309        // inventory of what survived. NotFound continues to be tolerated
310        // (the key is gone — operator intent satisfied). Every other
311        // per-key error is logged and recorded; at the end we either
312        // declare full success or return PartialDelete naming every
313        // surviving key so a retry can converge (#122).
314        let mut undeleted: Vec<String> = Vec::new();
315        for object in &fresh {
316            // The baseline bundle (if any) is left for `gc sweep` —
317            // see the tombstone block above. Other keys (chain.json,
318            // path-index.json, PROTECTED# is already refused earlier)
319            // are deleted synchronously. The lock key was filtered
320            // from `fresh` above, so it is not in the iteration.
321            if deferred_bundle_key.as_deref() == Some(object.key.as_str()) {
322                continue;
323            }
324            match self.store.delete(&object.key).await {
325                Ok(()) | Err(ObjectStoreError::NotFound(_)) => {}
326                Err(err) => {
327                    warn!(
328                        branch = %self.branch,
329                        key = %object.key,
330                        error = %err,
331                        "delete-branch: per-key delete failed; continuing sweep",
332                    );
333                    undeleted.push(object.key.clone());
334                }
335            }
336        }
337        // attempted excludes the deferred bundle (if any): that key was
338        // intentionally skipped via tombstone, not "attempted and missing"
339        // — the operator-facing count must reflect what was swept, not
340        // what was listed.
341        let attempted = fresh.len() - usize::from(deferred_bundle_key.is_some());
342        if !undeleted.is_empty() {
343            return Err(ManageError::PartialDelete {
344                branch: self.branch.clone(),
345                undeleted,
346                attempted,
347            });
348        }
349        // Issue #151 defence-in-depth: post-sweep, with the lock still
350        // held, confirm no `PROTECTED#` marker is present for this ref.
351        // The primary defence is the per-ref lock — `protect` /
352        // `unprotect` both acquire the same `<prefix>/<ref>/LOCK#.lock`
353        // per #159, so a marker cannot land between the under-lock
354        // listing and the sweep. This `head` probe is belt-and-suspenders
355        // surveillance: an orphan marker observed here would indicate a
356        // contract violation (lock bypass, bucket inconsistency, or
357        // misbehaving sibling tool). The helper logs at `error!` and
358        // does NOT change the delete's success outcome — the branch's
359        // bundle artefacts are gone, so the operator's intent stands.
360        verify_no_orphan_protected_after_delete(self.store.as_ref(), self.prefix_opt(), ref_name)
361            .await;
362        writeln!(out, "Branch {} has been deleted", self.branch)?;
363        info!(branch = %self.branch, count = attempted, "branch deleted");
364        Ok(())
365    }
366
367    /// Build a validated `RefName` for `refs/heads/<branch>`. `open`
368    /// already accepted this value, so the parse is effectively
369    /// infallible — but we surface a parse failure as
370    /// [`ManageError::InvalidBranch`] rather than panicking so a
371    /// future loosening of `open`'s validator cannot turn delete-branch
372    /// into a panic surface.
373    fn validated_ref_name(&self) -> Result<RefName, ManageError> {
374        RefName::new(format!("refs/heads/{}", self.branch))
375            .map_err(|_| ManageError::InvalidBranch(self.branch.clone()))
376    }
377
378    /// Returns `Some(&prefix)` when a non-empty bucket prefix is
379    /// configured, `None` for root-prefixed buckets. Centralises the
380    /// `(!self.prefix.is_empty()).then_some(self.prefix.as_str())`
381    /// pattern previously duplicated across delete and tombstone paths.
382    fn prefix_opt(&self) -> Option<&str> {
383        (!self.prefix.is_empty()).then_some(self.prefix.as_str())
384    }
385
386    /// Attempt to tombstone the baseline bundle for a packchain ref so
387    /// the synchronous delete loop can skip it (issue #143). Returns
388    /// the bundle key that was deferred, or `None` if no deferral is
389    /// possible. Thin caller-side wrapper that resolves `&self`'s
390    /// prefix / ref-name and delegates to the shared
391    /// [`try_write_baseline_tombstone`] helper for the actual
392    /// load-chain / listing-check / tombstone-write logic (#221).
393    async fn try_tombstone_baseline(
394        &self,
395        fresh: &[crate::object_store::ObjectMeta],
396    ) -> Option<String> {
397        // `RefName::new` re-runs the same `gix-validate` check `open`
398        // already accepted, so this is effectively infallible. Surface
399        // a parse failure as "no tombstone" rather than panicking — a
400        // future loosening of `open`'s validator must not make
401        // delete-branch unsafe.
402        let ref_name = RefName::new(format!("refs/heads/{}", self.branch)).ok()?;
403        try_write_baseline_tombstone(
404            self.store.as_ref(),
405            self.prefix_opt(),
406            &ref_name,
407            fresh,
408            "delete-branch",
409        )
410        .await
411    }
412
413    /// Mark the branch as protected by writing the `PROTECTED#` sentinel.
414    /// Idempotent — overwrites any existing marker.
415    ///
416    /// # Per-ref lock (#159)
417    ///
418    /// `protect` acquires the same `<prefix>/<ref>/LOCK#.lock` the
419    /// helper-protocol push, helper-protocol delete, and `delete-branch`
420    /// take. Pre-#159, the push path's pre-bundle `is_protected` check
421    /// could race a concurrent `protect`: a force-push that observed no
422    /// marker would still overwrite the bundle even if `protect` landed
423    /// between the under-lock `is_protected` and the bundle upload —
424    /// because `protect` was a lockless `put_bytes`. Taking the same
425    /// lock serialises protection state changes against the writers
426    /// that consult it, closing the entire write window rather than
427    /// narrowing it to a second sample.
428    ///
429    /// If the lock is contended (a push, delete, or compact holds it),
430    /// `protect` returns [`ManageError::LockContended`] and makes no
431    /// changes. Operators can retry. Stale-lock recovery is inherited
432    /// from `acquire_lock` (a previous holder that crashed without
433    /// releasing).
434    ///
435    /// Re-lists the branch prefix under the lock so a concurrent
436    /// `delete-branch` (or last-bundle removal) that landed between
437    /// [`ManageBranch::open`] and the lock window is caught and the
438    /// marker is NOT written for a non-existent branch (#137). Without
439    /// this re-check the orphaned `PROTECTED#` would persist with no
440    /// automated cleanup and would silently block a future recreation
441    /// of the same branch from being force-pushed or deleted. The
442    /// re-listing filters out stale lock keys and any pre-existing
443    /// `PROTECTED#` marker so a branch whose only residue is operational
444    /// metadata is treated as gone.
445    ///
446    /// # Errors
447    ///
448    /// Returns [`ManageError::BranchNotFound`] if the under-lock listing
449    /// shows the branch was deleted concurrently. Returns
450    /// [`ManageError::LockContended`] if another writer holds the
451    /// per-ref lock at acquisition time. Returns [`ManageError::Store`]
452    /// if a list or put operation fails.
453    pub async fn protect(&self) -> Result<(), ManageError> {
454        self.protect_into(&mut std::io::stdout()).await
455    }
456
457    /// Writer-injecting variant of [`Self::protect`] so tests can
458    /// capture the "now protected" operator message. Mirrors the
459    /// pattern established by [`Self::delete_into`] (#145) and the
460    /// management CLI's other writer-aware entry points.
461    ///
462    /// # Errors
463    ///
464    /// Same as [`Self::protect`], plus [`ManageError::Io`] if a write
465    /// to `out` fails.
466    pub(crate) async fn protect_into<W: Write>(&self, out: &mut W) -> Result<(), ManageError> {
467        let (lock_key, guard) = self.acquire_ref_lock("protect").await?;
468        let work = self.protect_under_lock(out).await;
469        self.release_or_warn(guard, &lock_key, "protect").await;
470        work
471    }
472
473    /// Lock-held body of [`Self::protect_into`]: re-list under the
474    /// lock, reject if the branch has been deleted concurrently,
475    /// otherwise write the `PROTECTED#` sentinel. Extracted so the
476    /// acquire/release tail in `protect_into` runs unconditionally on
477    /// every exit path — including the `BranchNotFound` early return.
478    async fn protect_under_lock<W: Write>(&self, out: &mut W) -> Result<(), ManageError> {
479        let fresh = self.store.list(&self.branch_prefix()).await?;
480        if !super::has_branch_data(&fresh) {
481            warn!(
482                branch = %self.branch,
483                "branch was deleted concurrently between open and protect; refusing to write orphaned marker",
484            );
485            return Err(ManageError::BranchNotFound(self.branch.clone()));
486        }
487        self.store
488            .put_bytes(&self.protected_key(), Bytes::new(), PutOpts::default())
489            .await?;
490        writeln!(out, "Branch {} is now protected", self.branch)?;
491        Ok(())
492    }
493
494    /// Remove the `PROTECTED#` sentinel. A missing marker is treated as
495    /// already-unprotected rather than an error.
496    ///
497    /// # Per-ref lock (#159)
498    ///
499    /// `unprotect` acquires the same per-ref lock as [`Self::protect`]
500    /// so ALL protection state changes serialise against pushes,
501    /// deletes, and compactions. Without taking the lock here a
502    /// concurrent push observing `is_protected() == true` could
503    /// otherwise commit to the protected refusal path just as
504    /// `unprotect` landed, leaving the writer's behaviour out of step
505    /// with operator intent. Symmetry with `protect` keeps the lock the
506    /// single point of serialisation for protection state.
507    ///
508    /// # Errors
509    ///
510    /// Returns [`ManageError::LockContended`] if another writer holds
511    /// the per-ref lock at acquisition time. Returns
512    /// [`ManageError::Store`] for object-store failures other than
513    /// `NotFound`.
514    pub async fn unprotect(&self) -> Result<(), ManageError> {
515        self.unprotect_into(&mut std::io::stdout()).await
516    }
517
518    /// Writer-injecting variant of [`Self::unprotect`] so tests can
519    /// capture the "now unprotected" operator message.
520    ///
521    /// # Errors
522    ///
523    /// Same as [`Self::unprotect`], plus [`ManageError::Io`] if a
524    /// write to `out` fails.
525    pub(crate) async fn unprotect_into<W: Write>(&self, out: &mut W) -> Result<(), ManageError> {
526        let (lock_key, guard) = self.acquire_ref_lock("unprotect").await?;
527        let work = self.unprotect_under_lock(out).await;
528        self.release_or_warn(guard, &lock_key, "unprotect").await;
529        work
530    }
531
532    /// Lock-held body of [`Self::unprotect_into`]: delete the
533    /// `PROTECTED#` marker, treating `NotFound` as
534    /// already-unprotected. The lock scope is mechanical (no listing
535    /// or recovery work needed); we still hold it so a concurrent
536    /// `protect` cannot land between here and the delete and leave
537    /// the operator's "unprotect" intent silently overridden.
538    async fn unprotect_under_lock<W: Write>(&self, out: &mut W) -> Result<(), ManageError> {
539        match self.store.delete(&self.protected_key()).await {
540            Ok(()) | Err(ObjectStoreError::NotFound(_)) => {
541                writeln!(out, "Branch {} is now unprotected", self.branch)?;
542                Ok(())
543            }
544            Err(other) => Err(other.into()),
545        }
546    }
547
548    /// Acquire the per-ref lock for `op` (`delete-branch`, `protect`,
549    /// `unprotect`, or any future ref-mutating caller). Returns the
550    /// resolved lock object-store key alongside the guard so the
551    /// matching `release_or_warn` tail can name the key in its log
552    /// line without re-deriving it.
553    ///
554    /// Contention surfaces as [`ManageError::LockContended`] with the
555    /// branch name, lock key, and current TTL — matching the wording
556    /// `delete-branch` (#158) uses so operators see one shape of error
557    /// across the management surface.
558    async fn acquire_ref_lock(&self, op: &'static str) -> Result<(String, LockGuard), ManageError> {
559        let ref_name = self.validated_ref_name()?;
560        let prefix_opt = self.prefix_opt();
561        let lock_key = lock_key(prefix_opt, &ref_name);
562        let ttl = lock_ttl_from_env();
563        let now = OffsetDateTime::now_utc();
564        let Some(guard) = acquire_lock(Arc::clone(&self.store), &lock_key, ttl, now).await? else {
565            warn!(
566                branch = %self.branch,
567                op = op,
568                key = %lock_key,
569                "{op}: per-ref lock is held by another writer; refusing to race",
570            );
571            return Err(ManageError::LockContended {
572                branch: self.branch.clone(),
573                lock: lock_key,
574                ttl_seconds: ttl.whole_seconds(),
575            });
576        };
577        Ok((lock_key, guard))
578    }
579
580    /// Release a previously acquired lock, downgrading release failures
581    /// to a `warn!` so the caller's primary error (or success) is what
582    /// surfaces. The lock's TTL recovers a leaked key on the next
583    /// acquirer (#150), so the worst case is a delayed retry rather
584    /// than a permanently stuck ref.
585    async fn release_or_warn(&self, guard: LockGuard, lock_key: &str, op: &'static str) {
586        if let Err(e) = release_lock(guard).await {
587            warn!(
588                branch = %self.branch,
589                op = op,
590                key = %lock_key,
591                error = %e,
592                "{op}: failed to release per-ref lock; will age out by TTL",
593            );
594        }
595    }
596}
597
598#[cfg(test)]
599mod tests {
600    use super::*;
601    use crate::manage::{Prompter, ScriptedPrompter, scripted::Answer};
602    use crate::object_store::mock::MockStore;
603    use crate::packchain::gc::baseline_tombstone_listing_prefix;
604    use bytes::Bytes;
605
606    fn seed_with_branch(branch: &str) -> MockStore {
607        let mock = MockStore::new();
608        mock.insert(
609            format!("myrepo/refs/heads/{branch}/abc.bundle"),
610            Bytes::from("body"),
611        );
612        mock
613    }
614
615    #[tokio::test]
616    async fn open_returns_branch_not_found_when_empty() {
617        let mock = MockStore::new();
618        let store: Arc<dyn ObjectStore> = Arc::new(mock);
619        let prompter = ScriptedPrompter::new([]);
620        match ManageBranch::open(store, "myrepo", "missing", &prompter).await {
621            Err(ManageError::BranchNotFound(name)) => assert_eq!(name, "missing"),
622            Err(other) => panic!("expected BranchNotFound, got {other:?}"),
623            Ok(_) => panic!("expected open to fail"),
624        }
625    }
626
627    #[tokio::test]
628    async fn delete_removes_every_key_when_confirmed() {
629        // No PROTECTED# marker — only the bundle. A confirmed delete
630        // must clear it AND release the per-ref lock it acquires
631        // (#158), leaving the bucket empty.
632        let mock = seed_with_branch("main");
633        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
634        let prompter = ScriptedPrompter::new([Answer::Confirm(true)]);
635
636        let mb = ManageBranch::open(store, "myrepo", "main", &prompter as &dyn Prompter)
637            .await
638            .expect("open");
639        mb.delete().await.expect("delete");
640        assert!(
641            mock.keys().is_empty(),
642            "all keys removed (including the LOCK#.lock that delete-branch acquired and released): {:?}",
643            mock.keys()
644        );
645        assert_eq!(prompter.remaining(), 0);
646    }
647
648    #[tokio::test]
649    async fn delete_refuses_when_protected_marker_present() {
650        // `protect` then `delete-branch` must refuse — same wording the
651        // helper-protocol delete path emits. The prompt is never reached,
652        // so the script queues no answer; the marker and bundle survive.
653        let mock = seed_with_branch("main");
654        mock.insert("myrepo/refs/heads/main/PROTECTED#", Bytes::new());
655        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
656        let prompter = ScriptedPrompter::new([]);
657
658        let mb = ManageBranch::open(store, "myrepo", "main", &prompter as &dyn Prompter)
659            .await
660            .expect("open");
661        let err = mb
662            .delete()
663            .await
664            .expect_err("delete must refuse when PROTECTED# is present");
665        match &err {
666            ManageError::Protected(name) => assert_eq!(name, "main"),
667            other => panic!("expected ManageError::Protected, got {other:?}"),
668        }
669        assert!(
670            err.to_string()
671                .contains("git-remote-object-store unprotect"),
672            "error message must point at unprotect, got: {err}",
673        );
674        assert!(mock.contains("myrepo/refs/heads/main/PROTECTED#"));
675        assert!(mock.contains("myrepo/refs/heads/main/abc.bundle"));
676        // Prompt must not have been consumed.
677        assert_eq!(prompter.remaining(), 0);
678    }
679
680    #[tokio::test]
681    async fn delete_succeeds_after_unprotect_clears_marker() {
682        // Protect, then unprotect, then delete — the canonical recovery
683        // path. The final delete must remove every remaining key.
684        let mock = seed_with_branch("main");
685        mock.insert("myrepo/refs/heads/main/PROTECTED#", Bytes::new());
686        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
687        let prompter = ScriptedPrompter::new([Answer::Confirm(true)]);
688
689        let mb = ManageBranch::open(store, "myrepo", "main", &prompter as &dyn Prompter)
690            .await
691            .expect("open");
692        mb.unprotect().await.expect("unprotect");
693        mb.delete().await.expect("delete after unprotect");
694        assert!(
695            mock.keys().is_empty(),
696            "all keys removed after unprotect+delete: {:?}",
697            mock.keys()
698        );
699    }
700
701    #[tokio::test]
702    async fn delete_no_keeps_keys() {
703        let mock = seed_with_branch("main");
704        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
705        let prompter = ScriptedPrompter::new([Answer::Confirm(false)]);
706
707        let mb = ManageBranch::open(store, "myrepo", "main", &prompter as &dyn Prompter)
708            .await
709            .expect("open");
710        mb.delete().await.expect("delete (aborted)");
711        assert_eq!(mock.keys().len(), 1, "branch still present");
712    }
713
714    #[tokio::test]
715    async fn protect_creates_marker_idempotent() {
716        let mock = seed_with_branch("main");
717        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
718        let prompter = ScriptedPrompter::new([]);
719
720        let mb = ManageBranch::open(store, "myrepo", "main", &prompter as &dyn Prompter)
721            .await
722            .expect("open");
723        mb.protect().await.expect("protect");
724        assert!(mock.contains("myrepo/refs/heads/main/PROTECTED#"));
725        // Second call overwrites without error.
726        mb.protect().await.expect("protect again");
727        assert!(mock.contains("myrepo/refs/heads/main/PROTECTED#"));
728    }
729
730    #[tokio::test]
731    async fn protect_refuses_when_branch_deleted_between_open_and_protect() {
732        // Issue #137: TOCTOU between `open` (which lists to verify the
733        // branch exists) and `protect` (which writes the marker). A
734        // concurrent `delete-branch` or last-bundle removal lands
735        // between the two calls. Pre-fix, `protect` wrote a marker for
736        // a non-existent branch — an orphaned `PROTECTED#` that never
737        // gets cleaned up and silently blocks a future recreation of
738        // the same branch. The fix re-lists immediately before the put
739        // and refuses with BranchNotFound if the branch is gone.
740        let mock = seed_with_branch("main");
741        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
742        let prompter = ScriptedPrompter::new([]);
743
744        let mb = ManageBranch::open(store, "myrepo", "main", &prompter as &dyn Prompter)
745            .await
746            .expect("open");
747        // Simulate a concurrent delete sweeping every key under the
748        // branch prefix after `open` returned but before `protect` runs.
749        for key in mock.keys() {
750            if key.starts_with("myrepo/refs/heads/main/") {
751                let _ = mock.remove_key(&key);
752            }
753        }
754        let err = mb
755            .protect()
756            .await
757            .expect_err("protect must refuse against a concurrently-deleted branch");
758        match &err {
759            ManageError::BranchNotFound(name) => assert_eq!(name, "main"),
760            other => panic!("expected BranchNotFound, got {other:?}"),
761        }
762        // The orphaned marker must NOT have been written — that is the
763        // exact regression #137 fixes.
764        assert!(
765            !mock.contains("myrepo/refs/heads/main/PROTECTED#"),
766            "orphaned PROTECTED# must not be written when branch is gone",
767        );
768        assert!(
769            mock.keys().is_empty(),
770            "store remains empty: {:?}",
771            mock.keys()
772        );
773    }
774
775    #[tokio::test]
776    async fn protect_refuses_when_only_stale_lock_key_remains() {
777        // A `LOCK#.lock` key is operational metadata, not branch data.
778        // Treating a lock-only listing as "branch exists" would let a
779        // `protect` write a marker for a branch that has no bundles —
780        // the same orphan-marker pathology #137 describes, just with a
781        // lock as the misleading residue instead of an empty listing.
782        //
783        // The lock is seeded stale (older than TTL) so #159's lock
784        // acquisition recovers it rather than reporting contention —
785        // otherwise we would assert the wrong error. The data-presence
786        // re-check then runs and refuses the orphan write.
787        let mock = MockStore::new();
788        mock.insert("myrepo/refs/heads/main/abc.bundle", Bytes::from("body"));
789        let stale = OffsetDateTime::now_utc() - time::Duration::days(1);
790        mock.insert_with(
791            "myrepo/refs/heads/main/LOCK#.lock",
792            Bytes::new(),
793            stale,
794            PutOpts::default(),
795        );
796        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
797        let prompter = ScriptedPrompter::new([]);
798
799        let mb = ManageBranch::open(store, "myrepo", "main", &prompter as &dyn Prompter)
800            .await
801            .expect("open");
802        // Concurrent push-then-delete leaves only the lock behind.
803        let _ = mock.remove_key("myrepo/refs/heads/main/abc.bundle");
804        let err = mb
805            .protect()
806            .await
807            .expect_err("protect must refuse when only a lock key remains");
808        assert!(
809            matches!(err, ManageError::BranchNotFound(ref name) if name == "main"),
810            "expected BranchNotFound, got {err:?}",
811        );
812        assert!(!mock.contains("myrepo/refs/heads/main/PROTECTED#"));
813        // protect must recover the stale lock AND release the fresh one
814        // it acquired. A regression that leaked the lock would still
815        // pass the BranchNotFound assertion above.
816        assert!(
817            !mock.contains("myrepo/refs/heads/main/LOCK#.lock"),
818            "stale lock must be recovered and the acquired lock released",
819        );
820    }
821
822    #[tokio::test]
823    async fn protect_remains_idempotent_when_marker_already_present() {
824        // The pre-existing marker plus a real bundle means the branch
825        // is alive. `protect` must still succeed (idempotent overwrite)
826        // — the data-presence check must not regress to "any marker
827        // means orphan" and refuse a legitimate re-protect.
828        let mock = seed_with_branch("main");
829        mock.insert("myrepo/refs/heads/main/PROTECTED#", Bytes::new());
830        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
831        let prompter = ScriptedPrompter::new([]);
832
833        let mb = ManageBranch::open(store, "myrepo", "main", &prompter as &dyn Prompter)
834            .await
835            .expect("open");
836        mb.protect()
837            .await
838            .expect("protect must remain idempotent over an existing marker");
839        assert!(mock.contains("myrepo/refs/heads/main/PROTECTED#"));
840        assert!(mock.contains("myrepo/refs/heads/main/abc.bundle"));
841    }
842
843    #[tokio::test]
844    async fn protect_into_writes_operator_message_to_writer() {
845        // Mirror the delete_into pattern (#145): the writer-injecting
846        // variant must emit the operator-visible message through `out`,
847        // not via stdout. A regression that dropped the message — or
848        // emitted it on stdout instead of the writer — would slip past
849        // any test calling `protect()` because that wraps stdout.
850        let mock = seed_with_branch("main");
851        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
852        let prompter = ScriptedPrompter::new([]);
853        let mb = ManageBranch::open(store, "myrepo", "main", &prompter as &dyn Prompter)
854            .await
855            .expect("open");
856        let mut out: Vec<u8> = Vec::new();
857        mb.protect_into(&mut out).await.expect("protect_into");
858        let captured = String::from_utf8(out).expect("utf8");
859        assert!(
860            captured.contains("Branch main is now protected"),
861            "protect_into must emit the operator message; got: {captured:?}",
862        );
863    }
864
865    #[tokio::test]
866    async fn unprotect_into_writes_operator_message_to_writer() {
867        let mock = seed_with_branch("main");
868        mock.insert("myrepo/refs/heads/main/PROTECTED#", Bytes::new());
869        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
870        let prompter = ScriptedPrompter::new([]);
871        let mb = ManageBranch::open(store, "myrepo", "main", &prompter as &dyn Prompter)
872            .await
873            .expect("open");
874        let mut out: Vec<u8> = Vec::new();
875        mb.unprotect_into(&mut out).await.expect("unprotect_into");
876        let captured = String::from_utf8(out).expect("utf8");
877        assert!(
878            captured.contains("Branch main is now unprotected"),
879            "unprotect_into must emit the operator message; got: {captured:?}",
880        );
881    }
882
883    #[tokio::test]
884    async fn unprotect_deletes_marker_when_present() {
885        let mock = seed_with_branch("main");
886        mock.insert("myrepo/refs/heads/main/PROTECTED#", Bytes::new());
887        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
888        let prompter = ScriptedPrompter::new([]);
889
890        let mb = ManageBranch::open(store, "myrepo", "main", &prompter as &dyn Prompter)
891            .await
892            .expect("open");
893        mb.unprotect().await.expect("unprotect");
894        assert!(!mock.contains("myrepo/refs/heads/main/PROTECTED#"));
895    }
896
897    #[tokio::test]
898    async fn unprotect_idempotent_when_marker_absent() {
899        let mock = seed_with_branch("main");
900        let store: Arc<dyn ObjectStore> = Arc::new(mock);
901        let prompter = ScriptedPrompter::new([]);
902
903        let mb = ManageBranch::open(store, "myrepo", "main", &prompter as &dyn Prompter)
904            .await
905            .expect("open");
906        mb.unprotect()
907            .await
908            .expect("unprotect should be idempotent");
909    }
910
911    #[tokio::test]
912    async fn open_rejects_invalid_branch_name() {
913        // Attempting `delete-branch foo/../bar` would otherwise build
914        // literal `<prefix>/refs/heads/foo/../bar/...` keys on S3.
915        let mock = MockStore::new();
916        let store: Arc<dyn ObjectStore> = Arc::new(mock);
917        let prompter = ScriptedPrompter::new([]);
918        match ManageBranch::open(store, "myrepo", "foo/../bar", &prompter).await {
919            Err(ManageError::InvalidBranch(name)) => assert_eq!(name, "foo/../bar"),
920            Err(other) => panic!("expected InvalidBranch, got {other:?}"),
921            Ok(_) => panic!("expected open to reject `foo/../bar`"),
922        }
923    }
924
925    #[tokio::test]
926    async fn open_rejects_branch_with_control_char() {
927        let mock = MockStore::new();
928        let store: Arc<dyn ObjectStore> = Arc::new(mock);
929        let prompter = ScriptedPrompter::new([]);
930        match ManageBranch::open(store, "myrepo", "main\nrefs/heads/other", &prompter).await {
931            Err(ManageError::InvalidBranch(_)) => {}
932            Err(other) => panic!("expected InvalidBranch, got {other:?}"),
933            Ok(_) => panic!("expected open to reject control-char branch"),
934        }
935    }
936
937    #[tokio::test]
938    async fn delete_partial_failure_continues_and_returns_structured_error() {
939        // Issue #122: pre-fix, `delete` short-circuited on the first
940        // per-key error, leaving the later keys untouched and the
941        // operator with no inventory of what survived. The fix is to
942        // collect failures, continue past each, and return a structured
943        // `PartialDelete` naming exactly the keys that remain.
944        //
945        // `MockStore::list` returns keys in lexicographic (BTreeMap)
946        // order. The loop deletes aaa, attempts bbb (armed to fail
947        // transiently), and must still attempt ccc. Post-fix: aaa and
948        // ccc are gone, bbb remains, the error names bbb explicitly.
949        let mock = MockStore::new();
950        mock.insert("myrepo/refs/heads/main/aaa.bundle", Bytes::from("a"));
951        mock.insert("myrepo/refs/heads/main/bbb.bundle", Bytes::from("b"));
952        mock.insert("myrepo/refs/heads/main/ccc.bundle", Bytes::from("c"));
953        mock.arm(crate::object_store::mock::Fault::NetworkOnDelete {
954            key: "myrepo/refs/heads/main/bbb.bundle".to_owned(),
955        });
956        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
957        let prompter = ScriptedPrompter::new([Answer::Confirm(true)]);
958
959        let mb = ManageBranch::open(
960            Arc::clone(&store),
961            "myrepo",
962            "main",
963            &prompter as &dyn Prompter,
964        )
965        .await
966        .expect("open");
967        let err = mb
968            .delete()
969            .await
970            .expect_err("partial delete must surface PartialDelete");
971        match &err {
972            ManageError::PartialDelete {
973                branch,
974                undeleted,
975                attempted,
976            } => {
977                assert_eq!(branch, "main");
978                assert_eq!(*attempted, 3);
979                assert_eq!(
980                    undeleted.as_slice(),
981                    ["myrepo/refs/heads/main/bbb.bundle"],
982                    "undeleted list must name exactly the failed key",
983                );
984            }
985            other => panic!("expected PartialDelete, got {other:?}"),
986        }
987        // The error message must name the failed key so a copy-paste
988        // retry tool (or human) can act on it.
989        let rendered = err.to_string();
990        assert!(
991            rendered.contains("myrepo/refs/heads/main/bbb.bundle"),
992            "error message must name surviving key, got: {rendered}",
993        );
994        assert!(
995            rendered.contains("retry to converge"),
996            "error message must point at the retry path, got: {rendered}",
997        );
998        assert!(
999            rendered.contains("1 of 3"),
1000            "render should pin the count framing, got: {rendered}",
1001        );
1002        // The loop did NOT short-circuit on bbb — aaa AND ccc are
1003        // both gone, and only bbb survives.
1004        assert!(!mock.contains("myrepo/refs/heads/main/aaa.bundle"));
1005        assert!(mock.contains("myrepo/refs/heads/main/bbb.bundle"));
1006        assert!(!mock.contains("myrepo/refs/heads/main/ccc.bundle"));
1007        assert_eq!(mock.pending_faults(), 0);
1008
1009        // Retry-converges: clear nothing extra (the fault is already
1010        // consumed) and run delete again. The fresh listing inside
1011        // `delete` will only show bbb; the loop deletes it; the branch
1012        // is now fully gone.
1013        let prompter2 = ScriptedPrompter::new([Answer::Confirm(true)]);
1014        let mb2 = ManageBranch::open(store, "myrepo", "main", &prompter2 as &dyn Prompter)
1015            .await
1016            .expect("re-open after partial delete");
1017        mb2.delete().await.expect("retry must converge to Ok");
1018        assert!(
1019            mock.keys().is_empty(),
1020            "retry must remove the surviving key: {:?}",
1021            mock.keys(),
1022        );
1023    }
1024
1025    #[tokio::test]
1026    async fn delete_partial_failure_attempts_every_key_in_listing() {
1027        // Issue #122 explicit four-key case: a transient failure on
1028        // key #2 of a 4-key listing must not stop the loop from
1029        // attempting keys #3 and #4. Pre-fix, this seeded with key
1030        // names a-d, fault on bbb, would leave bbb/ccc/ddd standing.
1031        // Post-fix, only bbb survives (the named failure).
1032        let mock = MockStore::new();
1033        mock.insert("myrepo/refs/heads/main/aaa.bundle", Bytes::from("a"));
1034        mock.insert("myrepo/refs/heads/main/bbb.bundle", Bytes::from("b"));
1035        mock.insert("myrepo/refs/heads/main/ccc.bundle", Bytes::from("c"));
1036        mock.insert("myrepo/refs/heads/main/ddd.bundle", Bytes::from("d"));
1037        mock.arm(crate::object_store::mock::Fault::NetworkOnDelete {
1038            key: "myrepo/refs/heads/main/bbb.bundle".to_owned(),
1039        });
1040        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
1041        let prompter = ScriptedPrompter::new([Answer::Confirm(true)]);
1042
1043        let mb = ManageBranch::open(store, "myrepo", "main", &prompter as &dyn Prompter)
1044            .await
1045            .expect("open");
1046        let err = mb.delete().await.expect_err("partial delete expected");
1047        match err {
1048            ManageError::PartialDelete {
1049                undeleted,
1050                attempted,
1051                ..
1052            } => {
1053                assert_eq!(attempted, 4, "loop must visit every listed key");
1054                assert_eq!(undeleted.as_slice(), ["myrepo/refs/heads/main/bbb.bundle"]);
1055            }
1056            other => panic!("expected PartialDelete, got {other:?}"),
1057        }
1058        // Keys #1, #3, #4 were all attempted and succeeded; only the
1059        // named failure key survives.
1060        assert!(!mock.contains("myrepo/refs/heads/main/aaa.bundle"));
1061        assert!(mock.contains("myrepo/refs/heads/main/bbb.bundle"));
1062        assert!(!mock.contains("myrepo/refs/heads/main/ccc.bundle"));
1063        assert!(!mock.contains("myrepo/refs/heads/main/ddd.bundle"));
1064    }
1065
1066    #[tokio::test]
1067    async fn delete_all_keys_fail_returns_full_inventory() {
1068        // Two faults arm against two of the three keys, plus a third
1069        // standalone failure. We assert that PartialDelete lists every
1070        // surviving key in lexicographic order so an operator (or
1071        // tooling that reads the structured field) gets a complete
1072        // inventory rather than just the first failure.
1073        let mock = MockStore::new();
1074        mock.insert("myrepo/refs/heads/main/aaa.bundle", Bytes::from("a"));
1075        mock.insert("myrepo/refs/heads/main/bbb.bundle", Bytes::from("b"));
1076        mock.insert("myrepo/refs/heads/main/ccc.bundle", Bytes::from("c"));
1077        for key in [
1078            "myrepo/refs/heads/main/aaa.bundle",
1079            "myrepo/refs/heads/main/bbb.bundle",
1080            "myrepo/refs/heads/main/ccc.bundle",
1081        ] {
1082            mock.arm(crate::object_store::mock::Fault::NetworkOnDelete {
1083                key: key.to_owned(),
1084            });
1085        }
1086        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
1087        let prompter = ScriptedPrompter::new([Answer::Confirm(true)]);
1088
1089        let mb = ManageBranch::open(store, "myrepo", "main", &prompter as &dyn Prompter)
1090            .await
1091            .expect("open");
1092        let err = mb.delete().await.expect_err("all-fail must surface error");
1093        match err {
1094            ManageError::PartialDelete {
1095                undeleted,
1096                attempted,
1097                ..
1098            } => {
1099                assert_eq!(attempted, 3);
1100                assert_eq!(
1101                    undeleted,
1102                    vec![
1103                        "myrepo/refs/heads/main/aaa.bundle".to_owned(),
1104                        "myrepo/refs/heads/main/bbb.bundle".to_owned(),
1105                        "myrepo/refs/heads/main/ccc.bundle".to_owned(),
1106                    ],
1107                    "every surviving key must be reported, in listing order",
1108                );
1109            }
1110            other => panic!("expected PartialDelete, got {other:?}"),
1111        }
1112        // All three originals survive — nothing was deleted.
1113        assert_eq!(mock.keys().len(), 3);
1114    }
1115
1116    #[tokio::test]
1117    async fn delete_mixed_notfound_and_failure_only_lists_real_failures() {
1118        // NotFound mid-sweep is tolerated (#139). The PartialDelete
1119        // inventory must NOT include keys that the listing showed but
1120        // that a concurrent sweeper had already removed — those are
1121        // success from the operator's POV. Only the genuine network
1122        // failure on bbb should be in `undeleted`.
1123        let mock = MockStore::new();
1124        mock.insert("myrepo/refs/heads/main/aaa.bundle", Bytes::from("a"));
1125        mock.insert("myrepo/refs/heads/main/bbb.bundle", Bytes::from("b"));
1126        mock.insert("myrepo/refs/heads/main/ccc.bundle", Bytes::from("c"));
1127        // aaa raced and is gone; bbb is a genuine network failure; ccc
1128        // succeeds normally.
1129        mock.arm(crate::object_store::mock::Fault::NotFoundOnDelete {
1130            key: "myrepo/refs/heads/main/aaa.bundle".to_owned(),
1131        });
1132        mock.arm(crate::object_store::mock::Fault::NetworkOnDelete {
1133            key: "myrepo/refs/heads/main/bbb.bundle".to_owned(),
1134        });
1135        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
1136        let prompter = ScriptedPrompter::new([Answer::Confirm(true)]);
1137
1138        let mb = ManageBranch::open(store, "myrepo", "main", &prompter as &dyn Prompter)
1139            .await
1140            .expect("open");
1141        let err = mb.delete().await.expect_err("bbb failure must surface");
1142        match err {
1143            ManageError::PartialDelete {
1144                undeleted,
1145                attempted,
1146                ..
1147            } => {
1148                assert_eq!(attempted, 3);
1149                assert_eq!(
1150                    undeleted.as_slice(),
1151                    ["myrepo/refs/heads/main/bbb.bundle"],
1152                    "only the genuine non-NotFound failure must appear",
1153                );
1154            }
1155            other => panic!("expected PartialDelete, got {other:?}"),
1156        }
1157        // ccc was deleted by the loop. bbb survives. aaa's NotFound
1158        // fault short-circuited its delete BEFORE the actual removal,
1159        // so the body is still in the mock — same observable as the
1160        // pre-existing `delete_tolerates_notfound_mid_sweep` test.
1161        assert!(!mock.contains("myrepo/refs/heads/main/ccc.bundle"));
1162        assert!(mock.contains("myrepo/refs/heads/main/bbb.bundle"));
1163    }
1164
1165    /// Prompter that performs a side effect against a [`MockStore`]
1166    /// before replying to `confirm`, simulating a concurrent operation
1167    /// landing during the user's prompt window. Each call consumes one
1168    /// queued `(action, answer)` pair; running dry returns
1169    /// [`ManageError::Cancelled`] so an under-armed script fails loudly.
1170    struct ConcurrentPrompter {
1171        store: MockStore,
1172        actions: std::sync::Mutex<std::collections::VecDeque<(ConcurrentAction, bool)>>,
1173    }
1174
1175    enum ConcurrentAction {
1176        /// Insert `(key, body)` into the store.
1177        Insert(String, Bytes),
1178        /// Insert multiple `(key, body)` pairs in one prompt window —
1179        /// used to model an interleaved `git push` + `protect` race
1180        /// against a single user prompt (#131).
1181        InsertMany(Vec<(String, Bytes)>),
1182        /// Delete every key currently under `prefix` (simulates a
1183        /// concurrent `delete-branch` winning the race).
1184        DeleteAllUnder(String),
1185    }
1186
1187    impl ConcurrentPrompter {
1188        fn new(
1189            store: MockStore,
1190            actions: impl IntoIterator<Item = (ConcurrentAction, bool)>,
1191        ) -> Self {
1192            Self {
1193                store,
1194                actions: std::sync::Mutex::new(actions.into_iter().collect()),
1195            }
1196        }
1197    }
1198
1199    impl Prompter for ConcurrentPrompter {
1200        fn select(&self, _prompt: &str, _options: &[String]) -> Result<usize, ManageError> {
1201            panic!("ConcurrentPrompter does not expect select");
1202        }
1203
1204        fn confirm(&self, _prompt: &str) -> Result<bool, ManageError> {
1205            let (action, answer) = self
1206                .actions
1207                .lock()
1208                .expect("concurrent mutex poisoned")
1209                .pop_front()
1210                .ok_or(ManageError::Cancelled)?;
1211            match action {
1212                ConcurrentAction::Insert(key, body) => self.store.insert(key, body),
1213                ConcurrentAction::InsertMany(pairs) => {
1214                    for (key, body) in pairs {
1215                        self.store.insert(key, body);
1216                    }
1217                }
1218                ConcurrentAction::DeleteAllUnder(prefix) => {
1219                    for key in self.store.keys() {
1220                        if key.starts_with(&prefix) {
1221                            let _ = self.store.remove_key(&key);
1222                        }
1223                    }
1224                }
1225            }
1226            Ok(answer)
1227        }
1228    }
1229
1230    #[tokio::test]
1231    async fn delete_sweeps_objects_added_during_prompt() {
1232        // Issue #139: a concurrent push lands a new bundle key between
1233        // the initial LIST and the deletion loop. Pre-fix, that key was
1234        // not in the captured listing and survived the "successful"
1235        // delete. The fix re-lists after the prompt, so the new key is
1236        // included in the sweep.
1237        let mock = seed_with_branch("main");
1238        let new_key = "myrepo/refs/heads/main/concurrent.bundle".to_owned();
1239        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
1240        let prompter = ConcurrentPrompter::new(
1241            mock.clone(),
1242            [(
1243                ConcurrentAction::Insert(new_key.clone(), Bytes::from("racing body")),
1244                true,
1245            )],
1246        );
1247
1248        let mb = ManageBranch::open(store, "myrepo", "main", &prompter as &dyn Prompter)
1249            .await
1250            .expect("open");
1251        mb.delete()
1252            .await
1253            .expect("delete must include concurrently-added key");
1254        assert!(
1255            mock.keys().is_empty(),
1256            "fresh listing must drive sweep; zombie keys remaining: {:?}",
1257            mock.keys(),
1258        );
1259        assert!(
1260            !mock.contains(&new_key),
1261            "concurrently-added bundle must be deleted, not left as a zombie",
1262        );
1263    }
1264
1265    #[tokio::test]
1266    async fn delete_refuses_when_marker_lands_during_prompt() {
1267        // Initial listing has no PROTECTED# marker, so the protection
1268        // check passes and the prompt fires. A concurrent `protect`
1269        // lands during the prompt, then the user answers "yes". The
1270        // fresh-listing protection check must catch the marker and
1271        // refuse — otherwise the operator silently bulldozes a ref that
1272        // was just protected.
1273        let mock = seed_with_branch("main");
1274        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
1275        let prompter = ConcurrentPrompter::new(
1276            mock.clone(),
1277            [(
1278                ConcurrentAction::Insert(
1279                    "myrepo/refs/heads/main/PROTECTED#".to_owned(),
1280                    Bytes::new(),
1281                ),
1282                true,
1283            )],
1284        );
1285
1286        let mb = ManageBranch::open(store, "myrepo", "main", &prompter as &dyn Prompter)
1287            .await
1288            .expect("open");
1289        let err = mb
1290            .delete()
1291            .await
1292            .expect_err("delete must refuse marker that landed during prompt");
1293        assert!(
1294            matches!(err, ManageError::Protected(ref name) if name == "main"),
1295            "expected Protected, got {err:?}",
1296        );
1297        // Both the marker and the original bundle survive.
1298        assert!(mock.contains("myrepo/refs/heads/main/PROTECTED#"));
1299        assert!(mock.contains("myrepo/refs/heads/main/abc.bundle"));
1300    }
1301
1302    #[tokio::test]
1303    async fn issue_131_protect_during_prompt_blocks_delete_even_with_concurrent_push() {
1304        // Issue #131 regression: TOCTOU between the initial protection
1305        // check and the deletion loop. This pins the specific scenario
1306        // where a `protect` lands DURING the user prompt — distinct from
1307        // #139's pure-push race. The combined push+protect interleaving
1308        // here proves two things about the post-prompt re-check:
1309        //
1310        //   1. The marker check fires on the FRESH listing, not the
1311        //      stale initial listing (otherwise the marker is missed
1312        //      because it didn't exist when `delete` started).
1313        //   2. The marker check takes precedence over the sweep even
1314        //      when other concurrent activity (a racing push) would
1315        //      otherwise look "successful" — the operator must not
1316        //      silently bulldoze a freshly-protected ref just because
1317        //      the listing also grew.
1318        //
1319        // Pre-#139 the marker check was only on the initial listing, so
1320        // both concurrent writes were ignored and the original bundle
1321        // was deleted. The fix re-lists after the prompt and re-checks
1322        // for the marker, refusing the delete entirely.
1323        let mock = seed_with_branch("main");
1324        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
1325        let prompter = ConcurrentPrompter::new(
1326            mock.clone(),
1327            [(
1328                ConcurrentAction::InsertMany(vec![
1329                    ("myrepo/refs/heads/main/PROTECTED#".to_owned(), Bytes::new()),
1330                    (
1331                        "myrepo/refs/heads/main/racing-push.bundle".to_owned(),
1332                        Bytes::from("pushed during prompt"),
1333                    ),
1334                ]),
1335                true,
1336            )],
1337        );
1338
1339        let mb = ManageBranch::open(store, "myrepo", "main", &prompter as &dyn Prompter)
1340            .await
1341            .expect("open");
1342        let err = mb
1343            .delete()
1344            .await
1345            .expect_err("delete must refuse marker even when push also raced");
1346        assert!(
1347            matches!(err, ManageError::Protected(ref name) if name == "main"),
1348            "expected Protected (post-prompt re-check), got {err:?}",
1349        );
1350        // The marker, the racing push, and the original bundle all
1351        // survive — refusal is total, not partial.
1352        assert!(mock.contains("myrepo/refs/heads/main/PROTECTED#"));
1353        assert!(mock.contains("myrepo/refs/heads/main/racing-push.bundle"));
1354        assert!(mock.contains("myrepo/refs/heads/main/abc.bundle"));
1355    }
1356
1357    #[tokio::test]
1358    async fn delete_handles_empty_initial_listing_when_branch_swept_between_open_and_delete() {
1359        // Distinct from the prompt-window race
1360        // (`delete_reports_already_gone_on_concurrent_delete_race`):
1361        // here the branch is swept BETWEEN `open()` succeeding (data
1362        // existed at open time) and the FIRST listing inside
1363        // `delete()`. The function must handle the empty-initial-list
1364        // path without panicking, without spuriously claiming success,
1365        // and without surfacing an unexpected error variant. The fresh
1366        // re-listing inside `delete()` is also empty, so the
1367        // "already gone" branch fires and the function returns Ok(()).
1368        let mock = seed_with_branch("main");
1369        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
1370        // One confirm answer queued: the current implementation does
1371        // NOT short-circuit on an empty INITIAL listing — it falls
1372        // through to the prompt (the operator may want to confirm a
1373        // "0 objects" delete) and only the empty FRESH listing path
1374        // (post-prompt) returns Ok(()). Queuing a single `true`
1375        // exercises that exact path.
1376        let prompter = ScriptedPrompter::new([Answer::Confirm(true)]);
1377
1378        let mb = ManageBranch::open(store, "myrepo", "main", &prompter as &dyn Prompter)
1379            .await
1380            .expect("open succeeds while branch data is still present");
1381
1382        // Sweep every key under the branch between open() and delete().
1383        // Mirrors a concurrent `delete-branch` or last-bundle removal
1384        // that ran while the caller was still holding the open handle.
1385        for key in mock.keys() {
1386            if key.starts_with("myrepo/refs/heads/main/") {
1387                let _ = mock.remove_key(&key);
1388            }
1389        }
1390        assert!(
1391            mock.keys().is_empty(),
1392            "pre-condition: branch must be fully swept before delete()",
1393        );
1394
1395        mb.delete()
1396            .await
1397            .expect("delete() must handle an empty initial listing without error");
1398        assert!(
1399            mock.keys().is_empty(),
1400            "delete() against an already-empty branch must not resurrect any key",
1401        );
1402    }
1403
1404    #[tokio::test]
1405    async fn delete_reports_already_gone_on_concurrent_delete_race() {
1406        // A concurrent `delete-branch` (or last-bundle removal) clears
1407        // every object under the branch prefix during the prompt
1408        // window. The fresh listing is empty; the function must report
1409        // the race and return Ok(()), not claim success without doing
1410        // anything.
1411        //
1412        // The store-state asserts here are intentionally weak: the
1413        // ConcurrentPrompter side effect already cleared the store
1414        // before `delete()` resumed from the prompt, so `keys()` is
1415        // empty regardless of which production branch was taken
1416        // (#145). The load-bearing assert is the captured-stdout
1417        // substring — without the "already gone" notice the operator
1418        // cannot tell a successful delete from a no-op race-loss.
1419        let mock = seed_with_branch("main");
1420        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
1421        let prompter = ConcurrentPrompter::new(
1422            mock.clone(),
1423            [(
1424                ConcurrentAction::DeleteAllUnder("myrepo/refs/heads/main/".to_owned()),
1425                true,
1426            )],
1427        );
1428
1429        let mb = ManageBranch::open(store, "myrepo", "main", &prompter as &dyn Prompter)
1430            .await
1431            .expect("open");
1432        let mut out: Vec<u8> = Vec::new();
1433        mb.delete_into(&mut out)
1434            .await
1435            .expect("empty fresh listing must return Ok, not silent success");
1436        let captured = String::from_utf8(out).expect("captured output must be UTF-8");
1437        assert!(
1438            captured.contains("is already gone"),
1439            "operator message must announce the concurrent race; got: {captured:?}",
1440        );
1441        assert!(
1442            !captured.contains("has been deleted"),
1443            "must not claim a successful delete when nothing was swept; got: {captured:?}",
1444        );
1445        assert!(mock.keys().is_empty(), "store remains empty");
1446    }
1447
1448    #[tokio::test]
1449    async fn delete_tolerates_notfound_mid_sweep() {
1450        // A concurrent sweeper races between our fresh listing and a
1451        // per-key delete: the listing still reports `bbb`, but by the
1452        // time `delete(bbb)` fires the key is gone. Pre-fix, the
1453        // ObjectStoreError::NotFound surfaced as ManageError::Store and
1454        // aborted the sweep mid-flight. The fix tolerates NotFound in
1455        // the loop so a partial concurrent delete doesn't leave the
1456        // rest of the branch standing.
1457        let mock = MockStore::new();
1458        mock.insert("myrepo/refs/heads/main/aaa.bundle", Bytes::from("a"));
1459        mock.insert("myrepo/refs/heads/main/bbb.bundle", Bytes::from("b"));
1460        mock.insert("myrepo/refs/heads/main/ccc.bundle", Bytes::from("c"));
1461        mock.arm(crate::object_store::mock::Fault::NotFoundOnDelete {
1462            key: "myrepo/refs/heads/main/bbb.bundle".to_owned(),
1463        });
1464        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
1465        let prompter = ScriptedPrompter::new([Answer::Confirm(true)]);
1466        let mb = ManageBranch::open(store, "myrepo", "main", &prompter as &dyn Prompter)
1467            .await
1468            .expect("open");
1469        mb.delete()
1470            .await
1471            .expect("NotFound mid-sweep must not abort the loop");
1472        // aaa and ccc are deleted; the NotFound fault on bbb is
1473        // tolerated and the fault is consumed (the body remains because
1474        // the fault fired BEFORE the actual removal).
1475        assert!(!mock.contains("myrepo/refs/heads/main/aaa.bundle"));
1476        assert!(!mock.contains("myrepo/refs/heads/main/ccc.bundle"));
1477        // bbb's body is still present because the fault short-circuited
1478        // the delete with NotFound before removal. In production the
1479        // analogous case is a concurrent sweeper that ALREADY removed
1480        // it — same observable: key gone or not, the loop continues.
1481        assert_eq!(mock.pending_faults(), 0);
1482    }
1483
1484    // --- Root-of-bucket (empty prefix) coverage --------------------------
1485
1486    #[tokio::test]
1487    async fn root_prefix_delete_removes_keys_without_leading_slash() {
1488        // Repo lives at the bucket root: keys have no `<prefix>/`
1489        // segment. A leading-slash regression here would surface as
1490        // `BranchNotFound` (the list of `/refs/heads/main/` returns
1491        // nothing) or as a delete that fails to match the real keys.
1492        // No PROTECTED# marker is seeded — protected-ref refusal is
1493        // covered separately by
1494        // `root_prefix_delete_refuses_when_protected_marker_present`.
1495        // The `LOCK#.lock` is created and removed by `delete`'s own
1496        // acquire/release tail (#158) — pre-seeding a fresh lock here
1497        // would (correctly) surface as `LockContended`.
1498        let mock = MockStore::new();
1499        mock.insert("refs/heads/main/abc.bundle", Bytes::from("body"));
1500        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
1501        let prompter = ScriptedPrompter::new([Answer::Confirm(true)]);
1502
1503        let mb = ManageBranch::open(store, "", "main", &prompter as &dyn Prompter)
1504            .await
1505            .expect("open at root");
1506        mb.delete().await.expect("delete at root");
1507        assert!(mock.keys().is_empty(), "all root keys removed");
1508    }
1509
1510    #[tokio::test]
1511    async fn root_prefix_delete_refuses_when_protected_marker_present() {
1512        // Root-of-bucket layout (no `<prefix>/` segment) must use the
1513        // same final-segment match the helper-protocol delete path uses;
1514        // a substring-only check could miss the unprefixed marker key.
1515        let mock = MockStore::new();
1516        mock.insert("refs/heads/main/abc.bundle", Bytes::from("body"));
1517        mock.insert("refs/heads/main/PROTECTED#", Bytes::new());
1518        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
1519        let prompter = ScriptedPrompter::new([]);
1520
1521        let mb = ManageBranch::open(store, "", "main", &prompter as &dyn Prompter)
1522            .await
1523            .expect("open at root");
1524        let err = mb
1525            .delete()
1526            .await
1527            .expect_err("delete at root must refuse PROTECTED#");
1528        assert!(
1529            matches!(err, ManageError::Protected(ref name) if name == "main"),
1530            "expected ManageError::Protected, got {err:?}",
1531        );
1532        assert!(mock.contains("refs/heads/main/PROTECTED#"));
1533        assert!(mock.contains("refs/heads/main/abc.bundle"));
1534    }
1535
1536    #[tokio::test]
1537    async fn root_prefix_protect_writes_marker_at_root_layout() {
1538        let mock = MockStore::new();
1539        mock.insert("refs/heads/main/abc.bundle", Bytes::from("body"));
1540        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
1541        let prompter = ScriptedPrompter::new([]);
1542
1543        let mb = ManageBranch::open(store, "", "main", &prompter as &dyn Prompter)
1544            .await
1545            .expect("open at root");
1546        mb.protect().await.expect("protect at root");
1547        // Root-of-bucket layout: no leading slash, no synthetic prefix.
1548        assert!(mock.contains("refs/heads/main/PROTECTED#"));
1549        assert!(!mock.contains("/refs/heads/main/PROTECTED#"));
1550    }
1551
1552    #[tokio::test]
1553    async fn root_prefix_unprotect_removes_marker_at_root_layout() {
1554        let mock = MockStore::new();
1555        mock.insert("refs/heads/main/abc.bundle", Bytes::from("body"));
1556        mock.insert("refs/heads/main/PROTECTED#", Bytes::new());
1557        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
1558        let prompter = ScriptedPrompter::new([]);
1559
1560        let mb = ManageBranch::open(store, "", "main", &prompter as &dyn Prompter)
1561            .await
1562            .expect("open at root");
1563        mb.unprotect().await.expect("unprotect at root");
1564        assert!(!mock.contains("refs/heads/main/PROTECTED#"));
1565        // The bundle alongside the marker must survive — `unprotect` is
1566        // a marker-only delete and a regression that broadened the
1567        // delete scope would leave the bundle missing.
1568        assert!(mock.contains("refs/heads/main/abc.bundle"));
1569    }
1570
1571    #[tokio::test]
1572    async fn root_prefix_open_reports_branch_not_found_for_missing_branch() {
1573        let mock = MockStore::new();
1574        let store: Arc<dyn ObjectStore> = Arc::new(mock);
1575        let prompter = ScriptedPrompter::new([]);
1576        match ManageBranch::open(store, "", "missing", &prompter).await {
1577            Err(ManageError::BranchNotFound(name)) => assert_eq!(name, "missing"),
1578            Err(other) => panic!("expected BranchNotFound, got {other:?}"),
1579            Ok(_) => panic!("expected open at root to fail with BranchNotFound"),
1580        }
1581    }
1582
1583    // --- Baseline-bundle tombstone on delete-branch (#143) ---------------
1584
1585    /// SHA used as `<full_at>` in the seeded `chain.json` for the
1586    /// tombstone tests below. The exact value is irrelevant — the
1587    /// tests assert that whatever SHA the chain names is the SHA the
1588    /// tombstone references and the SHA whose `<sha>.bundle` survives
1589    /// the synchronous sweep.
1590    const TOMBSTONE_TEST_FULL_AT: &str = "0123456789abcdef0123456789abcdef01234567";
1591
1592    /// Seed a packchain-style branch at `<prefix>/refs/heads/<branch>/`
1593    /// with a baseline bundle, a `chain.json` naming that bundle as
1594    /// `full_at`, and a `path-index.json`. Returns the bundle's
1595    /// full key so tests can pin survival/deletion against the
1596    /// exact byte string.
1597    async fn seed_packchain_branch(
1598        store: &crate::object_store::mock::MockStore,
1599        prefix: &str,
1600        branch: &str,
1601    ) -> String {
1602        use crate::packchain::manifest::write_chain;
1603        use crate::packchain::schema::{ChainManifest, ChainSegment, Sha40};
1604
1605        let ref_name = RefName::new(format!("refs/heads/{branch}")).unwrap();
1606        let prefix_opt = (!prefix.is_empty()).then_some(prefix);
1607        let full_at = Sha40::try_new(TOMBSTONE_TEST_FULL_AT).unwrap();
1608        let chain = ChainManifest {
1609            v: 1,
1610            tip: full_at.clone(),
1611            full_at: full_at.clone(),
1612            segments: vec![ChainSegment {
1613                sha: full_at.clone(),
1614                parent_sha: None,
1615                pack: format!("packs/{TOMBSTONE_TEST_FULL_AT}.pack"),
1616                bytes: 1_024,
1617            }],
1618        };
1619        write_chain(store, prefix_opt, &ref_name, &chain)
1620            .await
1621            .unwrap();
1622        // path-index.json — written verbatim so we can assert the
1623        // synchronous sweep removes it alongside chain.json.
1624        let path_index_key = crate::packchain::keys::path_index_key(prefix_opt, &ref_name);
1625        store.insert(path_index_key, Bytes::from_static(b"{\"v\":1,\"root\":{}}"));
1626        let bundle_key = keys::bundle_key(prefix_opt, ref_name.as_str(), full_at.as_str());
1627        store.insert(bundle_key.clone(), Bytes::from_static(b"PACKBUNDLE"));
1628        bundle_key
1629    }
1630
1631    #[tokio::test]
1632    async fn delete_writes_baseline_tombstone_and_defers_bundle() {
1633        // Issue #143: a packchain delete-branch must write a
1634        // `<prefix>/gc/baseline-tomb-*.json` tombstone naming the
1635        // current `full_at` SHA, and the synchronous sweep must
1636        // leave that bundle in place. chain.json and path-index.json
1637        // ARE removed synchronously — from a fresh reader's
1638        // perspective the ref is gone the moment the chain commits
1639        // its deletion; the bundle stays only so an in-flight
1640        // fetcher that already loaded the prior chain can finish.
1641        let mock = crate::object_store::mock::MockStore::new();
1642        let bundle_key = seed_packchain_branch(&mock, "repo", "main").await;
1643        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
1644        let prompter = ScriptedPrompter::new([Answer::Confirm(true)]);
1645
1646        let mb = ManageBranch::open(store, "repo", "main", &prompter as &dyn Prompter)
1647            .await
1648            .expect("open");
1649        mb.delete().await.expect("delete");
1650
1651        // chain.json and path-index.json are deleted synchronously.
1652        assert!(
1653            !mock.contains("repo/refs/heads/main/chain.json"),
1654            "chain.json must be removed synchronously: {:?}",
1655            mock.keys(),
1656        );
1657        assert!(
1658            !mock.contains("repo/refs/heads/main/path-index.json"),
1659            "path-index.json must be removed synchronously: {:?}",
1660            mock.keys(),
1661        );
1662        // The baseline bundle is NOT removed — it is left for `gc sweep`.
1663        assert!(
1664            mock.contains(&bundle_key),
1665            "baseline bundle must survive synchronous delete: {:?}",
1666            mock.keys(),
1667        );
1668        // Exactly one baseline tombstone is written under
1669        // `<prefix>/gc/baseline-tomb-*.json`, and it names the
1670        // bundle's SHA. The shape (UUID-named JSON body) belongs to
1671        // `BaselineTombstone`; this test pins only the listing
1672        // prefix and the SHA inside, since the UUID is intentionally
1673        // non-deterministic.
1674        let tomb_keys: Vec<String> = mock
1675            .keys()
1676            .into_iter()
1677            .filter(|k| k.starts_with(&baseline_tombstone_listing_prefix(Some("repo"))))
1678            .collect();
1679        assert_eq!(
1680            tomb_keys.len(),
1681            1,
1682            "exactly one baseline tombstone must exist: {tomb_keys:?}",
1683        );
1684        let body = mock
1685            .get_bytes(&tomb_keys[0])
1686            .await
1687            .expect("tombstone body present");
1688        let parsed: serde_json::Value =
1689            serde_json::from_slice(&body).expect("tombstone is valid JSON");
1690        assert_eq!(parsed["v"], 1);
1691        assert_eq!(parsed["sha"], TOMBSTONE_TEST_FULL_AT);
1692        assert_eq!(parsed["ref_name"], "refs/heads/main");
1693    }
1694
1695    #[tokio::test]
1696    async fn gc_sweep_after_grace_window_reclaims_deferred_bundle() {
1697        // Round-trip the #143 contract: write a tombstone via
1698        // delete-branch, then run `gc sweep --force` (skips the
1699        // grace window). The bundle must now be gone. This proves
1700        // the tombstone's body is shaped exactly the way the
1701        // existing sweep code expects — a regression in the
1702        // delete-branch tombstone shape would surface as a deferred
1703        // sweep step rather than a reclaim.
1704        let mock = crate::object_store::mock::MockStore::new();
1705        let bundle_key = seed_packchain_branch(&mock, "repo", "main").await;
1706        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
1707        let prompter = ScriptedPrompter::new([Answer::Confirm(true)]);
1708
1709        let mb = ManageBranch::open(
1710            Arc::clone(&store),
1711            "repo",
1712            "main",
1713            &prompter as &dyn Prompter,
1714        )
1715        .await
1716        .expect("open");
1717        mb.delete().await.expect("delete");
1718        // Pre-condition: bundle still present, tombstone written.
1719        assert!(mock.contains(&bundle_key));
1720
1721        // `--force` skips the grace window. The sweep finds a
1722        // chain.json-less ref (delete-branch removed it) and so
1723        // proceeds with the bundle delete.
1724        let outcome = crate::packchain::gc::sweep(
1725            store.as_ref(),
1726            "repo",
1727            crate::packchain::gc::SweepOpts {
1728                grace_hours: 0,
1729                force: true,
1730            },
1731        )
1732        .await
1733        .expect("sweep");
1734        assert_eq!(
1735            outcome.swept_tombstones, 1,
1736            "sweep must reclaim exactly the tombstone delete-branch wrote",
1737        );
1738        assert!(
1739            !mock.contains(&bundle_key),
1740            "baseline bundle must be deleted by sweep: surviving keys = {:?}",
1741            mock.keys(),
1742        );
1743        // The tombstone itself is also gone after a successful sweep.
1744        let surviving_tombs: Vec<String> = mock
1745            .keys()
1746            .into_iter()
1747            .filter(|k| k.starts_with(&baseline_tombstone_listing_prefix(Some("repo"))))
1748            .collect();
1749        assert!(
1750            surviving_tombs.is_empty(),
1751            "tombstone must be deleted by sweep: {surviving_tombs:?}",
1752        );
1753    }
1754
1755    #[tokio::test]
1756    async fn delete_bundle_engine_ref_with_no_chain_uses_immediate_delete() {
1757        // Bundle-engine refs (no `chain.json`) have no baseline to
1758        // tombstone. The function must fall through to the existing
1759        // immediate-delete path — no tombstone written, every key
1760        // swept synchronously. This guards against a regression that
1761        // would write a spurious tombstone naming a non-existent
1762        // SHA, or that would leave a bundle-engine ref's `.bundle`
1763        // standing.
1764        let mock = seed_with_branch("main");
1765        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
1766        let prompter = ScriptedPrompter::new([Answer::Confirm(true)]);
1767
1768        let mb = ManageBranch::open(store, "myrepo", "main", &prompter as &dyn Prompter)
1769            .await
1770            .expect("open");
1771        mb.delete().await.expect("delete");
1772        assert!(
1773            mock.keys().is_empty(),
1774            "bundle-engine ref must be fully swept synchronously: {:?}",
1775            mock.keys(),
1776        );
1777        // No baseline tombstone written.
1778        let tomb_keys: Vec<String> = mock
1779            .keys()
1780            .into_iter()
1781            .filter(|k| k.contains(crate::packchain::gc::BASELINE_TOMBSTONE_KEY_FRAGMENT))
1782            .collect();
1783        assert!(
1784            tomb_keys.is_empty(),
1785            "no tombstone must be written for a chain-less ref: {tomb_keys:?}",
1786        );
1787    }
1788
1789    #[tokio::test]
1790    async fn delete_unparseable_chain_falls_back_to_synchronous_bundle_delete() {
1791        // A malformed `chain.json` (truncated, wrong schema version,
1792        // etc.) means `load_chain` fails. The delete path must NOT
1793        // block on this — the operator already confirmed the delete;
1794        // the ref is going away. The fallback is the existing
1795        // immediate-delete behaviour: sweep every key including any
1796        // bundles. Without the fallback an operator could be stuck
1797        // unable to delete a corrupted ref.
1798        let mock = crate::object_store::mock::MockStore::new();
1799        // Hand-craft an unparseable `chain.json` body (not JSON).
1800        mock.insert(
1801            "repo/refs/heads/main/chain.json",
1802            Bytes::from_static(b"not a json"),
1803        );
1804        // Seed a bundle whose name matches what a chain.json MIGHT
1805        // have pointed at — must still be swept synchronously since
1806        // we have no tombstone protection.
1807        let bundle_key = format!("repo/refs/heads/main/{TOMBSTONE_TEST_FULL_AT}.bundle");
1808        mock.insert(bundle_key.clone(), Bytes::from_static(b"BUNDLE"));
1809        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
1810        let prompter = ScriptedPrompter::new([Answer::Confirm(true)]);
1811
1812        let mb = ManageBranch::open(store, "repo", "main", &prompter as &dyn Prompter)
1813            .await
1814            .expect("open");
1815        mb.delete().await.expect("delete");
1816
1817        assert!(
1818            mock.keys().is_empty(),
1819            "unparseable chain must fall back to immediate sweep: {:?}",
1820            mock.keys(),
1821        );
1822    }
1823
1824    #[tokio::test]
1825    async fn delete_chain_pointing_at_missing_bundle_sweeps_remaining_keys() {
1826        // Pathological case: `chain.json` parses and names a
1827        // `full_at`, but the corresponding `<sha>.bundle` is NOT in
1828        // the fresh listing (already deleted, or never written).
1829        // `try_tombstone_baseline` returns None on this branch —
1830        // there is nothing to defer. The synchronous sweep must
1831        // still remove chain.json and any other residue.
1832        use crate::packchain::manifest::write_chain;
1833        use crate::packchain::schema::{ChainManifest, ChainSegment, Sha40};
1834        let mock = crate::object_store::mock::MockStore::new();
1835        // Seed chain.json + path-index.json but NOT the bundle.
1836        let ref_name = RefName::new("refs/heads/main").unwrap();
1837        let full_at = Sha40::try_new(TOMBSTONE_TEST_FULL_AT).unwrap();
1838        let chain = ChainManifest {
1839            v: 1,
1840            tip: full_at.clone(),
1841            full_at: full_at.clone(),
1842            segments: vec![ChainSegment {
1843                sha: full_at.clone(),
1844                parent_sha: None,
1845                pack: format!("packs/{TOMBSTONE_TEST_FULL_AT}.pack"),
1846                bytes: 1_024,
1847            }],
1848        };
1849        write_chain(&mock, Some("repo"), &ref_name, &chain)
1850            .await
1851            .unwrap();
1852
1853        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
1854        let prompter = ScriptedPrompter::new([Answer::Confirm(true)]);
1855
1856        let mb = ManageBranch::open(store, "repo", "main", &prompter as &dyn Prompter)
1857            .await
1858            .expect("open");
1859        mb.delete().await.expect("delete");
1860
1861        // chain.json removed; no bundle was ever there; no
1862        // tombstone written (deferring nothing is pointless).
1863        assert!(
1864            !mock.contains("repo/refs/heads/main/chain.json"),
1865            "chain.json must be removed: {:?}",
1866            mock.keys(),
1867        );
1868        let tomb_keys: Vec<String> = mock
1869            .keys()
1870            .into_iter()
1871            .filter(|k| k.starts_with(&baseline_tombstone_listing_prefix(Some("repo"))))
1872            .collect();
1873        assert!(
1874            tomb_keys.is_empty(),
1875            "no tombstone for a chain whose bundle is already absent: {tomb_keys:?}",
1876        );
1877    }
1878
1879    // --- Per-ref lock acquisition / release (#158) ----------------------
1880
1881    #[tokio::test]
1882    async fn delete_refuses_when_per_ref_lock_is_held_by_another_writer() {
1883        // Issue #158: pre-fix, `delete-branch` performed a fresh
1884        // listing + sweep without taking the per-ref `LOCK#.lock`. A
1885        // concurrent `git push` that acquired the lock and started
1886        // uploading a new bundle after the listing would land that
1887        // bundle AFTER the delete sweep, leaving the ref alive while
1888        // delete-branch reported success.
1889        //
1890        // The fix takes the same lock the helper-protocol push and
1891        // delete paths take. This test seeds a FRESH lock (matching a
1892        // concurrent push holding it) and asserts that delete-branch
1893        // returns `LockContended` and makes NO changes — the bundle
1894        // and the lock both survive verbatim.
1895        let mock = seed_with_branch("main");
1896        // Fresh `last_modified` = now → `acquire_lock` sees
1897        // `age <= ttl` and reports contention (Ok(None)).
1898        mock.insert("myrepo/refs/heads/main/LOCK#.lock", Bytes::new());
1899        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
1900        let prompter = ScriptedPrompter::new([Answer::Confirm(true)]);
1901
1902        let mb = ManageBranch::open(store, "myrepo", "main", &prompter as &dyn Prompter)
1903            .await
1904            .expect("open");
1905        let err = mb
1906            .delete()
1907            .await
1908            .expect_err("delete must refuse to race a fresh lock holder");
1909        match &err {
1910            ManageError::LockContended {
1911                branch,
1912                lock,
1913                ttl_seconds,
1914            } => {
1915                assert_eq!(branch, "main");
1916                assert_eq!(lock, "myrepo/refs/heads/main/LOCK#.lock");
1917                assert!(
1918                    *ttl_seconds > 0,
1919                    "ttl_seconds must be positive, got {ttl_seconds}",
1920                );
1921            }
1922            other => panic!("expected LockContended, got {other:?}"),
1923        }
1924        // The operator-facing wording must name the lock key (so a
1925        // doctor invocation can copy it) and surface the TTL.
1926        let rendered = err.to_string();
1927        assert!(
1928            rendered.contains("myrepo/refs/heads/main/LOCK#.lock"),
1929            "error must name the lock key, got: {rendered}",
1930        );
1931        assert!(
1932            rendered.contains("doctor"),
1933            "error must point operators at doctor, got: {rendered}",
1934        );
1935        // NOTHING was deleted: the bundle, the lock, and the prompt's
1936        // confirmation are all preserved. Pre-#158 this exact race
1937        // produced a "success" with the bundle missing.
1938        assert!(
1939            mock.contains("myrepo/refs/heads/main/abc.bundle"),
1940            "bundle must survive a contended-lock refusal",
1941        );
1942        assert!(
1943            mock.contains("myrepo/refs/heads/main/LOCK#.lock"),
1944            "the racing writer's lock must NOT be deleted",
1945        );
1946    }
1947
1948    #[tokio::test]
1949    async fn delete_recovers_stale_lock_and_proceeds() {
1950        // A `LOCK#.lock` older than the TTL means a previous writer
1951        // crashed before releasing it. `acquire_lock` recovers it by
1952        // deleting and re-acquiring. The delete must then complete
1953        // normally — refusing on a stale lock would let a crashed
1954        // writer block the bucket forever.
1955        //
1956        // This pins the staleness boundary by seeding a lock dated
1957        // well in the past (sufficient for any reasonable TTL up to
1958        // hours) and asserting both the sweep and the final release
1959        // ran.
1960        let mock = MockStore::new();
1961        mock.insert("myrepo/refs/heads/main/abc.bundle", Bytes::from("body"));
1962        let stale = OffsetDateTime::now_utc() - time::Duration::days(1);
1963        mock.insert_with(
1964            "myrepo/refs/heads/main/LOCK#.lock",
1965            Bytes::new(),
1966            stale,
1967            PutOpts::default(),
1968        );
1969        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
1970        let prompter = ScriptedPrompter::new([Answer::Confirm(true)]);
1971
1972        let mb = ManageBranch::open(store, "myrepo", "main", &prompter as &dyn Prompter)
1973            .await
1974            .expect("open");
1975        mb.delete().await.expect("stale lock must be recovered");
1976        assert!(
1977            mock.keys().is_empty(),
1978            "bundle and lock both gone after stale-lock recovery + release: {:?}",
1979            mock.keys(),
1980        );
1981    }
1982
1983    #[tokio::test]
1984    async fn delete_releases_lock_after_successful_sweep() {
1985        // A successful delete must clean up the LOCK#.lock it
1986        // acquired. Without release, a subsequent operation would
1987        // see a fresh (just-released) lock and report contention
1988        // until the TTL elapses — defeating the point of explicit
1989        // release.
1990        let mock = seed_with_branch("main");
1991        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
1992        let prompter = ScriptedPrompter::new([Answer::Confirm(true)]);
1993
1994        let mb = ManageBranch::open(
1995            Arc::clone(&store),
1996            "myrepo",
1997            "main",
1998            &prompter as &dyn Prompter,
1999        )
2000        .await
2001        .expect("open");
2002        mb.delete().await.expect("delete");
2003        assert!(
2004            !mock.contains("myrepo/refs/heads/main/LOCK#.lock"),
2005            "lock must be released (deleted) after a successful sweep: {:?}",
2006            mock.keys(),
2007        );
2008    }
2009
2010    #[tokio::test]
2011    async fn delete_releases_lock_even_when_sweep_returns_partial_delete() {
2012        // The lock must be released regardless of how the lock-held
2013        // body returned. A `PartialDelete` error means one per-key
2014        // delete failed; the lock is still released so a retry isn't
2015        // gated on TTL recovery.
2016        let mock = MockStore::new();
2017        mock.insert("myrepo/refs/heads/main/aaa.bundle", Bytes::from("a"));
2018        mock.insert("myrepo/refs/heads/main/bbb.bundle", Bytes::from("b"));
2019        mock.arm(crate::object_store::mock::Fault::NetworkOnDelete {
2020            key: "myrepo/refs/heads/main/bbb.bundle".to_owned(),
2021        });
2022        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
2023        let prompter = ScriptedPrompter::new([Answer::Confirm(true)]);
2024
2025        let mb = ManageBranch::open(store, "myrepo", "main", &prompter as &dyn Prompter)
2026            .await
2027            .expect("open");
2028        let err = mb
2029            .delete()
2030            .await
2031            .expect_err("partial delete must still surface its error");
2032        assert!(matches!(err, ManageError::PartialDelete { .. }));
2033        assert!(
2034            !mock.contains("myrepo/refs/heads/main/LOCK#.lock"),
2035            "lock must be released even when sweep returns PartialDelete: {:?}",
2036            mock.keys(),
2037        );
2038    }
2039
2040    #[tokio::test]
2041    async fn delete_does_not_iterate_over_its_own_lock_key() {
2042        // Mirrors `protocol::push::delete_remote_ref_under_lock`'s
2043        // lock-key filter (#133): the fresh under-lock listing must
2044        // exclude the lock we hold so the sweep does not delete our
2045        // own coordination key mid-critical-section. The expression
2046        // of this guarantee in the test: a stale lock that the
2047        // acquire path recovered is replaced by OUR fresh lock; the
2048        // sweep must NOT report `attempted = 2` (the lock + the
2049        // bundle) but `attempted = 1` (the bundle only).
2050        //
2051        // Indirect proof: we arm a fault on the lock key. If the
2052        // sweep iterates over the lock, the fault fires and the
2053        // delete returns `PartialDelete { undeleted: [lock] }`. The
2054        // test asserts the fault is NOT consumed — the sweep skipped
2055        // the lock as expected.
2056        let mock = MockStore::new();
2057        mock.insert("myrepo/refs/heads/main/abc.bundle", Bytes::from("body"));
2058        // Arm a fault on the lock key; if `delete` iterates over the
2059        // lock the fault fires.
2060        mock.arm(crate::object_store::mock::Fault::NetworkOnDelete {
2061            key: "myrepo/refs/heads/main/LOCK#.lock".to_owned(),
2062        });
2063        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
2064        let prompter = ScriptedPrompter::new([Answer::Confirm(true)]);
2065
2066        let mb = ManageBranch::open(store, "myrepo", "main", &prompter as &dyn Prompter)
2067            .await
2068            .expect("open");
2069        // Note: `release_lock`'s delete also goes through the mock
2070        // and will trip the fault. We accept either outcome here —
2071        // the load-bearing assertion is that the sweep loop did not
2072        // iterate over the lock (which would have produced a
2073        // `PartialDelete`). The release-delete simply returns a
2074        // warn-logged error and is swallowed; the sweep success path
2075        // surfaces as `Ok(())`.
2076        let result = mb.delete().await;
2077        assert!(
2078            !matches!(result, Err(ManageError::PartialDelete { .. })),
2079            "sweep must not iterate over the lock key (no PartialDelete on the lock): {result:?}",
2080        );
2081        // The bundle was deleted by the sweep.
2082        assert!(
2083            !mock.contains("myrepo/refs/heads/main/abc.bundle"),
2084            "bundle must still be swept: {:?}",
2085            mock.keys(),
2086        );
2087    }
2088
2089    #[tokio::test]
2090    async fn delete_release_failure_does_not_mask_sweep_success() {
2091        // Issue #158: release failures are downgraded to `warn!` —
2092        // the operator's "ref is gone" intent is satisfied as soon
2093        // as the sweep succeeds, and an orphan lock will age out via
2094        // the next acquirer's TTL recovery. A regression that
2095        // propagated the release error would surface a spurious
2096        // failure for a delete that actually succeeded.
2097        //
2098        // Arm a fault on the lock-key delete (which is exactly what
2099        // `release_lock` calls). The sweep is unaffected (bundle is
2100        // a different key); the delete must return `Ok(())`.
2101        let mock = MockStore::new();
2102        mock.insert("myrepo/refs/heads/main/abc.bundle", Bytes::from("body"));
2103        mock.arm(crate::object_store::mock::Fault::NetworkOnDelete {
2104            key: "myrepo/refs/heads/main/LOCK#.lock".to_owned(),
2105        });
2106        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
2107        let prompter = ScriptedPrompter::new([Answer::Confirm(true)]);
2108
2109        let mb = ManageBranch::open(store, "myrepo", "main", &prompter as &dyn Prompter)
2110            .await
2111            .expect("open");
2112        mb.delete()
2113            .await
2114            .expect("release failure must not mask sweep success");
2115        // The bundle was deleted; only the now-orphan lock survives
2116        // (the release fault consumed the lock's delete).
2117        assert!(!mock.contains("myrepo/refs/heads/main/abc.bundle"));
2118    }
2119
2120    // -----------------------------------------------------------------
2121    // Issue #159 — protect / unprotect must acquire the per-ref lock so
2122    // a concurrent push-in-progress cannot land a force-push between
2123    // the under-lock `is_protected` sample and the bundle upload.
2124    // -----------------------------------------------------------------
2125
2126    #[tokio::test]
2127    async fn protect_refuses_when_per_ref_lock_is_held_by_another_writer() {
2128        // Issue #159: pre-fix, `protect` was a lockless `put_bytes`. A
2129        // concurrent `git push` that had taken the per-ref lock and
2130        // already passed its under-lock `is_protected()` check could
2131        // overwrite the bundle even if `protect` landed between that
2132        // check and the bundle upload. The fix takes the same lock the
2133        // push path takes; this test seeds a fresh lock (matching a
2134        // push holding it) and asserts `protect` returns
2135        // `LockContended` and writes NO marker.
2136        let mock = seed_with_branch("main");
2137        mock.insert("myrepo/refs/heads/main/LOCK#.lock", Bytes::new());
2138        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
2139        let prompter = ScriptedPrompter::new([]);
2140
2141        let mb = ManageBranch::open(store, "myrepo", "main", &prompter as &dyn Prompter)
2142            .await
2143            .expect("open");
2144        let err = mb
2145            .protect()
2146            .await
2147            .expect_err("protect must refuse to race a fresh lock holder");
2148        match &err {
2149            ManageError::LockContended {
2150                branch,
2151                lock,
2152                ttl_seconds,
2153            } => {
2154                assert_eq!(branch, "main");
2155                assert_eq!(lock, "myrepo/refs/heads/main/LOCK#.lock");
2156                assert!(*ttl_seconds > 0);
2157            }
2158            other => panic!("expected LockContended, got {other:?}"),
2159        }
2160        // The marker must NOT be written and the racing writer's lock
2161        // must NOT be deleted. Pre-#159 this exact race let `protect`
2162        // land its marker AFTER the push's `is_protected` check, with
2163        // the push completing the force-push anyway.
2164        assert!(
2165            !mock.contains("myrepo/refs/heads/main/PROTECTED#"),
2166            "no marker may be written under a contended lock",
2167        );
2168        assert!(
2169            mock.contains("myrepo/refs/heads/main/LOCK#.lock"),
2170            "the racing writer's lock must survive a contention refusal",
2171        );
2172        assert!(mock.contains("myrepo/refs/heads/main/abc.bundle"));
2173    }
2174
2175    #[tokio::test]
2176    async fn unprotect_refuses_when_per_ref_lock_is_held_by_another_writer() {
2177        // Symmetry with the protect contention test: `unprotect` must
2178        // also block on a held lock so protection state changes are
2179        // serialised against every other writer. Pre-#159, `unprotect`
2180        // was a lockless `delete`; a concurrent push observing
2181        // `is_protected() == true` and a racing `unprotect` could land
2182        // with the push still on the protected-refusal path.
2183        let mock = seed_with_branch("main");
2184        mock.insert("myrepo/refs/heads/main/PROTECTED#", Bytes::new());
2185        mock.insert("myrepo/refs/heads/main/LOCK#.lock", Bytes::new());
2186        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
2187        let prompter = ScriptedPrompter::new([]);
2188
2189        let mb = ManageBranch::open(store, "myrepo", "main", &prompter as &dyn Prompter)
2190            .await
2191            .expect("open");
2192        let err = mb
2193            .unprotect()
2194            .await
2195            .expect_err("unprotect must refuse to race a fresh lock holder");
2196        assert!(
2197            matches!(err, ManageError::LockContended { ref branch, .. } if branch == "main"),
2198            "expected LockContended, got {err:?}",
2199        );
2200        // The marker must remain — `unprotect` did not get to remove it.
2201        assert!(mock.contains("myrepo/refs/heads/main/PROTECTED#"));
2202        assert!(mock.contains("myrepo/refs/heads/main/LOCK#.lock"));
2203    }
2204
2205    #[tokio::test]
2206    async fn protect_releases_lock_after_successful_write() {
2207        // A successful protect must release the LOCK#.lock it acquired.
2208        // Without release, a subsequent push or unprotect would see a
2209        // fresh lock and report contention until TTL — defeating the
2210        // point of an explicit release.
2211        let mock = seed_with_branch("main");
2212        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
2213        let prompter = ScriptedPrompter::new([]);
2214
2215        let mb = ManageBranch::open(store, "myrepo", "main", &prompter as &dyn Prompter)
2216            .await
2217            .expect("open");
2218        mb.protect().await.expect("protect");
2219        assert!(mock.contains("myrepo/refs/heads/main/PROTECTED#"));
2220        assert!(
2221            !mock.contains("myrepo/refs/heads/main/LOCK#.lock"),
2222            "lock must be released after a successful protect: {:?}",
2223            mock.keys(),
2224        );
2225    }
2226
2227    #[tokio::test]
2228    async fn unprotect_releases_lock_after_successful_delete() {
2229        // Mirror of `protect_releases_lock_after_successful_write`: the
2230        // unprotect path must release its lock on the success branch.
2231        let mock = seed_with_branch("main");
2232        mock.insert("myrepo/refs/heads/main/PROTECTED#", Bytes::new());
2233        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
2234        let prompter = ScriptedPrompter::new([]);
2235
2236        let mb = ManageBranch::open(store, "myrepo", "main", &prompter as &dyn Prompter)
2237            .await
2238            .expect("open");
2239        mb.unprotect().await.expect("unprotect");
2240        assert!(!mock.contains("myrepo/refs/heads/main/PROTECTED#"));
2241        assert!(
2242            !mock.contains("myrepo/refs/heads/main/LOCK#.lock"),
2243            "lock must be released after a successful unprotect: {:?}",
2244            mock.keys(),
2245        );
2246    }
2247
2248    #[tokio::test]
2249    async fn protect_recovers_stale_lock_and_proceeds() {
2250        // A `LOCK#.lock` older than the TTL means a previous writer
2251        // crashed before releasing it. `acquire_lock` recovers it by
2252        // deleting and re-acquiring. The protect must then complete
2253        // normally — refusing on a stale lock would let a crashed
2254        // writer block protection state changes forever.
2255        let mock = seed_with_branch("main");
2256        let stale = OffsetDateTime::now_utc() - time::Duration::days(1);
2257        mock.insert_with(
2258            "myrepo/refs/heads/main/LOCK#.lock",
2259            Bytes::new(),
2260            stale,
2261            PutOpts::default(),
2262        );
2263        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
2264        let prompter = ScriptedPrompter::new([]);
2265
2266        let mb = ManageBranch::open(store, "myrepo", "main", &prompter as &dyn Prompter)
2267            .await
2268            .expect("open");
2269        mb.protect().await.expect("stale lock must be recovered");
2270        assert!(mock.contains("myrepo/refs/heads/main/PROTECTED#"));
2271        assert!(
2272            !mock.contains("myrepo/refs/heads/main/LOCK#.lock"),
2273            "stale lock recovered and our fresh lock released: {:?}",
2274            mock.keys(),
2275        );
2276    }
2277
2278    #[tokio::test]
2279    async fn protect_release_failure_does_not_mask_marker_write_success() {
2280        // Issue #159 / #158 symmetry: release failures are downgraded
2281        // to `warn!`. A regression that propagated the release error
2282        // would lie to the operator about a `protect` that actually
2283        // succeeded — the marker is on the bucket; the orphan lock
2284        // ages out via the next acquirer's TTL recovery.
2285        let mock = seed_with_branch("main");
2286        mock.arm(crate::object_store::mock::Fault::NetworkOnDelete {
2287            key: "myrepo/refs/heads/main/LOCK#.lock".to_owned(),
2288        });
2289        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
2290        let prompter = ScriptedPrompter::new([]);
2291
2292        let mb = ManageBranch::open(store, "myrepo", "main", &prompter as &dyn Prompter)
2293            .await
2294            .expect("open");
2295        mb.protect()
2296            .await
2297            .expect("release failure must not mask marker-write success");
2298        assert!(
2299            mock.contains("myrepo/refs/heads/main/PROTECTED#"),
2300            "marker must be written even when lock release fails",
2301        );
2302    }
2303
2304    #[tokio::test]
2305    async fn issue_159_protect_cannot_land_during_active_push() {
2306        // The headline regression test for #159. Models the documented
2307        // race verbatim:
2308        //
2309        //   1. push acquires LOCK#.lock
2310        //   2. push reads is_protected -> NotFound
2311        //   3. operator runs `protect`, which (pre-#159) put_bytes the
2312        //      marker without taking the lock — succeeds
2313        //   4. push uploads the new bundle, force-overwriting a now
2314        //      "protected" ref
2315        //
2316        // The fix makes step 3 fail with `LockContended`. With the
2317        // lock still on the bucket from step 1, `protect` cannot run
2318        // until the push releases — at which point the push has
2319        // already either committed or refused on its under-lock
2320        // `is_protected` check, with no half-state in between.
2321        //
2322        // The test seeds the lock directly (representing step 1's
2323        // holder) and asserts step 3 fails. The push's actual upload
2324        // is not exercised here because it is covered by the
2325        // helper-protocol push tests; the load-bearing claim is "no
2326        // mid-push protect can sneak in".
2327        let mock = seed_with_branch("main");
2328        mock.insert("myrepo/refs/heads/main/LOCK#.lock", Bytes::new());
2329        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
2330        let prompter = ScriptedPrompter::new([]);
2331
2332        let mb = ManageBranch::open(store, "myrepo", "main", &prompter as &dyn Prompter)
2333            .await
2334            .expect("open");
2335        let err = mb
2336            .protect()
2337            .await
2338            .expect_err("protect must not land during an active push");
2339        assert!(
2340            matches!(err, ManageError::LockContended { .. }),
2341            "expected LockContended, got {err:?}",
2342        );
2343        // Marker NOT written: the under-lock push (when it eventually
2344        // releases) will see no marker, take whichever branch
2345        // is_protected dictates, and operator intent never crosses
2346        // streams with the writer's snapshot.
2347        assert!(!mock.contains("myrepo/refs/heads/main/PROTECTED#"));
2348        // The racing writer's lock must survive the contention refusal —
2349        // protect must not have touched LOCK#.lock owned by another
2350        // operation. Pinning this directly makes the test self-sufficient.
2351        assert!(
2352            mock.contains("myrepo/refs/heads/main/LOCK#.lock"),
2353            "the writer's LOCK#.lock must survive a contended protect attempt",
2354        );
2355    }
2356
2357    // -----------------------------------------------------------------
2358    // Issue #151 — delete paths must not miss a `PROTECTED#` marker
2359    // written after the under-lock listing. Closed mechanically by the
2360    // per-ref lock (#158 for delete-branch, #159 for protect/unprotect):
2361    // `protect` blocks on the same key the delete acquired, so a marker
2362    // cannot land between the under-lock listing and the sweep. These
2363    // tests pin the lock-contract guarantee and the post-sweep
2364    // defensive verification that surfaces a contract violation if one
2365    // ever arises.
2366    // -----------------------------------------------------------------
2367
2368    #[tokio::test]
2369    async fn issue_151_protect_cannot_inject_marker_during_active_delete() {
2370        // The headline regression test for #151. Models the documented
2371        // race verbatim:
2372        //
2373        //   1. delete-branch acquires LOCK#.lock and does its
2374        //      under-lock listing (no marker).
2375        //   2. operator runs `protect`, which (pre-#159) put_bytes the
2376        //      marker without taking the lock — would succeed.
2377        //   3. delete-branch sweeps the listing it took at step 1,
2378        //      missing the marker entirely; delete reports success
2379        //      while the marker is orphaned.
2380        //
2381        // The fix (#159) makes step 2 fail with `LockContended` because
2382        // `protect` now serialises through the same per-ref lock
2383        // delete-branch holds. The test seeds the lock directly
2384        // (representing the delete-branch holder at step 1) and asserts
2385        // step 2 fails — proving the race window is mechanically closed.
2386        let mock = seed_with_branch("main");
2387        mock.insert("myrepo/refs/heads/main/LOCK#.lock", Bytes::new());
2388        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
2389        let prompter = ScriptedPrompter::new([]);
2390
2391        let mb = ManageBranch::open(store, "myrepo", "main", &prompter as &dyn Prompter)
2392            .await
2393            .expect("open");
2394        let err = mb
2395            .protect()
2396            .await
2397            .expect_err("protect must not land during an active delete-branch");
2398        assert!(
2399            matches!(err, ManageError::LockContended { .. }),
2400            "expected LockContended (lock held by delete-branch), got {err:?}",
2401        );
2402        // No marker landed — the delete's sweep will not encounter a
2403        // mid-flow PROTECTED# the listing did not see.
2404        assert!(
2405            !mock.contains("myrepo/refs/heads/main/PROTECTED#"),
2406            "no marker may be written while a delete holds the lock",
2407        );
2408        // The delete-branch holder's lock survives the contention
2409        // refusal verbatim.
2410        assert!(
2411            mock.contains("myrepo/refs/heads/main/LOCK#.lock"),
2412            "the delete-branch holder's lock must survive contention",
2413        );
2414    }
2415
2416    #[tokio::test]
2417    async fn issue_151_post_sweep_verification_passes_on_clean_delete() {
2418        // The post-sweep `verify_no_orphan_protected_after_delete`
2419        // probe is belt-and-suspenders telemetry: with the lock
2420        // contract in place there is no way for a marker to appear
2421        // post-sweep. The probe must be silent on the happy path so an
2422        // operator reading logs is not chasing phantoms.
2423        //
2424        // This test exercises the success path end-to-end: seed only
2425        // the bundle, confirm, sweep. The delete must return Ok and
2426        // the bucket must be empty (including the lock the release
2427        // step removed). A regression that flipped the post-sweep
2428        // probe into a hard error (rather than telemetry) would surface
2429        // here as an unexpected `Err`.
2430        let mock = seed_with_branch("main");
2431        let store: Arc<dyn ObjectStore> = Arc::new(mock.clone());
2432        let prompter = ScriptedPrompter::new([Answer::Confirm(true)]);
2433
2434        let mb = ManageBranch::open(store, "myrepo", "main", &prompter as &dyn Prompter)
2435            .await
2436            .expect("open");
2437        mb.delete()
2438            .await
2439            .expect("clean delete must pass the post-sweep probe silently");
2440        assert!(
2441            mock.keys().is_empty(),
2442            "bundle + lock both gone after a clean delete-and-release: {:?}",
2443            mock.keys(),
2444        );
2445    }
2446}