Skip to main content

git_remote_object_store/manage/
doctor.rs

1//! `doctor` analyzer + fixers.
2//!
3//! The flow is: analyze → write report → fix duplicate bundles per ref
4//! → fix invalid HEAD → list and (optionally) delete stale locks.
5//!
6//! The `Doctor` value is constructed once per CLI invocation; all
7//! interaction goes through the injected [`Prompter`] so the same code
8//! path drives both the binary (via [`DialoguerPrompter`]) and unit
9//! tests (via `ScriptedPrompter`, gated on `test-util`).
10//!
11//! All human-readable output flows through [`Doctor::run_into`]'s
12//! `impl Write` parameter so tests can capture exact bytes without
13//! spawning the management binary. [`Doctor::run`] is the thin
14//! public wrapper that passes [`std::io::stdout()`] (per-write
15//! locking, keeping the future `Send`).
16//!
17//! [`DialoguerPrompter`]: super::DialoguerPrompter
18
19use std::io::Write;
20use std::sync::Arc;
21use std::time::Duration;
22
23use bytes::Bytes;
24use time::OffsetDateTime;
25use tracing::{info, warn};
26use uuid::Uuid;
27
28use super::snapshot::{
29    BundleEntry, MalformedBundleKey, RefSnapshot, RepoSnapshot, analyze_objects,
30};
31use super::{ManageError, Prompter, StaleReason};
32// `DEFAULT_LOCK_TTL_SECONDS` is only referenced from the tests below
33// and from doc-comments. Tests bring it in via the `use super::*;`
34// inside `mod tests`. The `#[cfg(test)]` gate makes this `use`
35// invisible to `cargo doc`, so doc-comments must reach the constant
36// through its fully-qualified path
37// (`crate::protocol::push::DEFAULT_LOCK_TTL_SECONDS`) rather than the
38// bare identifier.
39#[cfg(test)]
40use super::DEFAULT_LOCK_TTL_SECONDS;
41use crate::keys;
42use crate::object_store::{ObjectMeta, ObjectStore, ObjectStoreError, PutOpts};
43use crate::packchain::audit::{self, AuditReport, BranchRow};
44use crate::protocol::push::lock_ttl_from_env_seconds;
45use crate::url::StorageEngine;
46
47/// Tunables for [`Doctor::run`].
48#[derive(Debug, Clone, Copy)]
49pub struct DoctorOpts {
50    /// When `true`, `fix_multiple_bundles` deletes the losing bundles
51    /// outright. When `false` (default), they are quarantined to a
52    /// fresh `<ref>_<uuid8>` ref so a human can recover them later.
53    pub delete_bundle: bool,
54    /// Locks older than this TTL are considered stale. `None` falls
55    /// back to [`crate::protocol::push::lock_ttl_from_env`] which
56    /// honours `GIT_REMOTE_OBJECT_STORE_LOCK_TTL_SECONDS` (defaulting
57    /// to [`crate::protocol::push::DEFAULT_LOCK_TTL_SECONDS`] when
58    /// unset). An explicit
59    /// `Some(n)` — including `Some(0)` — passes through unchanged:
60    /// `doctor` only compares lock ages, it never acquires a lock, so
61    /// `--lock-ttl-seconds 0` is a deliberate "treat every lock as
62    /// stale" request rather than a footgun. The push and compact
63    /// paths, which DO acquire locks, route through
64    /// [`crate::protocol::push::resolve_lock_ttl_seconds`] for the
65    /// zero-clamp that protects them.
66    pub lock_ttl_seconds: Option<u64>,
67    /// When `true`, scanned stale locks are deleted; otherwise, the
68    /// doctor only reports them and recommends re-running with the
69    /// flag.
70    pub delete_stale_locks: bool,
71    /// Engine resolved from the bucket's `FORMAT` key. Drives
72    /// engine-aware reporting: a `Packchain` value enables the
73    /// orphan / tombstone / compaction / dangling-reference section.
74    /// Bundle remotes see the existing report unchanged.
75    pub engine: StorageEngine,
76}
77
78impl Default for DoctorOpts {
79    fn default() -> Self {
80        Self {
81            delete_bundle: false,
82            lock_ttl_seconds: None,
83            delete_stale_locks: false,
84            engine: StorageEngine::Bundle,
85        }
86    }
87}
88
89/// `doctor` runner.
90pub struct Doctor<'a> {
91    store: Arc<dyn ObjectStore>,
92    prefix: String,
93    opts: DoctorOpts,
94    prompter: &'a dyn Prompter,
95}
96
97impl<'a> Doctor<'a> {
98    /// Construct a new runner. `prefix` is the parsed remote URL's
99    /// repository prefix without a trailing `/`. Pass an empty string
100    /// for repositories stored at the bucket/container root.
101    #[must_use]
102    pub fn new(
103        store: Arc<dyn ObjectStore>,
104        prefix: impl Into<String>,
105        opts: DoctorOpts,
106        prompter: &'a dyn Prompter,
107    ) -> Self {
108        Self {
109            store,
110            prefix: prefix.into(),
111            opts,
112            prompter,
113        }
114    }
115
116    /// Resolve the stale-lock TTL in seconds. `None` defers to
117    /// `GIT_REMOTE_OBJECT_STORE_LOCK_TTL_SECONDS` (or
118    /// [`DEFAULT_LOCK_TTL_SECONDS`] when the env var is unset) via
119    /// [`crate::protocol::push::lock_ttl_from_env`]. An explicit
120    /// `Some(n)` — including `Some(0)` — passes through unchanged:
121    /// `doctor` only compares lock ages, it never acquires a lock,
122    /// so `--lock-ttl-seconds 0` is a deliberate "treat every lock as
123    /// stale" request and is left to the operator's discretion. The
124    /// push/compact paths, which DO acquire locks, route through
125    /// `resolve_lock_ttl_seconds` for the zero-clamp that protects
126    /// them.
127    fn resolved_lock_ttl_seconds(&self) -> u64 {
128        self.opts
129            .lock_ttl_seconds
130            .unwrap_or_else(lock_ttl_from_env_seconds)
131    }
132
133    /// Analyze, report, and fix — writing human-readable output to
134    /// stdout.
135    ///
136    /// Thin wrapper around [`run_into`](Self::run_into) that passes
137    /// [`std::io::stdout()`]. Each write acquires the stdout lock
138    /// individually, keeping the returned future `Send`. Use
139    /// `run_into` directly when you need to capture output (e.g. in
140    /// tests).
141    ///
142    /// # Errors
143    ///
144    /// Returns [`ManageError::Store`] if an object-store call fails,
145    /// [`ManageError::Internal`] if a prompter returns an out-of-range
146    /// index, [`ManageError::Cancelled`] if the user cancels an interactive
147    /// prompt, or [`ManageError::Io`] for prompt or write I/O failures.
148    pub async fn run(&self) -> Result<(), ManageError> {
149        self.run_into(&mut std::io::stdout()).await
150    }
151
152    /// Analyze, report, and fix — writing human-readable output to
153    /// `out`.
154    ///
155    /// Errors short-circuit the run — partial fixes are committed
156    /// immediately (each `delete` / `copy` / `put` is its own request).
157    /// This is a "best-effort" stance.
158    ///
159    /// # Errors
160    ///
161    /// Returns [`ManageError::Store`] if an object-store call fails,
162    /// [`ManageError::Internal`] if a prompter returns an out-of-range
163    /// index, [`ManageError::Cancelled`] if the user cancels an interactive
164    /// prompt, or [`ManageError::Io`] for prompt or write I/O failures.
165    pub(crate) async fn run_into<W: Write>(&self, out: &mut W) -> Result<(), ManageError> {
166        // Share one LIST between snapshot analysis, the packchain
167        // audit, and stale-lock scanning so a doctor run is a single
168        // bucket walk regardless of repo size. Empty `prefix`
169        // (root-of-bucket repo) collapses to a bucket-wide list.
170        let list_prefix = keys::join(Some(&self.prefix), "");
171        let objects = self.store.list(&list_prefix).await?;
172        let mut snapshot = analyze_objects(&objects, &list_prefix, &self.store).await?;
173        write!(out, "{}", self.report(&snapshot))?;
174
175        // Surface malformed bundle keys that push silently filters
176        // (issue #124). Read-only: the doctor never deletes these, it
177        // points the operator at the keys and lets them decide. This
178        // sits between the snapshot report and the packchain section
179        // so a bundle-engine repo (which has no packchain section)
180        // still sees the warning right after its ref list.
181        if !snapshot.malformed_bundle_keys.is_empty() {
182            write!(
183                out,
184                "{}",
185                render_malformed_bundles_section(&snapshot.malformed_bundle_keys),
186            )?;
187        }
188
189        // Engine-aware diagnostic. The packchain section is purely
190        // read-only — no bucket mutations — so it runs before any of
191        // the fixers below to keep the report ordering stable
192        // regardless of what fixers do later.
193        if let Some(section) = self.maybe_render_packchain_section(&objects).await? {
194            write!(out, "{section}")?;
195        }
196
197        // Fix duplicates ref-by-ref. We need owned ref-names because
198        // `fix_multiple_bundles` mutates the snapshot under `&mut`.
199        let dup_refs: Vec<String> = snapshot
200            .refs
201            .iter()
202            .filter(|(_, r)| r.bundles.len() > 1)
203            .map(|(name, _)| name.clone())
204            .collect();
205        for ref_path in dup_refs {
206            self.fix_multiple_bundles(out, &mut snapshot, &ref_path)
207                .await?;
208        }
209
210        if !snapshot.is_head_valid() {
211            self.fix_head(out, &mut snapshot).await?;
212        }
213
214        self.list_and_handle_stale_locks(out, &objects).await?;
215        Ok(())
216    }
217
218    /// Run the packchain audit and render its section, but only when
219    /// the configured engine is [`StorageEngine::Packchain`]. Returns
220    /// `Ok(None)` for bundle remotes (cheap — no I/O).
221    ///
222    /// Exposed at `pub(crate)` so the tests can pin the engine gate
223    /// without going through `run`'s `print!` side effect.
224    ///
225    /// # Errors
226    ///
227    /// Returns [`ManageError::Packchain`] for engine-level failures
228    /// during the audit.
229    pub(crate) async fn maybe_render_packchain_section(
230        &self,
231        objects: &[ObjectMeta],
232    ) -> Result<Option<String>, ManageError> {
233        if !matches!(self.opts.engine, StorageEngine::Packchain) {
234            return Ok(None);
235        }
236        let report = audit::audit(&*self.store, &self.prefix, objects).await?;
237        Ok(Some(render_packchain_section(&report)))
238    }
239
240    /// Render the snapshot to a human-readable report. Returns the
241    /// finished string (with trailing newline) so callers can route
242    /// it to stdout, a logger, or a test buffer.
243    #[must_use]
244    pub(crate) fn report(&self, snapshot: &RepoSnapshot) -> String {
245        use std::fmt::Write;
246        let mut out = String::new();
247        let _ = writeln!(out, "{}:", self.report_label());
248        for (ref_path, r) in &snapshot.refs {
249            let star = if r.is_protected { "*" } else { "" };
250            let status = match r.bundles.len() {
251                0 if r.has_chain => "Ok",
252                0 => "No bundles",
253                1 => "Ok",
254                _ => "Multiple bundles",
255            };
256            let _ = writeln!(out, " {star} {ref_path}: {status}");
257        }
258        // Match `is_head_valid`'s predicate (issue #154): a ref whose
259        // only residue is a `PROTECTED#` marker counts as "no branch
260        // data", so the report must show `HEAD: Invalid` for that case
261        // — otherwise the report would contradict the repair path that
262        // follows in `run_into`.
263        let head_label = snapshot
264            .head
265            .as_deref()
266            .filter(|h| {
267                snapshot
268                    .refs
269                    .get(*h)
270                    .is_some_and(RefSnapshot::has_branch_data)
271            })
272            .unwrap_or("Invalid");
273        let _ = writeln!(out, "  HEAD: {head_label}");
274        out
275    }
276
277    /// Human-readable label for the repo in printed output. Empty
278    /// `prefix` (root-of-bucket repo) renders as `(root)` so the report
279    /// header isn't a bare colon.
280    fn report_label(&self) -> &str {
281        if self.prefix.is_empty() {
282            "(root)"
283        } else {
284            &self.prefix
285        }
286    }
287
288    async fn fix_multiple_bundles<W: Write>(
289        &self,
290        out: &mut W,
291        snapshot: &mut RepoSnapshot,
292        ref_path: &str,
293    ) -> Result<(), ManageError> {
294        writeln!(
295            out,
296            "\nFix multiple bundles for repo {} and ref {ref_path}",
297            self.report_label()
298        )?;
299
300        // The caller filtered for refs with `bundles.len() > 1`; if the
301        // map shape changed in between, surface a structured internal
302        // error rather than aborting the process.
303        let ref_entry = snapshot.refs.get_mut(ref_path).ok_or_else(|| {
304            ManageError::Internal(format!(
305                "fix_multiple_bundles called with ref {ref_path} absent from snapshot"
306            ))
307        })?;
308
309        let labels: Vec<String> = ref_entry
310            .bundles
311            .iter()
312            .map(|b| format!("{} {}", b.sha, b.last_modified))
313            .collect();
314
315        let keep_idx = self.prompter.select("Choose the bundle to keep", &labels)?;
316        // `dialoguer::Select` validates the index against the option count
317        // before returning, so out-of-range here means a test prompter
318        // queued an invalid script. Propagate as a structured internal
319        // error so the helper doesn't abort the whole run.
320        let keeper = ref_entry.bundles.get(keep_idx).ok_or_else(|| {
321            ManageError::Internal(format!(
322                "prompter returned out-of-range index {keep_idx} for {} bundle(s)",
323                ref_entry.bundles.len()
324            ))
325        })?;
326        let keeper_sha = keeper.sha.clone();
327        let keeper_key = keeper.key.clone();
328
329        if !self.prompter.confirm("Confirm and apply changes")? {
330            // Match `ManageBranch::delete`: an interactive "no" is the user
331            // declining this fix, not an abort of the whole run. Doctor
332            // continues to the next ref / stale-lock scan with exit 0.
333            writeln!(out, "Aborted")?;
334            return Ok(());
335        }
336
337        // Re-verify the keeper bundle still exists on the bucket before
338        // evicting any losers. The bundle list above is taken from the
339        // top-of-run snapshot, which can be minutes old after the
340        // interactive prompt; a concurrent `git push` / `manage
341        // delete-branch` / another `doctor` run may have removed the
342        // keeper in between. Without this re-check, partitioning and
343        // deleting losers on the stale snapshot can evict the last live
344        // bundle for the ref (issue #155).
345        //
346        // We HEAD the keeper's exact key (rather than re-listing the
347        // ref prefix) because the question here is precisely "does
348        // this one object still exist?". A targeted HEAD is the
349        // cheapest unambiguous existence check and matches the
350        // singleton-marker pattern documented in lesson #12.
351        match self.store.head(&keeper_key).await {
352            Ok(_) => {}
353            Err(ObjectStoreError::NotFound(_)) => {
354                writeln!(
355                    out,
356                    "Selected keeper bundle {keeper_sha} for {ref_path} is no longer \
357                     present on the bucket; refusing to evict losers. Re-run doctor."
358                )?;
359                warn!(
360                    ref_path,
361                    keeper = %keeper_sha,
362                    key = %keeper_key,
363                    "doctor fix_multiple_bundles: keeper disappeared between snapshot and eviction",
364                );
365                return Err(ManageError::StaleSnapshot {
366                    entity: ref_path.to_owned(),
367                    reason: StaleReason::Deleted,
368                });
369            }
370            Err(e) => return Err(e.into()),
371        }
372
373        writeln!(out, "Keeping {keeper_sha}")?;
374        // Partition into (keep, evict). The snapshot is updated in place
375        // so subsequent steps (HEAD validation in particular) see the
376        // resolved layout.
377        let bundles = std::mem::take(&mut ref_entry.bundles);
378        let (keepers, losers): (Vec<_>, Vec<_>) =
379            bundles.into_iter().partition(|b| b.sha == keeper_sha);
380        ref_entry.bundles = keepers;
381
382        for losing in &losers {
383            self.evict_losing_bundle(out, ref_path, losing).await?;
384        }
385        Ok(())
386    }
387
388    async fn evict_losing_bundle<W: Write>(
389        &self,
390        out: &mut W,
391        ref_path: &str,
392        losing: &BundleEntry,
393    ) -> Result<(), ManageError> {
394        // Both branches end with `self.store.delete(&losing.key)` (after
395        // this `if/else`): the bundle is always removed from its losing
396        // location. The branches differ only in whether the bundle is
397        // first quarantined under a new ref. Adding a "dry-run" or
398        // "preserve in place" branch here must NOT fall through to the
399        // unconditional delete below — keep the delete inside the
400        // appropriate branch when adding new modes.
401        //
402        // Loser-side `NotFound` is treated as success in both branches.
403        // The bundle list reflects the top-of-run snapshot; a
404        // concurrent push, delete-branch, or doctor run can sweep a
405        // loser between the snapshot and the eviction. The keeper
406        // re-check in `fix_multiple_bundles` already proved the keeper
407        // survived — once that holds, an already-gone loser is the
408        // desired end state, not a failure (issue #155).
409        if self.opts.delete_bundle {
410            writeln!(out, "Removing {}", losing.sha)?;
411        } else {
412            // `Uuid::Simple`'s `Display` impl does NOT honor the
413            // precision specifier (`{:.8}`), so encode into a stack
414            // buffer and slice to 8 chars to produce the `<ref>_<uuid8>`
415            // quarantine ref name.
416            let mut buf = [0u8; uuid::fmt::Simple::LENGTH];
417            let suffix = &Uuid::new_v4().simple().encode_lower(&mut buf)[..8];
418            let new_ref = format!("{ref_path}_{suffix}");
419            // `keys::bundle_key` accepts an `Option<&str>` prefix and
420            // collapses `Some("")` to the no-prefix shape; passing
421            // `Some(&self.prefix)` therefore handles both the prefixed
422            // and root-of-bucket cases without an explicit branch.
423            let dst_key = keys::bundle_key(Some(&self.prefix), &new_ref, &losing.sha);
424            writeln!(out, "Moving {} to new branch {new_ref}", losing.sha)?;
425            match self.store.copy(&losing.key, &dst_key).await {
426                Ok(()) => {}
427                Err(ObjectStoreError::NotFound(_)) => {
428                    writeln!(
429                        out,
430                        "Skipping {}: loser bundle already gone before quarantine copy",
431                        losing.sha,
432                    )?;
433                    return Ok(());
434                }
435                Err(e) => return Err(e.into()),
436            }
437        }
438        match self.store.delete(&losing.key).await {
439            Ok(()) => Ok(()),
440            Err(ObjectStoreError::NotFound(_)) => {
441                writeln!(
442                    out,
443                    "Skipping {}: loser bundle already gone before delete",
444                    losing.sha,
445                )?;
446                Ok(())
447            }
448            Err(e) => Err(e.into()),
449        }
450    }
451
452    async fn fix_head<W: Write>(
453        &self,
454        out: &mut W,
455        snapshot: &mut RepoSnapshot,
456    ) -> Result<(), ManageError> {
457        writeln!(out, "\nFix invalid HEAD for repo {}", self.report_label())?;
458
459        // Only offer refs that pass `has_branch_data`: a PROTECTED#-only
460        // ref appears in `snapshot.refs` (see #154) but would always fail
461        // the keeper recheck. Surfacing it in the Select picker is a
462        // usability trap.
463        let candidates: Vec<&str> = snapshot
464            .refs
465            .iter()
466            .filter(|(k, r)| k.starts_with("refs/heads/") && r.has_branch_data())
467            .map(|(k, _)| k.as_str())
468            .collect();
469        if candidates.is_empty() {
470            writeln!(
471                out,
472                "No `refs/heads/*` available to assign as HEAD; skipping."
473            )?;
474            return Ok(());
475        }
476
477        let labels: Vec<String> = candidates
478            .iter()
479            .map(|k| short_branch_name(k).to_owned())
480            .collect();
481        let chosen = self
482            .prompter
483            .select("Choose the new HEAD branch", &labels)?;
484        // `dialoguer::Select` cannot return an out-of-range index; an
485        // out-of-range answer here is a test-script bug (see
486        // `fix_multiple_bundles`). Surface as a structured internal
487        // error rather than aborting the process.
488        let new_head = candidates
489            .get(chosen)
490            .copied()
491            .ok_or_else(|| {
492                ManageError::Internal(format!(
493                    "prompter returned out-of-range index {chosen} for {} HEAD candidate(s)",
494                    candidates.len()
495                ))
496            })?
497            .to_owned();
498
499        // Re-verify the chosen branch is still present on the bucket
500        // before writing HEAD. The candidate list above is taken from
501        // the top-of-run snapshot, which can be minutes old after the
502        // interactive prompt; a concurrent `git push :<branch>` or
503        // `manage delete-branch` may have removed the chosen branch
504        // since the snapshot was taken. Writing HEAD anyway would
505        // re-create the invalid-HEAD condition the doctor was trying
506        // to fix (issue #138).
507        //
508        // We re-list the branch prefix (rather than HEADing a single
509        // object) because a packchain branch is healthy with
510        // `chain.json` and `packs/*` but no `.bundle` — there is no
511        // single object key that uniquely identifies "branch exists"
512        // across both engines. `keys::ref_listing_prefix` produces the
513        // canonical `<prefix>/<ref>/` shape shared with the push
514        // engines and `ManageBranch::protect`.
515        //
516        // The "branch exists" predicate uses [`super::has_branch_data`],
517        // which excludes `*.lock` keys and the `PROTECTED#` marker.
518        // Those keys are operational metadata — a stale lock file or a
519        // surviving protection marker — that can outlive the user-data
520        // keys (`chain.json`, `packs/*`, `*.bundle`) when a concurrent
521        // delete runs partially. Writing HEAD against a branch whose
522        // only residue is operational metadata would re-create the
523        // invalid-HEAD condition #138 set out to prevent, so we treat
524        // that residue as evidence the branch is gone. Sharing the
525        // helper with `ManageBranch::protect` keeps the two
526        // race-detection paths in lockstep.
527        let branch_prefix = keys::ref_listing_prefix(Some(&self.prefix), &new_head);
528        let recheck = self.store.list(&branch_prefix).await?;
529        if !super::has_branch_data(&recheck) {
530            // `recheck` may still be non-empty here if it contains only
531            // operational metadata (lock files, `PROTECTED#` markers);
532            // surface that distinction to the operator so a residual
533            // lock or marker doesn't read as "the branch is back".
534            let residue_only = !recheck.is_empty();
535            if residue_only {
536                writeln!(
537                    out,
538                    "Selected branch {new_head} is considered gone — only operational \
539                     metadata (lock files / PROTECTED# marker) remains under its prefix. \
540                     Refusing to write stale HEAD. Re-run doctor."
541                )?;
542            } else {
543                writeln!(
544                    out,
545                    "Selected branch {new_head} was deleted between selection and HEAD write; \
546                     refusing to write stale HEAD. Re-run doctor."
547                )?;
548            }
549            warn!(
550                branch = %new_head,
551                residue_only,
552                "doctor fix_head: chosen branch disappeared between snapshot and HEAD write"
553            );
554            let reason = if residue_only {
555                StaleReason::ResidueOnly
556            } else {
557                StaleReason::Deleted
558            };
559            return Err(ManageError::StaleSnapshot {
560                entity: new_head,
561                reason,
562            });
563        }
564
565        let head_key = keys::join(Some(&self.prefix), "HEAD");
566        writeln!(out, "Setting {new_head} as HEAD")?;
567        self.store
568            .put_bytes(&head_key, Bytes::from(new_head.clone()), PutOpts::default())
569            .await?;
570        snapshot.head = Some(new_head);
571        Ok(())
572    }
573
574    async fn list_and_handle_stale_locks<W: Write>(
575        &self,
576        out: &mut W,
577        objects: &[ObjectMeta],
578    ) -> Result<(), ManageError> {
579        writeln!(out, "\nScanning for stale locks...")?;
580        let ttl = Duration::from_secs(self.resolved_lock_ttl_seconds());
581        let stale = scan_stale_locks(objects, ttl);
582
583        if stale.is_empty() {
584            writeln!(out, "No stale locks found.")?;
585            return Ok(());
586        }
587
588        writeln!(out, "Found stale locks:")?;
589        for (key, age) in &stale {
590            writeln!(out, " - {key} (age: {}s)", age.as_secs())?;
591        }
592
593        if !self.opts.delete_stale_locks {
594            writeln!(
595                out,
596                "\nRun with --delete-stale-locks to remove them automatically."
597            )?;
598            return Ok(());
599        }
600
601        writeln!(out, "\nDeleting stale locks...")?;
602        let mut deleted = 0usize;
603        let mut skipped = 0usize;
604        for (key, _) in &stale {
605            match delete_stale_lock_if_still_stale(self.store.as_ref(), key, ttl, out).await? {
606                DeleteOutcome::Deleted => deleted += 1,
607                DeleteOutcome::SkippedNotStale
608                | DeleteOutcome::SkippedDisappeared
609                | DeleteOutcome::SkippedHeadError => skipped += 1,
610                DeleteOutcome::DeleteFailed => {}
611            }
612        }
613        if skipped > 0 {
614            writeln!(
615                out,
616                "Skipped {skipped} lock(s) that became fresh or disappeared since listing; deleted {deleted}."
617            )?;
618        }
619        Ok(())
620    }
621}
622
623/// Stale-lock scan result: each entry pairs the original listing's key
624/// with the age (relative to the scan's `now`) that exceeded the TTL.
625/// The age is preserved for the operator-visible report;
626/// `delete_stale_lock_if_still_stale` re-derives staleness from a
627/// fresh HEAD before any destructive action.
628fn scan_stale_locks(objects: &[ObjectMeta], ttl: Duration) -> Vec<(&str, Duration)> {
629    scan_stale_locks_at(OffsetDateTime::now_utc(), objects, ttl)
630}
631
632/// Same as [`scan_stale_locks`] but with `now` injected so tests can pin
633/// the `age > ttl` strict-inequality boundary deterministically. The
634/// wall-clock `OffsetDateTime::now_utc()` ticks between any two
635/// instructions, so a test that constructs `last_modified = now - ttl`
636/// cannot observe `age == ttl` without controlling `now`.
637fn scan_stale_locks_at(
638    now: OffsetDateTime,
639    objects: &[ObjectMeta],
640    ttl: Duration,
641) -> Vec<(&str, Duration)> {
642    objects
643        .iter()
644        .filter(|o| super::is_lock_key(&o.key))
645        .filter_map(|o| {
646            let raw_age = now - o.last_modified;
647            if raw_age.is_negative() {
648                warn!(
649                    key = %o.key,
650                    skew_secs = -raw_age.whole_seconds(),
651                    "lock's last_modified is in the future; treating as \
652                     out-of-tolerance (clock skew?)",
653                );
654                // Future-stamped lock (issue #223): NOT "older than" any
655                // positive TTL, so honor `--lock-ttl-seconds N>0` by
656                // excluding it. The operator-explicit
657                // `--lock-ttl-seconds 0` ("treat every lock as stale")
658                // path still evicts so the skew is observable and the
659                // operator can clear it. Pre-fix this branch swallowed
660                // the `try_from` `Err` as `None`, silently skipping the
661                // lock at every TTL.
662                return ttl.is_zero().then_some((o.key.as_str(), Duration::ZERO));
663            }
664            // `raw_age >= 0` from the negative branch above, so the
665            // signed-to-unsigned conversion cannot fail. `expect`
666            // surfaces a future regression that loosens this invariant
667            // rather than masking it with a fallback.
668            let age = Duration::try_from(raw_age)
669                .expect("raw_age is non-negative — branch above returned for negatives");
670            (age > ttl).then_some((o.key.as_str(), age))
671        })
672        .collect()
673}
674
675/// Outcome of a single re-check + delete attempt against one stale-listed
676/// lock key. Each variant maps 1:1 to a branch in the original
677/// `list_and_handle_stale_locks` loop so the orchestrator can aggregate
678/// counts without re-inspecting the store or the writer.
679#[derive(Debug, Clone, Copy, PartialEq, Eq)]
680enum DeleteOutcome {
681    /// The fresh HEAD confirmed staleness and `delete` succeeded.
682    Deleted,
683    /// The fresh HEAD reported a `last_modified` within the TTL; the
684    /// lock is now active and must not be deleted (issue #132).
685    SkippedNotStale,
686    /// The fresh HEAD returned `NotFound` — another client's
687    /// stale-lock recovery already cleaned up the key.
688    SkippedDisappeared,
689    /// The fresh HEAD failed with a non-`NotFound` error (transient
690    /// network / 5xx). Treated as inconclusive: skip the delete
691    /// rather than acting on the stale-listing assumption.
692    SkippedHeadError,
693    /// HEAD confirmed staleness but `delete` itself failed. Logged,
694    /// counted as neither deleted nor skipped, so the caller can
695    /// continue with the next key (best-effort sweep contract).
696    DeleteFailed,
697}
698
699/// Re-HEAD `key` and, if it is still stale relative to `ttl`, delete
700/// it. Writes the operator-visible status line for whichever branch
701/// fires so the orchestrator stays a pure aggregator.
702///
703/// The re-HEAD shrinks the TOCTOU window between the original
704/// `list` (potentially minutes old, after interactive prompts) and
705/// the destructive `delete` from minutes to milliseconds. A
706/// conditional-delete primitive on `ObjectStore` would eliminate the
707/// residual window, but that is a trait-wide change tracked separately
708/// (issue #132's "suggested fix" accepts this trade-off).
709async fn delete_stale_lock_if_still_stale<W: Write>(
710    store: &dyn ObjectStore,
711    key: &str,
712    ttl: Duration,
713    out: &mut W,
714) -> Result<DeleteOutcome, ManageError> {
715    match store.head(key).await {
716        Ok(meta) => {
717            let raw_age = OffsetDateTime::now_utc() - meta.last_modified;
718            let still_stale = if raw_age.is_negative() {
719                warn!(
720                    key,
721                    skew_secs = -raw_age.whole_seconds(),
722                    "lock's last_modified is in the future on re-check; \
723                     treating as out-of-tolerance (clock skew?)",
724                );
725                // Mirror `scan_stale_locks_at` (issue #223): a future-
726                // stamped lock is "stale" only on the operator-explicit
727                // TTL=0 sweep. Any positive TTL means "older than N
728                // seconds", which a future-stamped lock is not.
729                ttl.is_zero()
730            } else {
731                let age = Duration::try_from(raw_age)
732                    .expect("raw_age is non-negative — branch above returned for negatives");
733                age > ttl
734            };
735            if !still_stale {
736                writeln!(
737                    out,
738                    "Skipping {key}: lock no longer stale, refusing to delete"
739                )?;
740                warn!(key, "lock no longer stale, skipping doctor delete");
741                return Ok(DeleteOutcome::SkippedNotStale);
742            }
743        }
744        Err(ObjectStoreError::NotFound(_)) => {
745            writeln!(out, "Skipping {key}: lock disappeared concurrently")?;
746            warn!(key, "lock disappeared between listing and delete, skipping");
747            return Ok(DeleteOutcome::SkippedDisappeared);
748        }
749        Err(e) => {
750            // Best-effort sweep: a transient HEAD failure must not
751            // abort the run or delete on a stale assumption.
752            writeln!(out, "Failed to re-check {key}: {e}")?;
753            warn!(key, error = %e, "head re-check failed; skipping delete");
754            return Ok(DeleteOutcome::SkippedHeadError);
755        }
756    }
757    match store.delete(key).await {
758        Ok(()) => {
759            writeln!(out, "Deleted {key}")?;
760            info!(key, "deleted stale lock");
761            Ok(DeleteOutcome::Deleted)
762        }
763        Err(e) => {
764            // Best-effort: report each failure but keep going.
765            writeln!(out, "Failed to delete {key}: {e}")?;
766            Ok(DeleteOutcome::DeleteFailed)
767        }
768    }
769}
770
771/// Last `/`-separated segment of a ref path, used as the human label in
772/// `fix_head`'s branch picker. Returns the full path if it has no
773/// slashes (e.g. a single-component ref).
774///
775/// `unwrap_or(full)` is the identity fallback: `rsplit` always yields
776/// at least one element, so the fallback is unreachable for any
777/// non-empty input — but if `rsplit`'s contract ever relaxes (or the
778/// input is empty), returning the input verbatim is strictly safer
779/// than panicking, since the caller only uses the result as a display
780/// label.
781fn short_branch_name(full: &str) -> &str {
782    full.rsplit('/').next().unwrap_or(full)
783}
784
785/// Render the malformed-bundle-key section. Caller must check
786/// `entries.is_empty()` first — this function always emits a header.
787///
788/// Doctor does not auto-delete: the safe action is the operator's, not
789/// ours. Each row is the full key plus a `aws s3 rm` / `az storage blob
790/// delete`-style hint so an operator can act without re-deriving the
791/// path from the ref name.
792fn render_malformed_bundles_section(entries: &[MalformedBundleKey]) -> String {
793    use std::fmt::Write;
794    let mut out = String::new();
795    let _ = writeln!(
796        out,
797        "\nMalformed bundle keys (push silently ignores these):"
798    );
799    for entry in entries {
800        let _ = writeln!(out, "  - {} (ref {})", entry.key, entry.ref_path);
801    }
802    let _ = writeln!(
803        out,
804        "  Delete each key manually (`aws s3 rm` / `az storage blob delete`) and re-push the ref.",
805    );
806    out
807}
808
809/// Render the packchain audit section. The output ends with a trailing
810/// newline so callers can `print!` it without manual spacing. The shape
811/// is the one specified in #68: a header line, then four sub-sections
812/// (orphans, tombstones, branches needing compaction, errors).
813fn render_packchain_section(report: &AuditReport) -> String {
814    use std::fmt::Write;
815    let mut out = String::new();
816    let _ = writeln!(out, "\n=== Packchain ===");
817    let _ = writeln!(
818        out,
819        "Orphans: {} pack(s), {}",
820        report.orphans.pack_count,
821        format_bytes(report.orphans.bytes),
822    );
823
824    if report.tombstones.is_empty() {
825        let _ = writeln!(out, "Tombstones (pending sweep): none");
826    } else {
827        let _ = writeln!(out, "Tombstones (pending sweep):");
828        for t in &report.tombstones {
829            let age = format_age(t.age_hours);
830            let _ = writeln!(
831                out,
832                "  - run id {}, marked {} ({}), {} pack(s)",
833                t.run_id, t.marked_at, age, t.orphan_count,
834            );
835        }
836    }
837
838    let candidates: Vec<&BranchRow> = report
839        .branches
840        .iter()
841        .filter(|r| r.recommend_compact)
842        .collect();
843    if candidates.is_empty() {
844        let _ = writeln!(out, "Branches needing compaction: none");
845    } else {
846        let _ = writeln!(out, "Branches needing compaction:");
847        for r in candidates {
848            let _ = writeln!(
849                out,
850                "  - {}: {} segment(s), {} since full_at  [recommend compact]",
851                r.ref_path,
852                r.segments_total,
853                format_bytes(r.bytes_total),
854            );
855        }
856    }
857
858    let has_corrupt = report.branches.iter().any(|r| !r.has_full_at_segment);
859    if report.dangling.is_empty() && !has_corrupt {
860        let _ = writeln!(out, "ERRORS: none");
861    } else {
862        let _ = writeln!(out, "ERRORS:");
863        for d in &report.dangling {
864            let _ = writeln!(
865                out,
866                "  - {}/chain.json references missing pack {}",
867                d.ref_path, d.missing_pack_key,
868            );
869        }
870        for b in report.branches.iter().filter(|r| !r.has_full_at_segment) {
871            let _ = writeln!(
872                out,
873                "  - {}/chain.json full_at not present in segments (corrupt manifest)",
874                b.ref_path,
875            );
876        }
877    }
878    out
879}
880
881/// Human-readable byte total. Reports MiB / GiB above 1 MiB and bare
882/// bytes below; rounds to a single decimal for the larger units. The
883/// units mirror the numbers operators see in `gc` output and the issue
884/// thresholds (100 MiB, etc.) so the report stays diff-comparable.
885fn format_bytes(bytes: u64) -> String {
886    const KIB: u64 = 1_024;
887    const MIB: u64 = 1_024 * KIB;
888    const GIB: u64 = 1_024 * MIB;
889    // f64 precision loss is acceptable here: the result is rendered
890    // to one decimal place for human consumption only.
891    #[allow(clippy::cast_precision_loss)]
892    let scaled = |unit: u64| bytes as f64 / unit as f64;
893    if bytes >= GIB {
894        format!("{:.1} GiB", scaled(GIB))
895    } else if bytes >= MIB {
896        format!("{:.1} MiB", scaled(MIB))
897    } else if bytes >= KIB {
898        format!("{:.1} KiB", scaled(KIB))
899    } else {
900        format!("{bytes} B")
901    }
902}
903
904/// Render an age in whole hours. Negative values (clock skew) are
905/// reported as "<1h" so the line stays human-readable without leaking
906/// signed-arithmetic semantics into the report.
907fn format_age(hours: i64) -> String {
908    if hours <= 0 {
909        "<1h".to_owned()
910    } else if hours < 48 {
911        format!("{hours}h ago")
912    } else {
913        format!("{}d ago", hours / 24)
914    }
915}
916
917#[cfg(test)]
918mod tests {
919    use super::*;
920    use crate::manage::snapshot::analyze;
921    use crate::manage::{ScriptedPrompter, scripted::Answer};
922    use crate::object_store::mock::{Fault, MockStore};
923    use bytes::Bytes;
924    use time::OffsetDateTime;
925
926    fn store_arc(mock: &MockStore) -> Arc<dyn ObjectStore> {
927        Arc::new(mock.clone())
928    }
929
930    // Valid 40-lower-hex stems used as bundle-key fixtures. Earlier
931    // doctor tests used short stems like "abc"; #124 added stem
932    // validation in the snapshot pass, so well-formed-bundle test
933    // fixtures must now carry real-length stems.
934    const SHA_A: &str = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
935    const SHA_B: &str = "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb";
936    const SHA_C: &str = "cccccccccccccccccccccccccccccccccccccccc";
937
938    #[tokio::test]
939    async fn no_issues_round_trip_runs_clean() {
940        let mock = MockStore::new();
941        mock.insert("myrepo/HEAD", Bytes::from("refs/heads/main"));
942        mock.insert(
943            format!("myrepo/refs/heads/main/{SHA_A}.bundle"),
944            Bytes::from("b"),
945        );
946        let initial_keys = mock.keys();
947        let prompter = ScriptedPrompter::new([]);
948        let doctor = Doctor::new(store_arc(&mock), "myrepo", DoctorOpts::default(), &prompter);
949        doctor
950            .run_into(&mut std::io::sink())
951            .await
952            .expect("doctor.run");
953        // A clean run must not mutate the bucket — no objects added,
954        // moved, or removed. This catches a `Doctor::run` regressed to
955        // a no-op as well as one that over-eagerly fixes a non-issue.
956        assert_eq!(mock.keys(), initial_keys);
957        assert_eq!(prompter.remaining(), 0);
958    }
959
960    #[tokio::test]
961    async fn fix_multiple_bundles_quarantines_losers_by_default() {
962        let mock = MockStore::new();
963        mock.insert("myrepo/HEAD", Bytes::from("refs/heads/main"));
964        mock.insert(
965            format!("myrepo/refs/heads/main/{SHA_A}.bundle"),
966            Bytes::from("body-a"),
967        );
968        mock.insert(
969            format!("myrepo/refs/heads/main/{SHA_B}.bundle"),
970            Bytes::from("body-b"),
971        );
972        let prompter = ScriptedPrompter::new([Answer::Select(0), Answer::Confirm(true)]);
973        let doctor = Doctor::new(store_arc(&mock), "myrepo", DoctorOpts::default(), &prompter);
974        doctor
975            .run_into(&mut std::io::sink())
976            .await
977            .expect("doctor.run");
978
979        // Original bundle for the keeper is still present.
980        assert!(mock.contains(&format!("myrepo/refs/heads/main/{SHA_A}.bundle")));
981        // Loser was moved off the main ref.
982        assert!(!mock.contains(&format!("myrepo/refs/heads/main/{SHA_B}.bundle")));
983        // The new quarantine ref has a key with the moved bundle, and
984        // the suffix is exactly 8 lowercase hex characters
985        // (`<ref>_<uuid8>`).
986        let loser_tail = format!("/{SHA_B}.bundle");
987        let moved = mock
988            .keys()
989            .into_iter()
990            .find(|k| k.starts_with("myrepo/refs/heads/main_") && k.ends_with(&loser_tail))
991            .expect("quarantine key created");
992        let suffix = moved
993            .strip_prefix("myrepo/refs/heads/main_")
994            .and_then(|rest| rest.strip_suffix(&loser_tail))
995            .expect("quarantine key matches `<ref>_<suffix>/<sha>.bundle`");
996        assert_eq!(suffix.len(), 8, "expected 8-char suffix, got {suffix:?}");
997        assert!(
998            suffix
999                .chars()
1000                .all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase()),
1001            "expected lowercase hex suffix, got {suffix:?}"
1002        );
1003    }
1004
1005    #[tokio::test]
1006    async fn fix_multiple_bundles_delete_mode_removes_losers() {
1007        let mock = MockStore::new();
1008        mock.insert("myrepo/HEAD", Bytes::from("refs/heads/main"));
1009        mock.insert(
1010            format!("myrepo/refs/heads/main/{SHA_A}.bundle"),
1011            Bytes::from("a"),
1012        );
1013        mock.insert(
1014            format!("myrepo/refs/heads/main/{SHA_B}.bundle"),
1015            Bytes::from("b"),
1016        );
1017        let prompter = ScriptedPrompter::new([Answer::Select(1), Answer::Confirm(true)]);
1018        let opts = DoctorOpts {
1019            delete_bundle: true,
1020            ..DoctorOpts::default()
1021        };
1022        let doctor = Doctor::new(store_arc(&mock), "myrepo", opts, &prompter);
1023        doctor
1024            .run_into(&mut std::io::sink())
1025            .await
1026            .expect("doctor.run");
1027        assert!(!mock.contains(&format!("myrepo/refs/heads/main/{SHA_A}.bundle")));
1028        assert!(mock.contains(&format!("myrepo/refs/heads/main/{SHA_B}.bundle")));
1029    }
1030
1031    #[tokio::test]
1032    async fn fix_multiple_bundles_user_aborts_keeps_originals() {
1033        let mock = MockStore::new();
1034        mock.insert("myrepo/HEAD", Bytes::from("refs/heads/main"));
1035        mock.insert(
1036            format!("myrepo/refs/heads/main/{SHA_A}.bundle"),
1037            Bytes::from("a"),
1038        );
1039        mock.insert(
1040            format!("myrepo/refs/heads/main/{SHA_B}.bundle"),
1041            Bytes::from("b"),
1042        );
1043        let prompter = ScriptedPrompter::new([Answer::Select(0), Answer::Confirm(false)]);
1044        let doctor = Doctor::new(store_arc(&mock), "myrepo", DoctorOpts::default(), &prompter);
1045        // User-no on the confirmation declines this fix but is not an
1046        // abort of the whole run — the doctor continues to scan stale
1047        // locks and exits 0. Both bundles must remain untouched.
1048        doctor
1049            .run_into(&mut std::io::sink())
1050            .await
1051            .expect("user-no should not error");
1052        assert!(mock.contains(&format!("myrepo/refs/heads/main/{SHA_A}.bundle")));
1053        assert!(mock.contains(&format!("myrepo/refs/heads/main/{SHA_B}.bundle")));
1054    }
1055
1056    #[tokio::test]
1057    async fn fix_multiple_bundles_out_of_range_select_returns_internal_error() {
1058        // A scripted prompter that returns an out-of-range index used to
1059        // panic the process via `expect`. The defensive path now returns
1060        // a structured `ManageError::Internal` so the helper / management
1061        // CLI can surface the bug without aborting (issue #33).
1062        let mock = MockStore::new();
1063        mock.insert("myrepo/HEAD", Bytes::from("refs/heads/main"));
1064        mock.insert(
1065            format!("myrepo/refs/heads/main/{SHA_A}.bundle"),
1066            Bytes::from("a"),
1067        );
1068        mock.insert(
1069            format!("myrepo/refs/heads/main/{SHA_B}.bundle"),
1070            Bytes::from("b"),
1071        );
1072        // Two bundles → valid indices are 0 and 1; 99 is out of range.
1073        let prompter = ScriptedPrompter::new([Answer::Select(99)]);
1074        let doctor = Doctor::new(store_arc(&mock), "myrepo", DoctorOpts::default(), &prompter);
1075        let err = doctor
1076            .run_into(&mut std::io::sink())
1077            .await
1078            .expect_err("out-of-range index propagates");
1079        assert!(
1080            matches!(err, ManageError::Internal(ref msg) if msg.contains("out-of-range")),
1081            "expected ManageError::Internal, got {err:?}",
1082        );
1083    }
1084
1085    /// Prompter that runs a one-shot side effect immediately before
1086    /// returning its first `confirm` answer. The bundle-fixer's
1087    /// keeper re-verification fires *after* the operator confirms
1088    /// (issue #155), so simulating a concurrent delete that lands
1089    /// between the prompt and the eviction requires firing the side
1090    /// effect in `confirm`, not `select`.
1091    struct DeleteOnConfirmPrompter {
1092        select_index: usize,
1093        confirm_answer: bool,
1094        mock: MockStore,
1095        keys_to_delete: Vec<String>,
1096        fired: std::sync::Mutex<bool>,
1097    }
1098
1099    impl Prompter for DeleteOnConfirmPrompter {
1100        fn select(&self, _prompt: &str, _options: &[String]) -> Result<usize, ManageError> {
1101            Ok(self.select_index)
1102        }
1103
1104        fn confirm(&self, _prompt: &str) -> Result<bool, ManageError> {
1105            let mut fired = self.fired.lock().expect("fired mutex poisoned");
1106            if !*fired {
1107                for key in &self.keys_to_delete {
1108                    let _ = self.mock.remove_key(key);
1109                }
1110                *fired = true;
1111            }
1112            Ok(self.confirm_answer)
1113        }
1114    }
1115
1116    #[tokio::test]
1117    async fn fix_multiple_bundles_refuses_when_keeper_disappears_after_confirm() {
1118        // Issue #155: `fix_multiple_bundles` partitions the top-of-run
1119        // snapshot and evicts losers without revalidating that the
1120        // chosen keeper still exists. A concurrent push / delete-branch
1121        // / another doctor that removes the keeper between the prompt
1122        // and the eviction would otherwise leave the ref with no live
1123        // bundle. The fix re-HEADs the keeper's exact key after the
1124        // operator confirms; on `NotFound` it aborts the run with a
1125        // typed `StaleSnapshot` and does NOT delete any losers.
1126        let mock = MockStore::new();
1127        mock.insert("myrepo/HEAD", Bytes::from("refs/heads/main"));
1128        mock.insert(
1129            format!("myrepo/refs/heads/main/{SHA_A}.bundle"),
1130            Bytes::from("a"),
1131        );
1132        mock.insert(
1133            format!("myrepo/refs/heads/main/{SHA_B}.bundle"),
1134            Bytes::from("b"),
1135        );
1136        let initial_keys = mock.keys();
1137        // Snapshot order is listing order: SHA_A comes before SHA_B
1138        // (lexicographic), so index 0 is the SHA_A bundle — that's
1139        // the one we delete out from under the doctor.
1140        let prompter = DeleteOnConfirmPrompter {
1141            select_index: 0,
1142            confirm_answer: true,
1143            mock: mock.clone(),
1144            keys_to_delete: vec![format!("myrepo/refs/heads/main/{SHA_A}.bundle")],
1145            fired: std::sync::Mutex::new(false),
1146        };
1147        let doctor = Doctor::new(store_arc(&mock), "myrepo", DoctorOpts::default(), &prompter);
1148        let mut out = Vec::new();
1149        let err = doctor
1150            .run_into(&mut out)
1151            .await
1152            .expect_err("missing keeper must surface as stale snapshot");
1153        assert!(
1154            matches!(
1155                &err,
1156                ManageError::StaleSnapshot { entity, reason: StaleReason::Deleted }
1157                    if entity == "refs/heads/main"
1158            ),
1159            "expected ManageError::StaleSnapshot {{ refs/heads/main, Deleted }}, got {err:?}",
1160        );
1161        // Pin the rendered Display so the operator-facing wording does
1162        // not silently regress to the residue-only phrasing.
1163        let rendered = err.to_string();
1164        assert!(
1165            rendered.contains("refs/heads/main")
1166                && rendered.contains("was deleted between selection and write")
1167                && rendered.contains("re-run doctor"),
1168            "expected Deleted-flavoured Display, got: {rendered}",
1169        );
1170
1171        // Critical invariant: the loser must NOT have been evicted.
1172        // Before the fix, the doctor partitioned the stale snapshot
1173        // and deleted the loser, leaving the ref with no live bundle.
1174        // Now: keeper is gone (the prompter deleted it to simulate the
1175        // race), but the loser must remain so the ref still has at
1176        // least one bundle.
1177        assert!(
1178            mock.contains(&format!("myrepo/refs/heads/main/{SHA_B}.bundle")),
1179            "loser must NOT be evicted when keeper recheck fails (bucket: {:?})",
1180            mock.keys(),
1181        );
1182        // Pin the strongest invariant: only the simulated concurrent
1183        // delete of SHA_A changed the bucket. No quarantine ref, no
1184        // copy, no extra delete fired.
1185        let mut expected = initial_keys.clone();
1186        expected.retain(|k| k != &format!("myrepo/refs/heads/main/{SHA_A}.bundle"));
1187        let mut actual = mock.keys();
1188        expected.sort();
1189        actual.sort();
1190        assert_eq!(
1191            actual, expected,
1192            "doctor mutated bucket beyond the simulated concurrent delete",
1193        );
1194
1195        // Operator-facing message names the keeper SHA, the ref, and
1196        // tells the operator to re-run.
1197        let output = String::from_utf8(out).expect("doctor output is utf-8");
1198        assert!(
1199            output.contains(SHA_A),
1200            "expected keeper SHA in operator output:\n{output}",
1201        );
1202        assert!(
1203            output.contains("no longer present"),
1204            "expected 'no longer present' message:\n{output}",
1205        );
1206        assert!(
1207            output.contains("Re-run doctor"),
1208            "expected re-run instruction:\n{output}",
1209        );
1210        // The "Keeping" line MUST NOT appear — we aborted before
1211        // committing to the eviction plan.
1212        assert!(
1213            !output.contains(&format!("Keeping {SHA_A}")),
1214            "doctor printed keeper announcement despite aborting:\n{output}",
1215        );
1216        // And the eviction "Moving ..." line MUST NOT appear either.
1217        assert!(
1218            !output.contains(&format!("Moving {SHA_B}")),
1219            "doctor printed eviction line despite aborting:\n{output}",
1220        );
1221    }
1222
1223    #[tokio::test]
1224    async fn fix_multiple_bundles_proceeds_when_keeper_still_present_after_confirm() {
1225        // Negative control for #155: if the concurrent delete removes
1226        // a non-keeper bundle between the prompt and the eviction,
1227        // the keeper re-verification passes and the doctor proceeds
1228        // normally. This pins the recheck to the keeper only — it
1229        // does not re-list everything and abort on any drift.
1230        let mock = MockStore::new();
1231        mock.insert("myrepo/HEAD", Bytes::from("refs/heads/main"));
1232        mock.insert(
1233            format!("myrepo/refs/heads/main/{SHA_A}.bundle"),
1234            Bytes::from("a"),
1235        );
1236        mock.insert(
1237            format!("myrepo/refs/heads/main/{SHA_B}.bundle"),
1238            Bytes::from("b"),
1239        );
1240        mock.insert(
1241            format!("myrepo/refs/heads/main/{SHA_C}.bundle"),
1242            Bytes::from("c"),
1243        );
1244        // Keep SHA_A (index 0). Concurrent delete removes SHA_B (a
1245        // non-keeper loser) between prompt and recheck. The doctor
1246        // must still proceed: SHA_A survives (it is the keeper) and
1247        // SHA_C is the only loser to evict — SHA_B is already gone.
1248        let prompter = DeleteOnConfirmPrompter {
1249            select_index: 0,
1250            confirm_answer: true,
1251            mock: mock.clone(),
1252            keys_to_delete: vec![format!("myrepo/refs/heads/main/{SHA_B}.bundle")],
1253            fired: std::sync::Mutex::new(false),
1254        };
1255        let doctor = Doctor::new(store_arc(&mock), "myrepo", DoctorOpts::default(), &prompter);
1256        doctor
1257            .run_into(&mut std::io::sink())
1258            .await
1259            .expect("unrelated concurrent delete must not block keeper-survival path");
1260
1261        // Keeper survived.
1262        assert!(mock.contains(&format!("myrepo/refs/heads/main/{SHA_A}.bundle")));
1263        // The non-keeper that the simulated race deleted is gone.
1264        assert!(!mock.contains(&format!("myrepo/refs/heads/main/{SHA_B}.bundle")));
1265        // SHA_C is the remaining loser — eviction must have run on it.
1266        // Without this assertion, a regression where the keeper recheck
1267        // passes but the eviction loop is skipped would silently pass.
1268        assert!(
1269            !mock.contains(&format!("myrepo/refs/heads/main/{SHA_C}.bundle")),
1270            "loser SHA_C must be evicted from the ref after a successful keeper recheck",
1271        );
1272    }
1273
1274    #[tokio::test]
1275    async fn fix_multiple_bundles_propagates_transient_head_failure_on_keeper() {
1276        // A non-`NotFound` `head` failure on the keeper re-check must
1277        // propagate as a `Store` error rather than silently proceed
1278        // with the eviction on the stale snapshot. The strongest
1279        // invariant: no losers are evicted when the recheck is
1280        // inconclusive — proceeding would risk deleting the last
1281        // live bundle on a transient blip.
1282        let mock = MockStore::new();
1283        mock.insert("myrepo/HEAD", Bytes::from("refs/heads/main"));
1284        mock.insert(
1285            format!("myrepo/refs/heads/main/{SHA_A}.bundle"),
1286            Bytes::from("a"),
1287        );
1288        mock.insert(
1289            format!("myrepo/refs/heads/main/{SHA_B}.bundle"),
1290            Bytes::from("b"),
1291        );
1292        let initial_keys = mock.keys();
1293        let keeper_key = format!("myrepo/refs/heads/main/{SHA_A}.bundle");
1294        mock.arm(Fault::NetworkOnHead {
1295            key: keeper_key.clone(),
1296        });
1297        let prompter = ScriptedPrompter::new([Answer::Select(0), Answer::Confirm(true)]);
1298        let doctor = Doctor::new(store_arc(&mock), "myrepo", DoctorOpts::default(), &prompter);
1299        let err = doctor
1300            .run_into(&mut std::io::sink())
1301            .await
1302            .expect_err("transient HEAD failure must propagate, not silently succeed");
1303        assert!(
1304            matches!(err, ManageError::Store(_)),
1305            "expected ManageError::Store from the head-fault, got {err:?}",
1306        );
1307        // No losers evicted on an inconclusive HEAD.
1308        let mut actual = mock.keys();
1309        let mut expected = initial_keys.clone();
1310        actual.sort();
1311        expected.sort();
1312        assert_eq!(
1313            actual, expected,
1314            "doctor mutated bucket despite inconclusive keeper recheck",
1315        );
1316    }
1317
1318    #[tokio::test]
1319    async fn fix_head_out_of_range_select_returns_internal_error() {
1320        // Sibling guard for the HEAD-candidate prompter path (#33). The
1321        // doctor reaches `fix_head` when the existing HEAD is invalid (or
1322        // the bucket has no HEAD object) AND there is at least one
1323        // `refs/heads/*` candidate to assign. An out-of-range script
1324        // index from a bad test prompter must surface as
1325        // `ManageError::Internal`, not panic the process.
1326        let mock = MockStore::new();
1327        // No HEAD object → snapshot.is_head_valid() is false.
1328        mock.insert(
1329            format!("myrepo/refs/heads/main/{SHA_A}.bundle"),
1330            Bytes::from("b"),
1331        );
1332        mock.insert(
1333            format!("myrepo/refs/heads/dev/{SHA_C}.bundle"),
1334            Bytes::from("c"),
1335        );
1336        // Two HEAD candidates → valid indices are 0 and 1; 42 is out of
1337        // range. No prior bundle-fix prompts because no ref has > 1
1338        // bundle.
1339        let prompter = ScriptedPrompter::new([Answer::Select(42)]);
1340        let doctor = Doctor::new(store_arc(&mock), "myrepo", DoctorOpts::default(), &prompter);
1341        let err = doctor
1342            .run_into(&mut std::io::sink())
1343            .await
1344            .expect_err("out-of-range HEAD index propagates");
1345        assert!(
1346            matches!(err, ManageError::Internal(ref msg) if msg.contains("HEAD candidate")),
1347            "expected ManageError::Internal naming HEAD candidate, got {err:?}",
1348        );
1349    }
1350
1351    #[tokio::test]
1352    async fn fix_head_writes_chosen_branch() {
1353        let mock = MockStore::new();
1354        mock.insert(
1355            format!("myrepo/refs/heads/main/{SHA_A}.bundle"),
1356            Bytes::from("b"),
1357        );
1358        mock.insert(
1359            format!("myrepo/refs/heads/dev/{SHA_C}.bundle"),
1360            Bytes::from("c"),
1361        );
1362        // Refs are surfaced in BTreeMap order (lexicographic), so the
1363        // candidate list is `[refs/heads/dev, refs/heads/main]` and
1364        // index 1 is `main`.
1365        let prompter = ScriptedPrompter::new([Answer::Select(1)]);
1366        let doctor = Doctor::new(store_arc(&mock), "myrepo", DoctorOpts::default(), &prompter);
1367        doctor
1368            .run_into(&mut std::io::sink())
1369            .await
1370            .expect("doctor.run");
1371
1372        let head_bytes = mock.get_bytes("myrepo/HEAD").await.expect("HEAD written");
1373        assert_eq!(&head_bytes[..], b"refs/heads/main");
1374    }
1375
1376    /// Prompter that runs a one-shot side effect immediately before
1377    /// returning its first `select` answer. Used by the issue #138
1378    /// regression tests to simulate a concurrent
1379    /// `git push :<branch>` / `manage delete-branch` that fires
1380    /// **between** the operator's selection and the HEAD write.
1381    struct DeleteBeforeReturnPrompter {
1382        index: usize,
1383        mock: MockStore,
1384        keys_to_delete: Vec<String>,
1385        /// Keys to insert (with an empty body) immediately before
1386        /// returning the answer — lets a race test seed residue
1387        /// (lock files, `PROTECTED#` markers) into the branch prefix
1388        /// the doctor is about to re-check.
1389        keys_to_insert: Vec<String>,
1390        fired: std::sync::Mutex<bool>,
1391    }
1392
1393    impl Prompter for DeleteBeforeReturnPrompter {
1394        fn select(&self, _prompt: &str, _options: &[String]) -> Result<usize, ManageError> {
1395            let mut fired = self.fired.lock().expect("fired mutex poisoned");
1396            if !*fired {
1397                for key in &self.keys_to_delete {
1398                    let _ = self.mock.remove_key(key);
1399                }
1400                for key in &self.keys_to_insert {
1401                    self.mock.insert(key.clone(), Bytes::new());
1402                }
1403                *fired = true;
1404            }
1405            Ok(self.index)
1406        }
1407
1408        fn confirm(&self, _prompt: &str) -> Result<bool, ManageError> {
1409            panic!("DeleteBeforeReturnPrompter does not expect confirm prompts")
1410        }
1411    }
1412
1413    #[tokio::test]
1414    async fn fix_head_refuses_when_chosen_branch_deleted_between_select_and_write() {
1415        // Issue #138: `fix_head` presents candidates from the
1416        // top-of-run snapshot. If a concurrent push / delete-branch
1417        // removes the chosen branch between the operator's selection
1418        // and the HEAD write, the doctor must NOT write HEAD pointing
1419        // at a now-deleted branch. Simulate the race by deleting the
1420        // chosen branch's only object inside the prompter's `select`
1421        // implementation (which fires after the candidate list is
1422        // computed but before `put_bytes(HEAD)` runs).
1423        let mock = MockStore::new();
1424        mock.insert(
1425            format!("myrepo/refs/heads/main/{SHA_A}.bundle"),
1426            Bytes::from("b"),
1427        );
1428        mock.insert(
1429            format!("myrepo/refs/heads/dev/{SHA_C}.bundle"),
1430            Bytes::from("c"),
1431        );
1432        // Index 1 == `refs/heads/main` (lexicographic ordering).
1433        let prompter = DeleteBeforeReturnPrompter {
1434            index: 1,
1435            mock: mock.clone(),
1436            keys_to_delete: vec![format!("myrepo/refs/heads/main/{SHA_A}.bundle")],
1437            keys_to_insert: vec![],
1438            fired: std::sync::Mutex::new(false),
1439        };
1440        let doctor = Doctor::new(store_arc(&mock), "myrepo", DoctorOpts::default(), &prompter);
1441        let mut out = Vec::new();
1442        let err = doctor
1443            .run_into(&mut out)
1444            .await
1445            .expect_err("stale snapshot must surface as an error, not silent success");
1446        assert!(
1447            matches!(
1448                &err,
1449                ManageError::StaleSnapshot { entity, reason: StaleReason::Deleted }
1450                    if entity == "refs/heads/main"
1451            ),
1452            "expected ManageError::StaleSnapshot {{ refs/heads/main, Deleted }}, got {err:?}",
1453        );
1454        // Pin the rendered Display so the wording stays in lockstep
1455        // with the StaleReason::Deleted branch.
1456        let rendered = err.to_string();
1457        assert!(
1458            rendered.contains("refs/heads/main")
1459                && rendered.contains("was deleted between selection and write")
1460                && rendered.contains("re-run doctor"),
1461            "expected Deleted-flavoured Display, got: {rendered}",
1462        );
1463
1464        // HEAD must NOT have been written — the doctor refused to
1465        // create a HEAD pointing at the deleted branch.
1466        assert!(
1467            !mock.contains("myrepo/HEAD"),
1468            "HEAD was written despite chosen branch being deleted",
1469        );
1470
1471        // The operator-facing output names the deleted branch and
1472        // tells them to re-run.
1473        let output = String::from_utf8(out).expect("doctor output is utf-8");
1474        assert!(
1475            output.contains("refs/heads/main")
1476                && output.contains("deleted between selection and HEAD write")
1477                && output.contains("Re-run doctor"),
1478            "expected race-detection message, got:\n{output}",
1479        );
1480        // The "Setting … as HEAD" line MUST NOT appear — we aborted
1481        // before that write.
1482        assert!(
1483            !output.contains("Setting refs/heads/main as HEAD"),
1484            "doctor printed the HEAD-write confirmation despite aborting:\n{output}",
1485        );
1486    }
1487
1488    #[tokio::test]
1489    async fn fix_head_succeeds_when_unrelated_branch_deleted_during_prompt() {
1490        // Companion to the race-detection test above: if a concurrent
1491        // delete removes a branch the operator did NOT select, the
1492        // re-verification must still let the HEAD write proceed. This
1493        // pins the re-check to the chosen branch only — it does not
1494        // re-list everything and abort on any drift.
1495        let mock = MockStore::new();
1496        mock.insert(
1497            format!("myrepo/refs/heads/main/{SHA_A}.bundle"),
1498            Bytes::from("b"),
1499        );
1500        mock.insert(
1501            format!("myrepo/refs/heads/dev/{SHA_C}.bundle"),
1502            Bytes::from("c"),
1503        );
1504        // Index 1 == `refs/heads/main`. Delete `refs/heads/dev` (the
1505        // non-chosen branch) during the prompt to prove the chosen
1506        // branch's re-verification is the only thing that matters.
1507        let prompter = DeleteBeforeReturnPrompter {
1508            index: 1,
1509            mock: mock.clone(),
1510            keys_to_delete: vec![format!("myrepo/refs/heads/dev/{SHA_C}.bundle")],
1511            keys_to_insert: vec![],
1512            fired: std::sync::Mutex::new(false),
1513        };
1514        let doctor = Doctor::new(store_arc(&mock), "myrepo", DoctorOpts::default(), &prompter);
1515        doctor
1516            .run_into(&mut std::io::sink())
1517            .await
1518            .expect("unrelated concurrent delete must not block HEAD write");
1519
1520        let head_bytes = mock.get_bytes("myrepo/HEAD").await.expect("HEAD written");
1521        assert_eq!(&head_bytes[..], b"refs/heads/main");
1522    }
1523
1524    #[tokio::test]
1525    async fn fix_head_refuses_when_chosen_branch_left_with_only_lock_or_marker() {
1526        // Tightens the #138 race-detection check: an empty re-listing is
1527        // not the only "branch is gone" signal. A concurrent partial
1528        // delete can leave a stale `LOCK#.lock` and / or a `PROTECTED#`
1529        // marker behind after every user-data key (bundles, `chain.json`,
1530        // `packs/*`) has been swept. Those keys are operational
1531        // metadata, not branch contents — writing HEAD against them would
1532        // recreate exactly the invalid-HEAD condition the doctor exists
1533        // to prevent.
1534        //
1535        // We drive the race by seeding the branch with one bundle and
1536        // letting the prompter both delete the bundle AND insert the
1537        // residue (lock + marker) between candidate selection and the
1538        // HEAD-write re-check.
1539        let mock = MockStore::new();
1540        mock.insert(
1541            format!("myrepo/refs/heads/main/{SHA_A}.bundle"),
1542            Bytes::from("b"),
1543        );
1544        mock.insert(
1545            format!("myrepo/refs/heads/dev/{SHA_C}.bundle"),
1546            Bytes::from("c"),
1547        );
1548        // Index 1 == `refs/heads/main` (lexicographic ordering).
1549        let prompter = DeleteBeforeReturnPrompter {
1550            index: 1,
1551            mock: mock.clone(),
1552            keys_to_delete: vec![format!("myrepo/refs/heads/main/{SHA_A}.bundle")],
1553            keys_to_insert: vec![
1554                "myrepo/refs/heads/main/LOCK#.lock".to_owned(),
1555                format!("myrepo/refs/heads/main/{}", keys::PROTECTED_MARKER_SEGMENT),
1556            ],
1557            fired: std::sync::Mutex::new(false),
1558        };
1559        let doctor = Doctor::new(store_arc(&mock), "myrepo", DoctorOpts::default(), &prompter);
1560        let mut out = Vec::new();
1561        let err = doctor
1562            .run_into(&mut out)
1563            .await
1564            .expect_err("residue-only branch must surface as stale snapshot");
1565        assert!(
1566            matches!(
1567                &err,
1568                ManageError::StaleSnapshot { entity, reason: StaleReason::ResidueOnly }
1569                    if entity == "refs/heads/main"
1570            ),
1571            "expected ManageError::StaleSnapshot {{ refs/heads/main, ResidueOnly }}, \
1572             got {err:?}",
1573        );
1574        // Pin the rendered Display so the residue-only wording does
1575        // not silently regress to the "was deleted" phrasing — the
1576        // distinction is the whole point of issue #199.
1577        let rendered = err.to_string();
1578        assert!(
1579            rendered.contains("refs/heads/main")
1580                && rendered.contains("only operational metadata")
1581                && rendered.contains("PROTECTED# marker")
1582                && rendered.contains("re-run doctor"),
1583            "expected ResidueOnly-flavoured Display, got: {rendered}",
1584        );
1585        assert!(
1586            !rendered.contains("was deleted between selection and write"),
1587            "ResidueOnly Display must not use the Deleted wording: {rendered}",
1588        );
1589
1590        // HEAD must NOT have been written — the doctor refused even
1591        // though the lock and marker keys make the listing non-empty.
1592        assert!(
1593            !mock.contains("myrepo/HEAD"),
1594            "HEAD was written despite chosen branch having only operational metadata",
1595        );
1596
1597        // The operator-facing output names the branch and explains why
1598        // a non-empty listing was still treated as "gone".
1599        let output = String::from_utf8(out).expect("doctor output is utf-8");
1600        assert!(
1601            output.contains("refs/heads/main"),
1602            "expected branch name in output:\n{output}",
1603        );
1604        assert!(
1605            output.contains("considered gone"),
1606            "expected 'considered gone' framing in output:\n{output}",
1607        );
1608        assert!(
1609            output.contains("operational metadata"),
1610            "expected residue rationale in output:\n{output}",
1611        );
1612        assert!(
1613            !output.contains("Setting refs/heads/main as HEAD"),
1614            "doctor printed the HEAD-write confirmation despite aborting:\n{output}",
1615        );
1616    }
1617
1618    /// Prompter that arms a one-shot list fault on the
1619    /// [`MockStore`] immediately before returning its first `select`
1620    /// answer. Used by
1621    /// `fix_head_propagates_recheck_list_failure_without_writing_head`
1622    /// to land an `AccessDeniedOnList` fault into the re-verification
1623    /// list call without tripping the top-of-run snapshot list (which
1624    /// has already completed by the time the prompter is invoked).
1625    struct ArmFaultPrompter {
1626        mock: MockStore,
1627        branch_prefix: String,
1628        fired: std::sync::Mutex<bool>,
1629    }
1630
1631    impl Prompter for ArmFaultPrompter {
1632        fn select(&self, _prompt: &str, _options: &[String]) -> Result<usize, ManageError> {
1633            let mut fired = self.fired.lock().expect("fired mutex poisoned");
1634            if !*fired {
1635                self.mock.arm(Fault::AccessDeniedOnList {
1636                    prefix: self.branch_prefix.clone(),
1637                });
1638                *fired = true;
1639            }
1640            Ok(0)
1641        }
1642        fn confirm(&self, _prompt: &str) -> Result<bool, ManageError> {
1643            panic!("ArmFaultPrompter does not expect confirm prompts")
1644        }
1645    }
1646
1647    #[tokio::test]
1648    async fn fix_head_propagates_recheck_list_failure_without_writing_head() {
1649        // The re-verification list at the start of `fix_head` (just
1650        // before the HEAD put_bytes) propagates list failures via `?`.
1651        // A regression that swallowed the error and proceeded against
1652        // the stale top-of-run snapshot would silently re-create the
1653        // invalid-HEAD condition #138 was meant to prevent. Arm a
1654        // single-shot list fault scoped to the chosen branch's prefix
1655        // so only the re-check listing trips; the top-of-run snapshot
1656        // listing has already completed by the time the prompter is
1657        // invoked.
1658        let mock = MockStore::new();
1659        mock.insert(
1660            format!("myrepo/refs/heads/main/{SHA_A}.bundle"),
1661            Bytes::from("b"),
1662        );
1663        // Lexicographic order puts `refs/heads/main` at index 0 (it is
1664        // the only candidate). The prompter arms the AccessDenied
1665        // fault on its first select call — after the top-of-run
1666        // snapshot list has already completed but before the
1667        // re-verification list runs.
1668        let prompter = ArmFaultPrompter {
1669            mock: mock.clone(),
1670            branch_prefix: "myrepo/refs/heads/main/".to_owned(),
1671            fired: std::sync::Mutex::new(false),
1672        };
1673        let doctor = Doctor::new(store_arc(&mock), "myrepo", DoctorOpts::default(), &prompter);
1674        let err = doctor
1675            .run_into(&mut std::io::sink())
1676            .await
1677            .expect_err("list-fault on the re-check must propagate, not silently succeed");
1678        // The exact wording isn't pinned — only the variant chain
1679        // (ManageError::Store wrapping an ObjectStoreError). A
1680        // regression that swallowed the error would surface as Ok(_),
1681        // failing the expect_err above.
1682        assert!(
1683            matches!(err, ManageError::Store(_)),
1684            "expected ManageError::Store from the list-fault, got {err:?}",
1685        );
1686        // The critical invariant: HEAD must NOT have been written.
1687        // A re-check list failure that proceeded with the put would
1688        // re-create the invalid-HEAD condition the doctor is supposed
1689        // to fix.
1690        assert!(
1691            !mock.contains("myrepo/HEAD"),
1692            "HEAD was written despite the re-check list failing",
1693        );
1694    }
1695
1696    #[tokio::test]
1697    async fn stale_lock_listed_but_not_deleted_by_default() {
1698        let mock = MockStore::new();
1699        mock.insert("myrepo/HEAD", Bytes::from("refs/heads/main"));
1700        mock.insert(
1701            format!("myrepo/refs/heads/main/{SHA_A}.bundle"),
1702            Bytes::from("b"),
1703        );
1704        let stale = OffsetDateTime::now_utc() - time::Duration::seconds(120);
1705        mock.insert_with(
1706            "myrepo/refs/heads/main/LOCK#.lock",
1707            Bytes::new(),
1708            stale,
1709            PutOpts::default(),
1710        );
1711        let prompter = ScriptedPrompter::new([]);
1712        let doctor = Doctor::new(store_arc(&mock), "myrepo", DoctorOpts::default(), &prompter);
1713        doctor
1714            .run_into(&mut std::io::sink())
1715            .await
1716            .expect("doctor.run");
1717        assert!(
1718            mock.contains("myrepo/refs/heads/main/LOCK#.lock"),
1719            "lock retained without --delete-stale-locks"
1720        );
1721    }
1722
1723    #[tokio::test]
1724    async fn stale_lock_deleted_when_flag_set() {
1725        let mock = MockStore::new();
1726        mock.insert("myrepo/HEAD", Bytes::from("refs/heads/main"));
1727        mock.insert(
1728            format!("myrepo/refs/heads/main/{SHA_A}.bundle"),
1729            Bytes::from("b"),
1730        );
1731        let stale = OffsetDateTime::now_utc() - time::Duration::seconds(120);
1732        mock.insert_with(
1733            "myrepo/refs/heads/main/LOCK#.lock",
1734            Bytes::new(),
1735            stale,
1736            PutOpts::default(),
1737        );
1738        let opts = DoctorOpts {
1739            delete_stale_locks: true,
1740            ..DoctorOpts::default()
1741        };
1742        let prompter = ScriptedPrompter::new([]);
1743        let doctor = Doctor::new(store_arc(&mock), "myrepo", opts, &prompter);
1744        doctor
1745            .run_into(&mut std::io::sink())
1746            .await
1747            .expect("doctor.run");
1748        assert!(!mock.contains("myrepo/refs/heads/main/LOCK#.lock"));
1749    }
1750
1751    #[tokio::test]
1752    async fn fresh_lock_is_not_flagged_stale() {
1753        let mock = MockStore::new();
1754        mock.insert("myrepo/HEAD", Bytes::from("refs/heads/main"));
1755        mock.insert(
1756            format!("myrepo/refs/heads/main/{SHA_A}.bundle"),
1757            Bytes::from("b"),
1758        );
1759        // Stamped now → not stale.
1760        mock.insert("myrepo/refs/heads/main/LOCK#.lock", Bytes::new());
1761        let opts = DoctorOpts {
1762            delete_stale_locks: true,
1763            ..DoctorOpts::default()
1764        };
1765        let prompter = ScriptedPrompter::new([]);
1766        let doctor = Doctor::new(store_arc(&mock), "myrepo", opts, &prompter);
1767        doctor
1768            .run_into(&mut std::io::sink())
1769            .await
1770            .expect("doctor.run");
1771        assert!(mock.contains("myrepo/refs/heads/main/LOCK#.lock"));
1772    }
1773
1774    // --- Stale-listing race window (#132) ---------------------------------
1775    //
1776    // The initial bucket listing is reused for stale-lock detection after
1777    // interactive duplicate-bundle / HEAD-fix prompts have run, which can
1778    // take minutes. In that window the stale lock may have been cleaned up
1779    // and replaced by a fresh, active lock at the same key — and an
1780    // unconditional delete would silently revoke the new client's mutual
1781    // exclusion. The fix re-HEADs each lock key immediately before the
1782    // delete and skips keys that are no longer stale (or vanished).
1783
1784    /// Lock key was stale at listing time but a fresh lock now sits at the
1785    /// same key. The re-HEAD must catch it and refuse to delete.
1786    #[tokio::test]
1787    async fn stale_listing_with_fresh_head_skips_delete() {
1788        let mock = MockStore::new();
1789        // Store the lock with a FRESH timestamp — this is what HEAD will
1790        // see at delete time, simulating the lock having been refreshed
1791        // by another client between listing and stale-handling.
1792        mock.insert("myrepo/refs/heads/main/LOCK#.lock", Bytes::new());
1793
1794        // Synthesize the stale listing the way `run_into` would see it if
1795        // the lock had been stale when the initial `list` ran. Calling
1796        // `list_and_handle_stale_locks` directly lets the test inject the
1797        // race condition deterministically.
1798        let stale_ts = OffsetDateTime::now_utc() - time::Duration::seconds(120);
1799        let synthetic_listing = vec![ObjectMeta {
1800            key: "myrepo/refs/heads/main/LOCK#.lock".to_owned(),
1801            size: 0,
1802            last_modified: stale_ts,
1803            etag: None,
1804        }];
1805
1806        let opts = DoctorOpts {
1807            delete_stale_locks: true,
1808            ..DoctorOpts::default()
1809        };
1810        let prompter = ScriptedPrompter::new([]);
1811        let doctor = Doctor::new(store_arc(&mock), "myrepo", opts, &prompter);
1812        let mut out = Vec::new();
1813        doctor
1814            .list_and_handle_stale_locks(&mut out, &synthetic_listing)
1815            .await
1816            .expect("stale-lock handler");
1817        let captured = String::from_utf8(out).expect("utf-8 output");
1818
1819        assert!(
1820            mock.contains("myrepo/refs/heads/main/LOCK#.lock"),
1821            "fresh lock at stale-listed key must not be deleted",
1822        );
1823        assert!(
1824            captured.contains("no longer stale"),
1825            "skip reason missing from operator output: {captured:?}",
1826        );
1827        assert!(
1828            captured.contains("Skipped 1 lock(s)"),
1829            "skipped-count summary missing: {captured:?}",
1830        );
1831    }
1832
1833    /// Lock key was stale at listing time AND still stale at HEAD time —
1834    /// the re-HEAD must NOT suppress the legitimate delete.
1835    #[tokio::test]
1836    async fn stale_listing_with_stale_head_deletes() {
1837        let mock = MockStore::new();
1838        let stale_ts = OffsetDateTime::now_utc() - time::Duration::seconds(120);
1839        mock.insert_with(
1840            "myrepo/refs/heads/main/LOCK#.lock",
1841            Bytes::new(),
1842            stale_ts,
1843            PutOpts::default(),
1844        );
1845        let synthetic_listing = vec![ObjectMeta {
1846            key: "myrepo/refs/heads/main/LOCK#.lock".to_owned(),
1847            size: 0,
1848            last_modified: stale_ts,
1849            etag: None,
1850        }];
1851
1852        let opts = DoctorOpts {
1853            delete_stale_locks: true,
1854            ..DoctorOpts::default()
1855        };
1856        let prompter = ScriptedPrompter::new([]);
1857        let doctor = Doctor::new(store_arc(&mock), "myrepo", opts, &prompter);
1858        let mut out = Vec::new();
1859        doctor
1860            .list_and_handle_stale_locks(&mut out, &synthetic_listing)
1861            .await
1862            .expect("stale-lock handler");
1863        let captured = String::from_utf8(out).expect("utf-8 output");
1864
1865        assert!(
1866            !mock.contains("myrepo/refs/heads/main/LOCK#.lock"),
1867            "stale lock must be deleted when HEAD confirms staleness",
1868        );
1869        assert!(
1870            captured.contains("Deleted myrepo/refs/heads/main/LOCK#.lock"),
1871            "delete confirmation missing: {captured:?}",
1872        );
1873    }
1874
1875    /// Lock vanished between listing and stale-handling (e.g. another
1876    /// client's stale-lock recovery already cleaned it up). HEAD returns
1877    /// `NotFound` — the doctor must skip cleanly without erroring.
1878    #[tokio::test]
1879    async fn stale_listing_with_vanished_head_skips_without_error() {
1880        let mock = MockStore::new();
1881        // No lock object inserted — HEAD will return NotFound.
1882        let stale_ts = OffsetDateTime::now_utc() - time::Duration::seconds(120);
1883        let synthetic_listing = vec![ObjectMeta {
1884            key: "myrepo/refs/heads/main/LOCK#.lock".to_owned(),
1885            size: 0,
1886            last_modified: stale_ts,
1887            etag: None,
1888        }];
1889
1890        let opts = DoctorOpts {
1891            delete_stale_locks: true,
1892            ..DoctorOpts::default()
1893        };
1894        let prompter = ScriptedPrompter::new([]);
1895        let doctor = Doctor::new(store_arc(&mock), "myrepo", opts, &prompter);
1896        let mut out = Vec::new();
1897        doctor
1898            .list_and_handle_stale_locks(&mut out, &synthetic_listing)
1899            .await
1900            .expect("vanished lock must not error");
1901        let captured = String::from_utf8(out).expect("utf-8 output");
1902
1903        assert!(
1904            captured.contains("disappeared concurrently"),
1905            "concurrent-cleanup skip reason missing: {captured:?}",
1906        );
1907        assert!(
1908            !captured.contains("Deleted myrepo/refs/heads/main/LOCK#.lock"),
1909            "must not log a delete for a vanished key: {captured:?}",
1910        );
1911    }
1912
1913    /// Transient (non-`NotFound`) failure on the re-HEAD must be
1914    /// treated as inconclusive: the doctor must NOT delete on the
1915    /// stale-listing assumption, must NOT abort the run, and must
1916    /// surface the failure to the operator. Issue #132's "best-effort
1917    /// sweep" contract pins this arm — a regression that deletes on
1918    /// any non-`Ok` HEAD outcome would silently revoke another
1919    /// client's lock the moment the network burped.
1920    #[tokio::test]
1921    async fn stale_listing_with_transient_head_error_skips_lock_without_aborting() {
1922        let mock = MockStore::new();
1923        let key = "myrepo/refs/heads/main/LOCK#.lock";
1924        // Lock body present with a stale timestamp; the HEAD fault
1925        // fires before the timestamp is consulted, so the lock's
1926        // freshness is irrelevant. What matters is that delete is
1927        // skipped on the inconclusive HEAD.
1928        let stale_ts = OffsetDateTime::now_utc() - time::Duration::seconds(120);
1929        mock.insert_with(key, Bytes::new(), stale_ts, PutOpts::default());
1930        mock.arm(Fault::NetworkOnHead {
1931            key: key.to_owned(),
1932        });
1933
1934        let synthetic_listing = vec![ObjectMeta {
1935            key: key.to_owned(),
1936            size: 0,
1937            last_modified: stale_ts,
1938            etag: None,
1939        }];
1940
1941        let opts = DoctorOpts {
1942            delete_stale_locks: true,
1943            ..DoctorOpts::default()
1944        };
1945        let prompter = ScriptedPrompter::new([]);
1946        let doctor = Doctor::new(store_arc(&mock), "myrepo", opts, &prompter);
1947        let mut out = Vec::new();
1948        doctor
1949            .list_and_handle_stale_locks(&mut out, &synthetic_listing)
1950            .await
1951            .expect("transient HEAD error must not abort the sweep");
1952        let captured = String::from_utf8(out).expect("utf-8 output");
1953
1954        assert!(
1955            mock.contains(key),
1956            "lock must survive a transient HEAD failure (best-effort sweep)",
1957        );
1958        assert!(
1959            captured.contains("Failed to re-check"),
1960            "operator output missing re-check failure line: {captured:?}",
1961        );
1962        assert!(
1963            !captured.contains("Deleted myrepo/refs/heads/main/LOCK#.lock"),
1964            "must not log a delete when HEAD was inconclusive: {captured:?}",
1965        );
1966        assert!(
1967            captured.contains("Skipped 1 lock(s)"),
1968            "skipped-count summary missing: {captured:?}",
1969        );
1970    }
1971
1972    /// A per-key `delete` failure must be logged and the sweep must
1973    /// continue to the next stale lock. This pins the "best-effort:
1974    /// report each failure but keep going" contract — a regression
1975    /// that returned the first delete error would leave the second
1976    /// stale lock in place AND abort the run.
1977    #[tokio::test]
1978    async fn stale_listing_with_delete_failure_logs_and_continues() {
1979        let mock = MockStore::new();
1980        let main_key = "myrepo/refs/heads/main/LOCK#.lock";
1981        let dev_key = "myrepo/refs/heads/dev/LOCK#.lock";
1982        let stale_ts = OffsetDateTime::now_utc() - time::Duration::seconds(120);
1983        mock.insert_with(main_key, Bytes::new(), stale_ts, PutOpts::default());
1984        mock.insert_with(dev_key, Bytes::new(), stale_ts, PutOpts::default());
1985        // First key's delete fails (still-stale at HEAD, then network on
1986        // delete). Second key's delete should still fire.
1987        mock.arm(Fault::NetworkOnDelete {
1988            key: main_key.to_owned(),
1989        });
1990
1991        // `list_and_handle_stale_locks` iterates the synthetic listing
1992        // in order; pin order so the failing key is processed first.
1993        let synthetic_listing = vec![
1994            ObjectMeta {
1995                key: main_key.to_owned(),
1996                size: 0,
1997                last_modified: stale_ts,
1998                etag: None,
1999            },
2000            ObjectMeta {
2001                key: dev_key.to_owned(),
2002                size: 0,
2003                last_modified: stale_ts,
2004                etag: None,
2005            },
2006        ];
2007
2008        let opts = DoctorOpts {
2009            delete_stale_locks: true,
2010            ..DoctorOpts::default()
2011        };
2012        let prompter = ScriptedPrompter::new([]);
2013        let doctor = Doctor::new(store_arc(&mock), "myrepo", opts, &prompter);
2014        let mut out = Vec::new();
2015        doctor
2016            .list_and_handle_stale_locks(&mut out, &synthetic_listing)
2017            .await
2018            .expect("delete failure must not abort the sweep");
2019        let captured = String::from_utf8(out).expect("utf-8 output");
2020
2021        assert!(
2022            mock.contains(main_key),
2023            "first key must survive its delete failure",
2024        );
2025        assert!(
2026            !mock.contains(dev_key),
2027            "second key must still be deleted after the first failure",
2028        );
2029        assert!(
2030            captured.contains(&format!("Failed to delete {main_key}")),
2031            "operator output missing delete-failure line for {main_key}: {captured:?}",
2032        );
2033        assert!(
2034            captured.contains(&format!("Deleted {dev_key}")),
2035            "operator output missing delete confirmation for {dev_key}: {captured:?}",
2036        );
2037    }
2038
2039    // --- `delete_stale_lock_if_still_stale` per-branch unit coverage -----
2040    //
2041    // The five `DeleteOutcome` variants each correspond to one of the
2042    // failure / success paths the orchestrator counts against. Driving
2043    // the helper directly (instead of through `list_and_handle_stale_locks`)
2044    // pins each branch in isolation: a regression that conflates two
2045    // outcomes — e.g. counts `DeleteFailed` as `Deleted` — fails the
2046    // outcome-equality assertion even when the operator-visible writer
2047    // text still looks plausible.
2048
2049    const DELETE_TTL: Duration = Duration::from_mins(1);
2050    const LOCK_KEY: &str = "myrepo/refs/heads/main/LOCK#.lock";
2051
2052    /// Fresh HEAD confirms staleness; `delete` succeeds → `Deleted`.
2053    #[tokio::test]
2054    async fn helper_returns_deleted_when_head_confirms_stale() {
2055        let mock = MockStore::new();
2056        let stale_ts = OffsetDateTime::now_utc() - time::Duration::seconds(120);
2057        mock.insert_with(LOCK_KEY, Bytes::new(), stale_ts, PutOpts::default());
2058
2059        let mut out = Vec::new();
2060        let outcome =
2061            super::delete_stale_lock_if_still_stale(&mock, LOCK_KEY, DELETE_TTL, &mut out)
2062                .await
2063                .expect("helper must not error on the happy path");
2064
2065        assert_eq!(outcome, DeleteOutcome::Deleted);
2066        assert!(!mock.contains(LOCK_KEY), "lock body must be removed");
2067        let captured = String::from_utf8(out).expect("utf-8 output");
2068        assert!(
2069            captured.contains(&format!("Deleted {LOCK_KEY}")),
2070            "delete confirmation missing: {captured:?}",
2071        );
2072    }
2073
2074    /// Fresh HEAD shows a `last_modified` inside the TTL → `SkippedNotStale`.
2075    /// Pins the issue-#132 race-window guard.
2076    #[tokio::test]
2077    async fn helper_returns_skipped_not_stale_when_head_shows_fresh() {
2078        let mock = MockStore::new();
2079        // Insert with `now` — HEAD will read this as fresh.
2080        mock.insert(LOCK_KEY, Bytes::new());
2081
2082        let mut out = Vec::new();
2083        let outcome =
2084            super::delete_stale_lock_if_still_stale(&mock, LOCK_KEY, DELETE_TTL, &mut out)
2085                .await
2086                .expect("helper must not error when refusing to delete");
2087
2088        assert_eq!(outcome, DeleteOutcome::SkippedNotStale);
2089        assert!(
2090            mock.contains(LOCK_KEY),
2091            "fresh lock must survive a stale-listing call",
2092        );
2093        let captured = String::from_utf8(out).expect("utf-8 output");
2094        assert!(
2095            captured.contains("no longer stale"),
2096            "skip-reason text missing: {captured:?}",
2097        );
2098    }
2099
2100    /// HEAD returns `NotFound` → `SkippedDisappeared`.
2101    #[tokio::test]
2102    async fn helper_returns_skipped_disappeared_on_not_found() {
2103        // No insert — HEAD will return NotFound.
2104        let mock = MockStore::new();
2105        let mut out = Vec::new();
2106        let outcome =
2107            super::delete_stale_lock_if_still_stale(&mock, LOCK_KEY, DELETE_TTL, &mut out)
2108                .await
2109                .expect("vanished key must not surface as an error");
2110
2111        assert_eq!(outcome, DeleteOutcome::SkippedDisappeared);
2112        let captured = String::from_utf8(out).expect("utf-8 output");
2113        assert!(
2114            captured.contains("disappeared concurrently"),
2115            "disappeared-skip line missing: {captured:?}",
2116        );
2117    }
2118
2119    /// HEAD fails with a transient (non-`NotFound`) error →
2120    /// `SkippedHeadError`. The lock must NOT be deleted on the
2121    /// inconclusive HEAD (best-effort sweep contract).
2122    #[tokio::test]
2123    async fn helper_returns_skipped_head_error_on_transient_failure() {
2124        let mock = MockStore::new();
2125        let stale_ts = OffsetDateTime::now_utc() - time::Duration::seconds(120);
2126        mock.insert_with(LOCK_KEY, Bytes::new(), stale_ts, PutOpts::default());
2127        mock.arm(Fault::NetworkOnHead {
2128            key: LOCK_KEY.to_owned(),
2129        });
2130
2131        let mut out = Vec::new();
2132        let outcome =
2133            super::delete_stale_lock_if_still_stale(&mock, LOCK_KEY, DELETE_TTL, &mut out)
2134                .await
2135                .expect("transient HEAD failure must not abort the sweep");
2136
2137        assert_eq!(outcome, DeleteOutcome::SkippedHeadError);
2138        assert!(
2139            mock.contains(LOCK_KEY),
2140            "lock must survive an inconclusive HEAD",
2141        );
2142        let captured = String::from_utf8(out).expect("utf-8 output");
2143        assert!(
2144            captured.contains("Failed to re-check"),
2145            "re-check failure line missing: {captured:?}",
2146        );
2147    }
2148
2149    /// HEAD confirms staleness but `delete` itself fails →
2150    /// `DeleteFailed`. The helper must return Ok so the orchestrator
2151    /// can proceed to the next key.
2152    #[tokio::test]
2153    async fn helper_returns_delete_failed_when_delete_errors() {
2154        let mock = MockStore::new();
2155        let stale_ts = OffsetDateTime::now_utc() - time::Duration::seconds(120);
2156        mock.insert_with(LOCK_KEY, Bytes::new(), stale_ts, PutOpts::default());
2157        mock.arm(Fault::NetworkOnDelete {
2158            key: LOCK_KEY.to_owned(),
2159        });
2160
2161        let mut out = Vec::new();
2162        let outcome =
2163            super::delete_stale_lock_if_still_stale(&mock, LOCK_KEY, DELETE_TTL, &mut out)
2164                .await
2165                .expect("delete failure must surface as Ok(DeleteFailed), not Err");
2166
2167        assert_eq!(outcome, DeleteOutcome::DeleteFailed);
2168        assert!(
2169            mock.contains(LOCK_KEY),
2170            "failed delete must not have removed the lock",
2171        );
2172        let captured = String::from_utf8(out).expect("utf-8 output");
2173        assert!(
2174            captured.contains(&format!("Failed to delete {LOCK_KEY}")),
2175            "delete-failure line missing: {captured:?}",
2176        );
2177    }
2178
2179    #[tokio::test]
2180    async fn report_renders_protected_multi_bundle_and_invalid_head() {
2181        // Build a snapshot covering every report-line shape: a
2182        // protected ref with one bundle, a duplicate-bundle ref, an
2183        // empty ref, plus a HEAD body that does not match any ref so
2184        // the trailing label reads `Invalid`.
2185        let mock = MockStore::new();
2186        mock.insert("myrepo/HEAD", Bytes::from("refs/heads/missing"));
2187        mock.insert(
2188            format!("myrepo/refs/heads/main/{SHA_A}.bundle"),
2189            Bytes::from("b"),
2190        );
2191        mock.insert("myrepo/refs/heads/main/PROTECTED#", Bytes::new());
2192        mock.insert(
2193            format!("myrepo/refs/heads/dev/{SHA_A}.bundle"),
2194            Bytes::from("a"),
2195        );
2196        mock.insert(
2197            format!("myrepo/refs/heads/dev/{SHA_B}.bundle"),
2198            Bytes::from("a"),
2199        );
2200        mock.insert("myrepo/refs/heads/empty/PROTECTED#", Bytes::new());
2201
2202        let prompter = ScriptedPrompter::new([]);
2203        let doctor = Doctor::new(store_arc(&mock), "myrepo", DoctorOpts::default(), &prompter);
2204        let snapshot = super::analyze_objects(
2205            &mock.list("myrepo/").await.expect("list"),
2206            "myrepo/",
2207            &store_arc(&mock),
2208        )
2209        .await
2210        .expect("analyze");
2211
2212        let report = doctor.report(&snapshot);
2213        assert_eq!(
2214            report,
2215            "myrepo:\n  \
2216             refs/heads/dev: Multiple bundles\n \
2217             * refs/heads/empty: No bundles\n \
2218             * refs/heads/main: Ok\n  \
2219             HEAD: Invalid\n",
2220        );
2221    }
2222
2223    #[tokio::test]
2224    async fn report_renders_valid_head_as_ref_label() {
2225        let mock = MockStore::new();
2226        mock.insert("myrepo/HEAD", Bytes::from("refs/heads/main"));
2227        mock.insert(
2228            format!("myrepo/refs/heads/main/{SHA_A}.bundle"),
2229            Bytes::from("b"),
2230        );
2231        let prompter = ScriptedPrompter::new([]);
2232        let doctor = Doctor::new(store_arc(&mock), "myrepo", DoctorOpts::default(), &prompter);
2233        let snapshot = super::analyze_objects(
2234            &mock.list("myrepo/").await.expect("list"),
2235            "myrepo/",
2236            &store_arc(&mock),
2237        )
2238        .await
2239        .expect("analyze");
2240
2241        let report = doctor.report(&snapshot);
2242        assert_eq!(
2243            report,
2244            "myrepo:\n  refs/heads/main: Ok\n  HEAD: refs/heads/main\n",
2245        );
2246    }
2247
2248    // --- Root-of-bucket (empty prefix) coverage --------------------------
2249
2250    #[tokio::test]
2251    async fn root_prefix_clean_run_does_not_mutate_bucket() {
2252        // Repo lives at the bucket root — keys have no `<prefix>/`
2253        // segment. A regression that re-introduces the leading slash
2254        // would surface either as "no objects found" (the doctor's
2255        // listing key would be `/`, which matches nothing) or as the
2256        // doctor failing to identify the bundle by its relative path.
2257        let mock = MockStore::new();
2258        mock.insert("HEAD", Bytes::from("refs/heads/main"));
2259        mock.insert(
2260            format!("refs/heads/main/{SHA_A}.bundle"),
2261            Bytes::from("body"),
2262        );
2263        let initial_keys = mock.keys();
2264        let prompter = ScriptedPrompter::new([]);
2265        let doctor = Doctor::new(store_arc(&mock), "", DoctorOpts::default(), &prompter);
2266        doctor
2267            .run_into(&mut std::io::sink())
2268            .await
2269            .expect("doctor.run at root");
2270        assert_eq!(mock.keys(), initial_keys);
2271    }
2272
2273    #[tokio::test]
2274    async fn root_prefix_fix_head_writes_to_root_head_key() {
2275        // No HEAD object → fix_head writes one. The key must be the
2276        // bare `HEAD`, not `/HEAD`.
2277        let mock = MockStore::new();
2278        mock.insert(format!("refs/heads/main/{SHA_A}.bundle"), Bytes::from("b"));
2279        let prompter = ScriptedPrompter::new([Answer::Select(0)]);
2280        let doctor = Doctor::new(store_arc(&mock), "", DoctorOpts::default(), &prompter);
2281        doctor
2282            .run_into(&mut std::io::sink())
2283            .await
2284            .expect("doctor.run at root");
2285
2286        let head_bytes = mock.get_bytes("HEAD").await.expect("HEAD at root");
2287        assert_eq!(&head_bytes[..], b"refs/heads/main");
2288        assert!(!mock.contains("/HEAD"), "no leading-slash HEAD key");
2289    }
2290
2291    #[tokio::test]
2292    async fn root_prefix_fix_multiple_bundles_quarantines_at_root() {
2293        let mock = MockStore::new();
2294        mock.insert("HEAD", Bytes::from("refs/heads/main"));
2295        mock.insert(format!("refs/heads/main/{SHA_A}.bundle"), Bytes::from("a"));
2296        mock.insert(format!("refs/heads/main/{SHA_B}.bundle"), Bytes::from("b"));
2297        let prompter = ScriptedPrompter::new([Answer::Select(0), Answer::Confirm(true)]);
2298        let doctor = Doctor::new(store_arc(&mock), "", DoctorOpts::default(), &prompter);
2299        doctor
2300            .run_into(&mut std::io::sink())
2301            .await
2302            .expect("doctor.run at root");
2303
2304        // Loser was moved to a quarantine ref `refs/heads/main_<uuid8>`,
2305        // and the destination key has no leading slash.
2306        let moved = mock
2307            .keys()
2308            .into_iter()
2309            .find(|k| k.starts_with("refs/heads/main_") && k.ends_with(&format!("/{SHA_B}.bundle")))
2310            .expect("quarantine key created at root");
2311        assert!(
2312            !moved.starts_with('/'),
2313            "quarantine key must not have a leading slash: {moved:?}"
2314        );
2315    }
2316
2317    // --- Packchain section --------------------------------------------
2318
2319    #[tokio::test]
2320    async fn bundle_engine_clean_run_does_not_mutate_bucket() {
2321        // Doctor against a bundle-engine remote does not run the
2322        // packchain audit (the engine gate at run() guards it). This
2323        // test pins that the run path is non-mutating; the *absence*
2324        // of the packchain section under bundle is verified
2325        // separately via `bundle_engine_audit_runs_against_packchain_only`
2326        // (mutation-tested: inverting the engine gate makes that
2327        // test fail).
2328        let mock = MockStore::new();
2329        mock.insert("repo/HEAD", Bytes::from("refs/heads/main"));
2330        mock.insert(
2331            format!("repo/refs/heads/main/{SHA_A}.bundle"),
2332            Bytes::from("b"),
2333        );
2334        let initial_keys = mock.keys();
2335        let prompter = ScriptedPrompter::new([]);
2336        let doctor = Doctor::new(store_arc(&mock), "repo", DoctorOpts::default(), &prompter);
2337        doctor
2338            .run_into(&mut std::io::sink())
2339            .await
2340            .expect("clean bundle run");
2341        assert_eq!(mock.keys(), initial_keys);
2342    }
2343
2344    /// Build a small packchain-shape mock used by the engine-gate
2345    /// tests below: chain.json + live pack + orphan pack + tombstone.
2346    /// Returns the bucket-wide listing the doctor would compute, so
2347    /// tests can drive `maybe_render_packchain_section` with the
2348    /// same `objects` slice the production code receives.
2349    async fn packchain_mock_with_listing() -> (MockStore, Vec<ObjectMeta>) {
2350        let mock = MockStore::new();
2351        mock.insert(
2352            "repo/refs/heads/main/chain.json",
2353            Bytes::from(
2354                r#"{"v":1,"tip":"0000000000000000000000000000000000000001","full_at":"0000000000000000000000000000000000000001","segments":[{"sha":"0000000000000000000000000000000000000001","parent_sha":null,"pack":"packs/1111111111111111111111111111111111111111.pack","bytes":1024}]}"#,
2355            ),
2356        );
2357        mock.insert(
2358            "repo/packs/1111111111111111111111111111111111111111.pack",
2359            Bytes::from_static(b"live"),
2360        );
2361        mock.insert(
2362            "repo/packs/2222222222222222222222222222222222222222.pack",
2363            Bytes::from_static(b"orphan-body-len-eq-19"),
2364        );
2365        let marked_at = (OffsetDateTime::now_utc() - time::Duration::hours(2))
2366            .format(&time::format_description::well_known::Rfc3339)
2367            .unwrap();
2368        let tombstone_body = format!(
2369            r#"{{"v":1,"run_id":"abc-1","marked_at":"{marked_at}","orphan_packs":["2222222222222222222222222222222222222222"]}}"#
2370        );
2371        let tombstone_key = format!("repo/gc/tombstones-abc-1-{marked_at}.json");
2372        mock.insert(tombstone_key, Bytes::from(tombstone_body));
2373
2374        let objects = mock.list("repo/").await.expect("list");
2375        (mock, objects)
2376    }
2377
2378    #[tokio::test]
2379    async fn packchain_engine_renders_section_with_orphan_and_tombstone() {
2380        // Mutation-tested: stubbing `render_packchain_section` to
2381        // return an empty string makes this test fail.
2382        let (mock, objects) = packchain_mock_with_listing().await;
2383        let prompter = ScriptedPrompter::new([]);
2384        let doctor = Doctor::new(
2385            store_arc(&mock),
2386            "repo",
2387            DoctorOpts {
2388                engine: StorageEngine::Packchain,
2389                ..DoctorOpts::default()
2390            },
2391            &prompter,
2392        );
2393        let rendered = doctor
2394            .maybe_render_packchain_section(&objects)
2395            .await
2396            .expect("packchain audit succeeds")
2397            .expect("packchain engine produces a section");
2398
2399        // Pin actual content. A regression in either audit
2400        // classification OR renderer fails one of these.
2401        assert!(rendered.contains("=== Packchain ==="), "{rendered}");
2402        assert!(rendered.contains("Orphans: 1 pack(s)"), "{rendered}");
2403        assert!(rendered.contains("21 B"), "{rendered}");
2404        assert!(rendered.contains("run id abc-1"), "{rendered}");
2405        assert!(rendered.contains("1 pack(s)"), "{rendered}");
2406    }
2407
2408    #[tokio::test]
2409    async fn bundle_engine_returns_no_packchain_section() {
2410        // Mutation-tested: inverting `maybe_render_packchain_section`'s
2411        // engine gate (`Packchain` → `Bundle`) makes this test fail.
2412        // The same packchain-shape bucket that fills `packchain_engine_renders_section_with_orphan_and_tombstone`
2413        // produces None here because the engine is Bundle.
2414        let (mock, objects) = packchain_mock_with_listing().await;
2415        let prompter = ScriptedPrompter::new([]);
2416        let doctor = Doctor::new(store_arc(&mock), "repo", DoctorOpts::default(), &prompter);
2417        let rendered = doctor
2418            .maybe_render_packchain_section(&objects)
2419            .await
2420            .expect("audit gate skips cleanly under bundle");
2421        assert!(
2422            rendered.is_none(),
2423            "bundle engine must not render a packchain section, got: {rendered:?}",
2424        );
2425    }
2426
2427    #[test]
2428    fn render_packchain_section_lists_dangling_references_as_errors() {
2429        // Hand-roll an `AuditReport` so the renderer's behaviour is
2430        // pinned without going through the live store.
2431        let report = AuditReport {
2432            orphans: super::audit::OrphanSummary::default(),
2433            tombstones: Vec::new(),
2434            branches: Vec::new(),
2435            dangling: vec![super::audit::DanglingRow {
2436                ref_path: "refs/heads/dev".to_owned(),
2437                missing_pack_key: "packs/abcdef0123456789abcdef0123456789abcdef01.pack".to_owned(),
2438            }],
2439        };
2440        let rendered = super::render_packchain_section(&report);
2441        assert!(rendered.contains("ERRORS:"));
2442        assert!(rendered.contains("references missing pack"));
2443        assert!(rendered.contains("refs/heads/dev"));
2444    }
2445
2446    #[test]
2447    fn render_packchain_section_clean_bucket_says_none() {
2448        let report = AuditReport::default();
2449        let rendered = super::render_packchain_section(&report);
2450        assert!(rendered.contains("=== Packchain ==="));
2451        assert!(rendered.contains("Orphans: 0 pack(s)"));
2452        assert!(rendered.contains("Tombstones (pending sweep): none"));
2453        assert!(rendered.contains("Branches needing compaction: none"));
2454        assert!(rendered.contains("ERRORS: none"));
2455    }
2456
2457    #[test]
2458    fn format_bytes_unit_boundaries() {
2459        assert_eq!(super::format_bytes(0), "0 B");
2460        assert_eq!(super::format_bytes(1023), "1023 B");
2461        assert_eq!(super::format_bytes(1024), "1.0 KiB");
2462        assert_eq!(super::format_bytes(1024 * 1024 - 1), "1024.0 KiB");
2463        assert_eq!(super::format_bytes(1024 * 1024), "1.0 MiB");
2464        assert_eq!(super::format_bytes(1024 * 1024 * 1024), "1.0 GiB");
2465    }
2466
2467    #[test]
2468    fn format_age_handles_clock_skew_and_rollover() {
2469        // Negative (future timestamp / clock skew) renders as "<1h".
2470        assert_eq!(super::format_age(-1), "<1h");
2471        // Zero hours likewise.
2472        assert_eq!(super::format_age(0), "<1h");
2473        // Just under the day rollover stays in hours.
2474        assert_eq!(super::format_age(1), "1h ago");
2475        assert_eq!(super::format_age(47), "47h ago");
2476        // Day rollover (>= 48h reports days).
2477        assert_eq!(super::format_age(48), "2d ago");
2478        assert_eq!(super::format_age(72), "3d ago");
2479    }
2480
2481    #[test]
2482    fn render_packchain_section_compaction_candidate_is_flagged() {
2483        let report = AuditReport {
2484            orphans: super::audit::OrphanSummary::default(),
2485            tombstones: Vec::new(),
2486            branches: vec![BranchRow {
2487                ref_path: "refs/heads/main".to_owned(),
2488                segments_total: 27,
2489                bytes_total: 142 * 1024 * 1024,
2490                recommend_compact: true,
2491                has_full_at_segment: true,
2492            }],
2493            dangling: Vec::new(),
2494        };
2495        let rendered = super::render_packchain_section(&report);
2496        assert!(rendered.contains("refs/heads/main: 27 segment(s)"));
2497        assert!(rendered.contains("[recommend compact]"));
2498        assert!(rendered.contains("142.0 MiB"));
2499    }
2500
2501    // --- Packchain report correctness (#75) ------------------------------
2502
2503    #[tokio::test]
2504    async fn packchain_report_shows_no_gc_or_packs_lines() {
2505        // Against a packchain-shape bucket, packs/ and gc/ must NOT appear
2506        // in the bundle-shape report section. This pins the cleaned output
2507        // shape specified in #75.
2508        let mock = MockStore::new();
2509        mock.insert("repo/HEAD", Bytes::from("refs/heads/main"));
2510        mock.insert("repo/refs/heads/main/chain.json", Bytes::from(r#"{"v":1}"#));
2511        mock.insert(
2512            "repo/packs/1111111111111111111111111111111111111111.pack",
2513            Bytes::from_static(b"live"),
2514        );
2515        mock.insert(
2516            "repo/gc/tombstones-abc-1-2025-01-01T00:00:00Z.json",
2517            Bytes::from_static(b"{}"),
2518        );
2519        mock.insert("repo/lfs/abcdef0123456789", Bytes::from("lfs-body"));
2520        let prompter = ScriptedPrompter::new([]);
2521        let store = store_arc(&mock);
2522        let doctor = Doctor::new(Arc::clone(&store), "repo", DoctorOpts::default(), &prompter);
2523        let snapshot = analyze(&*store, "repo").await.expect("analyze");
2524
2525        let report = doctor.report(&snapshot);
2526        // No "packs", "gc", or "lfs" ref lines.
2527        assert!(
2528            !report.contains(" packs:"),
2529            "packs must not appear in report: {report:?}",
2530        );
2531        assert!(
2532            !report.contains(" gc:"),
2533            "gc must not appear in report: {report:?}",
2534        );
2535        assert!(
2536            !report.contains(" lfs:"),
2537            "lfs must not appear in report: {report:?}",
2538        );
2539        // chain.json-bearing ref shows "Ok", not "No bundles".
2540        assert!(
2541            report.contains("refs/heads/main: Ok"),
2542            "chain.json ref must show Ok: {report:?}",
2543        );
2544        assert!(
2545            !report.contains("No bundles"),
2546            "healthy packchain report must have no 'No bundles' lines: {report:?}",
2547        );
2548    }
2549
2550    #[tokio::test]
2551    async fn packchain_chain_json_ref_reports_ok_not_no_bundles() {
2552        // A ref with only chain.json (no .bundle) must report "Ok" in
2553        // the bundle-shape section, because the pack data lives in packs/.
2554        // This is the per-ref case from #75.
2555        let mock = MockStore::new();
2556        mock.insert("repo/HEAD", Bytes::from("refs/heads/main"));
2557        mock.insert("repo/refs/heads/main/chain.json", Bytes::from(r#"{"v":1}"#));
2558        let prompter = ScriptedPrompter::new([]);
2559        let store = store_arc(&mock);
2560        let doctor = Doctor::new(Arc::clone(&store), "repo", DoctorOpts::default(), &prompter);
2561        let snapshot = analyze(&*store, "repo").await.expect("analyze");
2562
2563        let report = doctor.report(&snapshot);
2564        assert_eq!(
2565            report, "repo:\n  refs/heads/main: Ok\n  HEAD: refs/heads/main\n",
2566            "chain.json ref must report Ok, got: {report:?}",
2567        );
2568    }
2569
2570    #[tokio::test]
2571    async fn bundle_engine_report_unchanged_by_chain_json_fix() {
2572        // A bundle-engine repo with no chain.json files must produce the
2573        // same output as before the #75 fix: "No bundles" for empty refs,
2574        // "Ok" for single-bundle refs, "Multiple bundles" for dup refs.
2575        let mock = MockStore::new();
2576        mock.insert("myrepo/HEAD", Bytes::from("refs/heads/missing"));
2577        mock.insert(
2578            format!("myrepo/refs/heads/main/{SHA_A}.bundle"),
2579            Bytes::from("b"),
2580        );
2581        mock.insert("myrepo/refs/heads/empty/PROTECTED#", Bytes::new());
2582        let prompter = ScriptedPrompter::new([]);
2583        let store = store_arc(&mock);
2584        let doctor = Doctor::new(
2585            Arc::clone(&store),
2586            "myrepo",
2587            DoctorOpts::default(),
2588            &prompter,
2589        );
2590        let snapshot = analyze(&*store, "myrepo").await.expect("analyze");
2591
2592        let report = doctor.report(&snapshot);
2593        // "refs/heads/empty" has no bundles and no chain.json → "No bundles".
2594        assert!(
2595            report.contains("refs/heads/empty: No bundles"),
2596            "empty ref without chain.json must still show No bundles: {report:?}",
2597        );
2598        // "refs/heads/main" has one bundle → "Ok".
2599        assert!(
2600            report.contains("refs/heads/main: Ok"),
2601            "single-bundle ref must show Ok: {report:?}",
2602        );
2603    }
2604
2605    #[tokio::test]
2606    async fn packchain_multiple_bundles_still_reports_multiple() {
2607        // A packchain ref with chain.json AND multiple .bundle files must
2608        // still report "Multiple bundles" — the has_chain guard only fires
2609        // when bundles.len() == 0. Prevents a future mistaken edit from
2610        // suppressing the duplicate-bundle warning for packchain refs.
2611        let mock = MockStore::new();
2612        mock.insert("repo/HEAD", Bytes::from("refs/heads/main"));
2613        mock.insert("repo/refs/heads/main/chain.json", Bytes::from(r#"{"v":1}"#));
2614        mock.insert(
2615            format!("repo/refs/heads/main/{SHA_A}.bundle"),
2616            Bytes::from("a"),
2617        );
2618        mock.insert(
2619            format!("repo/refs/heads/main/{SHA_B}.bundle"),
2620            Bytes::from("b"),
2621        );
2622        let prompter = ScriptedPrompter::new([]);
2623        let store = store_arc(&mock);
2624        let doctor = Doctor::new(Arc::clone(&store), "repo", DoctorOpts::default(), &prompter);
2625        let snapshot = analyze(&*store, "repo").await.expect("analyze");
2626
2627        let report = doctor.report(&snapshot);
2628        assert!(
2629            report.contains("refs/heads/main: Multiple bundles"),
2630            "packchain ref with multiple bundles must still warn: {report:?}",
2631        );
2632    }
2633
2634    #[tokio::test]
2635    async fn packchain_chain_json_with_one_bundle_reports_ok() {
2636        // A packchain ref with chain.json AND exactly one .bundle hits
2637        // the `1 => "Ok"` arm (not the `0 if has_chain => "Ok"` arm).
2638        // Pin the rendered status so a future refactor of the match
2639        // doesn't accidentally regress this case.
2640        let mock = MockStore::new();
2641        mock.insert("repo/HEAD", Bytes::from("refs/heads/main"));
2642        mock.insert("repo/refs/heads/main/chain.json", Bytes::from(r#"{"v":1}"#));
2643        mock.insert(
2644            format!("repo/refs/heads/main/{SHA_A}.bundle"),
2645            Bytes::from("b"),
2646        );
2647        let prompter = ScriptedPrompter::new([]);
2648        let store = store_arc(&mock);
2649        let doctor = Doctor::new(Arc::clone(&store), "repo", DoctorOpts::default(), &prompter);
2650        let snapshot = analyze(&*store, "repo").await.expect("analyze");
2651
2652        let report = doctor.report(&snapshot);
2653        assert_eq!(
2654            report, "repo:\n  refs/heads/main: Ok\n  HEAD: refs/heads/main\n",
2655            "chain.json + 1 bundle must report Ok, got: {report:?}",
2656        );
2657    }
2658
2659    #[tokio::test]
2660    async fn packchain_protected_ref_shows_star_and_ok() {
2661        // A packchain ref that is both protected and chain.json-bearing
2662        // must render the star prefix and "Ok" status together.
2663        let mock = MockStore::new();
2664        mock.insert("repo/HEAD", Bytes::from("refs/heads/main"));
2665        mock.insert("repo/refs/heads/main/chain.json", Bytes::from(r#"{"v":1}"#));
2666        mock.insert("repo/refs/heads/main/PROTECTED#", Bytes::new());
2667        let prompter = ScriptedPrompter::new([]);
2668        let store = store_arc(&mock);
2669        let doctor = Doctor::new(Arc::clone(&store), "repo", DoctorOpts::default(), &prompter);
2670        let snapshot = analyze(&*store, "repo").await.expect("analyze");
2671
2672        let report = doctor.report(&snapshot);
2673        assert!(
2674            report.contains("* refs/heads/main: Ok"),
2675            "protected packchain ref must show star + Ok: {report:?}",
2676        );
2677    }
2678
2679    #[tokio::test]
2680    async fn root_prefix_report_renders_root_label() {
2681        // The first line of the report uses `(root)` so the empty
2682        // prefix doesn't produce a bare `:` header.
2683        let mock = MockStore::new();
2684        mock.insert("HEAD", Bytes::from("refs/heads/main"));
2685        mock.insert(format!("refs/heads/main/{SHA_A}.bundle"), Bytes::from("b"));
2686        let prompter = ScriptedPrompter::new([]);
2687        let doctor = Doctor::new(store_arc(&mock), "", DoctorOpts::default(), &prompter);
2688        let snapshot = super::analyze_objects(
2689            &mock.list("").await.expect("list at root"),
2690            "",
2691            &store_arc(&mock),
2692        )
2693        .await
2694        .expect("analyze");
2695
2696        let report = doctor.report(&snapshot);
2697        assert_eq!(
2698            report,
2699            "(root):\n  refs/heads/main: Ok\n  HEAD: refs/heads/main\n",
2700        );
2701    }
2702
2703    // --- run_into output-capture tests -----------------------------------
2704
2705    /// Helper: run `doctor.run_into` into a `Vec<u8>` and return the
2706    /// captured output as a `String`.
2707    async fn capture_run(doctor: &Doctor<'_>) -> (Result<(), ManageError>, String) {
2708        let mut buf = Vec::new();
2709        let result = doctor.run_into(&mut buf).await;
2710        let output = String::from_utf8(buf).expect("doctor output is valid UTF-8");
2711        (result, output)
2712    }
2713
2714    #[tokio::test]
2715    async fn run_into_bundle_engine_section_order() {
2716        // A clean bundle-engine run: snapshot report followed by the
2717        // stale-lock scan. The packchain section must NOT appear.
2718        // Expected output derives from `report()` (already pinned) plus
2719        // the stale-lock trailer.
2720        let mock = MockStore::new();
2721        mock.insert("myrepo/HEAD", Bytes::from("refs/heads/main"));
2722        mock.insert(
2723            format!("myrepo/refs/heads/main/{SHA_A}.bundle"),
2724            Bytes::from("b"),
2725        );
2726        let prompter = ScriptedPrompter::new([]);
2727        let doctor = Doctor::new(store_arc(&mock), "myrepo", DoctorOpts::default(), &prompter);
2728        let (result, output) = capture_run(&doctor).await;
2729        result.expect("clean bundle run");
2730
2731        // Section 1: snapshot report
2732        assert!(
2733            output.starts_with("myrepo:\n"),
2734            "output must start with snapshot report header, got: {output:?}",
2735        );
2736        // Section 2: stale-lock scan (no packchain section in between)
2737        assert!(
2738            output.contains("\nScanning for stale locks...\n"),
2739            "stale-lock scan missing from output: {output:?}",
2740        );
2741        assert!(
2742            output.contains("No stale locks found.\n"),
2743            "no-stale-locks trailer missing: {output:?}",
2744        );
2745        // Packchain section must NOT appear under bundle engine.
2746        assert!(
2747            !output.contains("=== Packchain ==="),
2748            "packchain section must not appear under bundle engine: {output:?}",
2749        );
2750
2751        // Assert ordering: snapshot header appears before stale-lock scan.
2752        let report_pos = output.find("myrepo:").expect("report header");
2753        let locks_pos = output
2754            .find("Scanning for stale locks")
2755            .expect("stale-lock scan");
2756        assert!(
2757            report_pos < locks_pos,
2758            "snapshot report must precede stale-lock scan"
2759        );
2760    }
2761
2762    #[tokio::test]
2763    async fn run_into_packchain_engine_section_order() {
2764        // A packchain-engine run: snapshot report, then packchain
2765        // section, then stale-lock scan. This test pins the three-part
2766        // section ordering that is only observable via captured output.
2767        let mock = MockStore::new();
2768        mock.insert("repo/HEAD", Bytes::from("refs/heads/main"));
2769        mock.insert(
2770            format!("repo/refs/heads/main/{SHA_A}.bundle"),
2771            Bytes::from("b"),
2772        );
2773        // A minimal packchain shape: chain.json + live pack.
2774        mock.insert(
2775            "repo/refs/heads/main/chain.json",
2776            Bytes::from(
2777                r#"{"v":1,"tip":"0000000000000000000000000000000000000001","full_at":"0000000000000000000000000000000000000001","segments":[{"sha":"0000000000000000000000000000000000000001","parent_sha":null,"pack":"packs/1111111111111111111111111111111111111111.pack","bytes":1024}]}"#,
2778            ),
2779        );
2780        mock.insert(
2781            "repo/packs/1111111111111111111111111111111111111111.pack",
2782            Bytes::from_static(b"live"),
2783        );
2784        let prompter = ScriptedPrompter::new([]);
2785        let doctor = Doctor::new(
2786            store_arc(&mock),
2787            "repo",
2788            DoctorOpts {
2789                engine: StorageEngine::Packchain,
2790                ..DoctorOpts::default()
2791            },
2792            &prompter,
2793        );
2794        let (result, output) = capture_run(&doctor).await;
2795        result.expect("clean packchain run");
2796
2797        // All three sections must appear, in order.
2798        let report_pos = output.find("repo:").expect("snapshot report header");
2799        let packchain_pos = output.find("=== Packchain ===").expect("packchain section");
2800        let locks_pos = output
2801            .find("Scanning for stale locks")
2802            .expect("stale-lock scan");
2803        assert!(
2804            report_pos < packchain_pos,
2805            "snapshot report must precede packchain section"
2806        );
2807        assert!(
2808            packchain_pos < locks_pos,
2809            "packchain section must precede stale-lock scan"
2810        );
2811    }
2812
2813    #[tokio::test]
2814    async fn run_into_captures_fix_multiple_bundles_output() {
2815        // Exercises the duplicate-bundle fixer path through `run_into`
2816        // and pins the interactive-prompt output in the capture buffer.
2817        let mock = MockStore::new();
2818        mock.insert("myrepo/HEAD", Bytes::from("refs/heads/main"));
2819        mock.insert(
2820            format!("myrepo/refs/heads/main/{SHA_A}.bundle"),
2821            Bytes::from("body-a"),
2822        );
2823        mock.insert(
2824            format!("myrepo/refs/heads/main/{SHA_B}.bundle"),
2825            Bytes::from("body-b"),
2826        );
2827        let prompter = ScriptedPrompter::new([Answer::Select(0), Answer::Confirm(true)]);
2828        let doctor = Doctor::new(store_arc(&mock), "myrepo", DoctorOpts::default(), &prompter);
2829        let (result, output) = capture_run(&doctor).await;
2830        result.expect("fix-multiple run");
2831
2832        assert!(
2833            output.contains("Fix multiple bundles for repo myrepo and ref refs/heads/main"),
2834            "fixer header missing: {output:?}",
2835        );
2836        assert!(
2837            output.contains(&format!("Keeping {SHA_A}")),
2838            "keeper announcement missing: {output:?}",
2839        );
2840        assert!(
2841            output.contains(&format!("Moving {SHA_B} to new branch")),
2842            "eviction line missing: {output:?}",
2843        );
2844    }
2845
2846    #[tokio::test]
2847    async fn run_into_captures_fix_head_output() {
2848        // Exercises the HEAD-fixer path and pins its output lines.
2849        let mock = MockStore::new();
2850        mock.insert(
2851            format!("myrepo/refs/heads/main/{SHA_A}.bundle"),
2852            Bytes::from("b"),
2853        );
2854        let prompter = ScriptedPrompter::new([Answer::Select(0)]);
2855        let doctor = Doctor::new(store_arc(&mock), "myrepo", DoctorOpts::default(), &prompter);
2856        let (result, output) = capture_run(&doctor).await;
2857        result.expect("fix-head run");
2858
2859        assert!(
2860            output.contains("Fix invalid HEAD for repo myrepo"),
2861            "HEAD fixer header missing: {output:?}",
2862        );
2863        assert!(
2864            output.contains("Setting refs/heads/main as HEAD"),
2865            "HEAD assignment line missing: {output:?}",
2866        );
2867    }
2868
2869    #[tokio::test]
2870    async fn fix_head_offered_when_head_points_at_protected_only_ref() {
2871        // Issue #154: HEAD points at `refs/heads/main`, but the only
2872        // residue under that ref is a `PROTECTED#` marker — no bundles,
2873        // no chain. The previous implementation classified this HEAD as
2874        // valid (because the ref entry existed in the snapshot) and
2875        // skipped the repair path, leaving HEAD silently dangling.
2876        //
2877        // Drive the fix end-to-end: the report must read `HEAD: Invalid`
2878        // and the runner must offer `fix_head`, pointing HEAD at the
2879        // only branch that carries actual data (`refs/heads/dev`).
2880        let mock = MockStore::new();
2881        mock.insert("myrepo/HEAD", Bytes::from("refs/heads/main"));
2882        mock.insert("myrepo/refs/heads/main/PROTECTED#", Bytes::new());
2883        mock.insert(
2884            format!("myrepo/refs/heads/dev/{SHA_C}.bundle"),
2885            Bytes::from("c"),
2886        );
2887        // Lexicographic order: `[refs/heads/dev, refs/heads/main]`,
2888        // index 0 == `refs/heads/dev` (the only branch with real data).
2889        let prompter = ScriptedPrompter::new([Answer::Select(0)]);
2890        let doctor = Doctor::new(store_arc(&mock), "myrepo", DoctorOpts::default(), &prompter);
2891        let (result, output) = capture_run(&doctor).await;
2892        result.expect("doctor.run");
2893
2894        // Report shows `Invalid`, not the stale `refs/heads/main`.
2895        assert!(
2896            output.contains("HEAD: Invalid"),
2897            "expected `HEAD: Invalid` in report; got:\n{output}",
2898        );
2899        // Repair path ran and rewrote HEAD to the only ref with data.
2900        assert!(
2901            output.contains("Fix invalid HEAD for repo myrepo"),
2902            "fix_head header missing; got:\n{output}",
2903        );
2904        assert!(
2905            output.contains("Setting refs/heads/dev as HEAD"),
2906            "HEAD reassignment line missing; got:\n{output}",
2907        );
2908        // And the bucket reflects the new HEAD.
2909        let head_bytes = mock.get_bytes("myrepo/HEAD").await.expect("HEAD written");
2910        assert_eq!(&head_bytes[..], b"refs/heads/dev");
2911        assert_eq!(prompter.remaining(), 0);
2912    }
2913
2914    #[tokio::test]
2915    async fn fix_head_skipped_when_head_points_at_protected_ref_with_bundle() {
2916        // Negative control for #154: protection does NOT invalidate a
2917        // HEAD that points at a ref with real branch data. A protected
2918        // branch with a bundle is structurally healthy — the doctor
2919        // must NOT offer `fix_head`. A regression that broadened the
2920        // invalidation check would trip the empty-script prompter on
2921        // an unexpected `select` call.
2922        let mock = MockStore::new();
2923        mock.insert("myrepo/HEAD", Bytes::from("refs/heads/main"));
2924        mock.insert("myrepo/refs/heads/main/PROTECTED#", Bytes::new());
2925        mock.insert(
2926            format!("myrepo/refs/heads/main/{SHA_A}.bundle"),
2927            Bytes::from("b"),
2928        );
2929        let initial_keys = mock.keys();
2930        let prompter = ScriptedPrompter::new([]);
2931        let doctor = Doctor::new(store_arc(&mock), "myrepo", DoctorOpts::default(), &prompter);
2932        let (result, output) = capture_run(&doctor).await;
2933        result.expect("doctor.run");
2934
2935        // No HEAD-fix attempted; bucket unchanged.
2936        assert!(
2937            !output.contains("Fix invalid HEAD"),
2938            "fix_head fired against a protected ref with real data: {output}",
2939        );
2940        assert!(
2941            output.contains("HEAD: refs/heads/main"),
2942            "expected valid HEAD label in report; got:\n{output}",
2943        );
2944        assert_eq!(mock.keys(), initial_keys);
2945        assert_eq!(prompter.remaining(), 0);
2946    }
2947
2948    #[tokio::test]
2949    async fn fix_head_reports_no_candidates_when_all_refs_are_protected_only() {
2950        // Wave 3 hygiene (cross-wave /review T3): when HEAD is invalid
2951        // AND every `refs/heads/*` ref in the snapshot fails the
2952        // `has_branch_data` predicate (e.g., all PROTECTED#-only
2953        // residue), the candidate picker must be empty and the
2954        // operator-visible message must say so. Without the filter on
2955        // `r.has_branch_data()`, the picker would offer dead refs that
2956        // would fail the keeper-recheck anyway — a usability trap.
2957        let mock = MockStore::new();
2958        mock.insert("myrepo/HEAD", Bytes::from("refs/heads/main"));
2959        mock.insert("myrepo/refs/heads/main/PROTECTED#", Bytes::new());
2960        mock.insert("myrepo/refs/heads/dev/PROTECTED#", Bytes::new());
2961        let prompter = ScriptedPrompter::new([]);
2962        let doctor = Doctor::new(store_arc(&mock), "myrepo", DoctorOpts::default(), &prompter);
2963        let (result, output) = capture_run(&doctor).await;
2964        result.expect("doctor.run");
2965
2966        assert!(
2967            output.contains("HEAD: Invalid"),
2968            "expected `HEAD: Invalid` in report; got:\n{output}",
2969        );
2970        assert!(
2971            output.contains("Fix invalid HEAD for repo myrepo"),
2972            "fix_head header missing; got:\n{output}",
2973        );
2974        // Load-bearing assertion: the picker must NOT offer a
2975        // protected-only ref. The skipping message proves the filter
2976        // ran. An empty-script prompter would also catch a regression
2977        // that fell through to `select` against a now-stale candidate.
2978        assert!(
2979            output.contains("No `refs/heads/*` available to assign as HEAD"),
2980            "candidate picker must surface the no-candidates message: {output}",
2981        );
2982        assert_eq!(prompter.remaining(), 0);
2983    }
2984
2985    #[tokio::test]
2986    async fn run_into_captures_stale_lock_output() {
2987        // Exercises the stale-lock listing + deletion path and pins
2988        // the report lines in the capture buffer.
2989        let mock = MockStore::new();
2990        mock.insert("myrepo/HEAD", Bytes::from("refs/heads/main"));
2991        mock.insert(
2992            format!("myrepo/refs/heads/main/{SHA_A}.bundle"),
2993            Bytes::from("b"),
2994        );
2995        let stale = OffsetDateTime::now_utc() - time::Duration::seconds(120);
2996        mock.insert_with(
2997            "myrepo/refs/heads/main/LOCK#.lock",
2998            Bytes::new(),
2999            stale,
3000            PutOpts::default(),
3001        );
3002        let opts = DoctorOpts {
3003            delete_stale_locks: true,
3004            ..DoctorOpts::default()
3005        };
3006        let prompter = ScriptedPrompter::new([]);
3007        let doctor = Doctor::new(store_arc(&mock), "myrepo", opts, &prompter);
3008        let (result, output) = capture_run(&doctor).await;
3009        result.expect("stale-lock-delete run");
3010
3011        assert!(
3012            output.contains("Found stale locks:"),
3013            "stale-lock listing header missing: {output:?}",
3014        );
3015        assert!(
3016            output.contains("Deleting stale locks..."),
3017            "deletion progress line missing: {output:?}",
3018        );
3019        assert!(
3020            output.contains("Deleted myrepo/refs/heads/main/LOCK#.lock"),
3021            "individual deletion confirmation missing: {output:?}",
3022        );
3023    }
3024
3025    // --- Malformed bundle keys (#124) ------------------------------------
3026
3027    #[tokio::test]
3028    async fn malformed_bundle_key_surfaces_in_doctor_output() {
3029        // Push silently filters keys whose stem fails the 40-hex
3030        // check (#109 + #124). The doctor must surface them so an
3031        // operator can clean them up.
3032        let mock = MockStore::new();
3033        mock.insert("myrepo/HEAD", Bytes::from("refs/heads/main"));
3034        mock.insert(
3035            "myrepo/refs/heads/main/0123456789abcdef0123456789abcdef01234567.bundle",
3036            Bytes::from("body"),
3037        );
3038        mock.insert(
3039            "myrepo/refs/heads/main/not-a-valid-sha.bundle",
3040            Bytes::from("junk"),
3041        );
3042        let prompter = ScriptedPrompter::new([]);
3043        let doctor = Doctor::new(store_arc(&mock), "myrepo", DoctorOpts::default(), &prompter);
3044        let (result, output) = capture_run(&doctor).await;
3045        result.expect("doctor.run");
3046
3047        assert!(
3048            output.contains("Malformed bundle keys"),
3049            "section header missing: {output:?}",
3050        );
3051        assert!(
3052            output.contains("myrepo/refs/heads/main/not-a-valid-sha.bundle"),
3053            "malformed key not listed: {output:?}",
3054        );
3055        assert!(
3056            output.contains("(ref refs/heads/main)"),
3057            "ref-path context missing: {output:?}",
3058        );
3059        assert!(
3060            output.contains("Delete each key manually"),
3061            "remediation hint missing: {output:?}",
3062        );
3063
3064        // Doctor must NOT delete the malformed key — operator's call.
3065        assert!(
3066            mock.contains("myrepo/refs/heads/main/not-a-valid-sha.bundle"),
3067            "doctor must not auto-delete malformed bundle keys",
3068        );
3069        // The valid sibling is untouched as well.
3070        assert!(mock.contains(
3071            "myrepo/refs/heads/main/0123456789abcdef0123456789abcdef01234567.bundle",
3072        ));
3073    }
3074
3075    #[tokio::test]
3076    async fn clean_bucket_emits_no_malformed_section() {
3077        let mock = MockStore::new();
3078        mock.insert("myrepo/HEAD", Bytes::from("refs/heads/main"));
3079        mock.insert(
3080            "myrepo/refs/heads/main/0123456789abcdef0123456789abcdef01234567.bundle",
3081            Bytes::from("body"),
3082        );
3083        let prompter = ScriptedPrompter::new([]);
3084        let doctor = Doctor::new(store_arc(&mock), "myrepo", DoctorOpts::default(), &prompter);
3085        let (result, output) = capture_run(&doctor).await;
3086        result.expect("clean run");
3087        assert!(
3088            !output.contains("Malformed bundle keys"),
3089            "no-malformed runs must not emit the section: {output:?}",
3090        );
3091    }
3092
3093    #[tokio::test]
3094    async fn well_formed_stem_is_not_flagged_as_malformed() {
3095        // Sanity test mirroring the snapshot-level check: a valid
3096        // 40-hex stem must not end up in the doctor's malformed
3097        // section.
3098        let mock = MockStore::new();
3099        mock.insert("myrepo/HEAD", Bytes::from("refs/heads/main"));
3100        mock.insert(
3101            "myrepo/refs/heads/main/0123456789abcdef0123456789abcdef01234567.bundle",
3102            Bytes::from("body"),
3103        );
3104        let prompter = ScriptedPrompter::new([]);
3105        let doctor = Doctor::new(store_arc(&mock), "myrepo", DoctorOpts::default(), &prompter);
3106        let (result, output) = capture_run(&doctor).await;
3107        result.expect("clean run");
3108        assert!(!output.contains("Malformed bundle keys"));
3109        assert!(!output.contains("0123456789abcdef0123456789abcdef01234567.bundle (ref"));
3110    }
3111
3112    #[test]
3113    fn render_malformed_bundles_section_pins_format() {
3114        let entries = vec![
3115            MalformedBundleKey {
3116                ref_path: "refs/heads/main".to_owned(),
3117                key: "myrepo/refs/heads/main/not-a-valid-sha.bundle".to_owned(),
3118            },
3119            MalformedBundleKey {
3120                ref_path: "refs/heads/dev".to_owned(),
3121                key: "myrepo/refs/heads/dev/short.bundle".to_owned(),
3122            },
3123        ];
3124        let rendered = super::render_malformed_bundles_section(&entries);
3125        assert_eq!(
3126            rendered,
3127            "\nMalformed bundle keys (push silently ignores these):\n  \
3128             - myrepo/refs/heads/main/not-a-valid-sha.bundle (ref refs/heads/main)\n  \
3129             - myrepo/refs/heads/dev/short.bundle (ref refs/heads/dev)\n  \
3130             Delete each key manually (`aws s3 rm` / `az storage blob delete`) and re-push the ref.\n",
3131        );
3132    }
3133
3134    #[tokio::test]
3135    async fn run_into_captures_aborted_output() {
3136        // Exercises the user-decline path in fix_multiple_bundles and
3137        // pins the "Aborted" line in the capture buffer. The early
3138        // return after "Aborted" must prevent any keeper/eviction
3139        // output from appearing.
3140        let mock = MockStore::new();
3141        mock.insert("myrepo/HEAD", Bytes::from("refs/heads/main"));
3142        mock.insert(
3143            format!("myrepo/refs/heads/main/{SHA_A}.bundle"),
3144            Bytes::from("a"),
3145        );
3146        mock.insert(
3147            format!("myrepo/refs/heads/main/{SHA_B}.bundle"),
3148            Bytes::from("b"),
3149        );
3150        let prompter = ScriptedPrompter::new([Answer::Select(0), Answer::Confirm(false)]);
3151        let doctor = Doctor::new(store_arc(&mock), "myrepo", DoctorOpts::default(), &prompter);
3152        let (result, output) = capture_run(&doctor).await;
3153        result.expect("user-abort run");
3154
3155        assert!(
3156            output.contains("Fix multiple bundles for repo myrepo"),
3157            "fixer header missing: {output:?}",
3158        );
3159        assert!(
3160            output.contains("Aborted"),
3161            "abort message missing: {output:?}",
3162        );
3163        // The early return after "Aborted" must prevent the
3164        // keeper announcement and eviction lines from appearing.
3165        assert!(
3166            !output.contains("Keeping"),
3167            "keeper line must not appear after user abort: {output:?}",
3168        );
3169        assert!(
3170            !output.contains("Moving"),
3171            "eviction line must not appear after user abort: {output:?}",
3172        );
3173    }
3174
3175    // --- `DoctorOpts::default()` honours the env fallback (issue #189) ---
3176    //
3177    // The library Default impl used to bake `DEFAULT_LOCK_TTL_SECONDS`
3178    // into the struct, so library consumers that instantiate
3179    // `DoctorOpts::default()` would silently ignore
3180    // `GIT_REMOTE_OBJECT_STORE_LOCK_TTL_SECONDS`. The fix moves the env
3181    // read inside the runner so `Default` defers to it.
3182
3183    #[test]
3184    fn default_lock_ttl_is_none() {
3185        assert!(DoctorOpts::default().lock_ttl_seconds.is_none());
3186    }
3187
3188    #[test]
3189    fn resolved_lock_ttl_honors_env_explicit_and_zero() {
3190        // The env var is process-global; one `EnvGuard` holds the
3191        // per-key lock for the whole test, serialising against the
3192        // env-touching test in `protocol::push::tests`. Drop restores
3193        // the prior value on every exit path, including assertion
3194        // panics — the manual cleanup-before-assert dance is no longer
3195        // needed.
3196        use crate::protocol::push::ENV_LOCK_TTL_SECONDS;
3197        let env = crate::test_util::EnvGuard::take(ENV_LOCK_TTL_SECONDS);
3198        let mock = MockStore::new();
3199        let prompter = ScriptedPrompter::new([]);
3200        let doctor = Doctor::new(store_arc(&mock), "myrepo", DoctorOpts::default(), &prompter);
3201
3202        // Env override wins when `opts.lock_ttl_seconds` is `None`.
3203        env.set_to("120");
3204        assert_eq!(
3205            doctor.resolved_lock_ttl_seconds(),
3206            120,
3207            "env override must propagate to Doctor default",
3208        );
3209
3210        // Unset env falls back to the project default.
3211        env.clear();
3212        assert_eq!(
3213            doctor.resolved_lock_ttl_seconds(),
3214            DEFAULT_LOCK_TTL_SECONDS,
3215            "unset env must fall back to DEFAULT_LOCK_TTL_SECONDS",
3216        );
3217
3218        // Explicit `Some` always wins over the env var.
3219        let opts = DoctorOpts {
3220            lock_ttl_seconds: Some(7),
3221            ..DoctorOpts::default()
3222        };
3223        let doctor_with_opt = Doctor::new(store_arc(&mock), "myrepo", opts, &prompter);
3224        env.set_to("120");
3225        assert_eq!(
3226            doctor_with_opt.resolved_lock_ttl_seconds(),
3227            7,
3228            "explicit opts.lock_ttl_seconds must win",
3229        );
3230
3231        // Explicit `Some(0)` is an operator-deliberate "treat every
3232        // lock as stale" request. `doctor` only compares ages — it
3233        // never acquires a lock — so the zero passes through. The
3234        // push/compact paths use `resolve_lock_ttl_seconds` which
3235        // applies the zero-clamp; doctor must not.
3236        env.clear();
3237        let zero_opts = DoctorOpts {
3238            lock_ttl_seconds: Some(0),
3239            ..DoctorOpts::default()
3240        };
3241        let doctor_zero = Doctor::new(store_arc(&mock), "myrepo", zero_opts, &prompter);
3242        assert_eq!(
3243            doctor_zero.resolved_lock_ttl_seconds(),
3244            0,
3245            "explicit Some(0) must pass through to scan_stale_locks",
3246        );
3247        // Env value is irrelevant when the operator passed an explicit
3248        // value — the explicit value wins, including 0.
3249        env.set_to("240");
3250        assert_eq!(
3251            doctor_zero.resolved_lock_ttl_seconds(),
3252            0,
3253            "explicit Some(0) must override the env var (operator opt-in)",
3254        );
3255    }
3256
3257    /// Pin the `--lock-ttl-seconds 0` operator contract end-of-pipeline:
3258    /// once `resolved_lock_ttl_seconds()` returns `0`, the resulting
3259    /// `Duration::ZERO` must flow through `scan_stale_locks` and flag
3260    /// every lock as stale. Without this pin, a future regression that
3261    /// changed the comparison to `age >= ttl` (or otherwise short-circuited
3262    /// on `ttl.is_zero()`) would silently disable the zero-TTL contract
3263    /// while `resolved_lock_ttl_honors_env_explicit_and_zero` continued
3264    /// to pass on the resolver alone.
3265    #[tokio::test]
3266    async fn scan_stale_locks_with_zero_ttl_flags_every_lock() {
3267        let now = OffsetDateTime::now_utc();
3268        let listing = [
3269            ObjectMeta {
3270                key: "myrepo/refs/heads/main/LOCK#.lock".to_owned(),
3271                size: 0,
3272                last_modified: now - Duration::from_secs(1),
3273                etag: None,
3274            },
3275            ObjectMeta {
3276                key: "myrepo/refs/heads/feature/LOCK#.lock".to_owned(),
3277                size: 0,
3278                last_modified: now - Duration::from_hours(1),
3279                etag: None,
3280            },
3281        ];
3282        let stale = scan_stale_locks(&listing, Duration::ZERO);
3283        let keys: Vec<&str> = stale.iter().map(|(k, _)| *k).collect();
3284        assert_eq!(
3285            keys,
3286            vec![
3287                "myrepo/refs/heads/main/LOCK#.lock",
3288                "myrepo/refs/heads/feature/LOCK#.lock",
3289            ],
3290            "TTL=0 must flag every lock with a positive age as stale",
3291        );
3292    }
3293
3294    /// Pin the strict-greater-than comparison `age > ttl` in
3295    /// `scan_stale_locks` at the equality boundary. A regression that
3296    /// loosened it to `age >= ttl` would silently flag locks whose age
3297    /// exactly equals the operator's TTL — a stricter staleness
3298    /// threshold than the documented "older than" semantics.
3299    ///
3300    /// Constructs the boundary case via `scan_stale_locks_at`: a lock
3301    /// whose `last_modified` is exactly `ttl` before the injected
3302    /// `now`, so `age == ttl`. Wall-clock `OffsetDateTime::now_utc()`
3303    /// ticks between any two instructions, so reaching `age == ttl`
3304    /// deterministically requires the `_at` helper — see its doc.
3305    #[tokio::test]
3306    async fn scan_stale_locks_treats_age_equal_to_ttl_as_fresh() {
3307        let now = OffsetDateTime::now_utc();
3308        let ttl = Duration::from_mins(1);
3309        let listing = [ObjectMeta {
3310            key: "myrepo/refs/heads/main/LOCK#.lock".to_owned(),
3311            size: 0,
3312            last_modified: now - time::Duration::minutes(1),
3313            etag: None,
3314        }];
3315        let stale = scan_stale_locks_at(now, &listing, ttl);
3316        assert!(
3317            stale.is_empty(),
3318            "lock with age == ttl must NOT be flagged (contract is `age > ttl`, \
3319             not `>=`); a regression to `>=` would flag this lock: {stale:?}",
3320        );
3321    }
3322
3323    /// Issue #223: a future-stamped lock (clock skew between the lock
3324    /// writer and the doctor host) MUST be evicted on the operator-
3325    /// explicit `--lock-ttl-seconds 0` sweep. Pre-fix, the signed
3326    /// `time::Duration` from `now - last_modified` was negative, the
3327    /// `Duration::try_from` returned `Err`, and the filter swallowed
3328    /// the lock as if it were fresh — even at TTL=0, the operator's
3329    /// "treat every lock as stale" intent was silently violated.
3330    #[tokio::test]
3331    async fn scan_stale_locks_evicts_future_stamped_at_zero_ttl() {
3332        let now = OffsetDateTime::now_utc();
3333        let listing = [ObjectMeta {
3334            key: "myrepo/refs/heads/main/LOCK#.lock".to_owned(),
3335            size: 0,
3336            // 60 seconds AHEAD of `now` — pathological clock skew.
3337            last_modified: now + time::Duration::minutes(1),
3338            etag: None,
3339        }];
3340        let stale = scan_stale_locks_at(now, &listing, Duration::ZERO);
3341        let keys: Vec<&str> = stale.iter().map(|(k, _)| *k).collect();
3342        assert_eq!(
3343            keys,
3344            vec!["myrepo/refs/heads/main/LOCK#.lock"],
3345            "TTL=0 must evict future-stamped locks (operator-explicit \
3346             treat-every-lock-as-stale opt-in)",
3347        );
3348    }
3349
3350    /// Issue #223 negative-direction pin: a future-stamped lock is NOT
3351    /// "older than" any positive TTL, so a `--lock-ttl-seconds 60` sweep
3352    /// must leave it in place. Without this, an operator with a tuned
3353    /// positive TTL and a single skewed lock would see the lock evicted
3354    /// alongside genuinely-stale ones.
3355    #[tokio::test]
3356    async fn scan_stale_locks_keeps_future_stamped_at_positive_ttl() {
3357        let now = OffsetDateTime::now_utc();
3358        let listing = [ObjectMeta {
3359            key: "myrepo/refs/heads/main/LOCK#.lock".to_owned(),
3360            size: 0,
3361            last_modified: now + time::Duration::minutes(1),
3362            etag: None,
3363        }];
3364        let stale = scan_stale_locks_at(now, &listing, Duration::from_mins(1));
3365        assert!(
3366            stale.is_empty(),
3367            "positive TTL must NOT flag a future-stamped lock: {stale:?}",
3368        );
3369    }
3370
3371    /// Issue #223: the orchestration helper's re-HEAD path has the same
3372    /// signed-Duration bug shape as `scan_stale_locks_at`. Even if the
3373    /// scan flagged the lock, a future-stamped `last_modified` returned
3374    /// by HEAD pre-fix made `try_from` fail, `is_ok_and` return false,
3375    /// and the helper report `SkippedNotStale` — refusing the explicit
3376    /// operator request. Pin the TTL=0 path: the helper must delete.
3377    #[tokio::test]
3378    async fn delete_stale_lock_if_still_stale_evicts_future_stamped_at_zero_ttl() {
3379        let mock = MockStore::new();
3380        let key = "myrepo/refs/heads/main/LOCK#.lock";
3381        let future = OffsetDateTime::now_utc() + time::Duration::minutes(1);
3382        mock.insert_with(key, Bytes::new(), future, PutOpts::default());
3383
3384        let mut out = Vec::new();
3385        let outcome = super::delete_stale_lock_if_still_stale(&mock, key, Duration::ZERO, &mut out)
3386            .await
3387            .expect("helper must not error on the future-stamped path");
3388
3389        assert_eq!(outcome, DeleteOutcome::Deleted);
3390        assert!(
3391            !mock.contains(key),
3392            "future-stamped lock must be deleted at TTL=0"
3393        );
3394    }
3395
3396    /// Issue #223 negative-direction pin for the re-HEAD path: a
3397    /// future-stamped lock at a POSITIVE TTL must NOT be deleted (it
3398    /// is not "older than" any positive threshold). Without this pin,
3399    /// a "fix" that flipped negative-age to "ancient" would erroneously
3400    /// evict legitimately-future-stamped locks at every TTL.
3401    #[tokio::test]
3402    async fn delete_stale_lock_if_still_stale_skips_future_stamped_at_positive_ttl() {
3403        let mock = MockStore::new();
3404        let key = "myrepo/refs/heads/main/LOCK#.lock";
3405        let future = OffsetDateTime::now_utc() + time::Duration::minutes(1);
3406        mock.insert_with(key, Bytes::new(), future, PutOpts::default());
3407
3408        let mut out = Vec::new();
3409        let outcome =
3410            super::delete_stale_lock_if_still_stale(&mock, key, Duration::from_mins(1), &mut out)
3411                .await
3412                .expect("helper must not error when refusing a future-stamped lock");
3413
3414        assert_eq!(outcome, DeleteOutcome::SkippedNotStale);
3415        assert!(
3416            mock.contains(key),
3417            "future-stamped lock must survive at positive TTL",
3418        );
3419    }
3420
3421    /// Full-pipeline pin for the `--lock-ttl-seconds 0 --delete-stale-locks`
3422    /// operator contract: a freshly-PUT lock (RECENT `last_modified`)
3423    /// MUST be deleted when `lock_ttl_seconds: Some(0)` is set.
3424    ///
3425    /// Wires the regression-prone path end-to-end inside `cargo test`:
3426    /// `DoctorOpts { lock_ttl_seconds: Some(0), delete_stale_locks: true }`
3427    /// → `Doctor::resolved_lock_ttl_seconds()` → `scan_stale_locks` →
3428    /// per-key re-HEAD → `store.delete`. The previous regression
3429    /// (#208 narrowed, 7dfa5dc) clamped `Some(0)` to env-or-default
3430    /// at the resolver boundary, so the freshly-PUT lock looked
3431    /// non-stale (age < 60s) and survived. The unit test
3432    /// `resolved_lock_ttl_honors_env_explicit_and_zero` covers the
3433    /// resolver in isolation; this test pins the same contract at the
3434    /// orchestration boundary that lives shellspec exercises (lesson 15
3435    /// — the in-tree analog of the live `doctor --delete-stale-locks`
3436    /// test that originally caught the regression).
3437    #[tokio::test]
3438    async fn doctor_with_explicit_zero_lock_ttl_deletes_fresh_lock() {
3439        let mock = MockStore::new();
3440        let lock_key = "myrepo/refs/heads/main/LOCK#.lock";
3441        // Fresh lock — `last_modified` defaults to `OffsetDateTime::now_utc()`
3442        // inside the mock, so the lock's age is essentially zero. With
3443        // the project default TTL (60s), this lock is NOT stale; with
3444        // an operator-explicit `--lock-ttl-seconds 0`, it IS stale.
3445        mock.insert(lock_key, Bytes::new());
3446        let synthetic_listing = vec![ObjectMeta {
3447            key: lock_key.to_owned(),
3448            size: 0,
3449            last_modified: OffsetDateTime::now_utc(),
3450            etag: None,
3451        }];
3452
3453        let opts = DoctorOpts {
3454            lock_ttl_seconds: Some(0),
3455            delete_stale_locks: true,
3456            ..DoctorOpts::default()
3457        };
3458        let prompter = ScriptedPrompter::new([]);
3459        let doctor = Doctor::new(store_arc(&mock), "myrepo", opts, &prompter);
3460        let mut out = Vec::new();
3461        doctor
3462            .list_and_handle_stale_locks(&mut out, &synthetic_listing)
3463            .await
3464            .expect("stale-lock handler");
3465        let captured = String::from_utf8(out).expect("utf-8 output");
3466
3467        assert!(
3468            !mock.contains(lock_key),
3469            "explicit --lock-ttl-seconds 0 must evict the fresh lock; \
3470             a regression that clamped Some(0) to env-or-default would \
3471             leave the lock in place. Captured output: {captured:?}",
3472        );
3473        assert!(
3474            captured.contains(&format!("Deleted {lock_key}")),
3475            "per-key deletion line missing from operator output: {captured:?}",
3476        );
3477    }
3478}